[gegl/threaded-base-classes: 1/13] Make GeglOperation base classes threadable
- From: Øyvind Kolås <ok src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gegl/threaded-base-classes: 1/13] Make GeglOperation base classes threadable
- Date: Thu, 26 Jun 2014 22:09:00 +0000 (UTC)
commit e2a5bd04bf95e7f139f4a9a3c47f41c89327c4d3
Author: Øyvind Kolås <pippin gimp org>
Date: Tue Jun 24 01:54:20 2014 +0200
Make GeglOperation base classes threadable
All ops are force opted in by now; some give broken results like gaussian
blur; others crash. Point filters/composers ops seem to work correctly -
but might be limited by memory bandwidth.
gegl/gegl-types.h | 1 +
gegl/graph/gegl-node-private.h | 2 +-
gegl/operation/gegl-operation-composer.c | 92 ++++++++++++++++++++++++++++-
gegl/operation/gegl-operation-composer3.c | 94 ++++++++++++++++++++++++++++-
gegl/operation/gegl-operation-filter.c | 91 +++++++++++++++++++++++++++-
gegl/operation/gegl-operation-source.c | 91 +++++++++++++++++++++++++++-
gegl/operation/gegl-operation.h | 3 +-
7 files changed, 366 insertions(+), 8 deletions(-)
---
diff --git a/gegl/gegl-types.h b/gegl/gegl-types.h
index 714bdd3..8442911 100644
--- a/gegl/gegl-types.h
+++ b/gegl/gegl-types.h
@@ -25,6 +25,7 @@
G_BEGIN_DECLS
#define GEGL_AUTO_ROWSTRIDE 0
+#define GEGL_MAX_THREADS 16
typedef enum
{
diff --git a/gegl/graph/gegl-node-private.h b/gegl/graph/gegl-node-private.h
index 446a0bb..d80fce5 100644
--- a/gegl/graph/gegl-node-private.h
+++ b/gegl/graph/gegl-node-private.h
@@ -26,6 +26,7 @@
G_BEGIN_DECLS
+
#define GEGL_NODE_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GEGL_TYPE_NODE, GeglNodeClass))
#define GEGL_IS_NODE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GEGL_TYPE_NODE))
#define GEGL_NODE_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GEGL_TYPE_NODE, GeglNodeClass))
@@ -131,7 +132,6 @@ void
gegl_node_emit_computed (GeglNode *node,
const GeglRectangle *rect);
-#define GEGL_MAX_THREADS 16
G_END_DECLS
diff --git a/gegl/operation/gegl-operation-composer.c b/gegl/operation/gegl-operation-composer.c
index 739e914..de6beca 100644
--- a/gegl/operation/gegl-operation-composer.c
+++ b/gegl/operation/gegl-operation-composer.c
@@ -24,6 +24,7 @@
#include "gegl.h"
#include "gegl-operation-composer.h"
#include "gegl-operation-context.h"
+#include "gegl-config.h"
static gboolean gegl_operation_composer_process (GeglOperation *operation,
GeglOperationContext *context,
@@ -95,6 +96,39 @@ attach (GeglOperation *self)
g_param_spec_sink (pspec);
}
+typedef struct ThreadData
+{
+ GeglOperationComposerClass *klass;
+ GeglOperation *operation;
+ GeglBuffer *input;
+ GeglBuffer *aux;
+ GeglBuffer *output;
+ gint *pending;
+ gint level;
+ gboolean success;
+ GeglRectangle roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+ ThreadData *data = thread_data;
+ if (!data->klass->process (data->operation,
+ data->input, data->aux, data->output, &data->roi, data->level))
+ data->success = FALSE;
+ g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+ static GThreadPool *pool = NULL;
+ if (!pool)
+ {
+ pool = g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+ FALSE, NULL);
+ }
+ return pool;
+}
+
static gboolean
gegl_operation_composer_process (GeglOperation *operation,
GeglOperationContext *context,
@@ -103,6 +137,7 @@ gegl_operation_composer_process (GeglOperation *operation,
gint level)
{
GeglOperationComposerClass *klass = GEGL_OPERATION_COMPOSER_GET_CLASS (operation);
+ GeglOperationClass *op_class = GEGL_OPERATION_CLASS (klass);
GeglBuffer *input;
GeglBuffer *aux;
GeglBuffer *output;
@@ -124,7 +159,62 @@ gegl_operation_composer_process (GeglOperation *operation,
if (input != NULL ||
aux != NULL)
{
- success = klass->process (operation, input, aux, output, result, level);
+// success = klass->process (operation, input, aux, output, result, level);
+
+ op_class->parallelize = 1;
+ if (op_class->parallelize && result->width * result->height > 64*64)
+ {
+ GThreadPool *pool = thread_pool ();
+ gint threads = gegl_config ()->threads;
+ ThreadData thread_data[GEGL_MAX_THREADS];
+ gint pending = threads;
+
+ if (result->width > result->height)
+ {
+ gint bit = result->width / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.y = result->y;
+ thread_data[j].roi.height = result->height;
+ thread_data[j].roi.x = result->x + bit * j;
+ thread_data[j].roi.width = bit;
+ }
+ thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+ }
+ else
+ {
+ gint bit = result->height / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.x = result->x;
+ thread_data[j].roi.width = result->width;
+ thread_data[j].roi.y = result->y + bit * j;
+ thread_data[j].roi.height = bit;
+ }
+ thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+ }
+ for (gint i = 0; i < threads; i++)
+ {
+ thread_data[i].klass = klass;
+ thread_data[i].operation = operation;
+ thread_data[i].input = input;
+ thread_data[i].aux = aux;
+ thread_data[i].output = output;
+ thread_data[i].pending = &pending;
+ thread_data[i].level = level;
+ thread_data[i].success = TRUE;
+ }
+
+ for (gint i = 1; i < threads; i++)
+ g_thread_pool_push (pool, &thread_data[i], NULL);
+ thread_process (&thread_data[0], NULL);
+
+ while (pending > 0) {g_usleep(3);};
+ }
+ else
+ {
+ success = klass->process (operation, input, aux, output, result, level);
+ }
if (input)
g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-composer3.c b/gegl/operation/gegl-operation-composer3.c
index 339c813..8130702 100644
--- a/gegl/operation/gegl-operation-composer3.c
+++ b/gegl/operation/gegl-operation-composer3.c
@@ -24,6 +24,7 @@
#include "gegl.h"
#include "gegl-operation-composer3.h"
#include "gegl-operation-context.h"
+#include "gegl-config.h"
static gboolean gegl_operation_composer3_process
(GeglOperation *operation,
@@ -105,6 +106,42 @@ attach (GeglOperation *self)
g_param_spec_sink (pspec);
}
+typedef struct ThreadData
+{
+ GeglOperationComposer3Class *klass;
+ GeglOperation *operation;
+ GeglBuffer *input;
+ GeglBuffer *aux;
+ GeglBuffer *aux2;
+ GeglBuffer *output;
+ gint *pending;
+ gint level;
+ gboolean success;
+ GeglRectangle roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+ ThreadData *data = thread_data;
+ if (!data->klass->process (data->operation,
+ data->input, data->aux, data->aux2,
+ data->output, &data->roi, data->level))
+ data->success = FALSE;
+ g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+ static GThreadPool *pool = NULL;
+ if (!pool)
+ {
+ pool = g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+ FALSE, NULL);
+ }
+ return pool;
+}
+
+
static gboolean
gegl_operation_composer3_process (GeglOperation *operation,
GeglOperationContext *context,
@@ -113,6 +150,7 @@ gegl_operation_composer3_process (GeglOperation *operation,
gint level)
{
GeglOperationComposer3Class *klass = GEGL_OPERATION_COMPOSER3_GET_CLASS (operation);
+ GeglOperationClass *op_class = GEGL_OPERATION_CLASS (klass);
GeglBuffer *input;
GeglBuffer *aux;
GeglBuffer *aux2;
@@ -137,7 +175,61 @@ gegl_operation_composer3_process (GeglOperation *operation,
aux != NULL ||
aux2 != NULL)
{
- success = klass->process (operation, input, aux, aux2, output, result, level);
+ op_class->parallelize = 1;
+ if (op_class->parallelize && result->width * result->height > 64*64)
+ {
+ GThreadPool *pool = thread_pool ();
+ gint threads = gegl_config ()->threads;
+ ThreadData thread_data[GEGL_MAX_THREADS];
+ gint pending = threads;
+
+ if (result->width > result->height)
+ {
+ gint bit = result->width / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.y = result->y;
+ thread_data[j].roi.height = result->height;
+ thread_data[j].roi.x = result->x + bit * j;
+ thread_data[j].roi.width = bit;
+ }
+ thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+ }
+ else
+ {
+ gint bit = result->height / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.x = result->x;
+ thread_data[j].roi.width = result->width;
+ thread_data[j].roi.y = result->y + bit * j;
+ thread_data[j].roi.height = bit;
+ }
+ thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+ }
+ for (gint i = 0; i < threads; i++)
+ {
+ thread_data[i].klass = klass;
+ thread_data[i].operation = operation;
+ thread_data[i].input = input;
+ thread_data[i].aux = aux;
+ thread_data[i].aux2 = aux2;
+ thread_data[i].output = output;
+ thread_data[i].pending = &pending;
+ thread_data[i].level = level;
+ thread_data[i].success = TRUE;
+ }
+
+ for (gint i = 1; i < threads; i++)
+ g_thread_pool_push (pool, &thread_data[i], NULL);
+ thread_process (&thread_data[0], NULL);
+
+ while (pending > 0) {g_usleep(3);};
+ }
+ else
+ {
+ success = klass->process (operation, input, aux, aux2, output, result, level);
+ }
if (input)
g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-filter.c b/gegl/operation/gegl-operation-filter.c
index f3c924c..ea3e9bf 100644
--- a/gegl/operation/gegl-operation-filter.c
+++ b/gegl/operation/gegl-operation-filter.c
@@ -13,7 +13,7 @@
* You should have received a copy of the GNU Lesser General Public
* License along with GEGL; if not, see <http://www.gnu.org/licenses/>.
*
- * Copyright 2006 Øyvind Kolås
+ * Copyright 2006, 2014 Øyvind Kolås
*/
#include "config.h"
@@ -24,6 +24,7 @@
#include "gegl.h"
#include "gegl-operation-filter.h"
#include "gegl-operation-context.h"
+#include "gegl-config.h"
static gboolean gegl_operation_filter_process
(GeglOperation *operation,
@@ -103,6 +104,38 @@ detect (GeglOperation *operation,
return operation->node;
}
+typedef struct ThreadData
+{
+ GeglOperationFilterClass *klass;
+ GeglOperation *operation;
+ GeglBuffer *input;
+ GeglBuffer *output;
+ gint *pending;
+ gint level;
+ gboolean success;
+ GeglRectangle roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+ ThreadData *data = thread_data;
+ if (!data->klass->process (data->operation,
+ data->input, data->output, &data->roi, data->level))
+ data->success = FALSE;
+ g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+ static GThreadPool *pool = NULL;
+ if (!pool)
+ {
+ pool = g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+ FALSE, NULL);
+ }
+ return pool;
+}
+
static gboolean
gegl_operation_filter_process (GeglOperation *operation,
GeglOperationContext *context,
@@ -111,11 +144,13 @@ gegl_operation_filter_process (GeglOperation *operation,
gint level)
{
GeglOperationFilterClass *klass;
+ GeglOperationClass *op_class;
GeglBuffer *input;
GeglBuffer *output;
gboolean success = FALSE;
klass = GEGL_OPERATION_FILTER_GET_CLASS (operation);
+ op_class = GEGL_OPERATION_CLASS (klass);
g_assert (klass->process);
@@ -128,7 +163,59 @@ gegl_operation_filter_process (GeglOperation *operation,
input = gegl_operation_context_get_source (context, "input");
output = gegl_operation_context_get_target (context, "output");
- success = klass->process (operation, input, output, result, level);
+ op_class->parallelize = 1;
+ if (op_class->parallelize && result->width * result->height > 64*64)
+ {
+ GThreadPool *pool = thread_pool ();
+ gint threads = gegl_config ()->threads;
+ ThreadData thread_data[GEGL_MAX_THREADS];
+ gint pending = threads;
+
+ if (result->width > result->height)
+ {
+ gint bit = result->width / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.y = result->y;
+ thread_data[j].roi.height = result->height;
+ thread_data[j].roi.x = result->x + bit * j;
+ thread_data[j].roi.width = bit;
+ }
+ thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+ }
+ else
+ {
+ gint bit = result->height / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.x = result->x;
+ thread_data[j].roi.width = result->width;
+ thread_data[j].roi.y = result->y + bit * j;
+ thread_data[j].roi.height = bit;
+ }
+ thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+ }
+ for (gint i = 0; i < threads; i++)
+ {
+ thread_data[i].klass = klass;
+ thread_data[i].operation = operation;
+ thread_data[i].input = input;
+ thread_data[i].output = output;
+ thread_data[i].pending = &pending;
+ thread_data[i].level = level;
+ thread_data[i].success = TRUE;
+ }
+
+ for (gint i = 1; i < threads; i++)
+ g_thread_pool_push (pool, &thread_data[i], NULL);
+ thread_process (&thread_data[0], NULL);
+
+ while (pending > 0) {g_usleep(3);};
+ }
+ else
+ {
+ success = klass->process (operation, input, output, result, level);
+ }
if (input != NULL)
g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-source.c b/gegl/operation/gegl-operation-source.c
index 38a3b9c..ad102ec 100644
--- a/gegl/operation/gegl-operation-source.c
+++ b/gegl/operation/gegl-operation-source.c
@@ -24,6 +24,7 @@
#include "gegl.h"
#include "gegl-operation-source.h"
#include "gegl-operation-context.h"
+#include "gegl-config.h"
static gboolean gegl_operation_source_process
(GeglOperation *operation,
@@ -79,6 +80,37 @@ attach (GeglOperation *self)
g_param_spec_sink (pspec);
}
+typedef struct ThreadData
+{
+ GeglOperationSourceClass *klass;
+ GeglOperation *operation;
+ GeglBuffer *output;
+ gint *pending;
+ gint level;
+ gboolean success;
+ GeglRectangle roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+ ThreadData *data = thread_data;
+ if (!data->klass->process (data->operation,
+ data->output, &data->roi, data->level))
+ data->success = FALSE;
+ g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+ static GThreadPool *pool = NULL;
+ if (!pool)
+ {
+ pool = g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+ FALSE, NULL);
+ }
+ return pool;
+}
+
static gboolean
gegl_operation_source_process (GeglOperation *operation,
GeglOperationContext *context,
@@ -87,8 +119,11 @@ gegl_operation_source_process (GeglOperation *operation,
gint level)
{
GeglOperationSourceClass *klass = GEGL_OPERATION_SOURCE_GET_CLASS (operation);
+ GeglOperationClass *op_class;
GeglBuffer *output;
- gboolean success;
+ gboolean success = FALSE;
+
+ op_class = GEGL_OPERATION_CLASS (klass);
if (strcmp (output_prop, "output"))
{
@@ -98,7 +133,59 @@ gegl_operation_source_process (GeglOperation *operation,
g_assert (klass->process);
output = gegl_operation_context_get_target (context, "output");
- success = klass->process (operation, output, result, level);
+
+ op_class->parallelize= 1;
+ if (op_class->parallelize && result->width * result->height > 64*64)
+ {
+ GThreadPool *pool = thread_pool ();
+ gint threads = gegl_config ()->threads;
+ ThreadData thread_data[GEGL_MAX_THREADS];
+ gint pending = threads;
+
+ if (result->width > result->height)
+ {
+ gint bit = result->width / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.y = result->y;
+ thread_data[j].roi.height = result->height;
+ thread_data[j].roi.x = result->x + bit * j;
+ thread_data[j].roi.width = bit;
+ }
+ thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+ }
+ else
+ {
+ gint bit = result->height / threads;
+ for (gint j = 0; j < threads; j++)
+ {
+ thread_data[j].roi.x = result->x;
+ thread_data[j].roi.width = result->width;
+ thread_data[j].roi.y = result->y + bit * j;
+ thread_data[j].roi.height = bit;
+ }
+ thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+ }
+ for (gint i = 0; i < threads; i++)
+ {
+ thread_data[i].klass = klass;
+ thread_data[i].operation = operation;
+ thread_data[i].output = output;
+ thread_data[i].pending = &pending;
+ thread_data[i].level = level;
+ thread_data[i].success = TRUE;
+ }
+
+ for (gint i = 1; i < threads; i++)
+ g_thread_pool_push (pool, &thread_data[i], NULL);
+ thread_process (&thread_data[0], NULL);
+
+ while (pending > 0) {g_usleep(3);};
+ }
+ else
+ {
+ success = klass->process (operation, output, result, level);
+ }
return success;
}
diff --git a/gegl/operation/gegl-operation.h b/gegl/operation/gegl-operation.h
index 45b582f..fb3b739 100644
--- a/gegl/operation/gegl-operation.h
+++ b/gegl/operation/gegl-operation.h
@@ -78,7 +78,8 @@ struct _GeglOperationClass
guint no_cache :1; /* do not create a cache for this operation */
guint opencl_support:1;
- guint64 bit_pad:62;
+ guint parallelize:1;
+ guint64 bit_pad:61;
/* attach this operation with a GeglNode, override this if you are creating a
* GeglGraph, it is already defined for Filters/Sources/Composers.
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]