[gegl] operation: use gegl_parallel_distribute_area() for parallelization
- From: Ell <ell src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gegl] operation: use gegl_parallel_distribute_area() for parallelization
- Date: Sat, 10 Nov 2018 20:57:11 +0000 (UTC)
commit f14b731d8d94060d766ea79feb2c60975dfdf26f
Author: Ell <ell_se yahoo com>
Date: Sat Nov 10 15:38:55 2018 -0500
operation: use gegl_parallel_distribute_area() for parallelization
In all operation types that support auto-threading, use
gegl_parallel_distribute_area(), added in the previous commit, to
parallelize processing, instead of local thread pools.
gegl/operation/gegl-operation-composer.c | 101 +++++++---------------
gegl/operation/gegl-operation-composer3.c | 101 +++++++---------------
gegl/operation/gegl-operation-filter.c | 109 +++++++-----------------
gegl/operation/gegl-operation-point-composer.c | 96 ++++++---------------
gegl/operation/gegl-operation-point-composer3.c | 103 ++++++----------------
gegl/operation/gegl-operation-point-filter.c | 86 +++++--------------
gegl/operation/gegl-operation-source.c | 83 +++++-------------
gegl/operation/gegl-operation.c | 10 ++-
gegl/operation/gegl-operation.h | 2 +
9 files changed, 199 insertions(+), 492 deletions(-)
---
diff --git a/gegl/operation/gegl-operation-composer.c b/gegl/operation/gegl-operation-composer.c
index 82a1eb92e..a9ff76047 100644
--- a/gegl/operation/gegl-operation-composer.c
+++ b/gegl/operation/gegl-operation-composer.c
@@ -102,43 +102,36 @@ typedef struct ThreadData
GeglOperationComposerClass *klass;
GeglOperation *operation;
GeglOperationContext *context;
+ GeglBuffer *input;
GeglBuffer *aux;
GeglBuffer *output;
- gint *pending;
+ const GeglRectangle *roi;
gint level;
gboolean success;
- GeglRectangle roi;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer input)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
+ GeglBuffer *input;
- if (! input)
+ if (area->x == data->roi->x && area->y == data->roi->y)
+ {
+ input = g_object_ref (data->input);
+ }
+ else
{
input = gegl_operation_context_dup_input_maybe_copy (data->context,
- "input", &data->roi);
+ "input", area);
}
if (!data->klass->process (data->operation,
input, data->aux, data->output,
- &data->roi, data->level))
+ area, data->level))
data->success = FALSE;
g_object_unref (input);
-
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
static gboolean
@@ -175,54 +168,26 @@ gegl_operation_composer_process (GeglOperation *operation,
{
if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
- GThreadPool *pool = thread_pool ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- gint pending = threads;
-
- if (result->width > result->height)
- {
- gint bit = result->width / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.y = result->y;
- thread_data[j].roi.height = result->height;
- thread_data[j].roi.x = result->x + bit * j;
- thread_data[j].roi.width = bit;
- }
- thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
- }
- else
- {
- gint bit = result->height / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.x = result->x;
- thread_data[j].roi.width = result->width;
- thread_data[j].roi.y = result->y + bit * j;
- thread_data[j].roi.height = bit;
- }
- thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
- }
- for (gint i = 0; i < threads; i++)
- {
- thread_data[i].klass = klass;
- thread_data[i].operation = operation;
- thread_data[i].context = context;
- thread_data[i].aux = aux;
- thread_data[i].output = output;
- thread_data[i].pending = &pending;
- thread_data[i].level = level;
- thread_data[i].success = TRUE;
- }
-
- for (gint i = 1; i < threads; i++)
- g_thread_pool_push (pool, &thread_data[i], NULL);
- thread_process (&thread_data[0], g_object_ref (input));
-
- while (g_atomic_int_get (&pending)) {};
-
- success = thread_data[0].success;
+ ThreadData data;
+
+ data.klass = klass;
+ data.operation = operation;
+ data.context = context;
+ data.input = input;
+ data.aux = aux;
+ data.output = output;
+ data.roi = result;
+ data.level = level;
+ data.success = TRUE;
+
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ GEGL_SPLIT_STRATEGY_AUTO,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
+
+ success = data.success;
}
else
{
diff --git a/gegl/operation/gegl-operation-composer3.c b/gegl/operation/gegl-operation-composer3.c
index 5c2da7379..25269e8de 100644
--- a/gegl/operation/gegl-operation-composer3.c
+++ b/gegl/operation/gegl-operation-composer3.c
@@ -111,44 +111,37 @@ typedef struct ThreadData
GeglOperationComposer3Class *klass;
GeglOperation *operation;
GeglOperationContext *context;
+ GeglBuffer *input;
GeglBuffer *aux;
GeglBuffer *aux2;
GeglBuffer *output;
- gint *pending;
+ const GeglRectangle *roi;
gint level;
gboolean success;
- GeglRectangle roi;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer input)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
+ GeglBuffer *input;
- if (! input)
+ if (area->x == data->roi->x && area->y == data->roi->y)
+ {
+ input = g_object_ref (data->input);
+ }
+ else
{
input = gegl_operation_context_dup_input_maybe_copy (data->context,
- "input", &data->roi);
+ "input", area);
}
if (!data->klass->process (data->operation,
input, data->aux, data->aux2,
- data->output, &data->roi, data->level))
+ data->output, area, data->level))
data->success = FALSE;
g_object_unref (input);
-
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
@@ -196,55 +189,27 @@ gegl_operation_composer3_process (GeglOperation *operation,
{
if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
- GThreadPool *pool = thread_pool ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- gint pending = threads;
-
- if (result->width > result->height)
- {
- gint bit = result->width / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.y = result->y;
- thread_data[j].roi.height = result->height;
- thread_data[j].roi.x = result->x + bit * j;
- thread_data[j].roi.width = bit;
- }
- thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
- }
- else
- {
- gint bit = result->height / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.x = result->x;
- thread_data[j].roi.width = result->width;
- thread_data[j].roi.y = result->y + bit * j;
- thread_data[j].roi.height = bit;
- }
- thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
- }
- for (gint i = 0; i < threads; i++)
- {
- thread_data[i].klass = klass;
- thread_data[i].operation = operation;
- thread_data[i].context = context;
- thread_data[i].aux = aux;
- thread_data[i].aux2 = aux2;
- thread_data[i].output = output;
- thread_data[i].pending = &pending;
- thread_data[i].level = level;
- thread_data[i].success = TRUE;
- }
-
- for (gint i = 1; i < threads; i++)
- g_thread_pool_push (pool, &thread_data[i], NULL);
- thread_process (&thread_data[0], g_object_ref (input));
-
- while (g_atomic_int_get (&pending)) {};
+ ThreadData data;
+
+ data.klass = klass;
+ data.operation = operation;
+ data.context = context;
+ data.input = input;
+ data.aux = aux;
+ data.aux2 = aux2;
+ data.output = output;
+ data.roi = result;
+ data.level = level;
+ data.success = TRUE;
+
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ GEGL_SPLIT_STRATEGY_AUTO,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
- success = thread_data[0].success;
+ success = data.success;
}
else
{
diff --git a/gegl/operation/gegl-operation-filter.c b/gegl/operation/gegl-operation-filter.c
index 92da722a1..a5c7af3ec 100644
--- a/gegl/operation/gegl-operation-filter.c
+++ b/gegl/operation/gegl-operation-filter.c
@@ -120,41 +120,34 @@ typedef struct ThreadData
GeglOperationFilterClass *klass;
GeglOperation *operation;
GeglOperationContext *context;
+ GeglBuffer *input;
GeglBuffer *output;
- gint *pending;
+ const GeglRectangle *roi;
gint level;
gboolean success;
- GeglRectangle roi;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer input)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
+ GeglBuffer *input;
- if (! input)
+ if (area->x == data->roi->x && area->y == data->roi->y)
+ {
+ input = g_object_ref (data->input);
+ }
+ else
{
input = gegl_operation_context_dup_input_maybe_copy (data->context,
- "input", &data->roi);
+ "input", area);
}
if (!data->klass->process (data->operation,
- input, data->output, &data->roi, data->level))
+ input, data->output, area, data->level))
data->success = FALSE;
g_object_unref (input);
-
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
static gboolean
@@ -187,11 +180,8 @@ gegl_operation_filter_process (GeglOperation *operation,
if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
+ ThreadData data;
GeglSplitStrategy split_strategy = GEGL_SPLIT_STRATEGY_AUTO;
- GThreadPool *pool = thread_pool ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- gint pending = threads;
if (klass->get_split_strategy)
{
@@ -199,60 +189,23 @@ gegl_operation_filter_process (GeglOperation *operation,
output_prop, result, level);
}
- if (split_strategy == GEGL_SPLIT_STRATEGY_AUTO)
- {
- if (result->width > result->height)
- split_strategy = GEGL_SPLIT_STRATEGY_VERTICAL;
- else
- split_strategy = GEGL_SPLIT_STRATEGY_HORIZONTAL;
- }
-
- if (split_strategy == GEGL_SPLIT_STRATEGY_VERTICAL)
- {
- gint bit = result->width / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.y = result->y;
- thread_data[j].roi.height = result->height;
- thread_data[j].roi.x = result->x + bit * j;
- thread_data[j].roi.width = bit;
- }
- thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
- }
- else if (split_strategy == GEGL_SPLIT_STRATEGY_HORIZONTAL)
- {
- gint bit = result->height / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.x = result->x;
- thread_data[j].roi.width = result->width;
- thread_data[j].roi.y = result->y + bit * j;
- thread_data[j].roi.height = bit;
- }
- thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
- }
- else
- {
- g_return_val_if_reached (FALSE);
- }
- for (gint i = 0; i < threads; i++)
- {
- thread_data[i].klass = klass;
- thread_data[i].operation = operation;
- thread_data[i].context = context;
- thread_data[i].output = output;
- thread_data[i].pending = &pending;
- thread_data[i].level = level;
- thread_data[i].success = TRUE;
- }
-
- for (gint i = 1; i < threads; i++)
- g_thread_pool_push (pool, &thread_data[i], NULL);
- thread_process (&thread_data[0], g_object_ref (input));
-
- while (g_atomic_int_get (&pending)) {};
-
- success = thread_data[0].success;
+ data.klass = klass;
+ data.operation = operation;
+ data.context = context;
+ data.input = input;
+ data.output = output;
+ data.roi = result;
+ data.level = level;
+ data.success = TRUE;
+
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ split_strategy,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
+
+ success = data.success;
}
else
{
diff --git a/gegl/operation/gegl-operation-point-composer.c b/gegl/operation/gegl-operation-point-composer.c
index bacc14688..eb75e151c 100644
--- a/gegl/operation/gegl-operation-point-composer.c
+++ b/gegl/operation/gegl-operation-point-composer.c
@@ -46,23 +46,22 @@ typedef struct ThreadData
GeglBuffer *input;
GeglBuffer *aux;
GeglBuffer *output;
- gint *pending;
gint level;
gboolean success;
- GeglRectangle result;
const Babl *input_format;
const Babl *aux_format;
const Babl *output_format;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
gint read = 0;
gint aux = 0;
GeglBufferIterator *i = gegl_buffer_iterator_new (data->output,
- &data->result,
+ area,
data->level,
data->output_format,
GEGL_ACCESS_WRITE,
@@ -70,11 +69,11 @@ static void thread_process (gpointer thread_data, gpointer unused)
4);
if (data->input)
- read = gegl_buffer_iterator_add (i, data->input, &data->result, data->level,
+ read = gegl_buffer_iterator_add (i, data->input, area, data->level,
data->input_format,
GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
if (data->aux)
- aux = gegl_buffer_iterator_add (i, data->aux, &data->result, data->level,
+ aux = gegl_buffer_iterator_add (i, data->aux, area, data->level,
data->aux_format,
GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
@@ -85,19 +84,6 @@ static void thread_process (gpointer thread_data, gpointer unused)
data->aux?i->items[aux].data:NULL,
i->items[0].data, i->length, &(i->items[0].roi), data->level);
}
-
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
static gboolean
@@ -321,54 +307,19 @@ gegl_operation_point_composer_process (GeglOperation *operation,
return TRUE;
}
- if (gegl_operation_use_threading (operation, result) && result->height > 1)
+ if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- GThreadPool *pool = thread_pool ();
- gint pending;
- gint j;
-
- if (result->width > result->height)
- for (j = 0; j < threads; j++)
- {
- GeglRectangle rect = *result;
- rect.width /= threads;
- rect.x += rect.width * j;
-
- if (j == threads-1)
- rect.width = (result->width + result->x) - rect.x;
-
- thread_data[j].result = rect;
- }
- else
- for (j = 0; j < threads; j++)
- {
- GeglRectangle rect = *result;
- rect = *result;
- rect.height /= threads;
- rect.y += rect.height * j;
-
- if (j == threads-1)
- rect.height = (result->height + result->y) - rect.y;
-
- thread_data[j].result = rect;
- }
-
- for (j = 0; j < threads; j++)
- {
- thread_data[j].klass = point_composer_class;
- thread_data[j].operation = operation;
- thread_data[j].input = input;
- thread_data[j].aux = aux;
- thread_data[j].output = output;
- thread_data[j].pending = &pending;
- thread_data[j].level = level;
- thread_data[j].input_format = in_format;
- thread_data[j].aux_format = aux_format;
- thread_data[j].output_format = out_format;
- }
- pending = threads;
+ ThreadData data;
+
+ data.klass = point_composer_class;
+ data.operation = operation;
+ data.input = input;
+ data.aux = aux;
+ data.output = output;
+ data.level = level;
+ data.input_format = in_format;
+ data.aux_format = aux_format;
+ data.output_format = out_format;
if (gegl_cl_is_accelerated ())
{
@@ -378,11 +329,12 @@ gegl_operation_point_composer_process (GeglOperation *operation,
gegl_buffer_flush_ext (aux, result);
}
- for (gint j = 1; j < threads; j++)
- g_thread_pool_push (pool, &thread_data[j], NULL);
-
- thread_process (&thread_data[0], NULL);
- while (g_atomic_int_get (&pending)) {};
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ GEGL_SPLIT_STRATEGY_AUTO,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
return TRUE;
}
diff --git a/gegl/operation/gegl-operation-point-composer3.c b/gegl/operation/gegl-operation-point-composer3.c
index ebfcd22a5..56279396f 100644
--- a/gegl/operation/gegl-operation-point-composer3.c
+++ b/gegl/operation/gegl-operation-point-composer3.c
@@ -42,10 +42,8 @@ typedef struct ThreadData
GeglBuffer *aux;
GeglBuffer *aux2;
GeglBuffer *output;
- gint *pending;
gint level;
gboolean success;
- GeglRectangle result;
const Babl *input_format;
const Babl *aux_format;
@@ -53,29 +51,30 @@ typedef struct ThreadData
const Babl *output_format;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
gint read = 0;
gint aux = 0;
gint aux2 = 0;
GeglBufferIterator *i = gegl_buffer_iterator_new (data->output,
- &data->result,
+ area,
data->level,
data->output_format,
GEGL_ACCESS_WRITE,
GEGL_ABYSS_NONE, 4);
if (data->input)
- read = gegl_buffer_iterator_add (i, data->input, &data->result, data->level,
+ read = gegl_buffer_iterator_add (i, data->input, area, data->level,
data->input_format,
GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
if (data->aux)
- aux = gegl_buffer_iterator_add (i, data->aux, &data->result, data->level,
+ aux = gegl_buffer_iterator_add (i, data->aux, area, data->level,
data->aux_format,
GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
if (data->aux2)
- aux2 = gegl_buffer_iterator_add (i, data->aux2, &data->result, data->level,
+ aux2 = gegl_buffer_iterator_add (i, data->aux2, area, data->level,
data->aux2_format,
GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
@@ -88,19 +87,6 @@ static void thread_process (gpointer thread_data, gpointer unused)
data->aux2?i->items[aux2].data:NULL,
i->items[0].data, i->length, &(i->items[0].roi), data->level);
}
-
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
static gboolean
@@ -235,58 +221,21 @@ gegl_operation_point_composer3_process (GeglOperation *operation,
if ((result->width > 0) && (result->height > 0))
{
- if (gegl_operation_use_threading (operation, result) && result->height > 1)
+ if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- GThreadPool *pool = thread_pool ();
- gint pending;
- gint j;
-
- if (result->width > result->height)
- for (j = 0; j < threads; j++)
- {
- GeglRectangle rect = *result;
-
- rect.width /= threads;
- rect.x += rect.width * j;
-
- if (j == threads-1)
- rect.width = (result->width + result->x) - rect.x;
-
- thread_data[j].result = rect;
- }
- else
- for (j = 0; j < threads; j++)
- {
- GeglRectangle rect = *result;
-
- rect = *result;
- rect.height /= threads;
- rect.y += rect.height * j;
-
- if (j == threads-1)
- rect.height = (result->height + result->y) - rect.y;
-
- thread_data[j].result = rect;
- }
-
- for (j = 0; j < threads; j++)
- {
- thread_data[j].klass = point_composer3_class;
- thread_data[j].operation = operation;
- thread_data[j].input = input;
- thread_data[j].aux = aux;
- thread_data[j].aux2 = aux2;
- thread_data[j].output = output;
- thread_data[j].pending = &pending;
- thread_data[j].level = level;
- thread_data[j].input_format = in_format;
- thread_data[j].aux_format = aux_format;
- thread_data[j].aux2_format = aux2_format;
- thread_data[j].output_format = out_format;
- }
- pending = threads;
+ ThreadData data;
+
+ data.klass = point_composer3_class;
+ data.operation = operation;
+ data.input = input;
+ data.aux = aux;
+ data.aux2 = aux2;
+ data.output = output;
+ data.level = level;
+ data.input_format = in_format;
+ data.aux_format = aux_format;
+ data.aux2_format = aux2_format;
+ data.output_format = out_format;
if (gegl_cl_is_accelerated ())
{
@@ -298,10 +247,12 @@ gegl_operation_point_composer3_process (GeglOperation *operation,
gegl_buffer_flush_ext (aux2, result);
}
- for (gint j = 1; j < threads; j++)
- g_thread_pool_push (pool, &thread_data[j], NULL);
- thread_process (&thread_data[0], NULL);
- while (g_atomic_int_get (&pending)) {};
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ GEGL_SPLIT_STRATEGY_AUTO,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
return TRUE;
}
diff --git a/gegl/operation/gegl-operation-point-filter.c b/gegl/operation/gegl-operation-point-filter.c
index 6fffa74a1..365cb8c01 100644
--- a/gegl/operation/gegl-operation-point-filter.c
+++ b/gegl/operation/gegl-operation-point-filter.c
@@ -44,26 +44,25 @@ typedef struct ThreadData
GeglOperation *operation;
GeglBuffer *input;
GeglBuffer *output;
- gint *pending;
- GeglRectangle result;
gint level;
gboolean success;
const Babl *input_format;
const Babl *output_format;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
GeglBufferIterator *i = gegl_buffer_iterator_new (data->output,
- &data->result,
+ area,
data->level,
data->output_format,
GEGL_ACCESS_WRITE,
GEGL_ABYSS_NONE, 4);
gint read = 0;
if (data->input)
- read = gegl_buffer_iterator_add (i, data->input, &data->result, data->level,
+ read = gegl_buffer_iterator_add (i, data->input, area, data->level,
data->input_format,
GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
@@ -73,19 +72,6 @@ static void thread_process (gpointer thread_data, gpointer unused)
data->klass->process (data->operation, data->input?i->items[read].data:NULL,
i->items[0].data, i->length, &(i->items[0].roi), data->level);
}
-
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
static gboolean
@@ -286,59 +272,27 @@ gegl_operation_point_filter_process (GeglOperation *operation,
return TRUE;
}
- if (gegl_operation_use_threading (operation, result) && result->height > 1)
+ if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- GThreadPool *pool = thread_pool ();
- gint pending;
- int j;
-
- if (result->width > result->height)
- for (j = 0; j < threads; j++)
- {
- GeglRectangle rect = *result;
- rect.width /= threads;
- rect.x += rect.width * j;
-
- if (j == threads-1)
- rect.width = (result->width + result->x) - rect.x;
-
- thread_data[j].result = rect;
- }
- else
- for (j = 0; j < threads; j++)
- {
- GeglRectangle rect = *result;
- rect = *result;
- rect.height /= threads;
- rect.y += rect.height * j;
- if (j == threads-1)
- rect.height = (result->height + result->y) - rect.y;
- thread_data[j].result = rect;
- }
-
- for (j = 0; j < threads; j++)
- {
- thread_data[j].klass = point_filter_class;
- thread_data[j].operation = operation;
- thread_data[j].input = input;
- thread_data[j].output = output;
- thread_data[j].pending = &pending;
- thread_data[j].level = level;
- thread_data[j].input_format = in_format;
- thread_data[j].output_format = out_format;
- }
+ ThreadData data;
- pending = threads;
+ data.klass = point_filter_class;
+ data.operation = operation;
+ data.input = input;
+ data.output = output;
+ data.level = level;
+ data.input_format = in_format;
+ data.output_format = out_format;
if (gegl_cl_is_accelerated () && input)
gegl_buffer_flush_ext (input, result);
- for (gint j = 1; j < threads; j++)
- g_thread_pool_push (pool, &thread_data[j], NULL);
- thread_process (&thread_data[0], NULL);
- while (g_atomic_int_get (&pending)) {};
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ GEGL_SPLIT_STRATEGY_AUTO,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
return TRUE;
}
diff --git a/gegl/operation/gegl-operation-source.c b/gegl/operation/gegl-operation-source.c
index a226018c2..a64881e94 100644
--- a/gegl/operation/gegl-operation-source.c
+++ b/gegl/operation/gegl-operation-source.c
@@ -82,30 +82,17 @@ typedef struct ThreadData
GeglOperationSourceClass *klass;
GeglOperation *operation;
GeglBuffer *output;
- gint *pending;
gint level;
gboolean success;
- GeglRectangle roi;
} ThreadData;
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+ ThreadData *data)
{
- ThreadData *data = thread_data;
if (!data->klass->process (data->operation,
- data->output, &data->roi, data->level))
+ data->output, area, data->level))
data->success = FALSE;
- g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
- static GThreadPool *pool = NULL;
- if (!pool)
- {
- pool = g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
- FALSE, NULL);
- }
- return pool;
}
static gboolean
@@ -130,52 +117,22 @@ gegl_operation_source_process (GeglOperation *operation,
if (gegl_operation_use_threading (operation, result))
{
- gint threads = gegl_config_threads ();
- GThreadPool *pool = thread_pool ();
- ThreadData thread_data[GEGL_MAX_THREADS];
- gint pending = threads;
-
- if (result->width > result->height)
- {
- gint bit = result->width / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.y = result->y;
- thread_data[j].roi.height = result->height;
- thread_data[j].roi.x = result->x + bit * j;
- thread_data[j].roi.width = bit;
- }
- thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
- }
- else
- {
- gint bit = result->height / threads;
- for (gint j = 0; j < threads; j++)
- {
- thread_data[j].roi.x = result->x;
- thread_data[j].roi.width = result->width;
- thread_data[j].roi.y = result->y + bit * j;
- thread_data[j].roi.height = bit;
- }
- thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
- }
- for (gint i = 0; i < threads; i++)
- {
- thread_data[i].klass = klass;
- thread_data[i].operation = operation;
- thread_data[i].output = output;
- thread_data[i].pending = &pending;
- thread_data[i].level = level;
- thread_data[i].success = TRUE;
- }
-
- for (gint i = 1; i < threads; i++)
- g_thread_pool_push (pool, &thread_data[i], NULL);
- thread_process (&thread_data[0], NULL);
-
- while (g_atomic_int_get (&pending)) {};
-
- success = thread_data[0].success;
+ ThreadData data;
+
+ data.klass = klass;
+ data.operation = operation;
+ data.output = output;
+ data.level = level;
+ data.success = TRUE;
+
+ gegl_parallel_distribute_area (
+ result,
+ gegl_operation_get_min_threaded_sub_area (operation),
+ GEGL_SPLIT_STRATEGY_AUTO,
+ (GeglParallelDistributeAreaFunc) thread_process,
+ &data);
+
+ success = data.success;
}
else
{
diff --git a/gegl/operation/gegl-operation.c b/gegl/operation/gegl-operation.c
index de7bc9ccd..f60922ce7 100644
--- a/gegl/operation/gegl-operation.c
+++ b/gegl/operation/gegl-operation.c
@@ -816,12 +816,20 @@ gegl_operation_use_threading (GeglOperation *operation,
return FALSE;
if (op_class->threaded &&
- roi->width * roi->height > 64*64)
+ roi->width * roi->height >
+ gegl_operation_get_min_threaded_sub_area (operation))
return TRUE;
}
return FALSE;
}
+gsize
+gegl_operation_get_min_threaded_sub_area (GeglOperation *operation)
+{
+ /* FIXME: too arbitrary? */
+ return 64 * 64;
+}
+
static guchar *gegl_temp_alloc[GEGL_MAX_THREADS * 4]={NULL,};
static gint gegl_temp_size[GEGL_MAX_THREADS * 4]={0,};
diff --git a/gegl/operation/gegl-operation.h b/gegl/operation/gegl-operation.h
index a48e2adc9..b4d473fb4 100644
--- a/gegl/operation/gegl-operation.h
+++ b/gegl/operation/gegl-operation.h
@@ -248,6 +248,8 @@ gboolean gegl_operation_use_opencl (const GeglOperation *operation)
gboolean
gegl_operation_use_threading (GeglOperation *operation,
const GeglRectangle *roi);
+gsize
+gegl_operation_get_min_threaded_sub_area (GeglOperation *operation);
/* Invalidate a specific rectangle, indicating the any computation depending
* on this roi is now invalid.
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]