[tracker] TrackerMinerFS: make it possible to process files parallelly.
- From: Carlos Garnacho <carlosg src gnome org>
- To: svn-commits-list gnome org
- Cc:
- Subject: [tracker] TrackerMinerFS: make it possible to process files parallelly.
- Date: Thu, 8 Oct 2009 12:42:21 +0000 (UTC)
commit 5d2e503a523978055eb5bec2ee7b25187c48b42c
Author: Carlos Garnacho <carlos lanedo com>
Date: Wed Oct 7 17:35:55 2009 +0200
TrackerMinerFS: make it possible to process files parallelly.
Now there is a pool of files being processed, controlled by the
TrackerMinerFS::process-pool-limit (default value of 1), which
specifies the maximum number of files that can be processed at
the same time. Code flow has changed so no new files are processed
until there is room in the pool.
src/libtracker-miner/tracker-miner-fs.c | 149 ++++++++++++++++++++++--------
1 files changed, 109 insertions(+), 40 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 5731425..a5a009e 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -51,6 +51,11 @@ typedef struct {
gboolean recurse;
} DirectoryData;
+typedef struct {
+ GFile *file;
+ GCancellable *cancellable;
+} ProcessData;
+
struct TrackerMinerFSPrivate {
TrackerMonitor *monitor;
TrackerCrawler *crawler;
@@ -71,11 +76,11 @@ struct TrackerMinerFSPrivate {
guint crawl_directories_id;
guint item_queues_handler_id;
- GFile *current_file;
- GCancellable *cancellable;
-
gdouble throttle;
+ GList *processing_pool;
+ guint pool_limit;
+
/* Status */
guint been_started : 1;
guint been_crawled : 1;
@@ -114,7 +119,8 @@ enum {
enum {
PROP_0,
- PROP_THROTTLE
+ PROP_THROTTLE,
+ PROP_POOL_LIMIT
};
static void fs_finalize (GObject *object);
@@ -217,6 +223,13 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
"Modifier for the indexing speed, 0 is max speed",
0, 1, 0,
G_PARAM_READWRITE));
+ g_object_class_install_property (object_class,
+ PROP_POOL_LIMIT,
+ g_param_spec_uint ("process-pool-limit",
+ "Processing pool limit",
+ "Number of files that can be concurrently processed",
+ 1, G_MAXUINT, 1,
+ G_PARAM_READWRITE));
/**
* TrackerMinerFS::check-file:
* @miner_fs: the #TrackerMinerFS
@@ -382,6 +395,44 @@ tracker_miner_fs_init (TrackerMinerFS *object)
priv->quark_ignore_file = g_quark_from_static_string ("tracker-ignore-file");
}
+static ProcessData *
+process_data_new (GFile *file,
+ GCancellable *cancellable)
+{
+ ProcessData *data;
+
+ data = g_slice_new (ProcessData);
+ data->file = g_object_ref (file);
+ data->cancellable = g_object_ref (cancellable);
+
+ return data;
+}
+
+static void
+process_data_free (ProcessData *data)
+{
+ g_object_unref (data->file);
+ g_object_unref (data->cancellable);
+ g_slice_free (ProcessData, data);
+}
+
+static ProcessData *
+process_data_find (TrackerMinerFS *fs,
+ GFile *file)
+{
+ GList *l;
+
+ for (l = fs->private->processing_pool; l; l = l->next) {
+ ProcessData *data = l->data;
+
+ if (g_file_equal (data->file, file)) {
+ return data;
+ }
+ }
+
+ return NULL;
+}
+
static void
fs_finalize (GObject *object)
{
@@ -403,15 +454,14 @@ fs_finalize (GObject *object)
g_object_unref (priv->crawler);
g_object_unref (priv->monitor);
- if (priv->cancellable) {
- g_object_unref (priv->cancellable);
- }
-
if (priv->directories) {
g_list_foreach (priv->directories, (GFunc) directory_data_free, NULL);
g_list_free (priv->directories);
}
+ g_list_foreach (priv->processing_pool, (GFunc) process_data_free, NULL);
+ g_list_free (priv->processing_pool);
+
g_queue_foreach (priv->items_moved, (GFunc) item_moved_data_free, NULL);
g_queue_free (priv->items_moved);
@@ -433,11 +483,16 @@ fs_set_property (GObject *object,
const GValue *value,
GParamSpec *pspec)
{
+ TrackerMinerFS *fs = TRACKER_MINER_FS (object);
+
switch (prop_id) {
case PROP_THROTTLE:
tracker_miner_fs_set_throttle (TRACKER_MINER_FS (object),
g_value_get_double (value));
break;
+ case PROP_POOL_LIMIT:
+ fs->private->pool_limit = g_value_get_uint (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -458,6 +513,9 @@ fs_get_property (GObject *object,
case PROP_THROTTLE:
g_value_set_double (value, fs->private->throttle);
break;
+ case PROP_POOL_LIMIT:
+ g_value_set_uint (value, fs->private->pool_limit);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -655,6 +713,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
const GError *error,
gpointer user_data)
{
+ ProcessData *data;
gchar *uri;
uri = g_file_get_uri (file);
@@ -680,14 +739,12 @@ item_add_or_update_cb (TrackerMinerFS *fs,
}
}
- if (fs->private->cancellable) {
- g_object_unref (fs->private->cancellable);
- fs->private->cancellable = NULL;
- }
+ data = process_data_find (fs, file);
- if (fs->private->current_file) {
- g_object_unref (fs->private->current_file);
- fs->private->current_file = NULL;
+ if (data) {
+ process_data_free (data);
+ fs->private->processing_pool =
+ g_list_remove (fs->private->processing_pool, data);
}
/* Can be NULL on error */
@@ -697,41 +754,51 @@ item_add_or_update_cb (TrackerMinerFS *fs,
g_free (uri);
- /* Processing is now done, continue with other files */
- item_queue_handlers_set_up (fs);
+ if (g_list_length (fs->private->processing_pool) < fs->private->pool_limit) {
+ /* There is room in the pool for more files */
+ item_queue_handlers_set_up (fs);
+ }
}
static gboolean
item_add_or_update (TrackerMinerFS *fs,
GFile *file)
{
+ TrackerMinerFSPrivate *priv;
TrackerSparqlBuilder *sparql;
+ GCancellable *cancellable;
gboolean processing;
- if (fs->private->cancellable) {
- g_debug ("Cancellable for older operation still around, destroying");
- g_object_unref (fs->private->cancellable);
- }
-
- fs->private->cancellable = g_cancellable_new ();
+ priv = fs->private;
+ cancellable = g_cancellable_new ();
sparql = tracker_sparql_builder_new_update ();
processing = TRACKER_MINER_FS_GET_CLASS (fs)->process_file (fs, file, sparql,
- fs->private->cancellable,
+ cancellable,
item_add_or_update_cb,
NULL);
if (!processing) {
g_object_unref (sparql);
- g_object_unref (fs->private->cancellable);
- fs->private->cancellable = NULL;
+ g_object_unref (cancellable);
return TRUE;
} else {
- fs->private->current_file = g_object_ref (file);
- }
+ ProcessData *data;
+ guint length;
- return FALSE;
+ data = process_data_new (file, cancellable);
+ priv->processing_pool = g_list_prepend (priv->processing_pool, data);
+ length = g_list_length (priv->processing_pool);
+
+ g_object_unref (cancellable);
+
+ if (length >= priv->pool_limit) {
+ return FALSE;
+ } else {
+ return TRUE;
+ }
+ }
}
static gboolean
@@ -1015,7 +1082,8 @@ item_queue_handlers_cb (gpointer user_data)
switch (queue) {
case QUEUE_NONE:
/* Print stats and signal finished */
- if (!fs->private->is_crawling) {
+ if (!fs->private->is_crawling &&
+ !fs->private->processing_pool) {
process_stop (fs);
}
@@ -1492,7 +1560,7 @@ crawler_finished_cb (TrackerCrawler *crawler,
fs->private->total_files_ignored += files_ignored;
g_message ("%s crawling files after %2.2f seconds",
- was_interrupted ? "Stoped" : "Finished",
+ was_interrupted ? "Stopped" : "Finished",
g_timer_elapsed (fs->private->timer, NULL));
g_message (" Found %d directories, ignored %d directories",
directories_found,
@@ -1671,7 +1739,7 @@ tracker_miner_fs_remove_directory (TrackerMinerFS *fs,
{
TrackerMinerFSPrivate *priv;
gboolean return_val = FALSE;
- GList *dirs;
+ GList *dirs, *pool;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), FALSE);
g_return_val_if_fail (G_IS_FILE (file), FALSE);
@@ -1713,14 +1781,15 @@ tracker_miner_fs_remove_directory (TrackerMinerFS *fs,
check_files_removal (priv->items_updated, file);
check_files_removal (priv->items_created, file);
- if (priv->current_file &&
- priv->cancellable &&
- (g_file_equal (priv->current_file, file) ||
- g_file_has_prefix (priv->current_file, file))) {
- /* Cancel processing if currently processed file is
- * inside the removed directory.
- */
- g_cancellable_cancel (priv->cancellable);
+ pool = fs->private->processing_pool;
+
+ while (pool) {
+ ProcessData *data = pool->data;
+
+ if (g_file_equal (data->file, file) ||
+ g_file_has_prefix (data->file, file)) {
+ g_cancellable_cancel (data->cancellable);
+ }
}
return return_val;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]