[tracker/wip/carlosg/tracker-3.0-api-breaks: 21/79] libtracker-direct: Hook to TrackerSparqlConnection::create_notifier



commit c196a81b45735793c64afa126b7ed0dfaf2a5b7c
Author: Carlos Garnacho <carlosg gnome org>
Date:   Fri Dec 20 18:16:16 2019 +0100

    libtracker-direct: Hook to TrackerSparqlConnection::create_notifier
    
    Create a notifier that subscribes to events from the TrackerDataManager
    underneath. This makes the TrackerNotifier returned by a direct connection
    listen to events generated by that connection by default.

 src/libtracker-direct/meson.build      |   2 +-
 src/libtracker-direct/tracker-direct.c | 243 +++++++++++++++++++++++++++++++++
 2 files changed, 244 insertions(+), 1 deletion(-)
---
diff --git a/src/libtracker-direct/meson.build b/src/libtracker-direct/meson.build
index aff19a885..94775da1a 100644
--- a/src/libtracker-direct/meson.build
+++ b/src/libtracker-direct/meson.build
@@ -2,7 +2,7 @@ libtracker_direct = static_library('tracker-direct',
     'tracker-direct.c',
     'tracker-direct-statement.c',
     c_args: tracker_c_args,
-    dependencies: [ glib, gio, tracker_data_dep ],
+    dependencies: [ glib, gio, tracker_data_dep, tracker_sparql_intermediate_dep ],
     include_directories: [commoninc, configinc, srcinc],
 )
 
diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c
index 85a176805..d357503dd 100644
--- a/src/libtracker-direct/tracker-direct.c
+++ b/src/libtracker-direct/tracker-direct.c
@@ -24,6 +24,7 @@
 #include "tracker-direct-statement.h"
 #include <libtracker-data/tracker-data.h>
 #include <libtracker-data/tracker-sparql.h>
+#include <libtracker-sparql/tracker-notifier-private.h>
 
 static TrackerDBManagerFlags default_flags = 0;
 
@@ -42,6 +43,8 @@ struct _TrackerDirectConnectionPrivate
        GThreadPool *update_thread; /* Contains 1 exclusive thread */
        GThreadPool *select_pool;
 
+       GList *notifiers;
+
        guint initialized : 1;
 };
 
@@ -69,6 +72,8 @@ typedef struct {
 static void tracker_direct_connection_initable_iface_init (GInitableIface *iface);
 static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface);
 
+G_DEFINE_QUARK (TrackerDirectNotifier, tracker_direct_notifier)
+
 G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection,
                          TRACKER_SPARQL_TYPE_CONNECTION,
                          G_ADD_PRIVATE (TrackerDirectConnection)
@@ -307,6 +312,199 @@ tracker_direct_connection_init (TrackerDirectConnection *conn)
 {
 }
 
+static GHashTable *
+get_event_cache_ht (TrackerNotifier *notifier)
+{
+       GHashTable *events;
+
+       events = g_object_get_qdata (G_OBJECT (notifier), tracker_direct_notifier_quark ());
+       if (!events) {
+               events = g_hash_table_new_full (NULL, NULL, NULL,
+                                               (GDestroyNotify) _tracker_notifier_event_cache_free);
+               g_object_set_qdata_full (G_OBJECT (notifier), tracker_direct_notifier_quark (),
+                                        events, (GDestroyNotify) g_hash_table_unref);
+       }
+
+       return events;
+}
+
+static TrackerNotifierEventCache *
+lookup_event_cache (TrackerNotifier *notifier,
+                    gint             graph_id,
+                    const gchar     *graph)
+{
+       TrackerNotifierEventCache *cache;
+       GHashTable *events;
+
+       events = get_event_cache_ht (notifier);
+       cache = g_hash_table_lookup (events, GINT_TO_POINTER (graph_id));
+
+       if (!cache) {
+               cache = _tracker_notifier_event_cache_new (notifier, NULL, graph);
+               g_hash_table_insert (events, GINT_TO_POINTER (graph_id), cache);
+       }
+
+       return cache;
+}
+
+/* These callbacks will be called from a different thread
+ * (always the same one though), handle with care.
+ */
+static void
+insert_statement_cb (gint         graph_id,
+                     const gchar *graph,
+                     gint         subject_id,
+                     const gchar *subject,
+                     gint         predicate_id,
+                     gint         object_id,
+                     const gchar *object,
+                     GPtrArray   *rdf_types,
+                     gpointer     user_data)
+{
+       TrackerNotifier *notifier = user_data;
+       TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
+       TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
+       TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
+       TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
+       TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
+       TrackerNotifierEventCache *cache;
+       TrackerClass *new_class = NULL;
+       gint i;
+
+       cache = lookup_event_cache (notifier, graph_id, graph);
+
+       if (predicate_id == tracker_property_get_id (rdf_type)) {
+               const gchar *uri;
+
+               uri = tracker_ontologies_get_uri_by_id (ontologies, predicate_id);
+               new_class = tracker_ontologies_get_class_by_uri (ontologies, uri);
+       }
+
+       for (i = 0; i < rdf_types->len; i++) {
+               TrackerClass *class = g_ptr_array_index (rdf_types, i);
+               TrackerNotifierEventType event_type;
+
+               if (!tracker_class_get_notify (class))
+                       continue;
+
+               if (class == new_class)
+                       event_type = TRACKER_NOTIFIER_EVENT_CREATE;
+               else
+                       event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
+
+               _tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
+       }
+}
+
+static void
+delete_statement_cb (gint         graph_id,
+                     const gchar *graph,
+                     gint         subject_id,
+                     const gchar *subject,
+                     gint         predicate_id,
+                     gint         object_id,
+                     const gchar *object,
+                     GPtrArray   *rdf_types,
+                     gpointer     user_data)
+{
+       TrackerNotifier *notifier = user_data;
+       TrackerSparqlConnection *conn = _tracker_notifier_get_connection (notifier);
+       TrackerDirectConnection *direct = TRACKER_DIRECT_CONNECTION (conn);
+       TrackerDirectConnectionPrivate *priv = tracker_direct_connection_get_instance_private (direct);
+       TrackerOntologies *ontologies = tracker_data_manager_get_ontologies (priv->data_manager);
+       TrackerProperty *rdf_type = tracker_ontologies_get_rdf_type (ontologies);
+       TrackerNotifierEventCache *cache;
+       TrackerClass *new_class = NULL;
+       gint i;
+
+       cache = lookup_event_cache (notifier, graph_id, graph);
+
+       if (predicate_id == tracker_property_get_id (rdf_type)) {
+               const gchar *uri;
+
+               uri = tracker_ontologies_get_uri_by_id (ontologies, predicate_id);
+               new_class = tracker_ontologies_get_class_by_uri (ontologies, uri);
+       }
+
+       for (i = 0; i < rdf_types->len; i++) {
+               TrackerClass *class = g_ptr_array_index (rdf_types, i);
+               TrackerNotifierEventType event_type;
+
+               if (!tracker_class_get_notify (class))
+                       continue;
+
+               if (class == new_class)
+                       event_type = TRACKER_NOTIFIER_EVENT_DELETE;
+               else
+                       event_type = TRACKER_NOTIFIER_EVENT_UPDATE;
+
+               _tracker_notifier_event_cache_push_event (cache, subject_id, event_type);
+       }
+}
+
+static void
+commit_statement_cb (gpointer user_data)
+{
+       TrackerNotifierEventCache *cache;
+       TrackerNotifier *notifier = user_data;
+       GHashTable *events;
+       GHashTableIter iter;
+
+       events = get_event_cache_ht (notifier);
+       g_hash_table_iter_init (&iter, events);
+
+       while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &cache)) {
+               g_hash_table_iter_steal (&iter);
+               _tracker_notifier_event_cache_flush_events (cache);
+               _tracker_notifier_event_cache_free (cache);
+       }
+}
+
+static void
+rollback_statement_cb (gpointer user_data)
+{
+       TrackerNotifier *notifier = user_data;
+       GHashTable *events;
+
+       events = get_event_cache_ht (notifier);
+       g_hash_table_remove_all (events);
+}
+
+static void
+detach_notifier (TrackerDirectConnection *conn,
+                 TrackerNotifier         *notifier)
+{
+       TrackerDirectConnectionPrivate *priv;
+       TrackerData *tracker_data;
+
+       priv = tracker_direct_connection_get_instance_private (conn);
+
+       priv->notifiers = g_list_remove (priv->notifiers, notifier);
+
+       tracker_data = tracker_data_manager_get_data (priv->data_manager);
+       tracker_data_remove_insert_statement_callback (tracker_data,
+                                                      insert_statement_cb,
+                                                      notifier);
+       tracker_data_remove_delete_statement_callback (tracker_data,
+                                                      delete_statement_cb,
+                                                      notifier);
+       tracker_data_remove_commit_statement_callback (tracker_data,
+                                                      commit_statement_cb,
+                                                      notifier);
+       tracker_data_remove_rollback_statement_callback (tracker_data,
+                                                        rollback_statement_cb,
+                                                        notifier);
+}
+
+static void
+weak_ref_notify (gpointer  data,
+                 GObject  *prev_location)
+{
+       TrackerDirectConnection *conn = data;
+
+       detach_notifier (conn, (TrackerNotifier *) prev_location);
+}
+
 static void
 tracker_direct_connection_finalize (GObject *object)
 {
@@ -326,6 +524,15 @@ tracker_direct_connection_finalize (GObject *object)
                g_clear_object (&priv->data_manager);
        }
 
+       while (priv->notifiers) {
+               TrackerNotifier *notifier = priv->notifiers->data;
+
+               g_object_weak_unref (G_OBJECT (notifier),
+                                    weak_ref_notify,
+                                    conn);
+               detach_notifier (conn, notifier);
+       }
+
        g_clear_object (&priv->store);
        g_clear_object (&priv->ontology);
        g_clear_object (&priv->namespace_manager);
@@ -661,6 +868,41 @@ tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self)
        return priv->namespace_manager;
 }
 
+static TrackerNotifier *
+tracker_direct_connection_create_notifier (TrackerSparqlConnection *self,
+                                          TrackerNotifierFlags     flags)
+{
+       TrackerDirectConnectionPrivate *priv;
+       TrackerNotifier *notifier;
+       TrackerData *tracker_data;
+
+       priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
+
+       notifier = g_object_new (TRACKER_TYPE_NOTIFIER,
+                                "connection", self,
+                                "flags", flags,
+                                NULL);
+
+       tracker_data = tracker_data_manager_get_data (priv->data_manager);
+       tracker_data_add_insert_statement_callback (tracker_data,
+                                                   insert_statement_cb,
+                                                   notifier);
+       tracker_data_add_delete_statement_callback (tracker_data,
+                                                   delete_statement_cb,
+                                                   notifier);
+       tracker_data_add_commit_statement_callback (tracker_data,
+                                                   commit_statement_cb,
+                                                   notifier);
+       tracker_data_add_rollback_statement_callback (tracker_data,
+                                                     rollback_statement_cb,
+                                                     notifier);
+
+       g_object_weak_ref (G_OBJECT (notifier), weak_ref_notify, self);
+       priv->notifiers = g_list_prepend (priv->notifiers, notifier);
+
+       return notifier;
+}
+
 static void
 tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
 {
@@ -687,6 +929,7 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
        sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async;
        sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish;
        sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager;
+       sparql_connection_class->create_notifier = tracker_direct_connection_create_notifier;
 
        props[PROP_FLAGS] =
                g_param_spec_enum ("flags",


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]