[gegl] buffer: improved and fixed a bug in the async file backend
- From: Ville Sokk <villesokk src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gegl] buffer: improved and fixed a bug in the async file backend
- Date: Mon, 13 Aug 2012 09:16:02 +0000 (UTC)
commit f0340d69f227bb20383f202937ed2dd137c6bbbd
Author: Ville Sokk <ville sokk gmail com>
Date: Mon Aug 13 12:09:50 2012 +0300
buffer: improved and fixed a bug in the async file backend
gegl/buffer/gegl-tile-backend-file-async.c | 135 ++++++++++++++++++----------
gegl/buffer/gegl-tile-backend-file.h | 2 +-
2 files changed, 90 insertions(+), 47 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-file-async.c b/gegl/buffer/gegl-tile-backend-file-async.c
index 061c051..b0f9b1b 100644
--- a/gegl/buffer/gegl-tile-backend-file-async.c
+++ b/gegl/buffer/gegl-tile-backend-file-async.c
@@ -151,19 +151,20 @@ static gint file_size = 0;
static gint peak_allocs = 0;
static gint peak_file_size = 0;
-static GQueue queue = G_QUEUE_INIT;
-static GMutex *mutex = NULL;
-static GCond *queue_cond = NULL;
-static GCond *max_cond = NULL;
+static GQueue queue = G_QUEUE_INIT;
+static GMutex *queue_mutex = NULL;
+static GCond *queue_cond = NULL;
+static GCond *max_cond = NULL;
+static GMutex *write_mutex = NULL;
static void
gegl_tile_backend_file_finish_writing (GeglTileBackendFile *self)
{
- g_mutex_lock (mutex);
+ g_mutex_lock (queue_mutex);
while (self->pending_ops != 0)
- g_cond_wait (self->cond, mutex);
- g_mutex_unlock (mutex);
+ g_cond_wait (self->cond, queue_mutex);
+ g_mutex_unlock (queue_mutex);
}
static void
@@ -171,19 +172,24 @@ gegl_tile_backend_file_push_queue (GeglFileBackendThreadParams *params)
{
guint length;
- g_mutex_lock (mutex);
+ g_mutex_lock (queue_mutex);
length = g_queue_get_length (&queue);
+ /* block if the queue has gotten too big */
if (length > gegl_config ()->queue_limit)
- g_cond_wait (max_cond, mutex);
+ g_cond_wait (max_cond, queue_mutex);
params->file->pending_ops += 1;
g_queue_push_tail (&queue, params);
+
+ if (params->entry)
+ params->entry->link = g_queue_peek_tail_link (&queue);
+
if (length == 0) /* wake up the writer thread */
g_cond_signal (queue_cond);
- g_mutex_unlock (mutex);
+ g_mutex_unlock (queue_mutex);
}
static inline void
@@ -231,13 +237,21 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
{
GeglFileBackendThreadParams *params;
- g_mutex_lock (mutex);
+ g_mutex_lock (queue_mutex);
while (g_queue_is_empty (&queue))
- g_cond_wait (queue_cond, mutex);
+ g_cond_wait (queue_cond, queue_mutex);
+
+ g_mutex_lock (write_mutex);
+ if (g_queue_is_empty (&queue))
+ {
+ g_mutex_unlock (queue_mutex);
+ g_mutex_unlock (write_mutex);
+ continue;
+ }
params = (GeglFileBackendThreadParams *)g_queue_peek_head (&queue);
- g_mutex_unlock (mutex);
+ g_mutex_unlock (queue_mutex);
switch (params->operation)
{
@@ -252,25 +266,28 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
break;
}
- g_mutex_lock (mutex);
+ g_mutex_lock (queue_mutex);
g_queue_pop_head (&queue);
+ /* the file maybe waiting for its file operations to finish */
params->file->pending_ops -= 1;
if (params->file->pending_ops == 0)
g_cond_signal (params->file->cond);
+ /* unblock the main thread if the queue had gotten too big */
if (g_queue_get_length (&queue) < gegl_config ()->queue_limit)
g_cond_signal (max_cond);
if (params->entry)
- params->entry->in_queue = FALSE;
+ params->entry->link = NULL;
if (params->operation == OP_WRITE)
g_free (params->source);
g_free (params);
- g_mutex_unlock (mutex);
+ g_mutex_unlock (queue_mutex);
+ g_mutex_unlock (write_mutex);
}
return NULL;
@@ -287,30 +304,17 @@ gegl_tile_backend_file_entry_read (GeglTileBackendFile *self,
gegl_tile_backend_file_ensure_exist (self);
- if (entry->in_queue)
+ g_mutex_lock (queue_mutex);
+ if (entry->link)
{
- GList *link;
-
- g_mutex_lock (mutex);
-
- link = g_queue_peek_tail_link (&queue);
-
- while (link)
- {
- GeglFileBackendThreadParams *queued_op =
- (GeglFileBackendThreadParams *)link->data;
-
- if (queued_op->file == self && queued_op->offset == offset)
- {
- memcpy (dest, queued_op->source, to_be_read);
- g_mutex_unlock (mutex);
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
- return;
- }
- link = link->prev;
- }
- g_mutex_unlock (mutex);
+ memcpy (dest,
+ ((GeglFileBackendThreadParams *)entry->link->data)->source,
+ to_be_read);
+ g_mutex_unlock (queue_mutex);
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
+ return;
}
+ g_mutex_unlock (queue_mutex);
if (self->in_offset != offset)
{
@@ -347,7 +351,7 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile *self,
GeglFileBackendEntry *entry,
guchar *source)
{
- GeglFileBackendThreadParams *params = g_new0 (GeglFileBackendThreadParams, 1);
+ GeglFileBackendThreadParams *params;
gint length = gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self));
guchar *new_source = g_malloc (length);
@@ -355,6 +359,24 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile *self,
memcpy (new_source, source, length);
+ if (entry->link)
+ {
+ g_mutex_lock (write_mutex);
+
+ if (entry->link)
+ {
+ params = (GeglFileBackendThreadParams *)entry->link->data;
+
+ g_free (params->source);
+ params->source = new_source;
+ g_mutex_unlock (write_mutex);
+ return;
+ }
+
+ g_mutex_unlock (write_mutex);
+ }
+
+ params = g_new0 (GeglFileBackendThreadParams, 1);
params->operation = OP_WRITE;
params->length = length;
params->offset = entry->tile->offset;
@@ -362,8 +384,6 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile *self,
params->source = new_source;
params->entry = entry;
- entry->in_queue = TRUE;
-
gegl_tile_backend_file_push_queue (params);
GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "pushed entry write %i,%i,%i at %i", entry->tile->x, entry->tile->y, entry->tile->z, (gint)entry->tile->offset);
@@ -376,8 +396,8 @@ gegl_tile_backend_file_file_entry_create (gint x,
{
GeglFileBackendEntry *entry = g_new0 (GeglFileBackendEntry, 1);
- entry->tile = gegl_tile_entry_new (x, y, z);
- entry->in_queue = FALSE;
+ entry->tile = gegl_tile_entry_new (x, y, z);
+ entry->link = NULL;
return entry;
}
@@ -439,6 +459,28 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
{
/* XXX: EEEk, throwing away bits */
guint offset = entry->tile->offset;
+
+ /* Write mutex is used to completely stop the writer thread from working,
+ * which is required to remove the entry from the queue. entry->link is
+ * checked twice because while waiting for the mutex the writer thread
+ * may have already removed it. If we would lock the mutex before the
+ * first check we would spend too much time since it's not that common to
+ * have the entry in queue.
+ */
+ if (entry->link)
+ {
+ g_mutex_lock (write_mutex);
+
+ if (entry->link)
+ {
+ g_free (((GeglFileBackendThreadParams *)entry->link->data)->source);
+ g_free (entry->link->data);
+ g_queue_delete_link (&queue, entry->link);
+ }
+
+ g_mutex_unlock (write_mutex);
+ }
+
self->free_list = g_slist_prepend (self->free_list,
GUINT_TO_POINTER (offset));
g_hash_table_remove (self->index, entry);
@@ -1130,9 +1172,10 @@ gegl_tile_backend_file_class_init (GeglTileBackendFileClass *klass)
gobject_class->constructor = gegl_tile_backend_file_constructor;
gobject_class->finalize = gegl_tile_backend_file_finalize;
- queue_cond = g_cond_new ();
- max_cond = g_cond_new ();
- mutex = g_mutex_new ();
+ queue_cond = g_cond_new ();
+ max_cond = g_cond_new ();
+ queue_mutex = g_mutex_new ();
+ write_mutex = g_mutex_new ();
g_thread_create_full (gegl_tile_backend_file_writer_thread,
NULL, 0, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL, NULL);
diff --git a/gegl/buffer/gegl-tile-backend-file.h b/gegl/buffer/gegl-tile-backend-file.h
index 1151889..d9348c6 100644
--- a/gegl/buffer/gegl-tile-backend-file.h
+++ b/gegl/buffer/gegl-tile-backend-file.h
@@ -49,7 +49,7 @@ typedef enum
typedef struct
{
GeglBufferTile *tile;
- gchar in_queue;
+ GList *link;
} GeglFileBackendEntry;
typedef struct
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]