[tracker] libtracker-miner: Fix reference leak with TrackerTaskPool
- From: Martyn James Russell <mr src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker] libtracker-miner: Fix reference leak with TrackerTaskPool
- Date: Tue, 26 Aug 2014 16:57:06 +0000 (UTC)
commit 054509794b3afeb43f76625d96d8e4db5c7515a1
Author: Martyn Russell <martyn lanedo com>
Date: Tue Aug 26 17:16:52 2014 +0100
libtracker-miner: Fix reference leak with TrackerTaskPool
The leak occurred because tracker_sparql_task_new_with_sparql() was being
called but the returned TrackerTask* was not being unreferenced anywhere and
the call to tracker_sparql_buffer_flush() with the new task was taking its own
references internally.
Took this opportunity to make the code here easier to follow:
- do_process_file() is now merged into item_add_or_update()
- item_add_or_update_cb() is renamed to item_add_or_update_continue() so it's
obvious it is called from tracker_miner_fs_file_notify().
- renamed various variables to make the code easier to follow.
src/libtracker-miner/tracker-miner-fs.c | 165 ++++++++++++-------------
src/libtracker-miner/tracker-sparql-buffer.c | 10 ++-
2 files changed, 87 insertions(+), 88 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 69a05ad..ab4c2ce 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1338,80 +1338,19 @@ update_processing_task_context_free (UpdateProcessingTaskContext *ctxt)
g_slice_free (UpdateProcessingTaskContext, ctxt);
}
-static gboolean
-do_process_file (TrackerMinerFS *fs,
- TrackerTask *task)
-{
- TrackerMinerFSPrivate *priv;
- gboolean processing;
- gboolean attribute_update_only;
- gchar *uri;
- GFile *task_file;
- UpdateProcessingTaskContext *ctxt;
-
- ctxt = tracker_task_get_data (task);
- task_file = tracker_task_get_file (task);
- uri = g_file_get_uri (task_file);
- priv = fs->priv;
-
- attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (task_file),
- priv->quark_attribute_updated));
-
- if (!attribute_update_only) {
- g_debug ("Processing file '%s'...", uri);
- g_signal_emit (fs, signals[PROCESS_FILE], 0,
- task_file,
- ctxt->builder,
- ctxt->cancellable,
- &processing);
- } else {
- g_debug ("Processing attributes in file '%s'...", uri);
- g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
- task_file,
- ctxt->builder,
- ctxt->cancellable,
- &processing);
- }
-
- if (!processing) {
- /* Re-fetch data, since it might have been
- * removed in broken implementations
- */
- task = tracker_task_pool_find (priv->task_pool, task_file);
-
- g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
-
- if (!task) {
- g_critical ("%s has returned FALSE in ::process-file for '%s', "
- "but it seems that this file has been processed through "
- "tracker_miner_fs_file_notify(), this is an "
- "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
- } else {
- tracker_task_pool_remove (priv->task_pool, task);
- tracker_task_unref (task);
- }
- }
-
- g_free (uri);
-
- return processing;
-}
-
static void
-item_add_or_update_cb (TrackerMinerFS *fs,
- TrackerTask *extraction_task,
- const GError *error)
+item_add_or_update_continue (TrackerMinerFS *fs,
+ TrackerTask *task,
+ const GError *error)
{
UpdateProcessingTaskContext *ctxt;
TrackerTask *sparql_task = NULL;
- GFile *task_file;
+ GFile *file;
gchar *uri;
- ctxt = tracker_task_get_data (extraction_task);
- task_file = tracker_task_get_file (extraction_task);
- uri = g_file_get_uri (task_file);
-
- tracker_task_pool_remove (fs->priv->task_pool, extraction_task);
+ ctxt = tracker_task_get_data (task);
+ file = tracker_task_get_file (task);
+ uri = g_file_get_uri (file);
if (error) {
g_message ("Could not process '%s': %s", uri, error->message);
@@ -1420,15 +1359,13 @@ item_add_or_update_cb (TrackerMinerFS *fs,
if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- sparql_task = tracker_sparql_task_new_with_sparql (task_file,
- ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
}
} else {
if (ctxt->urn) {
gboolean attribute_update_only;
- attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (task_file),
-
fs->priv->quark_attribute_updated));
+ attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (file),
fs->priv->quark_attribute_updated));
g_debug ("Updating item '%s' with urn '%s'%s",
uri,
ctxt->urn,
@@ -1466,16 +1403,16 @@ item_add_or_update_cb (TrackerMinerFS *fs,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result
(ctxt->builder));
- sparql_task = tracker_sparql_task_new_take_sparql_str (task_file,
full_sparql);
+ sparql_task = tracker_sparql_task_new_take_sparql_str (file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
- sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
}
} else {
g_debug ("Creating new item '%s'", uri);
- sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (file, ctxt->builder);
}
}
@@ -1486,14 +1423,20 @@ item_add_or_update_cb (TrackerMinerFS *fs,
sparql_buffer_task_finished_cb,
fs);
- if (item_queue_is_blocked_by_file (fs, task_file)) {
+ if (item_queue_is_blocked_by_file (fs, file)) {
tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item
queue");
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
}
+
+ /* We can let go of our reference here because the
+ * sparql buffer takes its own reference when adding
+ * it to the task pool.
+ */
+ tracker_task_unref (sparql_task);
} else {
- if (item_queue_is_blocked_by_file (fs, task_file)) {
+ if (item_queue_is_blocked_by_file (fs, file)) {
/* Make sure that we don't stall the item queue, although we could
* expect the file to be reenqueued until the loop detector makes
* us drop it since we were specifically waiting for it to complete.
@@ -1510,7 +1453,14 @@ item_add_or_update_cb (TrackerMinerFS *fs,
item_queue_handlers_set_up (fs);
}
- tracker_task_unref (extraction_task);
+ /* Last reference is kept by the pool, removing the task from
+ * the pool cleans up the task too!
+ *
+ * NOTE that calling this any earlier actually causes invalid
+ * reads because the task frees up the
+ * UpdateProcessingTaskContext and GFile.
+ */
+ tracker_task_pool_remove (fs->priv->task_pool, task);
g_free (uri);
}
@@ -1541,15 +1491,17 @@ item_add_or_update (TrackerMinerFS *fs,
{
TrackerMinerFSPrivate *priv;
TrackerSparqlBuilder *sparql;
+ UpdateProcessingTaskContext *ctxt;
GCancellable *cancellable;
- gboolean retval;
+ gboolean processing;
+ gboolean keep_processing;
+ gboolean attribute_update_only;
TrackerTask *task;
- const gchar *parent_urn, *urn = NULL;
- UpdateProcessingTaskContext *ctxt;
+ const gchar *parent_urn, *urn;
+ gchar *uri;
GFile *parent;
priv = fs->priv;
- retval = TRUE;
cancellable = g_cancellable_new ();
sparql = tracker_sparql_builder_new_update ();
@@ -1581,20 +1533,61 @@ item_add_or_update (TrackerMinerFS *fs,
(GDestroyNotify) update_processing_task_context_free);
tracker_task_pool_add (priv->task_pool, task);
+ tracker_task_unref (task);
+
+ /* Call ::process-file to see if we handle this resource or not */
+ uri = g_file_get_uri (file);
+
+ attribute_update_only = GPOINTER_TO_INT (g_object_get_qdata (G_OBJECT (file),
priv->quark_attribute_updated));
+
+ if (!attribute_update_only) {
+ g_debug ("Processing file '%s'...", uri);
+ g_signal_emit (fs, signals[PROCESS_FILE], 0,
+ file,
+ ctxt->builder,
+ ctxt->cancellable,
+ &processing);
+ } else {
+ g_debug ("Processing attributes in file '%s'...", uri);
+ g_signal_emit (fs, signals[PROCESS_FILE_ATTRIBUTES], 0,
+ file,
+ ctxt->builder,
+ ctxt->cancellable,
+ &processing);
+ }
+
+ keep_processing = TRUE;
+
+ if (!processing) {
+ /* Re-fetch data, since it might have been
+ * removed in broken implementations
+ */
+ task = tracker_task_pool_find (priv->task_pool, file);
- if (do_process_file (fs, task)) {
+ g_message ("%s refused to process '%s'", G_OBJECT_TYPE_NAME (fs), uri);
+
+ if (!task) {
+ g_critical ("%s has returned FALSE in ::process-file for '%s', "
+ "but it seems that this file has been processed through "
+ "tracker_miner_fs_file_notify(), this is an "
+ "implementation error", G_OBJECT_TYPE_NAME (fs), uri);
+ } else {
+ tracker_task_pool_remove (priv->task_pool, task);
+ }
+ } else {
fs->priv->total_files_processed++;
if (tracker_task_pool_limit_reached (priv->task_pool)) {
- retval = FALSE;
+ keep_processing = FALSE;
}
}
+ g_free (uri);
g_object_unref (file);
g_object_unref (cancellable);
g_object_unref (sparql);
- return retval;
+ return keep_processing;
}
static gboolean
@@ -3740,7 +3733,7 @@ tracker_miner_fs_file_notify (TrackerMinerFS *fs,
return;
}
- item_add_or_update_cb (fs, task, error);
+ item_add_or_update_continue (fs, task, error);
}
/**
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index 418d3d7..43a43b4 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -693,14 +693,16 @@ sparql_buffer_push_to_pool (TrackerSparqlBuffer *buffer,
reset_flush_timeout (buffer);
}
- /* Task pool addition adds a reference (below) */
+ /* Task pool addition increments reference */
tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
if (!priv->tasks) {
priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
}
- g_ptr_array_add (priv->tasks, task);
+ /* We add a reference here because we unref when removed from
+ * the GPtrArray. */
+ g_ptr_array_add (priv->tasks, tracker_task_ref (task));
if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
@@ -722,6 +724,10 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
g_return_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer));
g_return_if_fail (task != NULL);
+ /* NOTE: We don't own the task and if we want it we have to
+ * reference it, each function below references task in
+ * different ways.
+ */
data = tracker_task_get_data (task);
if (!data->result) {
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]