[tracker/tracker-store] Introduce TrackerStore, a queue for updating
- From: Jürg Billeter <juergbi src gnome org>
- To: svn-commits-list gnome org
- Subject: [tracker/tracker-store] Introduce TrackerStore, a queue for updating
- Date: Tue, 26 May 2009 10:46:36 -0400 (EDT)
commit a3d8a58019659273071cfb3a6311377fbd946068
Author: Philip Van Hoof <philip codeminded be>
Date: Thu May 21 10:40:37 2009 +0200
Introduce TrackerStore, a queue for updating
TrackerStoreQueue is or will be the internal API for most data access and
storage. This implementation uses the GMainLoop and a GQueue for all batch
requests.
Due to its use of the GMainLoop the API is going to block the mainloop, but it
wont block the caller's code. It will instead find a nice slot during the
GMainLoop the execute itself on. When finished it'll execute first your
callback and then your GDestroyNotify. All parameters that you receive in the
callback that aren't user_data must not be freed (they are to be considered
read-only in your callback).
---
src/tracker-store/Makefile.am | 4 +-
src/tracker-store/tracker-main.c | 3 +
src/tracker-store/tracker-resources.c | 102 ++++++-----
src/tracker-store/tracker-store.c | 328 +++++++++++++++++++++++++++++++++
src/tracker-store/tracker-store.h | 60 ++++++
5 files changed, 448 insertions(+), 49 deletions(-)
diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am
index 3435d76..10e2c7a 100644
--- a/src/tracker-store/Makefile.am
+++ b/src/tracker-store/Makefile.am
@@ -51,7 +51,9 @@ tracker_store_SOURCES = \
tracker-push-registrar.c \
tracker-push-registrar.h \
tracker-resource-class.c \
- tracker-resource-class.h
+ tracker-resource-class.h \
+ tracker-store.c \
+ tracker-store.h
if OS_WIN32
tracker_store_win_libs = -lws2_32 -lkernel32
diff --git a/src/tracker-store/tracker-main.c b/src/tracker-store/tracker-main.c
index fe885da..5c23125 100644
--- a/src/tracker-store/tracker-main.c
+++ b/src/tracker-store/tracker-main.c
@@ -72,6 +72,7 @@
#include "tracker-volume-cleanup.h"
#include "tracker-backup.h"
#include "tracker-daemon.h"
+#include "tracker-store.h"
#ifdef G_OS_WIN32
#include <windows.h>
@@ -898,6 +899,7 @@ main (gint argc, gchar *argv[])
NULL);
#endif /* HAVE_HAL */
+ tracker_store_init ();
tracker_status_init (config, hal_power);
tracker_module_config_init ();
@@ -1009,6 +1011,7 @@ main (gint argc, gchar *argv[])
tracker_nfs_lock_shutdown ();
tracker_status_shutdown ();
tracker_turtle_shutdown ();
+ tracker_store_shutdown ();
tracker_thumbnailer_shutdown ();
tracker_log_shutdown ();
diff --git a/src/tracker-store/tracker-resources.c b/src/tracker-store/tracker-resources.c
index 700ddcc..dcb4367 100644
--- a/src/tracker-store/tracker-resources.c
+++ b/src/tracker-store/tracker-resources.c
@@ -42,12 +42,11 @@
#include "tracker-resources.h"
#include "tracker-resource-class.h"
#include "tracker-events.h"
+#include "tracker-store.h"
#define RDF_PREFIX TRACKER_RDF_PREFIX
#define RDF_TYPE RDF_PREFIX "type"
-/* Transaction every 'x' batch items */
-#define TRACKER_STORE_TRANSACTION_MAX 4000
G_DEFINE_TYPE(TrackerResources, tracker_resources, G_TYPE_OBJECT)
@@ -55,11 +54,14 @@ G_DEFINE_TYPE(TrackerResources, tracker_resources, G_TYPE_OBJECT)
typedef struct {
- gboolean batch_mode;
- gint batch_count;
GSList *event_sources;
} TrackerResourcesPrivate;
+typedef struct {
+ DBusGMethodInvocation *context;
+ guint request_id;
+} TrackerDBusMethodInfo;
+
static void
free_event_sources (TrackerResourcesPrivate *priv)
{
@@ -112,6 +114,13 @@ tracker_resources_new (void)
* Functions
*/
+static void
+destroy_method_info (gpointer user_data)
+{
+ g_slice_free (TrackerDBusMethodInfo, user_data);
+}
+
+
void
tracker_resources_insert (TrackerResources *self,
const gchar *subject,
@@ -133,7 +142,7 @@ tracker_resources_insert (TrackerResources *self,
"'%s' '%s' '%s'",
subject, predicate, object);
- tracker_data_insert_statement (subject, predicate, object);
+ tracker_store_insert_statement (subject, predicate, object);
dbus_g_method_return (context);
@@ -161,7 +170,7 @@ tracker_resources_delete (TrackerResources *self,
"'%s' '%s' '%s'",
subject, predicate, object);
- tracker_data_delete_statement (subject, predicate, object);
+ tracker_store_delete_statement (subject, predicate, object);
dbus_g_method_return (context);
@@ -230,7 +239,7 @@ tracker_resources_sparql_query (TrackerResources *self,
"query:'%s'",
query);
- result_set = tracker_data_query_sparql (query, &actual_error);
+ result_set = tracker_store_sparql_query (query, &actual_error);
if (actual_error) {
tracker_dbus_request_failed (request_id,
@@ -270,19 +279,12 @@ tracker_resources_sparql_update (TrackerResources *self,
tracker_dbus_async_return_if_fail (update != NULL, context);
- if (priv->batch_mode) {
- /* commit pending batch items */
- tracker_data_commit_transaction ();
- priv->batch_mode = FALSE;
- priv->batch_count = 0;
- }
-
tracker_dbus_request_new (request_id,
"DBus request for SPARQL Update, "
"update:'%s'",
update);
- tracker_data_update_sparql (update, &actual_error);
+ tracker_store_sparql_update (update, &actual_error);
if (actual_error) {
tracker_dbus_request_failed (request_id,
@@ -298,14 +300,32 @@ tracker_resources_sparql_update (TrackerResources *self,
tracker_dbus_request_success (request_id);
}
+static void
+batch_update_callback (GError *error, gpointer user_data)
+{
+ TrackerDBusMethodInfo *info = user_data;
+
+ if (error) {
+ tracker_dbus_request_failed (info->request_id,
+ &error,
+ NULL);
+ dbus_g_method_return_error (info->context, error);
+ return;
+ }
+
+ dbus_g_method_return (info->context);
+
+ tracker_dbus_request_success (info->request_id);
+}
+
void
tracker_resources_batch_sparql_update (TrackerResources *self,
const gchar *update,
DBusGMethodInvocation *context,
GError **error)
{
+ TrackerDBusMethodInfo *info;
TrackerResourcesPrivate *priv;
- GError *actual_error = NULL;
guint request_id;
priv = TRACKER_RESOURCES_GET_PRIVATE (self);
@@ -319,35 +339,23 @@ tracker_resources_batch_sparql_update (TrackerResources *self,
"update:'%s'",
update);
- if (!priv->batch_mode) {
- /* switch to batch mode
- delays database commits to improve performance */
- priv->batch_mode = TRUE;
- priv->batch_count = 0;
- tracker_data_begin_transaction ();
- }
+ info = g_slice_new (TrackerDBusMethodInfo);
- tracker_data_update_sparql (update, &actual_error);
+ info->request_id = request_id;
+ info->context = context;
- if (actual_error) {
- tracker_dbus_request_failed (request_id,
- &actual_error,
- NULL);
- dbus_g_method_return_error (context, actual_error);
- g_error_free (actual_error);
- return;
- }
+ tracker_store_queue_sparql_update (update, batch_update_callback,
+ info, destroy_method_info);
- if (++priv->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
- /* commit pending batch items */
- tracker_data_commit_transaction ();
- priv->batch_mode = FALSE;
- priv->batch_count = 0;
- }
+}
- dbus_g_method_return (context);
+static void
+batch_commit_callback (gpointer user_data)
+{
+ TrackerDBusMethodInfo *info = user_data;
- tracker_dbus_request_success (request_id);
+ dbus_g_method_return (info->context);
+ tracker_dbus_request_success (info->request_id);
}
void
@@ -355,6 +363,7 @@ tracker_resources_batch_commit (TrackerResources *self,
DBusGMethodInvocation *context,
GError **error)
{
+ TrackerDBusMethodInfo *info;
TrackerResourcesPrivate *priv;
guint request_id;
@@ -365,16 +374,13 @@ tracker_resources_batch_commit (TrackerResources *self,
tracker_dbus_request_new (request_id,
"DBus request for batch commit");
- if (priv->batch_mode) {
- /* commit pending batch items */
- tracker_data_commit_transaction ();
- priv->batch_mode = FALSE;
- priv->batch_count = 0;
- }
+ info = g_slice_new (TrackerDBusMethodInfo);
- dbus_g_method_return (context);
+ info->request_id = request_id;
+ info->context = context;
- tracker_dbus_request_success (request_id);
+ tracker_store_queue_commit (batch_commit_callback, info,
+ destroy_method_info);
}
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
new file mode 100644
index 0000000..8295284
--- /dev/null
+++ b/src/tracker-store/tracker-store.c
@@ -0,0 +1,328 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2009, Nokia
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ *
+ * Author: Philip Van Hoof <philip codeminded be>
+ */
+
+#include "config.h"
+
+#include <libtracker-common/tracker-dbus.h>
+#include <libtracker-db/tracker-db-dbus.h>
+
+#include <libtracker-data/tracker-data-update.h>
+#include <libtracker-data/tracker-data-query.h>
+
+#include "tracker-store.h"
+
+#define TRACKER_STORE_TRANSACTION_MAX 4000
+
+typedef struct {
+ gboolean have_handler;
+ gboolean batch_mode;
+ guint batch_count;
+ GQueue *queue;
+} TrackerStorePrivate;
+
+typedef enum {
+ TRACKER_STORE_TASK_TYPE_UPDATE = 0,
+ TRACKER_STORE_TASK_TYPE_COMMIT = 1
+} TrackerStoreTaskType;
+
+typedef struct {
+ TrackerStoreTaskType type;
+ union {
+ gchar *query;
+ } data;
+ gpointer user_data;
+ GDestroyNotify destroy;
+ union {
+ TrackerStoreSparqlUpdateCallback update_callback;
+ TrackerStoreCommitCallback commit_callback;
+ } callback;
+} TrackerStoreTask;
+
+static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
+
+static void
+private_free (gpointer data)
+{
+ TrackerStorePrivate *private = data;
+
+ g_queue_free (private->queue);
+ g_free (private);
+}
+
+static void
+tracker_store_task_free (TrackerStoreTask *task)
+{
+ g_free (task->data.query);
+ g_slice_free (TrackerStoreTask, task);
+}
+
+
+static gboolean
+queue_idle_handler (gpointer user_data)
+{
+ TrackerStorePrivate *private = user_data;
+ TrackerStoreTask *task;
+ GError *error = NULL;
+
+ task = g_queue_pop_head (private->queue);
+
+ if (!task) {
+ return FALSE;
+ }
+
+ /* Implicit transaction start */
+
+ if (!private->batch_mode && task->type != TRACKER_STORE_TASK_TYPE_COMMIT) {
+ /* switch to batch mode
+ delays database commits to improve performance */
+ tracker_data_begin_transaction ();
+ private->batch_mode = TRUE;
+ private->batch_count = 0;
+ }
+
+ switch (task->type) {
+ case TRACKER_STORE_TASK_TYPE_COMMIT:
+ if (private->batch_mode) {
+ /* commit pending batch items */
+ tracker_data_commit_transaction ();
+ private->batch_mode = FALSE;
+ private->batch_count = 0;
+ }
+
+ if (task->callback.commit_callback) {
+ task->callback.commit_callback (task->user_data);
+ }
+ break;
+
+ case TRACKER_STORE_TASK_TYPE_UPDATE:
+ tracker_data_update_sparql (task->data.query, &error);
+ if (task->callback.update_callback) {
+ task->callback.update_callback (error, task->user_data);
+ }
+
+ if (!error) {
+ private->batch_count++;
+ }
+ break;
+ }
+
+ if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+ /* commit pending batch items */
+ tracker_data_commit_transaction ();
+ private->batch_mode = FALSE;
+ private->batch_count = 0;
+ }
+
+ if (task->destroy) {
+ task->destroy (task->user_data);
+ }
+
+ if (error) {
+ g_clear_error (&error);
+ }
+
+ tracker_store_task_free (task);
+
+ return TRUE;
+}
+
+static void
+queue_idle_destroy (gpointer user_data)
+{
+ TrackerStorePrivate *private = user_data;
+
+ private->have_handler = FALSE;
+}
+
+void
+tracker_store_init (void)
+{
+ TrackerStorePrivate *private;
+
+ private = g_new0 (TrackerStorePrivate, 1);
+
+ private->queue = g_queue_new ();
+
+ g_static_private_set (&private_key,
+ private,
+ private_free);
+}
+
+void
+tracker_store_shutdown (void)
+{
+ TrackerStorePrivate *private;
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ if (private->have_handler) {
+ g_debug ("Can't exit until store-queue is finished ...");
+ while (private->have_handler) {
+ g_main_context_iteration (NULL, TRUE);
+ }
+ g_debug ("Store-queue finished");
+ }
+
+ g_static_private_set (&private_key, NULL, NULL);
+}
+
+static void
+start_handler (TrackerStorePrivate *private)
+{
+ private->have_handler = TRUE;
+
+ g_idle_add_full (G_PRIORITY_DEFAULT,
+ queue_idle_handler,
+ private,
+ queue_idle_destroy);
+}
+
+void
+tracker_store_queue_commit (TrackerStoreCommitCallback callback,
+ gpointer user_data,
+ GDestroyNotify destroy)
+{
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ task = g_slice_new0 (TrackerStoreTask);
+ task->type = TRACKER_STORE_TASK_TYPE_COMMIT;
+ task->user_data = user_data;
+ task->callback.commit_callback = callback;
+ task->destroy = destroy;
+
+ g_queue_push_tail (private->queue, task);
+
+ if (!private->have_handler) {
+ start_handler (private);
+ }
+}
+
+
+void
+tracker_store_queue_sparql_update (const gchar *sparql,
+ TrackerStoreSparqlUpdateCallback callback,
+ gpointer user_data,
+ GDestroyNotify destroy)
+{
+ TrackerStorePrivate *private;
+ TrackerStoreTask *task;
+
+ g_return_if_fail (sparql != NULL);
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ task = g_slice_new0 (TrackerStoreTask);
+ task->type = TRACKER_STORE_TASK_TYPE_UPDATE;
+ task->data.query = g_strdup (sparql);
+ task->user_data = user_data;
+ task->callback.update_callback = callback;
+ task->destroy = destroy;
+
+ g_queue_push_tail (private->queue, task);
+
+ if (!private->have_handler) {
+ start_handler (private);
+ }
+}
+
+void
+tracker_store_sparql_update (const gchar *sparql,
+ GError **error)
+{
+ TrackerStorePrivate *private;
+
+ g_return_if_fail (sparql != NULL);
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ if (private->batch_mode) {
+ /* commit pending batch items */
+ tracker_data_commit_transaction ();
+ private->batch_mode = FALSE;
+ private->batch_count = 0;
+ }
+
+ tracker_data_update_sparql (sparql, error);
+}
+
+TrackerDBResultSet*
+tracker_store_sparql_query (const gchar *sparql,
+ GError **error)
+{
+ return tracker_data_query_sparql (sparql, error);
+}
+
+void
+tracker_store_insert_statement (const gchar *subject,
+ const gchar *predicate,
+ const gchar *object)
+{
+ TrackerStorePrivate *private;
+
+ g_return_if_fail (subject != NULL);
+ g_return_if_fail (predicate != NULL);
+ g_return_if_fail (object != NULL);
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ if (private->batch_mode) {
+ /* commit pending batch items */
+ tracker_data_commit_transaction ();
+ private->batch_mode = FALSE;
+ private->batch_count = 0;
+ }
+
+ tracker_data_insert_statement (subject, predicate, object);
+}
+
+void
+tracker_store_delete_statement (const gchar *subject,
+ const gchar *predicate,
+ const gchar *object)
+{
+ TrackerStorePrivate *private;
+
+ g_return_if_fail (subject != NULL);
+ g_return_if_fail (predicate != NULL);
+ g_return_if_fail (object != NULL);
+
+ private = g_static_private_get (&private_key);
+ g_return_if_fail (private != NULL);
+
+ if (private->batch_mode) {
+ /* commit pending batch items */
+ tracker_data_commit_transaction ();
+ private->batch_mode = FALSE;
+ private->batch_count = 0;
+ }
+
+ tracker_data_delete_statement (subject, predicate, object);
+}
+
diff --git a/src/tracker-store/tracker-store.h b/src/tracker-store/tracker-store.h
new file mode 100644
index 0000000..c0253e4
--- /dev/null
+++ b/src/tracker-store/tracker-store.h
@@ -0,0 +1,60 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2009, Nokia
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ *
+ * Author: Philip Van Hoof <philip codeminded be>
+ */
+
+#ifndef __TRACKER_STORE_H__
+#define __TRACKER_STORE_H__
+
+#include <stdio.h>
+
+#include <libtracker-common/tracker-common.h>
+#include <libtracker-db/tracker-db-interface.h>
+
+G_BEGIN_DECLS
+
+typedef void (* TrackerStoreSparqlUpdateCallback) (GError *error,
+ gpointer user_data);
+typedef void (* TrackerStoreCommitCallback) (gpointer user_data);
+
+void tracker_store_init (void);
+void tracker_store_shutdown (void);
+void tracker_store_queue_commit (TrackerStoreCommitCallback callback,
+ gpointer user_data,
+ GDestroyNotify destroy);
+void tracker_store_queue_sparql_update (const gchar *sparql,
+ TrackerStoreSparqlUpdateCallback callback,
+ gpointer user_data,
+ GDestroyNotify destroy);
+void tracker_store_sparql_update (const gchar *sparql,
+ GError **error);
+void tracker_store_insert_statement (const gchar *subject,
+ const gchar *predicate,
+ const gchar *object);
+void tracker_store_delete_statement (const gchar *subject,
+ const gchar *predicate,
+ const gchar *object);
+TrackerDBResultSet*
+ tracker_store_sparql_query (const gchar *sparql,
+ GError **error);
+
+G_END_DECLS
+
+#endif /* __TRACKER_STORE_H__ */
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]