[gegl] buffer: in async file backend, overwrite GeglBufferBlock queue entries when flushing and don't use t
- From: Ville Sokk <villesokk src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gegl] buffer: in async file backend, overwrite GeglBufferBlock queue entries when flushing and don't use t
- Date: Thu, 16 Aug 2012 17:26:06 +0000 (UTC)
commit 3907a3a520c7555d07794a662b5807fdc1ed72be
Author: Ville Sokk <ville sokk gmail com>
Date: Thu Aug 16 20:25:16 2012 +0300
buffer: in async file backend, overwrite GeglBufferBlock queue entries when flushing and don't use two mutexes
gegl/buffer/gegl-tile-backend-file-async.c | 239 ++++++++++++++++------------
gegl/buffer/gegl-tile-backend-file.h | 7 +-
2 files changed, 145 insertions(+), 101 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-file-async.c b/gegl/buffer/gegl-tile-backend-file-async.c
index b27f2be..020b63b 100644
--- a/gegl/buffer/gegl-tile-backend-file-async.c
+++ b/gegl/buffer/gegl-tile-backend-file-async.c
@@ -113,7 +113,7 @@ struct _GeglTileBackendFile
* at all times to be able to keep track of the ->next offsets in
* the blocks.
*/
- GeglBufferBlock *in_holding;
+ GeglFileBackendEntry *in_holding;
/* loading buffer */
GList *tiles;
@@ -137,11 +137,11 @@ struct _GeglTileBackendFile
};
-static void gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self);
-static gboolean gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
- GeglBufferBlock *block);
-static void gegl_tile_backend_file_dbg_alloc (int size);
-static void gegl_tile_backend_file_dbg_dealloc (int size);
+static void gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self);
+static gboolean gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
+ GeglFileBackendEntry *block);
+static void gegl_tile_backend_file_dbg_alloc (int size);
+static void gegl_tile_backend_file_dbg_dealloc (int size);
G_DEFINE_TYPE (GeglTileBackendFile, gegl_tile_backend_file, GEGL_TYPE_TILE_BACKEND)
@@ -154,20 +154,20 @@ static gint file_size = 0;
static gint peak_allocs = 0;
static gint peak_file_size = 0;
-static GQueue queue = G_QUEUE_INIT;
-static GMutex *queue_mutex = NULL;
-static GCond *queue_cond = NULL;
-static GCond *max_cond = NULL;
-static GMutex *write_mutex = NULL;
+static GQueue queue = G_QUEUE_INIT;
+static GMutex *mutex = NULL;
+static GCond *queue_cond = NULL;
+static GCond *max_cond = NULL;
+static GeglFileBackendThreadParams *in_progress;
static void
gegl_tile_backend_file_finish_writing (GeglTileBackendFile *self)
{
- g_mutex_lock (queue_mutex);
+ g_mutex_lock (mutex);
while (self->pending_ops != 0)
- g_cond_wait (self->cond, queue_mutex);
- g_mutex_unlock (queue_mutex);
+ g_cond_wait (self->cond, mutex);
+ g_mutex_unlock (mutex);
}
static void
@@ -175,24 +175,29 @@ gegl_tile_backend_file_push_queue (GeglFileBackendThreadParams *params)
{
guint length;
- g_mutex_lock (queue_mutex);
+ g_mutex_lock (mutex);
length = g_queue_get_length (&queue);
/* block if the queue has gotten too big */
if (length > gegl_config ()->queue_limit)
- g_cond_wait (max_cond, queue_mutex);
+ g_cond_wait (max_cond, mutex);
params->file->pending_ops += 1;
g_queue_push_tail (&queue, params);
if (params->entry)
- params->entry->link = g_queue_peek_tail_link (&queue);
+ {
+ if (params->operation == OP_WRITE)
+ params->entry->tile_link = g_queue_peek_tail_link (&queue);
+ else /* OP_WRITE_BLOCK */
+ params->entry->block_link = g_queue_peek_tail_link (&queue);
+ }
if (length == 0) /* wake up the writer thread */
g_cond_signal (queue_cond);
- g_mutex_unlock (queue_mutex);
+ g_mutex_unlock (mutex);
}
static inline void
@@ -240,27 +245,30 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
{
GeglFileBackendThreadParams *params;
- g_mutex_lock (queue_mutex);
+ g_mutex_lock (mutex);
while (g_queue_is_empty (&queue))
- g_cond_wait (queue_cond, queue_mutex);
+ g_cond_wait (queue_cond, mutex);
- g_mutex_lock (write_mutex);
- if (g_queue_is_empty (&queue))
+ params = (GeglFileBackendThreadParams *)g_queue_pop_head (&queue);
+ if (params->entry)
{
- g_mutex_unlock (queue_mutex);
- g_mutex_unlock (write_mutex);
- continue;
+ in_progress = params;
+ if (params->operation == OP_WRITE)
+ params->entry->tile_link = NULL;
+ else /* OP_WRITE_BLOCK */
+ params->entry->block_link = NULL;
}
-
- params = (GeglFileBackendThreadParams *)g_queue_peek_head (&queue);
- g_mutex_unlock (queue_mutex);
+ g_mutex_unlock (mutex);
switch (params->operation)
{
case OP_WRITE:
gegl_tile_backend_file_write (params);
break;
+ case OP_WRITE_BLOCK:
+ gegl_tile_backend_file_write (params);
+ break;
case OP_TRUNCATE:
ftruncate (params->file->o, params->length);
break;
@@ -269,8 +277,8 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
break;
}
- g_mutex_lock (queue_mutex);
- g_queue_pop_head (&queue);
+ g_mutex_lock (mutex);
+ in_progress = NULL;
/* the file maybe waiting for its file operations to finish */
params->file->pending_ops -= 1;
@@ -281,16 +289,12 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
if (g_queue_get_length (&queue) < gegl_config ()->queue_limit)
g_cond_signal (max_cond);
- if (params->entry)
- params->entry->link = NULL;
-
- if (params->operation == OP_WRITE)
+ if (params->operation == OP_WRITE || params->operation == OP_WRITE_BLOCK)
g_free (params->source);
g_free (params);
- g_mutex_unlock (queue_mutex);
- g_mutex_unlock (write_mutex);
+ g_mutex_unlock (mutex);
}
return NULL;
@@ -307,17 +311,24 @@ gegl_tile_backend_file_entry_read (GeglTileBackendFile *self,
gegl_tile_backend_file_ensure_exist (self);
- g_mutex_lock (queue_mutex);
- if (entry->link)
+ if (entry->tile_link)
{
- memcpy (dest,
- ((GeglFileBackendThreadParams *)entry->link->data)->source,
- to_be_read);
- g_mutex_unlock (queue_mutex);
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
- return;
+ g_mutex_lock (mutex);
+
+ if (entry->tile_link)
+ {
+ memcpy (dest,
+ ((GeglFileBackendThreadParams *)entry->tile_link->data)->source,
+ to_be_read);
+ g_mutex_unlock (mutex);
+
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from queue", entry->tile->x, entry->tile->y, entry->tile->z);
+
+ return;
+ }
+
+ g_mutex_unlock (mutex);
}
- g_mutex_unlock (queue_mutex);
if (self->in_offset != offset)
{
@@ -360,19 +371,22 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile *self,
gegl_tile_backend_file_ensure_exist (self);
- if (entry->link)
+ if (entry->tile_link)
{
- g_mutex_lock (write_mutex);
+ g_mutex_lock (mutex);
- if (entry->link)
+ if (entry->tile_link)
{
- params = (GeglFileBackendThreadParams *)entry->link->data;
+ params = entry->tile_link->data;
memcpy (params->source, source, length);
- g_mutex_unlock (write_mutex);
+ g_mutex_unlock (mutex);
+
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "overwrote queue entry %i,%i,%i at %i", entry->tile->x, entry->tile->y, entry->tile->z, (gint)entry->tile->offset);
+
return;
}
- g_mutex_unlock (write_mutex);
+ g_mutex_unlock (mutex);
}
new_source = g_malloc (length);
@@ -398,8 +412,9 @@ gegl_tile_backend_file_file_entry_create (gint x,
{
GeglFileBackendEntry *entry = g_new0 (GeglFileBackendEntry, 1);
- entry->tile = gegl_tile_entry_new (x, y, z);
- entry->link = NULL;
+ entry->tile = gegl_tile_entry_new (x, y, z);
+ entry->tile_link = NULL;
+ entry->block_link = NULL;
return entry;
}
@@ -462,27 +477,28 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
/* XXX: EEEk, throwing away bits */
guint offset = entry->tile->offset;
- /* Write mutex is used to completely stop the writer thread from working,
- * which is required to remove the entry from the queue. entry->link is
- * checked twice because while waiting for the mutex the writer thread
- * may have already removed it. If we would lock the mutex before the
- * first check we would spend too much time since it's not that common to
- * have the entry in queue.
- */
- if (entry->link)
+ if (entry->tile_link || entry->block_link)
{
- g_mutex_lock (write_mutex);
+ gint i;
+ GList *link;
- if (entry->link)
+ g_mutex_lock (mutex);
+
+ for (i = 0, link = entry->tile_link;
+ i < 2;
+ i++, link = entry->block_link)
{
- GeglFileBackendThreadParams *queued_op = entry->link->data;
- queued_op->file->pending_ops -= 1;
- g_queue_delete_link (&queue, entry->link);
- g_free (queued_op->source);
- g_free (queued_op);
+ if (link)
+ {
+ GeglFileBackendThreadParams *queued_op = link->data;
+ queued_op->file->pending_ops -= 1;
+ g_queue_delete_link (&queue, link);
+ g_free (queued_op->source);
+ g_free (queued_op);
+ }
}
- g_mutex_unlock (write_mutex);
+ g_mutex_unlock (mutex);
}
self->free_list = g_slist_prepend (self->free_list,
@@ -490,6 +506,7 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
g_hash_table_remove (self->index, entry);
gegl_tile_backend_file_dbg_dealloc (gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
+
g_free (entry->tile);
g_free (entry);
}
@@ -524,47 +541,72 @@ gegl_tile_backend_file_write_header (GeglTileBackendFile *self)
}
static gboolean
-gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
- GeglBufferBlock *block)
+gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
+ GeglFileBackendEntry *item)
{
gegl_tile_backend_file_ensure_exist (self);
if (self->in_holding)
{
- guint64 next_allocation = self->offset + self->in_holding->length;
+ GeglFileBackendThreadParams *params;
+ GeglBufferBlock *block = &(self->in_holding->tile->block);
+ guint64 next_allocation = self->offset + block->length;
+ gint length = block->length;
+ guchar *new_source;
/* update the next offset pointer in the previous block */
- if (block == NULL)
+ if (item == NULL)
/* the previous block was the last block */
- self->in_holding->next = 0;
+ block->next = 0;
else
- self->in_holding->next = next_allocation;
+ block->next = next_allocation;
/* XXX: should promiscuosuly try to compress here as well,. if revisions
are not matching..
*/
- {
- GeglFileBackendThreadParams *params =
- g_new0 (GeglFileBackendThreadParams, 1);
- gint length = self->in_holding->length;
- guchar *new_source = g_malloc (length);
+ if (self->in_holding->block_link)
+ {
+ g_mutex_lock (mutex);
+
+ if (self->in_holding->block_link)
+ {
+ params = self->in_holding->block_link->data;
+ params->offset = self->offset;
+ memcpy (params->source, block, length);
+ g_mutex_unlock (mutex);
+
+ self->offset = next_allocation;
+ self->in_holding = item;
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Overwrote queue block: length:%i flags:%i next:%i at offset %i",
+ block->length,
+ block->flags,
+ (gint)block->next,
+ (gint)self->offset);
+ return TRUE;
+ }
+
+ g_mutex_unlock (mutex);
+ }
+
+ params = g_new0 (GeglFileBackendThreadParams, 1);
+ new_source = g_malloc (length);
- memcpy (new_source, self->in_holding, length);
+ memcpy (new_source, self->in_holding, length);
- params->operation = OP_WRITE;
- params->length = length;
- params->file = self;
- params->offset = self->offset;
- params->source = new_source;
+ params->operation = OP_WRITE_BLOCK;
+ params->length = length;
+ params->file = self;
+ params->offset = self->offset;
+ params->source = new_source;
+ params->entry = self->in_holding;
- gegl_tile_backend_file_push_queue (params);
+ gegl_tile_backend_file_push_queue (params);
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Pushed write of block: length:%i flags:%i next:%i at offset %i",
- self->in_holding->length,
- self->in_holding->flags,
- (gint)self->in_holding->next,
- (gint)self->offset);
- }
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Pushed write of block: length:%i flags:%i next:%i at offset %i",
+ block->length,
+ block->flags,
+ (gint)block->next,
+ (gint)self->offset);
self->offset = next_allocation;
}
@@ -576,7 +618,7 @@ gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
* header inside free list later
*/
}
- self->in_holding = block;
+ self->in_holding = item;
return TRUE;
}
@@ -758,7 +800,7 @@ gegl_tile_backend_file_flush (GeglTileSource *source,
{
GeglFileBackendEntry *item = iter->data;
- gegl_tile_backend_file_write_block (self, &(item->tile->block));
+ gegl_tile_backend_file_write_block (self, item);
}
gegl_tile_backend_file_write_block (self, NULL); /* terminate the index */
g_list_free (tiles);
@@ -1176,10 +1218,9 @@ gegl_tile_backend_file_class_init (GeglTileBackendFileClass *klass)
gobject_class->constructor = gegl_tile_backend_file_constructor;
gobject_class->finalize = gegl_tile_backend_file_finalize;
- queue_cond = g_cond_new ();
- max_cond = g_cond_new ();
- queue_mutex = g_mutex_new ();
- write_mutex = g_mutex_new ();
+ queue_cond = g_cond_new ();
+ max_cond = g_cond_new ();
+ mutex = g_mutex_new ();
g_thread_create_full (gegl_tile_backend_file_writer_thread,
NULL, 0, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL, NULL);
diff --git a/gegl/buffer/gegl-tile-backend-file.h b/gegl/buffer/gegl-tile-backend-file.h
index b9366e9..00ba5fc 100644
--- a/gegl/buffer/gegl-tile-backend-file.h
+++ b/gegl/buffer/gegl-tile-backend-file.h
@@ -42,6 +42,7 @@ typedef struct _GeglTileBackendFileClass GeglTileBackendFileClass;
typedef enum
{
OP_WRITE,
+ OP_WRITE_BLOCK,
OP_TRUNCATE,
OP_SYNC
} GeglFileBackendThreadOp;
@@ -49,8 +50,10 @@ typedef enum
typedef struct
{
GeglBufferTile *tile;
- /* reference to the writer queue link of this entry */
- GList *link;
+ /* reference to the writer queue links of this entry when writing
+ tile data or a GeglBufferBlock*/
+ GList *tile_link;
+ GList *block_link;
} GeglFileBackendEntry;
typedef struct
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]