[tracker-miners/wip/carlosg/shuffle-libtracker-miner: 20/116] libtracker-miner: Refactor processing queues



commit 024a7de0cc8bb7e593f8c861d857fd0b70495ca8
Author: Carlos Garnacho <carlosg gnome org>
Date:   Thu Oct 12 15:04:04 2017 +0200

    libtracker-miner: Refactor processing queues
    
    Replace all 4 queues for the different create/update/delete/move
    events with a single queue that contains generic QueueEvent
    structs. The GList node of the last event is stored as GFile
    qdata, in order to perform fast lookups when coalescing events.
    
    queue_event_coalesce() will attempt to convert any two events
    into less than that, it does rely on merging two events with
    no related events in between, those should be coalesced (or
    attempted to) when they arrive.

 src/libtracker-miner/tracker-miner-fs.c | 730 +++++++++++++-------------------
 1 file changed, 290 insertions(+), 440 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 2ad610da8..104b95914 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -140,9 +140,10 @@ static gboolean miner_fs_queues_status_trace_timeout_cb (gpointer data);
 #define TRACKER_MINER_FS_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_MINER_FS, 
TrackerMinerFSPrivate))
 
 typedef struct {
+       TrackerMinerFSEventType type;
        GFile *file;
-       GFile *source_file;
-} ItemMovedData;
+       GFile *dest_file;
+} QueueEvent;
 
 typedef struct {
        GFile *file;
@@ -153,11 +154,7 @@ typedef struct {
 } UpdateProcessingTaskContext;
 
 struct _TrackerMinerFSPrivate {
-       /* File queues for indexer */
-       TrackerPriorityQueue *items_created;
-       TrackerPriorityQueue *items_updated;
-       TrackerPriorityQueue *items_deleted;
-       TrackerPriorityQueue *items_moved;
+       TrackerPriorityQueue *items;
 
        guint item_queues_handler_id;
        GFile *item_queue_blocker;
@@ -229,6 +226,12 @@ typedef enum {
        QUEUE_WAIT,
 } QueueState;
 
+typedef enum {
+       QUEUE_ACTION_NONE           = 0,
+       QUEUE_ACTION_DELETE_FIRST   = 1 << 0,
+       QUEUE_ACTION_DELETE_SECOND  = 1 << 1,
+} QueueCoalesceAction;
+
 enum {
        PROCESS_FILE,
        PROCESS_FILE_ATTRIBUTES,
@@ -266,9 +269,6 @@ static void           miner_started                       (TrackerMiner
 static void           miner_stopped                       (TrackerMiner         *miner);
 static void           miner_paused                        (TrackerMiner         *miner);
 static void           miner_resumed                       (TrackerMiner         *miner);
-static ItemMovedData *item_moved_data_new                 (GFile                *file,
-                                                           GFile                *source_file);
-static void           item_moved_data_free                (ItemMovedData        *data);
 
 static void           indexing_tree_directory_removed     (TrackerIndexingTree  *indexing_tree,
                                                            GFile                *directory,
@@ -308,7 +308,12 @@ static void           task_pool_limit_reached_notify_cb       (GObject        *o
                                                                GParamSpec     *pspec,
                                                                gpointer        user_data);
 
+static void           miner_fs_queue_event                (TrackerMinerFS *fs,
+                                                          QueueEvent     *event,
+                                                          guint           priority);
+
 static GQuark quark_file_iri = 0;
+static GQuark quark_last_queue_event = 0;
 static GInitableIface* miner_fs_initable_parent_iface;
 static guint signals[LAST_SIGNAL] = { 0, };
 
@@ -569,6 +574,7 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
        g_type_class_add_private (object_class, sizeof (TrackerMinerFSPrivate));
 
        quark_file_iri = g_quark_from_static_string ("tracker-miner-file-iri");
+       quark_last_queue_event = g_quark_from_static_string ("tracker-last-queue-event");
 }
 
 static void
@@ -589,10 +595,7 @@ tracker_miner_fs_init (TrackerMinerFS *object)
        priv->timer_stopped = TRUE;
        priv->extraction_timer_stopped = TRUE;
 
-       priv->items_created = tracker_priority_queue_new ();
-       priv->items_updated = tracker_priority_queue_new ();
-       priv->items_deleted = tracker_priority_queue_new ();
-       priv->items_moved = tracker_priority_queue_new ();
+       priv->items = tracker_priority_queue_new ();
 
 #ifdef EVENT_QUEUE_ENABLE_TRACE
        priv->queue_status_timeout_id = g_timeout_add_seconds (EVENT_QUEUE_STATUS_TIMEOUT_SECS,
@@ -703,6 +706,146 @@ miner_fs_initable_iface_init (GInitableIface *iface)
        iface->init = miner_fs_initable_init;
 }
 
+static QueueEvent *
+queue_event_new (TrackerMinerFSEventType  type,
+                 GFile                   *file)
+{
+       QueueEvent *event;
+
+       g_assert (type != TRACKER_MINER_FS_EVENT_MOVED);
+
+       event = g_new0 (QueueEvent, 1);
+       event->type = type;
+       g_set_object (&event->file, file);
+
+       return event;
+}
+
+static QueueEvent *
+queue_event_moved_new (GFile *source,
+                       GFile *dest)
+{
+       QueueEvent *event;
+
+       event = g_new0 (QueueEvent, 1);
+       event->type = TRACKER_MINER_FS_EVENT_MOVED;
+       g_set_object (&event->dest_file, dest);
+       g_set_object (&event->file, source);
+
+       return event;
+}
+
+static GList *
+queue_event_get_last_event_node (QueueEvent *event)
+{
+       return g_object_get_qdata (G_OBJECT (event->file),
+                                  quark_last_queue_event);
+}
+
+static void
+queue_event_save_node (QueueEvent *event,
+                      GList      *node)
+{
+       g_assert (node->data == event);
+       g_object_set_qdata (G_OBJECT (event->file),
+                           quark_last_queue_event, node);
+}
+
+static void
+queue_event_dispose_node (QueueEvent *event)
+{
+       GList *node;
+
+       node = queue_event_get_last_event_node (event);
+
+       if (node && node->data == event) {
+               g_object_steal_qdata (G_OBJECT (event->file),
+                                     quark_last_queue_event);
+       }
+}
+
+static void
+queue_event_free (QueueEvent *event)
+{
+       queue_event_dispose_node (event);
+
+       g_clear_object (&event->dest_file);
+       g_clear_object (&event->file);
+       g_free (event);
+}
+
+static QueueCoalesceAction
+queue_event_coalesce (const QueueEvent  *first,
+                     const QueueEvent  *second,
+                     QueueEvent       **replacement)
+{
+       *replacement = NULL;
+
+       if (first->type == TRACKER_MINER_FS_EVENT_CREATED) {
+               if (second->type == TRACKER_MINER_FS_EVENT_UPDATED &&
+                   first->file == second->file) {
+                       return QUEUE_ACTION_DELETE_SECOND;
+               } else if (second->type == TRACKER_MINER_FS_EVENT_MOVED &&
+                          first->file == second->file) {
+                       *replacement = queue_event_new (TRACKER_MINER_FS_EVENT_CREATED,
+                                                       second->dest_file);
+                       return (QUEUE_ACTION_DELETE_FIRST |
+                               QUEUE_ACTION_DELETE_SECOND);
+               } else if (second->type == TRACKER_MINER_FS_EVENT_DELETED &&
+                          first->file == second->file) {
+                       /* We can't be sure that "create" is replacing a file
+                        * here. Preserve the second event just in case.
+                        */
+                       return QUEUE_ACTION_DELETE_FIRST;
+               }
+       } else if (first->type == TRACKER_MINER_FS_EVENT_UPDATED) {
+               if (second->type == TRACKER_MINER_FS_EVENT_UPDATED &&
+                   first->file == second->file) {
+                       return QUEUE_ACTION_DELETE_SECOND;
+               } else if (second->type == TRACKER_MINER_FS_EVENT_DELETED &&
+                          first->file == second->file) {
+                       return QUEUE_ACTION_DELETE_FIRST;
+               }
+       } else if (first->type == TRACKER_MINER_FS_EVENT_MOVED) {
+               if (second->type == TRACKER_MINER_FS_EVENT_MOVED &&
+                   first->dest_file == second->file) {
+                       if (first->file != second->dest_file) {
+                               *replacement = queue_event_moved_new (first->file,
+                                                                     second->dest_file);
+                       }
+
+                       return (QUEUE_ACTION_DELETE_FIRST |
+                               QUEUE_ACTION_DELETE_SECOND);
+               } else if (second->type == TRACKER_MINER_FS_EVENT_DELETED &&
+                          first->dest_file == second->file) {
+                       *replacement = queue_event_new (TRACKER_MINER_FS_EVENT_DELETED,
+                                                       first->file);
+                       return (QUEUE_ACTION_DELETE_FIRST |
+                               QUEUE_ACTION_DELETE_SECOND);
+               }
+       } else if (first->type == TRACKER_MINER_FS_EVENT_DELETED &&
+                  second->type == TRACKER_MINER_FS_EVENT_DELETED) {
+               return QUEUE_ACTION_DELETE_SECOND;
+       }
+
+       return QUEUE_ACTION_NONE;
+}
+
+static gboolean
+queue_event_is_descendant (QueueEvent *event,
+                          GFile      *prefix)
+{
+       return g_file_has_prefix (event->file, prefix);
+}
+
+static gboolean
+queue_event_is_equal_or_descendant (QueueEvent *event,
+                                   GFile      *prefix)
+{
+       return (g_file_equal (event->file, prefix) ||
+               g_file_has_prefix (event->file, prefix));
+}
+
 static void
 fs_finalize (GObject *object)
 {
@@ -736,25 +879,10 @@ fs_finalize (GObject *object)
                g_object_unref (priv->sparql_buffer);
        }
 
-       tracker_priority_queue_foreach (priv->items_moved,
-                                       (GFunc) item_moved_data_free,
-                                       NULL);
-       tracker_priority_queue_unref (priv->items_moved);
-
-       tracker_priority_queue_foreach (priv->items_deleted,
-                                       (GFunc) g_object_unref,
-                                       NULL);
-       tracker_priority_queue_unref (priv->items_deleted);
-
-       tracker_priority_queue_foreach (priv->items_updated,
-                                       (GFunc) g_object_unref,
-                                       NULL);
-       tracker_priority_queue_unref (priv->items_updated);
-
-       tracker_priority_queue_foreach (priv->items_created,
-                                       (GFunc) g_object_unref,
-                                       NULL);
-       tracker_priority_queue_unref (priv->items_created);
+       tracker_priority_queue_foreach (priv->items,
+                                       (GFunc) queue_event_free,
+                                       NULL);
+       tracker_priority_queue_unref (priv->items);
 
        if (priv->indexing_tree) {
                g_object_unref (priv->indexing_tree);
@@ -956,16 +1084,6 @@ miner_resumed (TrackerMiner *miner)
        }
 }
 
-static gboolean
-item_moved_data_has_prefix (gpointer data,
-                           gpointer user_data)
-{
-       ItemMovedData *moved_item = data;
-       GFile *prefix = user_data;
-
-       return g_file_has_prefix (moved_item->file, prefix);
-}
-
 static void
 notify_roots_finished (TrackerMinerFS *fs,
                        gboolean        check_queues)
@@ -1000,10 +1118,7 @@ notify_roots_finished (TrackerMinerFS *fs,
                 * too frequently)
                 */
                if (check_queues &&
-                   (tracker_priority_queue_find (fs->priv->items_created, NULL, (GEqualFunc) 
g_file_has_prefix, root) ||
-                    tracker_priority_queue_find (fs->priv->items_updated, NULL, (GEqualFunc) 
g_file_has_prefix, root) ||
-                    tracker_priority_queue_find (fs->priv->items_deleted, NULL, (GEqualFunc) 
g_file_has_prefix, root) ||
-                    tracker_priority_queue_find (fs->priv->items_moved, NULL, (GEqualFunc) 
item_moved_data_has_prefix, root))) {
+                   tracker_priority_queue_find (fs->priv->items, NULL, (GEqualFunc) 
queue_event_is_descendant, root)) {
                        continue;
                }
 
@@ -1087,27 +1202,6 @@ process_stop (TrackerMinerFS *fs)
        fs->priv->been_crawled = TRUE;
 }
 
-static ItemMovedData *
-item_moved_data_new (GFile *file,
-                     GFile *source_file)
-{
-       ItemMovedData *data;
-
-       data = g_slice_new (ItemMovedData);
-       data->file = g_object_ref (file);
-       data->source_file = g_object_ref (source_file);
-
-       return data;
-}
-
-static void
-item_moved_data_free (ItemMovedData *data)
-{
-       g_object_unref (data->file);
-       g_object_unref (data->source_file);
-       g_slice_free (ItemMovedData, data);
-}
-
 static gboolean
 item_queue_is_blocked_by_file (TrackerMinerFS *fs,
                                GFile *file)
@@ -1570,115 +1664,16 @@ should_wait (TrackerMinerFS *fs,
        return FALSE;
 }
 
-static QueueState
-item_queue_get_next_file (TrackerMinerFS  *fs,
-                          GFile          **file,
-                          GFile          **source_file,
-                          gint            *priority_out)
+static gboolean
+item_queue_get_next_file (TrackerMinerFS           *fs,
+                          GFile                   **file,
+                          GFile                   **source_file,
+                         TrackerMinerFSEventType  *type,
+                          gint                     *priority_out)
 {
-       ItemMovedData *data;
-       GFile *queue_file;
+       QueueEvent *event;
        gint priority;
 
-       /* Deleted items second */
-       queue_file = tracker_priority_queue_peek (fs->priv->items_deleted,
-                                                 &priority);
-       if (queue_file) {
-               *source_file = NULL;
-
-               trace_eq_pop_head ("DELETED", queue_file);
-
-               /* Do not ignore DELETED event. We should never see DELETED on update
-                  (atomic rename or in-place update) but we may see DELETED
-                  due to actual file deletion right after update. */
-
-               /* If the same item OR its first parent is currently being processed,
-                * we need to wait for this event */
-               if (should_wait (fs, queue_file)) {
-                       *file = NULL;
-
-                       trace_eq_push_head ("DELETED", queue_file, "Should wait");
-                       return QUEUE_WAIT;
-               }
-
-               tracker_priority_queue_pop (fs->priv->items_deleted, NULL);
-               *file = queue_file;
-               *priority_out = priority;
-               return QUEUE_DELETED;
-       }
-
-       /* Created items next */
-       queue_file = tracker_priority_queue_peek (fs->priv->items_created,
-                                                 &priority);
-       if (queue_file) {
-               *source_file = NULL;
-
-               trace_eq_pop_head ("CREATED", queue_file);
-
-               /* If the same item OR its first parent is currently being processed,
-                * we need to wait for this event */
-               if (should_wait (fs, queue_file)) {
-                       *file = NULL;
-
-                       trace_eq_push_head ("CREATED", queue_file, "Should wait");
-                       return QUEUE_WAIT;
-               }
-
-               tracker_priority_queue_pop (fs->priv->items_created, NULL);
-               *file = queue_file;
-               *priority_out = priority;
-               return QUEUE_CREATED;
-       }
-
-       /* Updated items next */
-       queue_file = tracker_priority_queue_peek (fs->priv->items_updated,
-                                                 &priority);
-       if (queue_file) {
-               *file = queue_file;
-               *source_file = NULL;
-
-               trace_eq_pop_head ("UPDATED", queue_file);
-
-               /* If the same item OR its first parent is currently being processed,
-                * we need to wait for this event */
-               if (should_wait (fs, queue_file)) {
-                       *file = NULL;
-
-                       trace_eq_push_head ("UPDATED", queue_file, "Should wait");
-                       return QUEUE_WAIT;
-               }
-
-               tracker_priority_queue_pop (fs->priv->items_updated, NULL);
-               *priority_out = priority;
-
-               return QUEUE_UPDATED;
-       }
-
-       /* Moved items next */
-       data = tracker_priority_queue_peek (fs->priv->items_moved,
-                                           &priority);
-       if (data) {
-               trace_eq_pop_head_2 ("MOVED", data->file, data->source_file);
-
-               /* If the same item OR its first parent is currently being processed,
-                * we need to wait for this event */
-               if (should_wait (fs, data->file) ||
-                   should_wait (fs, data->source_file)) {
-                       *file = NULL;
-                       *source_file = NULL;
-
-                       trace_eq_push_head_2 ("MOVED", data->source_file, data->file, "Should wait");
-                       return QUEUE_WAIT;
-               }
-
-               tracker_priority_queue_pop (fs->priv->items_moved, NULL);
-               *file = g_object_ref (data->file);
-               *source_file = g_object_ref (data->source_file);
-               *priority_out = priority;
-               item_moved_data_free (data);
-               return QUEUE_MOVED;
-       }
-
        *file = NULL;
        *source_file = NULL;
 
@@ -1693,10 +1688,32 @@ item_queue_get_next_file (TrackerMinerFS  *fs,
                /* There are still pending items to crawl,
                 * or extract pool limit is reached
                 */
-               return QUEUE_WAIT;
+               return FALSE;
        }
 
-       return QUEUE_NONE;
+       event = tracker_priority_queue_peek (fs->priv->items, &priority);
+
+       if (event) {
+               if (should_wait (fs, event->file) ||
+                   (event->dest_file && should_wait (fs, event->dest_file))) {
+                       return FALSE;
+               }
+
+               if (event->type == TRACKER_MINER_FS_EVENT_MOVED) {
+                       g_set_object (file, event->dest_file);
+                       g_set_object (source_file, event->file);
+               } else {
+                       g_set_object (file, event->file);
+               }
+
+               *type = event->type;
+               *priority_out = priority;
+
+               queue_event_free (event);
+               tracker_priority_queue_pop (fs->priv->items, NULL);
+       }
+
+       return TRUE;
 }
 
 static gdouble
@@ -1707,10 +1724,7 @@ item_queue_get_progress (TrackerMinerFS *fs,
        guint items_to_process = 0;
        guint items_total = 0;
 
-       items_to_process += tracker_priority_queue_get_length (fs->priv->items_deleted);
-       items_to_process += tracker_priority_queue_get_length (fs->priv->items_created);
-       items_to_process += tracker_priority_queue_get_length (fs->priv->items_updated);
-       items_to_process += tracker_priority_queue_get_length (fs->priv->items_moved);
+       items_to_process += tracker_priority_queue_get_length (fs->priv->items);
 
        items_total += fs->priv->total_directories_found;
        items_total += fs->priv->total_files_found;
@@ -1739,10 +1753,10 @@ miner_handle_next_item (TrackerMinerFS *fs)
        GFile *file = NULL;
        GFile *source_file = NULL;
        GFile *parent;
-       QueueState queue;
        GTimeVal time_now;
        static GTimeVal time_last = { 0 };
        gboolean keep_processing = TRUE;
+       TrackerMinerFSEventType type;
        gint priority = 0;
 
        if (fs->priv->timer_stopped) {
@@ -1755,9 +1769,7 @@ miner_handle_next_item (TrackerMinerFS *fs)
                return FALSE;
        }
 
-       queue = item_queue_get_next_file (fs, &file, &source_file, &priority);
-
-       if (queue == QUEUE_WAIT) {
+       if (!item_queue_get_next_file (fs, &file, &source_file, &type, &priority)) {
                /* We should flush the processing pool buffer here, because
                 * if there was a previous task on the same file we want to
                 * process now, we want it to get finished before we can go
@@ -1775,7 +1787,7 @@ miner_handle_next_item (TrackerMinerFS *fs)
                return FALSE;
        }
 
-       if (queue == QUEUE_NONE) {
+       if (file == NULL) {
                g_timer_stop (fs->priv->extraction_timer);
                fs->priv->extraction_timer_stopped = TRUE;
        } else if (fs->priv->extraction_timer_stopped) {
@@ -1859,9 +1871,7 @@ miner_handle_next_item (TrackerMinerFS *fs)
                }
        }
 
-       /* Handle queues */
-       switch (queue) {
-       case QUEUE_NONE:
+       if (file == NULL) {
                if (!tracker_file_notifier_is_active (fs->priv->file_notifier) &&
                    tracker_task_pool_get_size (fs->priv->task_pool) == 0) {
                        if (tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
@@ -1878,16 +1888,19 @@ miner_handle_next_item (TrackerMinerFS *fs)
                }
 
                /* No more files left to process */
-               keep_processing = FALSE;
-               break;
-       case QUEUE_MOVED:
+               return FALSE;
+       }
+
+       /* Handle queues */
+       switch (type) {
+       case TRACKER_MINER_FS_EVENT_MOVED:
                keep_processing = item_move (fs, file, source_file);
                break;
-       case QUEUE_DELETED:
+       case TRACKER_MINER_FS_EVENT_DELETED:
                keep_processing = item_remove (fs, file, FALSE);
                break;
-       case QUEUE_CREATED:
-       case QUEUE_UPDATED:
+       case TRACKER_MINER_FS_EVENT_CREATED:
+       case TRACKER_MINER_FS_EVENT_UPDATED:
                parent = g_file_get_parent (file);
 
                if (!parent ||
@@ -2046,17 +2059,6 @@ should_check_file (TrackerMinerFS *fs,
                                                        file, file_type);
 }
 
-static gboolean
-moved_files_equal (gconstpointer a,
-                   gconstpointer b)
-{
-       const ItemMovedData *data = a;
-       GFile *file = G_FILE (b);
-
-       /* Compare with dest file */
-       return g_file_equal (data->file, file);
-}
-
 static gint
 miner_fs_get_queue_priority (TrackerMinerFS *fs,
                              GFile          *file)
@@ -2084,155 +2086,58 @@ miner_fs_cache_file_urn (TrackerMinerFS *fs,
 }
 
 static void
-miner_fs_queue_file (TrackerMinerFS       *fs,
-                     TrackerPriorityQueue *item_queue,
-                     GFile                *file)
-{
-       gint priority;
-
-       miner_fs_cache_file_urn (fs, file, TRUE);
-       priority = miner_fs_get_queue_priority (fs, file);
-       tracker_priority_queue_add (item_queue, g_object_ref (file), priority);
-}
-
-/* Checks previous created/updated/deleted/moved queues for
- * monitor events. Returns TRUE if the item should still
- * be added to the queue.
- */
-static gboolean
-check_item_queues (TrackerMinerFS *fs,
-                   QueueState      queue,
-                   GFile          *file,
-                   GFile          *other_file)
+miner_fs_queue_event (TrackerMinerFS *fs,
+                     QueueEvent     *event,
+                     guint           priority)
 {
-       ItemMovedData *move_data;
+       GList *old = NULL, *link = NULL;
 
-       if (!fs->priv->been_crawled) {
-               /* Only do this after initial crawling, so
-                * we are mostly sure that we won't be doing
-                * checks on huge lists.
-                */
-               return TRUE;
+       if (event->type == TRACKER_MINER_FS_EVENT_MOVED) {
+               /* Remove all children of the dest location from being processed. */
+               tracker_priority_queue_foreach_remove (fs->priv->items,
+                                                      (GEqualFunc) queue_event_is_equal_or_descendant,
+                                                      event->dest_file,
+                                                      (GDestroyNotify) queue_event_free);
        }
 
-       switch (queue) {
-       case QUEUE_CREATED:
-               /* Created items aren't likely to have
-                * anything in other queues for the same
-                * file.
-                */
-               return TRUE;
-       case QUEUE_UPDATED:
-               /* No further updates after a previous created/updated event */
-               if (tracker_priority_queue_find (fs->priv->items_created, NULL,
-                                                (GEqualFunc) g_file_equal, file) ||
-                   tracker_priority_queue_find (fs->priv->items_updated, NULL,
-                                                (GEqualFunc) g_file_equal, file)) {
-                       g_debug ("  Found previous unhandled CREATED/UPDATED event");
-                       return FALSE;
-               }
-               return TRUE;
-       case QUEUE_DELETED:
-               if (tracker_file_notifier_get_file_type (fs->priv->file_notifier,
-                                                        file) == G_FILE_TYPE_DIRECTORY) {
-                       if (tracker_priority_queue_foreach_remove (fs->priv->items_updated,
-                                                                  (GEqualFunc) g_file_has_prefix,
-                                                                  file,
-                                                                  (GDestroyNotify) g_object_unref)) {
-                               g_debug ("  Deleting previous unhandled UPDATED events on children");
-                       }
-
-                       if (tracker_priority_queue_foreach_remove (fs->priv->items_created,
-                                                                  (GEqualFunc) g_file_has_prefix,
-                                                                  file,
-                                                                  (GDestroyNotify) g_object_unref)) {
-                               g_debug ("  Deleting previous unhandled CREATED events on children");
-                       }
-
-                       if (tracker_priority_queue_foreach_remove (fs->priv->items_deleted,
-                                                                  (GEqualFunc) g_file_has_prefix,
-                                                                  file,
-                                                                  (GDestroyNotify) g_object_unref)) {
-                               g_debug ("  Deleting previous unhandled DELETED events on children");
-                       }
-               }
+       old = queue_event_get_last_event_node (event);
 
-               /* Remove all previous updates */
-               if (tracker_priority_queue_foreach_remove (fs->priv->items_updated,
-                                                          (GEqualFunc) g_file_equal,
-                                                          file,
-                                                          (GDestroyNotify) g_object_unref)) {
-                       g_debug ("  Deleting previous unhandled UPDATED event");
-               }
+       if (old) {
+               QueueCoalesceAction action;
+               QueueEvent *replacement = NULL;
 
-               if (tracker_priority_queue_foreach_remove (fs->priv->items_created,
-                                                          (GEqualFunc) g_file_equal,
-                                                          file,
-                                                          (GDestroyNotify) g_object_unref)) {
-                       /* Created event was still in the queue,
-                        * remove it and ignore the current event
-                        */
-                       g_debug ("  Found matching unhandled CREATED event, removing file altogether");
-                       return FALSE;
-               }
+               action = queue_event_coalesce (old->data, event, &replacement);
 
-               return TRUE;
-       case QUEUE_MOVED:
-               /* Kill any events on other_file (The dest one), since it will be rewritten anyway */
-               if (tracker_priority_queue_foreach_remove (fs->priv->items_created,
-                                                          (GEqualFunc) g_file_equal,
-                                                          other_file,
-                                                          (GDestroyNotify) g_object_unref)) {
-                       g_debug ("  Removing previous unhandled CREATED event for dest file, will be 
rewritten anyway");
+               if (action & QUEUE_ACTION_DELETE_FIRST) {
+                       queue_event_free (old->data);
+                       tracker_priority_queue_remove_node (fs->priv->items,
+                                                           old);
                }
 
-               if (tracker_priority_queue_foreach_remove (fs->priv->items_updated,
-                                                          (GEqualFunc) g_file_equal,
-                                                          other_file,
-                                                          (GDestroyNotify) g_object_unref)) {
-                       g_debug ("  Removing previous unhandled UPDATED event for dest file, will be 
rewritten anyway");
+               if (action & QUEUE_ACTION_DELETE_SECOND) {
+                       queue_event_free (event);
+                       event = NULL;
                }
 
-               /* Now check file (Origin one) */
-               if (tracker_priority_queue_foreach_remove (fs->priv->items_created,
-                                                          (GEqualFunc) g_file_equal,
-                                                          file,
-                                                          (GDestroyNotify) g_object_unref)) {
-                       /* If source file was created, replace it with
-                        * a create event for the destination file, and
-                        * discard this event.
-                        *
-                        * We assume all posterior updates
-                        * have been merged together previously by this
-                        * same function.
-                        */
-                       g_debug ("  Found matching unhandled CREATED event "
-                                "for source file, merging both events together");
-                       miner_fs_queue_file (fs, fs->priv->items_created, other_file);
+               if (replacement)
+                       event = replacement;
+       }
 
-                       return FALSE;
+       if (event) {
+               if (event->type == TRACKER_MINER_FS_EVENT_DELETED) {
+                       /* Remove all children of this file from being processed. */
+                       tracker_priority_queue_foreach_remove (fs->priv->items,
+                                                              (GEqualFunc) 
queue_event_is_equal_or_descendant,
+                                                              event->file,
+                                                              (GDestroyNotify) queue_event_free);
                }
 
-               move_data = tracker_priority_queue_find (fs->priv->items_moved, NULL,
-                                                        (GEqualFunc) moved_files_equal, file);
-               if (move_data) {
-                       /* Origin file was the dest of a previous
-                        * move operation, merge these together.
-                        */
-                       g_debug ("  Source file is the destination of a previous "
-                                "unhandled MOVED event, merging both events together");
-                       g_object_unref (move_data->file);
-                       move_data->file = g_object_ref (other_file);
-                       return FALSE;
-               }
+               miner_fs_cache_file_urn (fs, event->file, TRUE);
 
-               return TRUE;
-               break;
-       default:
-               g_assert_not_reached ();
+               link = tracker_priority_queue_add (fs->priv->items, event, priority);
+               queue_event_save_node (event, link);
+               item_queue_handlers_set_up (fs);
        }
-
-       return TRUE;
 }
 
 static gboolean
@@ -2255,14 +2160,13 @@ file_notifier_file_created (TrackerFileNotifier  *notifier,
                             gpointer              user_data)
 {
        TrackerMinerFS *fs = user_data;
+       QueueEvent *event;
 
        if (filter_event (fs, TRACKER_MINER_FS_EVENT_CREATED, file, NULL))
                return;
 
-       if (check_item_queues (fs, QUEUE_CREATED, file, NULL)) {
-               miner_fs_queue_file (fs, fs->priv->items_created, file);
-               item_queue_handlers_set_up (fs);
-       }
+       event = queue_event_new (TRACKER_MINER_FS_EVENT_CREATED, file);
+       miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, file));
 }
 
 static void
@@ -2271,6 +2175,7 @@ file_notifier_file_deleted (TrackerFileNotifier  *notifier,
                             gpointer              user_data)
 {
        TrackerMinerFS *fs = user_data;
+       QueueEvent *event;
 
        if (filter_event (fs, TRACKER_MINER_FS_EVENT_DELETED, file, NULL))
                return;
@@ -2282,10 +2187,8 @@ file_notifier_file_deleted (TrackerFileNotifier  *notifier,
                                           file);
        }
 
-       if (check_item_queues (fs, QUEUE_DELETED, file, NULL)) {
-               miner_fs_queue_file (fs, fs->priv->items_deleted, file);
-               item_queue_handlers_set_up (fs);
-       }
+       event = queue_event_new (TRACKER_MINER_FS_EVENT_DELETED, file);
+       miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, file));
 }
 
 static void
@@ -2295,21 +2198,20 @@ file_notifier_file_updated (TrackerFileNotifier  *notifier,
                             gpointer              user_data)
 {
        TrackerMinerFS *fs = user_data;
+       QueueEvent *event;
 
        if (!attributes_only &&
            filter_event (fs, TRACKER_MINER_FS_EVENT_UPDATED, file, NULL))
                return;
 
-       if (check_item_queues (fs, QUEUE_UPDATED, file, NULL)) {
-               if (attributes_only) {
-                       g_object_set_qdata (G_OBJECT (file),
-                                           fs->priv->quark_attribute_updated,
-                                           GINT_TO_POINTER (TRUE));
-               }
-
-               miner_fs_queue_file (fs, fs->priv->items_updated, file);
-               item_queue_handlers_set_up (fs);
+       if (attributes_only) {
+               g_object_set_qdata (G_OBJECT (file),
+                                   fs->priv->quark_attribute_updated,
+                                   GINT_TO_POINTER (TRUE));
        }
+
+       event = queue_event_new (TRACKER_MINER_FS_EVENT_UPDATED, file);
+       miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, file));
 }
 
 static void
@@ -2319,19 +2221,13 @@ file_notifier_file_moved (TrackerFileNotifier *notifier,
                           gpointer             user_data)
 {
        TrackerMinerFS *fs = user_data;
+       QueueEvent *event;
 
        if (filter_event (fs, TRACKER_MINER_FS_EVENT_MOVED, dest, source))
                return;
 
-       if (check_item_queues (fs, QUEUE_MOVED, source, dest)) {
-               gint priority;
-
-               priority = miner_fs_get_queue_priority (fs, dest);
-               tracker_priority_queue_add (fs->priv->items_moved,
-                                           item_moved_data_new (dest, source),
-                                           priority);
-               item_queue_handlers_set_up (fs);
-       }
+       event = queue_event_moved_new (source, dest);
+       miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, source));
 }
 
 static void
@@ -2427,6 +2323,8 @@ file_notifier_finished (TrackerFileNotifier *notifier,
        if (!tracker_miner_fs_has_items_to_process (fs)) {
                g_info ("Finished all tasks");
                process_stop (fs);
+       } else {
+               item_queue_handlers_set_up (fs);
        }
 }
 
@@ -2455,21 +2353,6 @@ print_file_tree (GNode    *node,
 
 #endif /* CRAWLED_TREE_ENABLE_TRACE */
 
-/* Returns TRUE if file equals to
- * other_file, or is a child of it
- */
-static gboolean
-file_equal_or_descendant (GFile *file,
-                          GFile *prefix)
-{
-       if (g_file_equal (file, prefix) ||
-           g_file_has_prefix (file, prefix)) {
-               return TRUE;
-       }
-
-       return FALSE;
-}
-
 static void
 task_pool_cancel_foreach (gpointer data,
                           gpointer user_data)
@@ -2510,14 +2393,10 @@ indexing_tree_directory_removed (TrackerIndexingTree *indexing_tree,
        /* Remove anything contained in the removed directory
         * from all relevant processing queues.
         */
-       tracker_priority_queue_foreach_remove (priv->items_updated,
-                                              (GEqualFunc) file_equal_or_descendant,
-                                              directory,
-                                              (GDestroyNotify) g_object_unref);
-       tracker_priority_queue_foreach_remove (priv->items_created,
-                                              (GEqualFunc) file_equal_or_descendant,
-                                              directory,
-                                              (GDestroyNotify) g_object_unref);
+       tracker_priority_queue_foreach_remove (priv->items,
+                                              (GEqualFunc) queue_event_is_equal_or_descendant,
+                                              directory,
+                                              (GDestroyNotify) queue_event_free);
 
        g_debug ("  Removed files at %f\n", g_timer_elapsed (timer, NULL));
 
@@ -2531,6 +2410,7 @@ check_file_parents (TrackerMinerFS *fs,
 {
        GFile *parent, *root;
        GList *parents = NULL, *p;
+       QueueEvent *event;
 
        parent = g_file_get_parent (file);
 
@@ -2558,8 +2438,8 @@ check_file_parents (TrackerMinerFS *fs,
        }
 
        for (p = parents; p; p = p->next) {
-               trace_eq_push_tail ("UPDATED", p->data, "checking file parents");
-               miner_fs_queue_file (fs, fs->priv->items_updated, p->data);
+               event = queue_event_new (TRACKER_MINER_FS_EVENT_UPDATED, p->data);
+               miner_fs_queue_event (fs, event, miner_fs_get_queue_priority (fs, p->data));
                g_object_unref (p->data);
        }
 
@@ -2589,6 +2469,7 @@ tracker_miner_fs_check_file (TrackerMinerFS *fs,
                              gboolean        check_parents)
 {
        gboolean should_process = TRUE;
+       QueueEvent *event;
        gchar *uri;
 
        g_return_if_fail (TRACKER_IS_MINER_FS (fs));
@@ -2611,11 +2492,9 @@ tracker_miner_fs_check_file (TrackerMinerFS *fs,
 
                trace_eq_push_tail ("UPDATED", file, "Requested by application");
                miner_fs_cache_file_urn (fs, file, TRUE);
-               tracker_priority_queue_add (fs->priv->items_updated,
-                                           g_object_ref (file),
-                                           priority);
 
-               item_queue_handlers_set_up (fs);
+               event = queue_event_new (TRACKER_MINER_FS_EVENT_UPDATED, file);
+               miner_fs_queue_event (fs, event, priority);
        }
 
        g_free (uri);
@@ -2814,10 +2693,7 @@ tracker_miner_fs_has_items_to_process (TrackerMinerFS *fs)
        g_return_val_if_fail (TRACKER_IS_MINER_FS (fs), FALSE);
 
        if (tracker_file_notifier_is_active (fs->priv->file_notifier) ||
-           !tracker_priority_queue_is_empty (fs->priv->items_deleted) ||
-           !tracker_priority_queue_is_empty (fs->priv->items_created) ||
-           !tracker_priority_queue_is_empty (fs->priv->items_updated) ||
-           !tracker_priority_queue_is_empty (fs->priv->items_moved)) {
+           !tracker_priority_queue_is_empty (fs->priv->items)) {
                return TRUE;
        }
 
@@ -2864,63 +2740,37 @@ tracker_miner_fs_get_data_provider (TrackerMinerFS *fs)
 #ifdef EVENT_QUEUE_ENABLE_TRACE
 
 static void
-trace_files_foreach (gpointer file,
-                     gpointer fs)
+trace_events_foreach (gpointer data,
+                     gpointer fs)
 {
-       gchar *uri;
+       QueueEvent *event = data;
+       gchar *uri, *dest_uri = NULL;
 
-       uri = g_file_get_uri (G_FILE (file));
-       trace_eq ("(%s)     '%s'",
-                 G_OBJECT_TYPE_NAME (G_OBJECT (fs)),
-                 uri);
-       g_free (uri);
-}
+       uri = g_file_get_uri (event->file);
+       if (event->dest_file)
+               dest_uri = g_file_get_uri (event->dest_file);
+
+       trace_eq ("(%d) '%s' '%s'",
+                 event->type, uri, dest_uri);
 
-static void
-trace_moved_foreach (gpointer moved_data,
-                     gpointer fs)
-{
-       ItemMovedData *data = moved_data;
-       gchar *source_uri;
-       gchar *dest_uri;
-
-       source_uri = g_file_get_uri (data->source_file);
-       dest_uri = g_file_get_uri (data->file);
-       trace_eq ("(%s)     '%s->%s'",
-                 G_OBJECT_TYPE_NAME (G_OBJECT (fs)),
-                 source_uri,
-                 dest_uri);
-       g_free (source_uri);
        g_free (dest_uri);
+       g_free (uri);
 }
 
-static void
-miner_fs_trace_queue (TrackerMinerFS       *fs,
-                      const gchar          *queue_name,
-                      TrackerPriorityQueue *queue,
-                      GFunc                 foreach_cb)
+static gboolean
+miner_fs_queues_status_trace_timeout_cb (gpointer data)
 {
+       TrackerMinerFS *fs = data;
+
        trace_eq ("(%s) Queue '%s' has %u elements:",
                  G_OBJECT_TYPE_NAME (fs),
                  queue_name,
                  tracker_priority_queue_get_length (queue));
        tracker_priority_queue_foreach (queue,
-                                       foreach_cb,
+                                       trace_events_foreach,
                                        fs);
-}
 
-static gboolean
-miner_fs_queues_status_trace_timeout_cb (gpointer data)
-{
-       TrackerMinerFS *fs = data;
-
-       trace_eq ("(%s) ------------", G_OBJECT_TYPE_NAME (fs));
-       miner_fs_trace_queue (fs, "CREATED",   fs->priv->items_created,   trace_files_foreach);
-       miner_fs_trace_queue (fs, "UPDATED",   fs->priv->items_updated,   trace_files_foreach);
-       miner_fs_trace_queue (fs, "DELETED",   fs->priv->items_deleted,   trace_files_foreach);
-       miner_fs_trace_queue (fs, "MOVED",     fs->priv->items_moved,     trace_moved_foreach);
-
-       return TRUE;
+       return G_SOURCE_CONTINUE;
 }
 
 #endif /* EVENT_QUEUE_ENABLE_TRACE */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]