[tracker/miner-fs-limit-requests: 1/7] libtracker-miner: New limit for the number of requests sent to the store



commit 453e5395ccc1d9d116291485080a6e8736be6c28
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Tue Mar 15 12:28:07 2011 +0100

    libtracker-miner: New limit for the number of requests sent to the store

 .../tracker-miner-fs-processing-pool.c             |  246 +++++++++++++++++---
 .../tracker-miner-fs-processing-pool.h             |    7 +-
 src/libtracker-miner/tracker-miner-fs.c            |   21 ++-
 3 files changed, 234 insertions(+), 40 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 9eabaa4..bf22c43 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -186,6 +186,14 @@ struct _TrackerProcessingPool {
 	/* The processing pool limits */
 	guint limit[TRACKER_PROCESSING_TASK_STATUS_LAST];
 
+	/* The current number of requests sent to the store */
+	guint n_requests;
+	/* The limit for number of requests sent to the store */
+	guint limit_n_requests;
+	/* The list of UpdateArrayData items pending to be flushed, blocked
+	 * because the maximum number of requests was reached */
+	GQueue *pending_requests;
+
 	/* SPARQL buffer to pile up several UPDATEs */
 	GPtrArray *sparql_buffer;
 	GFile *sparql_buffer_current_parent;
@@ -199,7 +207,9 @@ struct _TrackerProcessingPool {
 };
 
 typedef struct {
+	TrackerProcessingPool *pool;
 	GPtrArray *tasks;
+	GArray *sparql_array;
 	GArray *error_map;
 	guint n_bulk_operations;
 } UpdateArrayData;
@@ -360,6 +370,10 @@ pool_status_trace_timeout_cb (gpointer data)
 			l = g_list_next (l);
 		}
 	}
+	trace ("(Processing Pool %s) Requests being currently processed: %u (max: %u)",
+	       G_OBJECT_TYPE_NAME (pool->miner),
+	       pool->n_requests,
+	       pool->limit_n_requests);
 	return TRUE;
 }
 #endif /* PROCESSING_POOL_ENABLE_TRACE */
@@ -377,6 +391,23 @@ pool_queue_free_foreach (gpointer data,
 	}
 }
 
+static void
+update_array_data_free (UpdateArrayData *update_data)
+{
+	if (!update_data)
+		return;
+
+	if (update_data->sparql_array) {
+		/* The array contains pointers to strings in the tasks, so no need to
+		 * deallocate its pointed contents, just the array itself. */
+		g_array_free (update_data->sparql_array, TRUE);
+	}
+
+	g_ptr_array_free (update_data->tasks, TRUE);
+	g_array_free (update_data->error_map, TRUE);
+	g_slice_free (UpdateArrayData, update_data);
+}
+
 void
 tracker_processing_pool_free (TrackerProcessingPool *pool)
 {
@@ -390,6 +421,11 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
 		g_source_remove (pool->timeout_id);
 #endif /* PROCESSING_POOL_ENABLE_TRACE */
 
+	g_queue_foreach (pool->pending_requests,
+	                 (GFunc)update_array_data_free,
+	                 NULL);
+	g_queue_free (pool->pending_requests);
+
 	/* Free any pending task here... shouldn't really
 	 * be any */
 	for (i = TRACKER_PROCESSING_TASK_STATUS_WAIT;
@@ -410,12 +446,13 @@ tracker_processing_pool_free (TrackerProcessingPool *pool)
 	}
 
 	g_free (pool);
-}
+	                 }
 
 TrackerProcessingPool *
 tracker_processing_pool_new (TrackerMinerFS *miner,
                              guint           limit_wait,
-                             guint           limit_ready)
+                             guint           limit_ready,
+                             guint           limit_n_requests)
 {
 	TrackerProcessingPool *pool;
 
@@ -426,16 +463,21 @@ tracker_processing_pool_new (TrackerMinerFS *miner,
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit_ready;
 	/* convenience limit, not really used currently */
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = G_MAXUINT;
+	pool->limit_n_requests = limit_n_requests;
 
 	pool->tasks[TRACKER_PROCESSING_TASK_STATUS_WAIT] = g_queue_new ();
 	pool->tasks[TRACKER_PROCESSING_TASK_STATUS_READY] = g_queue_new ();
 	pool->tasks[TRACKER_PROCESSING_TASK_STATUS_PROCESSING] = g_queue_new ();
 
+	pool->pending_requests = g_queue_new ();
+
 	g_debug ("Processing pool created with a limit of "
-	         "%u tasks in WAIT status and "
-	         "%u tasks in READY status",
+	         "%u tasks in WAIT status, "
+	         "%u tasks in READY status and "
+	         "%u requests",
 	         limit_wait,
-	         limit_ready);
+	         limit_ready,
+	         limit_n_requests);
 
 #ifdef PROCESSING_POOL_ENABLE_TRACE
 	pool->timeout_id = g_timeout_add_seconds (POOL_STATUS_TRACE_TIMEOUT_SECS,
@@ -464,6 +506,15 @@ tracker_processing_pool_set_ready_limit (TrackerProcessingPool *pool,
 	pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] = limit;
 }
 
+void
+tracker_processing_pool_set_n_requests_limit (TrackerProcessingPool *pool,
+                                              guint                  limit)
+{
+	g_message ("Processing pool limit for number of requests set to %u",
+	           limit);
+	pool->limit_n_requests = limit;
+}
+
 guint
 tracker_processing_pool_get_wait_limit (TrackerProcessingPool *pool)
 {
@@ -476,6 +527,12 @@ tracker_processing_pool_get_ready_limit (TrackerProcessingPool *pool)
 	return pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY];
 }
 
+guint
+tracker_processing_pool_get_n_requests_limit (TrackerProcessingPool *pool)
+{
+	return pool->limit_n_requests;
+}
+
 gboolean
 tracker_processing_pool_wait_limit_reached (TrackerProcessingPool *pool)
 {
@@ -492,6 +549,12 @@ tracker_processing_pool_ready_limit_reached (TrackerProcessingPool *pool)
 	        TRUE : FALSE);
 }
 
+static gboolean
+tracker_processing_pool_n_requests_limit_reached (TrackerProcessingPool *pool)
+{
+	return (pool->n_requests >= pool->limit_n_requests ? TRUE : FALSE);
+}
+
 TrackerProcessingTask *
 tracker_processing_pool_find_task (TrackerProcessingPool *pool,
                                    GFile                 *file,
@@ -558,16 +621,28 @@ tracker_processing_pool_sparql_update_cb (GObject      *object,
                                           GAsyncResult *result,
                                           gpointer      user_data)
 {
+	TrackerProcessingPool *pool;
 	TrackerProcessingTask *task;
 	GError *error = NULL;
-
-	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+	gboolean flush_next;
 
 	task = user_data;
+	pool = task->pool;
+
+	/* If we had reached the limit of requests, flush next as this request is
+	 * just finished */
+	flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+	/* Request finished */
+	pool->n_requests--;
 
-	trace ("(Processing Pool) Finished update of task %p for file '%s'",
+	trace ("(Processing Pool) Finished update of task %p for file '%s' "
+	       "(%u requests pending)",
 	       task,
-	       task->file_uri);
+	       task->file_uri,
+	       pool->n_requests);
+
+	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
 
 	/* Before calling user-provided callback, REMOVE the task from the pool;
 	 * as the user-provided callback may actually modify the pool again */
@@ -579,6 +654,13 @@ tracker_processing_pool_sparql_update_cb (GObject      *object,
 	/* Deallocate unneeded stuff */
 	tracker_processing_task_free (task);
 	g_clear_error (&error);
+
+	/* Flush if needed */
+	if (flush_next) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Pool request limit was reached and "
+		                                      "request just finished");
+	}
 }
 
 static void
@@ -586,16 +668,28 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
                                                 GAsyncResult *result,
                                                 gpointer      user_data)
 {
+	TrackerProcessingPool *pool;
 	GError *global_error = NULL;
 	GPtrArray *sparql_array_errors;
 	UpdateArrayData *update_data;
+	gboolean flush_next;
 	guint i;
 
 	/* Get arrays of errors and queries */
 	update_data = user_data;
+	pool = update_data->pool;
+
+	/* If we had reached the limit of requests, flush next as this request is
+	 * just finished */
+	flush_next = tracker_processing_pool_n_requests_limit_reached (pool);
+
+	/* Request finished */
+	pool->n_requests--;
 
-	trace ("(Processing Pool) Finished array-update of tasks %p",
-	       update_data->tasks);
+	trace ("(Processing Pool) Finished array-update of tasks %p"
+	       "(%u requests pending)",
+	       update_data->tasks,
+	       pool->n_requests);
 
 	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
 	                                                                     result,
@@ -646,13 +740,18 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 		g_ptr_array_unref (sparql_array_errors);
 
 	/* Note that tasks are actually deallocated here */
-	g_ptr_array_free (update_data->tasks, TRUE);
-	g_array_free (update_data->error_map, TRUE);
-	g_slice_free (UpdateArrayData, update_data);
+	update_array_data_free (update_data);
 
 	if (global_error) {
 		g_error_free (global_error);
 	}
+
+	/* Flush if needed */
+	if (flush_next) {
+		tracker_processing_pool_buffer_flush (pool,
+		                                      "Pool request limit was reached and "
+		                                      "UpdateArrayrequest just finished");
+	}
 }
 
 static void
@@ -755,6 +854,76 @@ bulk_operation_merge_free (BulkOperationMerge *operation)
 	g_slice_free (BulkOperationMerge, operation);
 }
 
+static void
+processing_pool_update_array_flush (TrackerProcessingPool *pool,
+                                    UpdateArrayData       *update_data,
+                                    const gchar           *reason)
+{
+	/* This method will flush the UpdateArrayData passed as
+	 * argument if:
+	 *  - The threshold of requests not reached.
+	 *  - There is no other pending request to flush.
+	 *
+	 * Otherwise, the passed UpdateArrayData will be queued (if any) and the
+	 * first one in the pending queue will get flushed.
+	 */
+	UpdateArrayData *to_flush;
+
+	/* If we cannot flush anything or existing pending requests to flush,
+	 * just queue the UpdateArrayData if any */
+	if (tracker_processing_pool_n_requests_limit_reached (pool)) {
+		/* If we hit the threshold, there's nothing to flush */
+		to_flush = NULL;
+
+		if (update_data) {
+			trace ("(Processing Pool %s) Queueing array-update of tasks %p with %u items "
+			       "(%s, threshold reached)",
+			       G_OBJECT_TYPE_NAME (pool->miner),
+			       update_data->tasks,
+			       update_data->tasks->len,
+			       reason ? reason : "Unknown reason");
+			g_queue_push_tail (pool->pending_requests, update_data);
+		}
+	} else if (pool->pending_requests->length > 0) {
+		/* There are other pending tasks to be flushed, we need to queue this one if any. */
+		to_flush = g_queue_pop_head (pool->pending_requests);
+
+		if (update_data) {
+			trace ("(Processing Pool %s) Queueing array-update of tasks %p with %u items "
+			       "(%s, pending requests first)",
+			       G_OBJECT_TYPE_NAME (pool->miner),
+			       update_data->tasks,
+			       update_data->tasks->len,
+			       reason ? reason : "Unknown reason");
+			g_queue_push_tail (pool->pending_requests, update_data);
+		}
+	} else {
+		/* No pending requests, flush the received UpdateArrayData, if any */
+		to_flush = update_data;
+	}
+
+	/* If nothing to flush, return */
+	if (!to_flush)
+		return;
+
+	trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
+	       G_OBJECT_TYPE_NAME (pool->miner),
+	       to_flush->tasks,
+	       to_flush->tasks->len,
+	       reason ? reason : "Unknown reason");
+
+	/* New Request */
+	pool->n_requests++;
+
+	tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
+	                                              (gchar **) to_flush->sparql_array->data,
+	                                              to_flush->sparql_array->len,
+	                                              G_PRIORITY_DEFAULT,
+	                                              NULL,
+	                                              tracker_processing_pool_sparql_update_array_cb,
+	                                              to_flush);
+}
+
 void
 tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
                                       const gchar           *reason)
@@ -764,11 +933,17 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	UpdateArrayData *update_data;
 	guint i, j;
 
-	if (!pool->sparql_buffer)
+	/* If no sparql buffer, flush any pending request, if any;
+	 * or just return otherwise */
+	if (!pool->sparql_buffer) {
+		processing_pool_update_array_flush (pool, NULL, reason);
 		return;
+	}
 
 	/* Loop buffer and construct array of strings */
 	sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
+
+	/* TODO: Avoid preallocating this, as we may not have any bulk operation */
 	bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
 	error_map = g_array_new (TRUE, TRUE, sizeof (gint));
 
@@ -834,24 +1009,21 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 		}
 	}
 
-	trace ("(Processing Pool %s) Flushing array-update of tasks %p with %u items (%s)",
-	       G_OBJECT_TYPE_NAME (pool->miner),
-	       pool->sparql_buffer,
-	       pool->sparql_buffer->len,
-	       reason ? reason : "Unknown reason");
-
+	/* Create new UpdateArrayData with the contents, which take ownership
+	 * of the SPARQL buffer. */
 	update_data = g_slice_new0 (UpdateArrayData);
+	update_data->pool = pool;
 	update_data->tasks = pool->sparql_buffer;
 	update_data->n_bulk_operations = bulk_ops->len;
 	update_data->error_map = error_map;
+	update_data->sparql_array = sparql_array;
 
-	tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
-	                                              (gchar **) sparql_array->data,
-	                                              sparql_array->len,
-	                                              G_PRIORITY_DEFAULT,
-	                                              NULL,
-	                                              tracker_processing_pool_sparql_update_array_cb,
-	                                              update_data);
+	/* Reset buffer in the pool */
+	pool->sparql_buffer = NULL;
+	pool->sparql_buffer_start_time = 0;
+
+	/* Flush or queue... */
+	processing_pool_update_array_flush (pool, update_data, reason);
 
 	/* Clear current parent */
 	if (pool->sparql_buffer_current_parent) {
@@ -859,15 +1031,7 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 		pool->sparql_buffer_current_parent = NULL;
 	}
 
-	/* Clear temp buffer */
-	g_array_free (sparql_array, TRUE);
-
 	g_ptr_array_free (bulk_ops, TRUE);
-
-	pool->sparql_buffer_start_time = 0;
-	/* Note the whole buffer is passed to the update_array callback,
-	 * so no need to free it. */
-	pool->sparql_buffer = NULL;
 }
 
 gboolean
@@ -898,8 +1062,11 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 	task->finished_user_data = user_data;
 
 	/* If buffering not requested, OR the limit of READY tasks is actually 1,
-	 * flush previous buffer (if any) and then the new update */
-	if (!buffer || pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1) {
+	 * flush previous buffer (if any) and then the new update (only if n_requests limit
+	 * not reached, otherwise buffer it) */
+	if (!tracker_processing_pool_n_requests_limit_reached (pool) &&
+	    (!buffer ||
+	     pool->limit[TRACKER_PROCESSING_TASK_STATUS_READY] == 1)) {
 		BulkOperationMerge *operation = NULL;
 		const gchar *sparql = NULL;
 
@@ -936,6 +1103,9 @@ tracker_processing_pool_push_ready_task (TrackerProcessingPool
 		}
 
 		if (sparql) {
+			/* New Request */
+			pool->n_requests++;
+
 			tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
 			                                        sparql,
 			                                        G_PRIORITY_DEFAULT,
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.h b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
index 1d73794..f695718 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.h
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.h
@@ -61,14 +61,18 @@ void                   tracker_processing_task_set_bulk_operation (TrackerProces
 
 TrackerProcessingPool *tracker_processing_pool_new                   (TrackerMinerFS          *miner,
                                                                       guint                    limit_wait,
-                                                                      guint                    limit_process);
+                                                                      guint                    limit_process,
+                                                                      guint                    limit_n_requests);
 void                   tracker_processing_pool_free                  (TrackerProcessingPool   *pool);
 void                   tracker_processing_pool_set_wait_limit        (TrackerProcessingPool   *pool,
                                                                       guint                    limit);
 void                   tracker_processing_pool_set_ready_limit       (TrackerProcessingPool   *pool,
                                                                       guint                    limit);
+void                   tracker_processing_pool_set_n_requests_limit  (TrackerProcessingPool   *pool,
+                                                                      guint                    limit);
 guint                  tracker_processing_pool_get_wait_limit        (TrackerProcessingPool   *pool);
 guint                  tracker_processing_pool_get_ready_limit       (TrackerProcessingPool   *pool);
+guint                  tracker_processing_pool_get_n_requests_limit  (TrackerProcessingPool   *pool);
 TrackerProcessingTask *tracker_processing_pool_find_task             (TrackerProcessingPool   *pool,
                                                                       GFile                   *file,
                                                                       gboolean                 path_search);
@@ -82,6 +86,7 @@ gboolean               tracker_processing_pool_push_ready_task       (TrackerPro
                                                                       gboolean                 buffer,
                                                                       TrackerProcessingPoolTaskFinishedCallback finished_handler,
                                                                       gpointer                 user_data);
+guint                  tracker_processing_pool_get_n_requests        (TrackerProcessingPool   *pool);
 guint                  tracker_processing_pool_get_wait_task_count   (TrackerProcessingPool   *pool);
 guint                  tracker_processing_pool_get_total_task_count  (TrackerProcessingPool   *pool);
 TrackerProcessingTask *tracker_processing_pool_get_last_wait         (TrackerProcessingPool   *pool);
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 33453df..5e929e6 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -97,6 +97,7 @@ static gboolean miner_fs_queues_status_trace_timeout_cb (gpointer data);
 /* Default processing pool limits to be set */
 #define DEFAULT_WAIT_POOL_LIMIT 1
 #define DEFAULT_READY_POOL_LIMIT 1
+#define DEFAULT_N_REQUESTS_POOL_LIMIT 10
 
 /**
  * SECTION:tracker-miner-fs
@@ -262,6 +263,7 @@ enum {
 	PROP_THROTTLE,
 	PROP_WAIT_POOL_LIMIT,
 	PROP_READY_POOL_LIMIT,
+	PROP_N_REQUESTS_POOL_LIMIT,
 	PROP_MTIME_CHECKING,
 	PROP_INITIAL_CRAWLING
 };
@@ -402,6 +404,14 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
 	                                                    1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
 	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	g_object_class_install_property (object_class,
+	                                 PROP_N_REQUESTS_POOL_LIMIT,
+	                                 g_param_spec_uint ("processing-pool-requests-limit",
+	                                                    "Processing pool limit for number of requests",
+	                                                    "Maximum number of SPARQL requests that can be sent "
+	                                                    "to the store in parallel.",
+	                                                    1, G_MAXUINT, DEFAULT_N_REQUESTS_POOL_LIMIT,
+	                                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
+	g_object_class_install_property (object_class,
 	                                 PROP_MTIME_CHECKING,
 	                                 g_param_spec_boolean ("mtime-checking",
 	                                                       "Mtime checking",
@@ -666,7 +676,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
 	/* Create processing pool */
 	priv->processing_pool = tracker_processing_pool_new (object,
 	                                                     DEFAULT_WAIT_POOL_LIMIT,
-	                                                     DEFAULT_READY_POOL_LIMIT);
+	                                                     DEFAULT_READY_POOL_LIMIT,
+	                                                     DEFAULT_N_REQUESTS_POOL_LIMIT);
 
 	/* Set up the crawlers now we have config and hal */
 	priv->crawler = tracker_crawler_new ();
@@ -825,6 +836,10 @@ fs_set_property (GObject      *object,
 		tracker_processing_pool_set_ready_limit (fs->private->processing_pool,
 		                                         g_value_get_uint (value));
 		break;
+	case PROP_N_REQUESTS_POOL_LIMIT:
+		tracker_processing_pool_set_n_requests_limit (fs->private->processing_pool,
+		                                              g_value_get_uint (value));
+		break;
 	case PROP_MTIME_CHECKING:
 		fs->private->mtime_checking = g_value_get_boolean (value);
 		break;
@@ -859,6 +874,10 @@ fs_get_property (GObject    *object,
 		g_value_set_uint (value,
 		                  tracker_processing_pool_get_ready_limit (fs->private->processing_pool));
 		break;
+	case PROP_N_REQUESTS_POOL_LIMIT:
+		g_value_set_uint (value,
+		                  tracker_processing_pool_get_n_requests_limit (fs->private->processing_pool));
+		break;
 	case PROP_MTIME_CHECKING:
 		g_value_set_boolean (value, fs->private->mtime_checking);
 		break;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]