[tracker/wip/miner-priority-queues: 12/19] tracker-miner-fs: Use TrackerTaskPool for the extraction pool
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/miner-priority-queues: 12/19] tracker-miner-fs: Use TrackerTaskPool for the extraction pool
- Date: Wed, 13 Jul 2011 16:24:15 +0000 (UTC)
commit 77038048f9c22f34648597420db57de8fee2f41f
Author: Carlos Garnacho <carlos lanedo com>
Date: Mon Jul 4 12:16:31 2011 +0200
tracker-miner-fs: Use TrackerTaskPool for the extraction pool
src/libtracker-miner/tracker-miner-fs.c | 124 ++++++++++++++++---------------
1 files changed, 63 insertions(+), 61 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 66754fd..dff0f5e 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -33,6 +33,7 @@
#include "tracker-thumbnailer.h"
#include "tracker-miner-fs-processing-pool.h"
#include "tracker-priority-queue.h"
+#include "tracker-task-pool.h"
/* If defined will print the tree from GNode while running */
#ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -132,6 +133,7 @@ typedef struct {
} DirectoryData;
typedef struct {
+ GFile *file;
gchar *urn;
gchar *parent_urn;
GCancellable *cancellable;
@@ -199,6 +201,10 @@ struct _TrackerMinerFSPrivate {
gdouble throttle;
+ /* Extraction tasks */
+ TrackerTaskPool *task_pool;
+
+ /* Sparql insertion tasks */
TrackerProcessingPool *processing_pool;
/* URI mtime cache */
@@ -363,7 +369,7 @@ static void tracker_miner_fs_directory_add_internal (TrackerMinerFS *f
static gboolean miner_fs_has_children_without_parent (TrackerMinerFS *fs,
GFile *file);
-static void processing_pool_cancel_foreach (gpointer data,
+static void task_pool_cancel_foreach (gpointer data,
gpointer user_data);
@@ -688,7 +694,8 @@ tracker_miner_fs_init (TrackerMinerFS *object)
(GDestroyNotify) g_free,
(GDestroyNotify) NULL);
- /* Create processing pool */
+ /* Create processing pools */
+ priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
priv->processing_pool = tracker_processing_pool_new (object,
DEFAULT_WAIT_POOL_LIMIT,
DEFAULT_READY_POOL_LIMIT,
@@ -794,9 +801,11 @@ fs_finalize (GObject *object)
tracker_priority_queue_unref (priv->crawled_directories);
/* Cancel every pending task */
- tracker_processing_pool_foreach (priv->processing_pool,
- processing_pool_cancel_foreach,
- NULL);
+ tracker_task_pool_foreach (priv->task_pool,
+ task_pool_cancel_foreach,
+ NULL);
+ g_object_unref (priv->task_pool);
+
tracker_processing_pool_free (priv->processing_pool);
tracker_priority_queue_foreach (priv->items_moved,
@@ -858,8 +867,8 @@ fs_set_property (GObject *object,
g_value_get_double (value));
break;
case PROP_WAIT_POOL_LIMIT:
- tracker_processing_pool_set_wait_limit (fs->priv->processing_pool,
- g_value_get_uint (value));
+ tracker_task_pool_set_limit (fs->priv->task_pool,
+ g_value_get_uint (value));
break;
case PROP_READY_POOL_LIMIT:
tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
@@ -897,7 +906,7 @@ fs_get_property (GObject *object,
break;
case PROP_WAIT_POOL_LIMIT:
g_value_set_uint (value,
- tracker_processing_pool_get_wait_limit (fs->priv->processing_pool));
+ tracker_task_pool_get_limit (fs->priv->task_pool));
break;
case PROP_READY_POOL_LIMIT:
g_value_set_uint (value,
@@ -1624,8 +1633,8 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
}
static gboolean
-do_process_file (TrackerMinerFS *fs,
- TrackerProcessingTask *task)
+do_process_file (TrackerMinerFS *fs,
+ TrackerTask *task)
{
TrackerMinerFSPrivate *priv;
gboolean processing;
@@ -1634,8 +1643,8 @@ do_process_file (TrackerMinerFS *fs,
GFile *task_file;
UpdateProcessingTaskContext *ctxt;
- ctxt = tracker_processing_task_get_context (task);
- task_file = tracker_processing_task_get_file (task);
+ ctxt = tracker_task_get_data (task);
+ task_file = tracker_task_get_file (task);
uri = g_file_get_uri (task_file);
priv = fs->priv;
@@ -1662,7 +1671,7 @@ do_process_file (TrackerMinerFS *fs,
/* Re-fetch data, since it might have been
* removed in broken implementations
*/
- task = tracker_processing_pool_find_task (priv->processing_pool, task_file, FALSE);
+ task = tracker_task_pool_find (priv->task_pool, task_file);
g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
@@ -1672,8 +1681,8 @@ do_process_file (TrackerMinerFS *fs,
"tracker_miner_fs_file_notify(), this is an "
"implementation error", G_OBJECT_TYPE_NAME (fs), uri);
} else {
- tracker_processing_pool_remove_task (priv->processing_pool, task);
- tracker_processing_task_unref (task);
+ tracker_task_pool_remove (priv->task_pool, task);
+ tracker_task_unref (task);
}
}
@@ -1683,26 +1692,26 @@ do_process_file (TrackerMinerFS *fs,
}
static void
-item_add_or_update_cb (TrackerMinerFS *fs,
- TrackerProcessingTask *task,
- const GError *error)
+item_add_or_update_cb (TrackerMinerFS *fs,
+ TrackerTask *extraction_task,
+ const GError *error)
{
UpdateProcessingTaskContext *ctxt;
+ TrackerProcessingTask *task;
GFile *task_file;
gchar *uri;
- ctxt = tracker_processing_task_get_context (task);
- task_file = tracker_processing_task_get_file (task);
+ ctxt = tracker_task_get_data (extraction_task);
+ task_file = tracker_task_get_file (extraction_task);
uri = g_file_get_uri (task_file);
+ tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+
if (error) {
g_message ("Could not process '%s': %s", uri, error->message);
fs->priv->total_files_notified_error++;
- tracker_processing_pool_remove_task (fs->priv->processing_pool, task);
- tracker_processing_task_unref (task);
-
item_queue_handlers_set_up (fs);
} else {
if (ctxt->urn) {
@@ -1770,6 +1779,8 @@ item_add_or_update_cb (TrackerMinerFS *fs,
}
}
+ tracker_task_unref (extraction_task);
+
g_free (uri);
}
@@ -1781,7 +1792,7 @@ item_add_or_update (TrackerMinerFS *fs,
TrackerSparqlBuilder *sparql;
GCancellable *cancellable;
gboolean retval;
- TrackerProcessingTask *task;
+ TrackerTask *task;
GFile *parent;
const gchar *urn;
const gchar *parent_urn = NULL;
@@ -1834,21 +1845,19 @@ item_add_or_update (TrackerMinerFS *fs,
/* Create task and add it to the pool as a WAIT task (we need to extract
* the file metadata and such) */
- task = tracker_processing_task_new (file);
ctxt = update_processing_task_context_new (TRACKER_MINER (fs),
urn,
parent_urn,
cancellable,
sparql);
- tracker_processing_task_set_context (task,
- ctxt,
- (GFreeFunc) update_processing_task_context_free);
- tracker_processing_pool_push_wait_task (priv->processing_pool, task);
+ task = tracker_task_new (file, ctxt,
+ (GDestroyNotify) update_processing_task_context_free);
+ tracker_task_pool_add (priv->task_pool, task);
if (do_process_file (fs, task)) {
fs->priv->total_files_processed++;
- if (tracker_processing_pool_wait_limit_reached (priv->processing_pool)) {
+ if (tracker_task_pool_limit_reached (priv->task_pool)) {
retval = FALSE;
}
}
@@ -2398,9 +2407,7 @@ should_wait (TrackerMinerFS *fs,
GFile *parent;
/* Is the item already being processed? */
- if (tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- TRUE)) {
+ if (tracker_task_pool_find (fs->priv->task_pool, file)) {
/* Yes, a previous event on same item currently
* being processed */
return TRUE;
@@ -2409,9 +2416,7 @@ should_wait (TrackerMinerFS *fs,
/* Is the item's parent being processed right now? */
parent = g_file_get_parent (file);
if (parent) {
- if (tracker_processing_pool_find_task (fs->priv->processing_pool,
- parent,
- TRUE)) {
+ if (tracker_task_pool_find (fs->priv->task_pool, parent)) {
/* Yes, a previous event on the parent of this item
* currently being processed */
g_object_unref (parent);
@@ -2466,7 +2471,7 @@ item_queue_get_next_file (TrackerMinerFS *fs,
!tracker_priority_queue_is_empty (fs->priv->crawled_directories)) {
trace_eq ("Created items queue empty, but still crawling (%d tasks in WAIT state)",
- tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool));
+ tracker_task_pool_get_size (fs->priv->task_pool));
/* The items_created queue is empty, but there are pending
* items from the crawler to be processed. We feed the queue
@@ -2474,7 +2479,7 @@ item_queue_get_next_file (TrackerMinerFS *fs,
* info is inserted to the store before the children are
* inspected.
*/
- if (tracker_processing_pool_get_wait_task_count (fs->priv->processing_pool) > 0) {
+ if (tracker_task_pool_get_size (fs->priv->task_pool) > 0) {
/* Items still being processed */
*file = NULL;
*source_file = NULL;
@@ -2813,6 +2818,7 @@ item_queue_handlers_cb (gpointer user_data)
case QUEUE_NONE:
/* Print stats and signal finished */
if (!fs->priv->is_crawling &&
+ tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
process_stop (fs);
}
@@ -2907,7 +2913,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
}
/* Already sent max number of tasks to tracker-extract? */
- if (tracker_processing_pool_wait_limit_reached (fs->priv->processing_pool)) {
+ if (tracker_task_pool_limit_reached (fs->priv->task_pool)) {
return;
}
@@ -4249,16 +4255,16 @@ tracker_miner_fs_directory_add (TrackerMinerFS *fs,
}
static void
-processing_pool_cancel_foreach (gpointer data,
- gpointer user_data)
+task_pool_cancel_foreach (gpointer data,
+ gpointer user_data)
{
- TrackerProcessingTask *task = data;
+ TrackerTask *task = data;
GFile *file = user_data;
GFile *task_file;
UpdateProcessingTaskContext *ctxt;
- task_file = tracker_processing_task_get_file (task);
- ctxt = tracker_processing_task_get_context (task);
+ ctxt = tracker_task_get_data (task);
+ task_file = tracker_task_get_file (task);
if (ctxt &&
ctxt->cancellable &&
@@ -4298,9 +4304,9 @@ tracker_miner_fs_directory_remove (TrackerMinerFS *fs,
g_debug ("Removing directory");
/* Cancel all pending tasks on files inside the path given by file */
- tracker_processing_pool_foreach (priv->processing_pool,
- processing_pool_cancel_foreach,
- file);
+ tracker_task_pool_foreach (priv->task_pool,
+ task_pool_cancel_foreach,
+ file);
g_debug (" Cancelled processing pool tasks at %f\n", g_timer_elapsed (timer, NULL));
@@ -4604,16 +4610,14 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
GFile *file,
const GError *error)
{
- TrackerProcessingTask *task;
+ TrackerTask *task;
g_return_if_fail (TRACKER_IS_MINER_FS (fs));
g_return_if_fail (G_IS_FILE (file));
fs->priv->total_files_notified++;
- task = tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- FALSE);
+ task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
@@ -4716,15 +4720,13 @@ G_CONST_RETURN gchar *
tracker_miner_fs_get_urn (TrackerMinerFS *fs,
GFile *file)
{
- TrackerProcessingTask *task;
+ TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
/* Check if found in currently processed data */
- task = tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- FALSE);
+ task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
@@ -4740,7 +4742,8 @@ tracker_miner_fs_get_urn (TrackerMinerFS *fs,
UpdateProcessingTaskContext *ctxt;
/* We are only storing the URN in the created/updated tasks */
- ctxt = tracker_processing_task_get_context (task);
+ ctxt = tracker_task_get_data (task);
+
if (!ctxt) {
gchar *uri;
@@ -4808,15 +4811,13 @@ G_CONST_RETURN gchar *
tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
GFile *file)
{
- TrackerProcessingTask *task;
+ TrackerTask *task;
g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), NULL);
g_return_val_if_fail (G_IS_FILE (file), NULL);
/* Check if found in currently processed data */
- task = tracker_processing_pool_find_task (fs->priv->processing_pool,
- file,
- FALSE);
+ task = tracker_task_pool_find (fs->priv->task_pool, file);
if (!task) {
gchar *uri;
@@ -4832,7 +4833,8 @@ tracker_miner_fs_get_parent_urn (TrackerMinerFS *fs,
UpdateProcessingTaskContext *ctxt;
/* We are only storing the URN in the created/updated tasks */
- ctxt = tracker_processing_task_get_context (task);
+ ctxt = tracker_task_get_data (task);
+
if (!ctxt) {
gchar *uri;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]