[tracker/wal: 5/18] tracker-store: Allow running queries while update is running
- From: Jürg Billeter <juergbi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wal: 5/18] tracker-store: Allow running queries while update is running
- Date: Fri, 13 Aug 2010 15:16:05 +0000 (UTC)
commit c06b0442f51f887614eba43d5ff140ed1b97923b
Author: Jürg Billeter <j bitron ch>
Date: Wed Jun 30 16:25:57 2010 +0200
tracker-store: Allow running queries while update is running
This will break order guarantees. Check with client applications and
libraries before merging this commit.
src/tracker-store/tracker-store.c | 398 ++++++++++++++-----------------------
1 files changed, 145 insertions(+), 253 deletions(-)
---
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index a68631d..f21fc05 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -41,20 +41,17 @@
#define TRACKER_STORE_MAX_CONCURRENT_QUERIES 2
-#define TRACKER_STORE_N_TURTLE_STATEMENTS 50
-
#define TRACKER_STORE_QUERY_WATCHDOG_TIMEOUT 10
#define TRACKER_STORE_MAX_TASK_TIME 30
typedef struct {
- gboolean have_handler, have_sync_handler;
gboolean start_log;
- GQueue *queues[TRACKER_STORE_N_PRIORITIES];
- guint handler, sync_handler;
+ GQueue *query_queues[TRACKER_STORE_N_PRIORITIES];
+ GQueue *update_queues[TRACKER_STORE_N_PRIORITIES];
guint n_queries_running;
gboolean update_running;
- GThreadPool *main_pool;
- GThreadPool *global_pool;
+ GThreadPool *update_pool;
+ GThreadPool *query_pool;
GSList *running_tasks;
guint watchdog_id;
guint max_task_time;
@@ -82,8 +79,6 @@ typedef struct {
GPtrArray *blank_nodes;
} update;
struct {
- TrackerTurtleReader *reader;
- gboolean in_progress;
gchar *path;
} turtle;
} data;
@@ -109,8 +104,6 @@ static GStaticPrivate private_key = G_STATIC_PRIVATE_INIT;
static int main_cpu;
#endif /* __USE_GNU */
-static void start_handler (TrackerStorePrivate *private);
-
static void
private_free (gpointer data)
{
@@ -118,7 +111,8 @@ private_free (gpointer data)
gint i;
for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- g_queue_free (private->queues[i]);
+ g_queue_free (private->query_queues[i]);
+ g_queue_free (private->update_queues[i]);
}
g_free (private);
}
@@ -127,7 +121,6 @@ static void
store_task_free (TrackerStoreTask *task)
{
if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
- g_object_unref (task->data.turtle.reader);
g_free (task->data.turtle.path);
} else if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
g_free (task->data.query.query);
@@ -145,26 +138,15 @@ store_task_free (TrackerStoreTask *task)
g_slice_free (TrackerStoreTask, task);
}
-static gboolean
-process_turtle_file_part (TrackerTurtleReader *reader, GError **error)
+static void
+process_turtle_file (TrackerTurtleReader *reader, GError **error)
{
- int i;
GError *new_error = NULL;
- /* process 10 statements at once before returning to main loop */
-
- i = 0;
-
- /* There is no logical structure in turtle files, so we have no choice
- * but fallback to fixed number of statements per transaction to avoid
- * blocking tracker-store.
- * Real applications should all use SPARQL update instead of turtle
- * import to avoid this issue.
- */
tracker_data_begin_transaction (&new_error);
if (new_error) {
g_propagate_error (error, new_error);
- return FALSE;
+ return;
}
while (new_error == NULL && tracker_turtle_reader_next (reader, &new_error)) {
@@ -184,87 +166,19 @@ process_turtle_file_part (TrackerTurtleReader *reader, GError **error)
tracker_turtle_reader_get_object (reader),
&new_error);
}
-
- i++;
- if (!new_error && i >= TRACKER_STORE_N_TURTLE_STATEMENTS) {
- tracker_data_commit_transaction (&new_error);
- if (new_error) {
- tracker_data_rollback_transaction ();
- g_propagate_error (error, new_error);
- return FALSE;
- }
- /* return to main loop */
- return TRUE;
- }
}
if (new_error) {
tracker_data_rollback_transaction ();
g_propagate_error (error, new_error);
- return FALSE;
+ return;
}
tracker_data_commit_transaction (&new_error);
if (new_error) {
tracker_data_rollback_transaction ();
g_propagate_error (error, new_error);
- return FALSE;
- }
-
- return FALSE;
-}
-
-static gboolean
-task_ready (TrackerStorePrivate *private)
-{
- TrackerStoreTask *task;
- gint i;
-
- /* return TRUE if at least one queue is not empty (to keep idle handler running) */
-
- if (private->n_queries_running >= TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
- /* maximum number of queries running already, cannot schedule anything else */
- return FALSE;
- } else if (private->update_running) {
- /* update running already, cannot schedule anything else */
- return FALSE;
- }
-
- for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- /* check next task of highest priority */
- task = g_queue_peek_head (private->queues[i]);
- if (task != NULL) {
- if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
- /* we know that the maximum number of concurrent queries has not been reached yet,
- query can be scheduled */
- return TRUE;
- } else if (private->n_queries_running == 0) {
- /* no queries running, updates can be scheduled */
- return TRUE;
- } else {
- /* queries running, wait for them to finish before scheduling updates */
- return FALSE;
- }
- }
- }
-
- return FALSE;
-}
-
-static void
-check_handler (TrackerStorePrivate *private)
-{
- if (private->active && task_ready (private)) {
- /* handler should be running */
- if (!private->have_handler) {
- start_handler (private);
- }
- } else {
- /* handler should not be running */
- if (private->have_handler) {
- g_source_remove (private->handler);
- private->have_handler = FALSE;
- }
+ return;
}
}
@@ -318,6 +232,55 @@ check_running_tasks_watchdog (TrackerStorePrivate *private)
}
}
+static void
+sched (TrackerStorePrivate *private)
+{
+ GQueue *queue;
+ TrackerStoreTask *task;
+ gint i;
+
+ if (!private->active) {
+ return;
+ }
+
+ while (private->n_queries_running < TRACKER_STORE_MAX_CONCURRENT_QUERIES) {
+ for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+ queue = private->query_queues[i];
+ task = g_queue_pop_head (queue);
+ if (task != NULL) {
+ break;
+ }
+ }
+ if (task == NULL) {
+ /* no pending query */
+ break;
+ }
+
+ private->running_tasks = g_slist_prepend (private->running_tasks, task);
+ ensure_running_tasks_watchdog (private);
+ private->n_queries_running++;
+
+ task->data.query.timer = g_timer_new ();
+
+ g_thread_pool_push (private->query_pool, task, NULL);
+ }
+
+ if (!private->update_running) {
+ for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
+ queue = private->update_queues[i];
+ task = g_queue_pop_head (queue);
+ if (task != NULL) {
+ break;
+ }
+ }
+ if (task != NULL) {
+ private->update_running = TRUE;
+
+ g_thread_pool_push (private->update_pool, task, NULL);
+ }
+ }
+}
+
static gboolean
task_finish_cb (gpointer data)
{
@@ -388,24 +351,19 @@ task_finish_cb (gpointer data)
private->update_running = FALSE;
} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
- private->update_running = FALSE;
-
- if (task->data.turtle.in_progress) {
- /* Task still in progress */
- check_handler (private);
- return FALSE;
- } else {
- if (task->callback.turtle_callback) {
- task->callback.turtle_callback (task->error, task->user_data);
- }
+ if (!task->error) {
+ tracker_data_notify_transaction ();
+ }
- if (task->error) {
- g_clear_error (&task->error);
- }
+ if (task->callback.turtle_callback) {
+ task->callback.turtle_callback (task->error, task->user_data);
+ }
- /* Remove the task now that we're done with it */
- g_queue_pop_head (private->queues[TRACKER_STORE_PRIORITY_TURTLE]);
+ if (task->error) {
+ g_clear_error (&task->error);
}
+
+ private->update_running = FALSE;
}
if (task->destroy) {
@@ -414,7 +372,7 @@ task_finish_cb (gpointer data)
store_task_free (task);
- check_handler (private);
+ sched (private);
return FALSE;
}
@@ -457,89 +415,16 @@ pool_dispatch_cb (gpointer data,
} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
task->data.update.blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &task->error);
} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
- if (!task->data.turtle.in_progress) {
- task->data.turtle.reader = tracker_turtle_reader_new (task->data.turtle.path, &task->error);
-
- if (task->error) {
- g_idle_add (task_finish_cb, task);
- return;
- }
+ TrackerTurtleReader *reader;
- task->data.turtle.in_progress = TRUE;
- }
-
- if (process_turtle_file_part (task->data.turtle.reader, &task->error)) {
- /* import still in progress */
- } else {
- /* import finished */
- task->data.turtle.in_progress = FALSE;
- }
+ reader = tracker_turtle_reader_new (task->data.turtle.path, &task->error);
+ process_turtle_file (reader, &task->error);
+ g_object_unref (reader);
}
g_idle_add (task_finish_cb, task);
}
-static void
-task_run_async (TrackerStorePrivate *private,
- TrackerStoreTask *task)
-{
- if (private->n_queries_running > 1) {
- /* use global pool if main pool might already be occupied */
- g_thread_pool_push (private->global_pool, task, NULL);
- } else {
- /* use main pool for updates and non-parallel queries */
- g_thread_pool_push (private->main_pool, task, NULL);
- }
-}
-
-static gboolean
-queue_idle_handler (gpointer user_data)
-{
- TrackerStorePrivate *private = user_data;
- GQueue *queue;
- TrackerStoreTask *task = NULL;
- gint i;
-
- for (i = 0; task == NULL && i < TRACKER_STORE_N_PRIORITIES; i++) {
- queue = private->queues[i];
- task = g_queue_peek_head (queue);
- }
- g_return_val_if_fail (task != NULL, FALSE);
-
- if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
- /* pop task now, otherwise further queries won't be scheduled */
- g_queue_pop_head (queue);
-
- private->running_tasks = g_slist_prepend (private->running_tasks, task);
- ensure_running_tasks_watchdog (private);
- private->n_queries_running++;
-
- task->data.query.timer = g_timer_new ();
-
- task_run_async (private, task);
- } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE ||
- task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
- g_queue_pop_head (queue);
-
- private->update_running = TRUE;
-
- task_run_async (private, task);
- } else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
- private->update_running = TRUE;
- task_run_async (private, task);
- }
-
- return task_ready (private);
-}
-
-static void
-queue_idle_destroy (gpointer user_data)
-{
- TrackerStorePrivate *private = user_data;
-
- private->have_handler = FALSE;
-}
-
void
tracker_store_init (void)
{
@@ -559,15 +444,16 @@ tracker_store_init (void)
}
for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- private->queues[i] = g_queue_new ();
+ private->query_queues[i] = g_queue_new ();
+ private->update_queues[i] = g_queue_new ();
}
- private->main_pool = g_thread_pool_new (pool_dispatch_cb,
- private, 1,
- TRUE, NULL);
- private->global_pool = g_thread_pool_new (pool_dispatch_cb,
- private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
- FALSE, NULL);
+ private->update_pool = g_thread_pool_new (pool_dispatch_cb,
+ private, 1,
+ TRUE, NULL);
+ private->query_pool = g_thread_pool_new (pool_dispatch_cb,
+ private, TRACKER_STORE_MAX_CONCURRENT_QUERIES,
+ FALSE, NULL);
/* as the following settings are global for unknown reasons,
let's use the same settings as gio, otherwise the used settings
@@ -584,7 +470,7 @@ tracker_store_init (void)
pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
/* lock main update/query thread to same cpu to improve overall performance
main loop thread is essentially idle during query execution */
- g_thread_pool_push (private->main_pool, GINT_TO_POINTER (1), NULL);
+ g_thread_pool_push (private->update_pool, GINT_TO_POINTER (1), NULL);
#endif /* __USE_GNU */
g_static_private_set (&private_key,
@@ -600,33 +486,12 @@ tracker_store_shutdown (void)
private = g_static_private_get (&private_key);
g_return_if_fail (private != NULL);
- g_thread_pool_free (private->global_pool, FALSE, TRUE);
- g_thread_pool_free (private->main_pool, FALSE, TRUE);
-
- if (private->have_handler) {
- g_source_remove (private->handler);
- private->have_handler = FALSE;
- }
-
- if (private->have_sync_handler) {
- g_source_remove (private->sync_handler);
- private->have_sync_handler = FALSE;
- }
+ g_thread_pool_free (private->query_pool, FALSE, TRUE);
+ g_thread_pool_free (private->update_pool, FALSE, TRUE);
g_static_private_set (&private_key, NULL, NULL);
}
-static void
-start_handler (TrackerStorePrivate *private)
-{
- private->have_handler = TRUE;
-
- private->handler = g_idle_add_full (G_PRIORITY_LOW,
- queue_idle_handler,
- private,
- queue_idle_destroy);
-}
-
void
tracker_store_sparql_query (const gchar *sparql,
TrackerStorePriority priority,
@@ -654,9 +519,9 @@ tracker_store_sparql_query (const gchar *sparql,
task->destroy = destroy;
task->client_id = g_strdup (client_id);
- g_queue_push_tail (private->queues[priority], task);
+ g_queue_push_tail (private->query_queues[priority], task);
- check_handler (private);
+ sched (private);
}
void
@@ -683,9 +548,9 @@ tracker_store_sparql_update (const gchar *sparql,
task->destroy = destroy;
task->client_id = g_strdup (client_id);
- g_queue_push_tail (private->queues[priority], task);
+ g_queue_push_tail (private->update_queues[priority], task);
- check_handler (private);
+ sched (private);
}
void
@@ -712,9 +577,9 @@ tracker_store_sparql_update_blank (const gchar *sparql,
task->destroy = destroy;
task->client_id = g_strdup (client_id);
- g_queue_push_tail (private->queues[priority], task);
+ g_queue_push_tail (private->update_queues[priority], task);
- check_handler (private);
+ sched (private);
}
void
@@ -738,9 +603,9 @@ tracker_store_queue_turtle_import (GFile *file,
task->callback.update_callback = callback;
task->destroy = destroy;
- g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_TURTLE], task);
+ g_queue_push_tail (private->update_queues[TRACKER_STORE_PRIORITY_TURTLE], task);
- check_handler (private);
+ sched (private);
}
guint
@@ -754,11 +619,30 @@ tracker_store_get_queue_size (void)
g_return_val_if_fail (private != NULL, 0);
for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- result += g_queue_get_length (private->queues[i]);
+ result += g_queue_get_length (private->query_queues[i]);
+ result += g_queue_get_length (private->update_queues[i]);
}
return result;
}
+static void
+unreg_task (TrackerStoreTask *task,
+ GError *error)
+{
+ if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
+ task->callback.query.query_callback (NULL, error, task->user_data);
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+ task->callback.update_callback (error, task->user_data);
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+ task->callback.update_blank_callback (NULL, error, task->user_data);
+ } else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+ task->callback.turtle_callback (error, task->user_data);
+ }
+ task->destroy (task->user_data);
+
+ store_task_free (task);
+}
+
void
tracker_store_unreg_batches (const gchar *client_id)
{
@@ -784,10 +668,29 @@ tracker_store_unreg_batches (const gchar *client_id)
}
for (i = 0; i < TRACKER_STORE_N_PRIORITIES; i++) {
- queue = private->queues[i];
-
+ queue = private->query_queues[i];
list = queue->head;
+ while (list) {
+ TrackerStoreTask *task;
+
+ cur = list;
+ list = list->next;
+ task = cur->data;
+
+ if (task && g_strcmp0 (task->client_id, client_id) == 0) {
+ g_queue_delete_link (queue, cur);
+
+ if (!error) {
+ g_set_error (&error, TRACKER_DBUS_ERROR, 0,
+ "Client disappeared");
+ }
+
+ unreg_task (task, error);
+ }
+ }
+ queue = private->update_queues[i];
+ list = queue->head;
while (list) {
TrackerStoreTask *task;
@@ -795,26 +698,15 @@ tracker_store_unreg_batches (const gchar *client_id)
list = list->next;
task = cur->data;
- if (task && task->type != TRACKER_STORE_TASK_TYPE_TURTLE) {
- if (g_strcmp0 (task->client_id, client_id) == 0) {
- if (!error) {
- g_set_error (&error, TRACKER_DBUS_ERROR, 0,
- "Client disappeared");
- }
-
- if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
- task->callback.query.query_callback (NULL, error, task->user_data);
- } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
- task->callback.update_callback (error, task->user_data);
- } else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
- task->callback.update_blank_callback (NULL, error, task->user_data);
- }
- task->destroy (task->user_data);
-
- g_queue_delete_link (queue, cur);
-
- store_task_free (task);
+ if (task && g_strcmp0 (task->client_id, client_id) == 0) {
+ g_queue_delete_link (queue, cur);
+
+ if (!error) {
+ g_set_error (&error, TRACKER_DBUS_ERROR, 0,
+ "Client disappeared");
}
+
+ unreg_task (task, error);
}
}
}
@@ -823,7 +715,7 @@ tracker_store_unreg_batches (const gchar *client_id)
g_clear_error (&error);
}
- check_handler (private);
+ sched (private);
}
void
@@ -834,5 +726,5 @@ tracker_store_set_active (gboolean active)
private = g_static_private_get (&private_key);
private->active = active;
- check_handler (private);
+ sched (private);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]