[tracker/miner-priority-queues-0.10: 20/22] libtracker-miner: Add priorities to the sparql buffer
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-priority-queues-0.10: 20/22] libtracker-miner: Add priorities to the sparql buffer
- Date: Thu, 21 Jul 2011 12:39:11 +0000 (UTC)
commit 3d50ef1d0b996d3731c5a8c333d9f50d637e3a06
Author: Carlos Garnacho <carlos lanedo com>
Date: Thu Jul 14 15:03:19 2011 +0200
libtracker-miner: Add priorities to the sparql buffer
If a task is of priority G_PRIORITY_HIGH, it will issue an Update()
instead of a UpdateArray(), so updates are most immediate.
src/libtracker-miner/tracker-miner-fs.c | 4 +
src/libtracker-miner/tracker-sparql-buffer.c | 84 +++++++++++++++++++++-----
src/libtracker-miner/tracker-sparql-buffer.h | 1 +
3 files changed, 73 insertions(+), 16 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 9ca23a5..229475d 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1891,6 +1891,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
sparql_task,
+ ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
@@ -1999,6 +2000,7 @@ item_remove (TrackerMinerFS *fs,
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
task,
+ G_PRIORITY_DEFAULT,
sparql_buffer_task_finished_cb,
fs);
@@ -2017,6 +2019,7 @@ item_remove (TrackerMinerFS *fs,
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
task,
+ G_PRIORITY_DEFAULT,
sparql_buffer_task_finished_cb,
fs);
@@ -2364,6 +2367,7 @@ item_move (TrackerMinerFS *fs,
FALSE));
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
task,
+ G_PRIORITY_DEFAULT,
sparql_buffer_task_finished_cb,
fs);
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index bd3abe9..cb35b44 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -26,6 +26,7 @@
typedef struct _TrackerSparqlBufferPrivate TrackerSparqlBufferPrivate;
typedef struct _SparqlTaskData SparqlTaskData;
typedef struct _UpdateArrayData UpdateArrayData;
+typedef struct _UpdateData UpdateData;
typedef struct _BulkOperationMerge BulkOperationMerge;
enum {
@@ -64,6 +65,11 @@ struct _SparqlTaskData
GSimpleAsyncResult *result;
};
+struct _UpdateData {
+ TrackerSparqlBuffer *buffer;
+ TrackerTask *task;
+};
+
struct _UpdateArrayData {
TrackerSparqlBuffer *buffer;
GPtrArray *tasks;
@@ -544,9 +550,30 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
return TRUE;
}
+static void
+tracker_sparql_buffer_update_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ UpdateData *update_data = user_data;
+ GError *error = NULL;
+
+ tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object),
+ result, &error);
+ if (error) {
+ g_critical ("Error in prioritized update: %s\n", error->message);
+ g_error_free (error);
+ }
+
+ tracker_task_pool_remove (TRACKER_TASK_POOL (update_data->buffer),
+ update_data->task);
+ g_slice_free (UpdateData, update_data);
+}
+
void
tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
TrackerTask *task,
+ gint priority,
GAsyncReadyCallback cb,
gpointer user_data)
{
@@ -558,27 +585,52 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
priv = buffer->priv;
- if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) {
- reset_flush_timeout (buffer);
- }
+ data = tracker_task_get_data (task);
+ data->result = g_simple_async_result_new (G_OBJECT (buffer),
+ cb, user_data, NULL);
- tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+ if (priority <= G_PRIORITY_HIGH &&
+ data->type != TASK_TYPE_BULK) {
+ UpdateData *update_data;
+ const gchar *sparql = NULL;
- if (!priv->tasks) {
- priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
- }
+ /* High priority task */
+ update_data = g_slice_new0 (UpdateData);
+ update_data->buffer = buffer;
+ update_data->task = task;
- g_ptr_array_add (priv->tasks, task);
+ if (data->type == TASK_TYPE_SPARQL_STR) {
+ sparql = data->data.str;
+ } else if (data->type == TASK_TYPE_SPARQL) {
+ sparql = tracker_sparql_builder_get_result (data->data.builder);
+ }
- data = tracker_task_get_data (task);
- data->result = g_simple_async_result_new (G_OBJECT (buffer),
- cb, user_data, NULL);
+ tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+ tracker_sparql_connection_update_async (priv->connection,
+ sparql,
+ G_PRIORITY_HIGH,
+ NULL,
+ tracker_sparql_buffer_update_cb,
+ update_data);
+ } else {
+ if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) {
+ reset_flush_timeout (buffer);
+ }
+
+ tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+
+ if (!priv->tasks) {
+ priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
+ }
+
+ g_ptr_array_add (priv->tasks, task);
- if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
- tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
- } else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
- /* We've filled half of the buffer, flush it as we receive more tasks */
- tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
+ if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
+ tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
+ } else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
+ /* We've filled half of the buffer, flush it as we receive more tasks */
+ tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
+ }
}
}
diff --git a/src/libtracker-miner/tracker-sparql-buffer.h b/src/libtracker-miner/tracker-sparql-buffer.h
index 6ee13b4..919ad9a 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.h
+++ b/src/libtracker-miner/tracker-sparql-buffer.h
@@ -65,6 +65,7 @@ gboolean tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
void tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
TrackerTask *task,
+ gint priority,
GAsyncReadyCallback cb,
gpointer user_data);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]