[tracker] libtracker-miner: Fix reference leak with TrackerTaskPool



commit 054509794b3afeb43f76625d96d8e4db5c7515a1
Author: Martyn Russell <martyn lanedo com>
Date:   Tue Aug 26 17:16:52 2014 +0100

    libtracker-miner: Fix reference leak with TrackerTaskPool
    
    The leak occurred because tracker_sparql_task_new_with_sparql() was being
    called but the returned TrackerTask* was not being unreferenced anywhere and
    the call to tracker_sparql_buffer_flush() with the new task was taking its own
    references internally.
    
    Took this opportunity to make the code here easier to follow:
    - do_process_file() is now merged into item_add_or_update()
    - item_add_or_update_cb() is renamed to item_add_or_update_continue() so it's
      obvious it is called from tracker_miner_fs_file_notify().
    - renamed various variables to make the code easier to follow.

 src/libtracker-miner/tracker-miner-fs.c      |  165 ++++++++++++-------------
 src/libtracker-miner/tracker-sparql-buffer.c |   10 ++-
 2 files changed, 87 insertions(+), 88 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 69a05ad..ab4c2ce 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1338,80 +1338,19 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
        g_slice_free (UpdateProcessingTaskContext, ctxt);
 }
 
-static gboolean
-do_process_file (TrackerMinerFS *fs,
-                 TrackerTask    *task)
-{
-       TrackerMinerFSPrivate *priv;
-       gboolean processing;
-       gboolean attribute_update_only;
-       gchar *uri;
-       GFile *task_file;
-       UpdateProcessingTaskContext *ctxt;
-
-       ctxt = tracker_task_get_data (task);
-       task_file = tracker_task_get_file (task);
-       uri = g_file_get_uri (task_file);
-       priv = fs->priv;
-
-       attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (task_file),
-                                                                    priv->quark_attribute_updated));
-
-       if (!attribute_update_only) {
-               g_debug ("Processing file '%s'...", uri);
-               g_signal_emit (fs, signals[PROCESS_FILE], 0,
-                              task_file,
-                              ctxt->builder,
-                              ctxt->cancellable,
-                              &processing);
-       } else {
-               g_debug ("Processing attributes in file '%s'...", uri);
-               g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
-                              task_file,
-                              ctxt->builder,
-                              ctxt->cancellable,
-                              &processing);
-       }
-
-       if (!processing) {
-               /* Re-fetch data, since it might have been
-                * removed in broken implementations
-                */
-               task = tracker_task_pool_find (priv->task_pool, task_file);
-
-               g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
-
-               if (!task) {
-                       g_critical ("%s has returned FALSE in ::process-file for '%s', "
-                                   "but it seems that this file has been processed through "
-                                   "tracker_miner_fs_file_notify(), this is an "
-                                   "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
-               } else {
-                       tracker_task_pool_remove (priv->task_pool, task);
-                       tracker_task_unref (task);
-               }
-       }
-
-       g_free (uri);
-
-       return processing;
-}
-
 static void
-item_add_or_update_cb (TrackerMinerFS *fs,
-                       TrackerTask    *extraction_task,
-                       const GError   *error)
+item_add_or_update_continue (TrackerMinerFS *fs,
+                             TrackerTask    *task,
+                             const GError   *error)
 {
        UpdateProcessingTaskContext *ctxt;
        TrackerTask *sparql_task = NULL;
-       GFile *task_file;
+       GFile *file;
        gchar *uri;
 
-       ctxt = tracker_task_get_data (extraction_task);
-       task_file = tracker_task_get_file (extraction_task);
-       uri = g_file_get_uri (task_file);
-
-       tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+       ctxt = tracker_task_get_data (task);
+       file = tracker_task_get_file (task);
+       uri = g_file_get_uri (file);
 
        if (error) {
                g_message ("Could not process '%s': %s", uri, error->message);
@@ -1420,15 +1359,13 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 
                if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND) &&
                    !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-                       sparql_task = tracker_sparql_task_new_with_sparql (task_file,
-                                                                          ctxt->builder);
+                       sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
                }
        } else {
                if (ctxt->urn) {
                        gboolean attribute_update_only;
 
-                       attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
-                                                                                      
fs->priv->quark_attribute_updated));
+                       attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (file), 
fs->priv->quark_attribute_updated));
                        g_debug ("Updating item '%s' with urn '%s'%s",
                                 uri,
                                 ctxt->urn,
@@ -1466,16 +1403,16 @@ item_add_or_update_cb (TrackerMinerFS *fs,
                                                               ctxt->urn, ctxt->urn,
                                                               tracker_sparql_builder_get_result 
(ctxt->builder));
 
-                               sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, 
full_sparql);
+                               sparql_task = tracker_sparql_task_new_take_sparql_str (file, full_sparql);
                        } else {
                                /* Do not drop graph if only updating attributes, the SPARQL builder
                                 * will already contain the necessary DELETE statements for the properties
                                 * being updated */
-                               sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
+                               sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
                        }
                } else {
                        g_debug ("Creating new item '%s'", uri);
-                       sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
+                       sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
                }
        }
 
@@ -1486,14 +1423,20 @@ item_add_or_update_cb (TrackerMinerFS *fs,
                                            sparql_buffer_task_finished_cb,
                                            fs);
 
-               if (item_queue_is_blocked_by_file (fs, task_file)) {
+               if (item_queue_is_blocked_by_file (fs, file)) {
                        tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item 
queue");
 
                        /* Check if we've finished inserting for given prefixes ... */
                        notify_roots_finished (fs, TRUE);
                }
+
+               /* We can let go of our reference here because the
+                * sparql buffer takes its own reference when adding
+                * it to the task pool.
+                */
+               tracker_task_unref (sparql_task);
        } else {
-               if (item_queue_is_blocked_by_file (fs, task_file)) {
+               if (item_queue_is_blocked_by_file (fs, file)) {
                        /* Make sure that we don't stall the item queue, although we could
                         * expect the file to be reenqueued until the loop detector makes
                         * us drop it since we were specifically waiting for it to complete.
@@ -1510,7 +1453,14 @@ item_add_or_update_cb (TrackerMinerFS *fs,
                item_queue_handlers_set_up (fs);
        }
 
-       tracker_task_unref (extraction_task);
+       /* Last reference is kept by the pool, removing the task from
+        * the pool cleans up the task too!
+        *
+        * NOTE that calling this any earlier actually causes invalid
+        * reads because the task frees up the
+        * UpdateProcessingTaskContext and GFile.
+        */
+       tracker_task_pool_remove (fs->priv->task_pool, task);
 
        g_free (uri);
 }
@@ -1541,15 +1491,17 @@ item_add_or_update (TrackerMinerFS *fs,
 {
        TrackerMinerFSPrivate *priv;
        TrackerSparqlBuilder *sparql;
+       UpdateProcessingTaskContext *ctxt;
        GCancellable *cancellable;
-       gboolean retval;
+       gboolean processing;
+       gboolean keep_processing;
+       gboolean attribute_update_only;
        TrackerTask *task;
-       const gchar *parent_urn, *urn = NULL;
-       UpdateProcessingTaskContext *ctxt;
+       const gchar *parent_urn, *urn;
+       gchar *uri;
        GFile *parent;
 
        priv = fs->priv;
-       retval = TRUE;
 
        cancellable = g_cancellable_new ();
        sparql = tracker_sparql_builder_new_update ();
@@ -1581,20 +1533,61 @@ item_add_or_update (TrackerMinerFS *fs,
                                 (GDestroyNotify) update_processing_task_context_free);
 
        tracker_task_pool_add (priv->task_pool, task);
+       tracker_task_unref (task);
+
+       /* Call ::process-file to see if we handle this resource or not */
+       uri = g_file_get_uri (file);
+
+       attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (file), 
priv->quark_attribute_updated));
+
+       if (!attribute_update_only) {
+               g_debug ("Processing file '%s'...", uri);
+               g_signal_emit (fs, signals[PROCESS_FILE], 0,
+                              file,
+                              ctxt->builder,
+                              ctxt->cancellable,
+                              &processing);
+       } else {
+               g_debug ("Processing attributes in file '%s'...", uri);
+               g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
+                              file,
+                              ctxt->builder,
+                              ctxt->cancellable,
+                              &processing);
+       }
+
+       keep_processing = TRUE;
+
+       if (!processing) {
+               /* Re-fetch data, since it might have been
+                * removed in broken implementations
+                */
+               task = tracker_task_pool_find (priv->task_pool, file);
 
-       if (do_process_file (fs, task)) {
+               g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
+
+               if (!task) {
+                       g_critical ("%s has returned FALSE in ::process-file for '%s', "
+                                   "but it seems that this file has been processed through "
+                                   "tracker_miner_fs_file_notify(), this is an "
+                                   "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
+               } else {
+                       tracker_task_pool_remove (priv->task_pool, task);
+               }
+       } else {
                fs->priv->total_files_processed++;
 
                if (tracker_task_pool_limit_reached (priv->task_pool)) {
-                       retval = FALSE;
+                       keep_processing = FALSE;
                }
        }
 
+       g_free (uri);
        g_object_unref (file);
        g_object_unref (cancellable);
        g_object_unref (sparql);
 
-       return retval;
+       return keep_processing;
 }
 
 static gboolean
@@ -3740,7 +3733,7 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
                return;
        }
 
-       item_add_or_update_cb (fs, task, error);
+       item_add_or_update_continue (fs, task, error);
 }
 
 /**
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index 418d3d7..43a43b4 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -693,14 +693,16 @@ sparql_buffer_push_to_pool (TrackerSparqlBuffer *buffer,
                reset_flush_timeout (buffer);
        }
 
-       /* Task pool addition adds a reference (below) */
+       /* Task pool addition increments reference */
        tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
 
        if (!priv->tasks) {
                priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
        }
 
-       g_ptr_array_add (priv->tasks, task);
+       /* We add a reference here because we unref when removed from
+        * the GPtrArray. */
+       g_ptr_array_add (priv->tasks, tracker_task_ref (task));
 
        if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
                tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
@@ -722,6 +724,10 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
        g_return_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer));
        g_return_if_fail (task != NULL);
 
+       /* NOTE: We don't own the task and if we want it we have to
+        * reference it, each function below references task in
+        * different ways.
+        */
        data = tracker_task_get_data (task);
 
        if (!data->result) {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]