[gegl] gegl: add gegl-parallel
- From: Ell <ell src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gegl] gegl: add gegl-parallel
- Date: Sat, 10 Nov 2018 20:57:06 +0000 (UTC)
commit 4351962035ebd47a176285d1c4064afe60c63543
Author: Ell <ell_se yahoo com>
Date: Sat Nov 10 15:30:07 2018 -0500
gegl: add gegl-parallel
gegl-parallel provides various parallel algorithms. Currently,
this is limited to the gegl_parallel_distribute() family of
functions, which distribute work across multiple threads, migrated
from GIMP.
The following commits use these functions to replace the various
thread-pools we use to auto-parallelize operations with simpler
code, fixing potential dealocks as a result of nested operation
processing, as in bug #790810, along the way.
Since gegl-parallel is public API, it also eases manual
parallelization of operations, inside and outside of GEGL.
gegl/Makefile.am | 3 +
gegl/gegl-init.c | 3 +
gegl/gegl-parallel-private.h | 33 ++++
gegl/gegl-parallel.c | 418 +++++++++++++++++++++++++++++++++++++++++++
gegl/gegl-parallel.h | 191 ++++++++++++++++++++
gegl/gegl.h | 1 +
6 files changed, 649 insertions(+)
---
diff --git a/gegl/Makefile.am b/gegl/Makefile.am
index cf14f2416..c3dca9f2b 100644
--- a/gegl/Makefile.am
+++ b/gegl/Makefile.am
@@ -55,6 +55,7 @@ GEGL_introspectable_headers = \
gegl-matrix.h \
gegl-lookup.h \
gegl-random.h \
+ gegl-parallel.h \
gegl-init.h \
gegl-version.h \
buffer/gegl-buffer.h \
@@ -100,6 +101,7 @@ GEGL_sources = \
gegl-xml.c \
gegl-gio.c \
gegl-random.c \
+ gegl-parallel.c \
gegl-serialize.c \
gegl-stats.c \
gegl-matrix.c \
@@ -118,6 +120,7 @@ GEGL_sources = \
gegl-op.h \
gegl-plugin.h \
gegl-random-private.h \
+ gegl-parallel-private.h \
gegl-stats.h \
gegl-gio-private.h \
gegl-types-internal.h \
diff --git a/gegl/gegl-init.c b/gegl/gegl-init.c
index 39bb6c323..414ebeb1f 100644
--- a/gegl/gegl-init.c
+++ b/gegl/gegl-init.c
@@ -105,6 +105,7 @@ guint gegl_debug_flags = 0;
#include "gegl-stats.h"
#include "graph/gegl-node-private.h"
#include "gegl-random-private.h"
+#include "gegl-parallel-private.h"
static gboolean gegl_post_parse_hook (GOptionContext *context,
GOptionGroup *group,
@@ -489,6 +490,7 @@ gegl_exit (void)
gegl_operation_gtype_cleanup ();
gegl_operation_handlers_cleanup ();
gegl_random_cleanup ();
+ gegl_parallel_cleanup ();
gegl_cl_cleanup ();
gegl_temp_buffer_free ();
@@ -702,6 +704,7 @@ gegl_post_parse_hook (GOptionContext *context,
GEGL_INSTRUMENT_START();
+ gegl_parallel_init ();
gegl_operation_gtype_init ();
gegl_tile_cache_init ();
diff --git a/gegl/gegl-parallel-private.h b/gegl/gegl-parallel-private.h
new file mode 100644
index 000000000..3c13ec85d
--- /dev/null
+++ b/gegl/gegl-parallel-private.h
@@ -0,0 +1,33 @@
+/* This file is part of GEGL.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
+ *
+ * Copyright 2018 Ell
+ */
+
+#ifndef __GEGL_PARALLEL_PRIVATE_H__
+#define __GEGL_PARALLEL_PRIVATE_H__
+
+
+G_BEGIN_DECLS
+
+
+void gegl_parallel_init (void);
+void gegl_parallel_cleanup (void);
+
+
+G_END_DECLS
+
+
+#endif /* __GEGL_PARALLEL_PRIVATE_H__ */
diff --git a/gegl/gegl-parallel.c b/gegl/gegl-parallel.c
new file mode 100644
index 000000000..38f564da3
--- /dev/null
+++ b/gegl/gegl-parallel.c
@@ -0,0 +1,418 @@
+/* This file is part of GEGL.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
+ *
+ * Copyright 2018 Ell
+ */
+
+#include "config.h"
+
+#include <glib.h>
+
+#include "gegl.h"
+#include "gegl-config.h"
+#include "gegl-parallel.h"
+#include "gegl-parallel-private.h"
+
+
+#define GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS GEGL_MAX_THREADS
+
+
+typedef struct
+{
+ GeglParallelDistributeFunc func;
+ gint n;
+ gpointer user_data;
+} GeglParallelDistributeTask;
+
+typedef struct
+{
+ GThread *thread;
+ GMutex mutex;
+ GCond cond;
+
+ gboolean quit;
+
+ GeglParallelDistributeTask *volatile task;
+ volatile gint i;
+} GeglParallelDistributeThread;
+
+
+/* local function prototypes */
+
+static void gegl_parallel_notify_threads (GeglConfig
*config);
+
+static void gegl_parallel_set_n_threads (gint
n_threads,
+ gboolean
finish_tasks);
+
+static void gegl_parallel_distribute_set_n_threads (gint
n_threads);
+static gpointer gegl_parallel_distribute_thread_func (GeglParallelDistributeThread
*thread);
+
+
+/* local variables */
+
+static gint gegl_parallel_distribute_n_threads = 1;
+static GeglParallelDistributeThread gegl_parallel_distribute_threads[GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS -
1];
+
+static GMutex gegl_parallel_distribute_completion_mutex;
+static GCond gegl_parallel_distribute_completion_cond;
+static volatile gint gegl_parallel_distribute_completion_counter;
+static volatile gint gegl_parallel_distribute_busy;
+
+
+/* public functions */
+
+
+void
+gegl_parallel_init (void)
+{
+ g_signal_connect (gegl_config (), "notify::threads",
+ G_CALLBACK (gegl_parallel_notify_threads),
+ NULL);
+
+ gegl_parallel_notify_threads (gegl_config ());
+}
+
+void
+gegl_parallel_cleanup (void)
+{
+ g_signal_handlers_disconnect_by_func (gegl_config (),
+ gegl_parallel_notify_threads,
+ NULL);
+
+ /* stop all threads */
+ gegl_parallel_set_n_threads (0, /* finish_tasks = */ FALSE);
+}
+
+void
+gegl_parallel_distribute (gint max_n,
+ GeglParallelDistributeFunc func,
+ gpointer user_data)
+{
+ GeglParallelDistributeTask task;
+ gint i;
+
+ g_return_if_fail (func != NULL);
+
+ if (max_n == 0)
+ return;
+
+ if (max_n < 0)
+ max_n = gegl_parallel_distribute_n_threads;
+ else
+ max_n = MIN (max_n, gegl_parallel_distribute_n_threads);
+
+ if (max_n == 1 ||
+ ! g_atomic_int_compare_and_exchange (&gegl_parallel_distribute_busy,
+ 0, 1))
+ {
+ func (0, 1, user_data);
+
+ return;
+ }
+
+ task.n = max_n;
+ task.func = func;
+ task.user_data = user_data;
+
+ g_atomic_int_set (&gegl_parallel_distribute_completion_counter, task.n - 1);
+
+ for (i = 0; i < task.n - 1; i++)
+ {
+ GeglParallelDistributeThread *thread =
+ &gegl_parallel_distribute_threads[i];
+
+ g_mutex_lock (&thread->mutex);
+
+ thread->task = &task;
+ thread->i = i;
+
+ g_cond_signal (&thread->cond);
+
+ g_mutex_unlock (&thread->mutex);
+ }
+
+ func (i, task.n, user_data);
+
+ if (g_atomic_int_get (&gegl_parallel_distribute_completion_counter))
+ {
+ g_mutex_lock (&gegl_parallel_distribute_completion_mutex);
+
+ while (g_atomic_int_get (&gegl_parallel_distribute_completion_counter))
+ {
+ g_cond_wait (&gegl_parallel_distribute_completion_cond,
+ &gegl_parallel_distribute_completion_mutex);
+ }
+
+ g_mutex_unlock (&gegl_parallel_distribute_completion_mutex);
+ }
+
+ g_atomic_int_set (&gegl_parallel_distribute_busy, 0);
+}
+
+typedef struct
+{
+ gsize size;
+ GeglParallelDistributeRangeFunc func;
+ gpointer user_data;
+} GeglParallelDistributeRangeData;
+
+static void
+gegl_parallel_distribute_range_func (gint i,
+ gint n,
+ GeglParallelDistributeRangeData *data)
+{
+ gsize offset;
+ gsize sub_size;
+
+ offset = (2 * i * data->size + n) / (2 * n);
+ sub_size = (2 * (i + 1) * data->size + n) / (2 * n) - offset;
+
+ data->func (offset, sub_size, data->user_data);
+}
+
+void
+gegl_parallel_distribute_range (gsize size,
+ gsize min_sub_size,
+ GeglParallelDistributeRangeFunc func,
+ gpointer user_data)
+{
+ GeglParallelDistributeRangeData data;
+ gsize n = size;
+
+ g_return_if_fail (func != NULL);
+
+ if (size == 0)
+ return;
+
+ if (min_sub_size > 1)
+ n /= min_sub_size;
+
+ n = CLAMP (n, 1, gegl_parallel_distribute_n_threads);
+
+ data.size = size;
+ data.func = func;
+ data.user_data = user_data;
+
+ gegl_parallel_distribute (
+ n,
+ (GeglParallelDistributeFunc) gegl_parallel_distribute_range_func,
+ &data);
+}
+
+typedef struct
+{
+ const GeglRectangle *area;
+ GeglSplitStrategy split_strategy;
+ GeglParallelDistributeAreaFunc func;
+ gpointer user_data;
+} GeglParallelDistributeAreaData;
+
+static void
+gegl_parallel_distribute_area_func (gint i,
+ gint n,
+ GeglParallelDistributeAreaData *data)
+{
+ GeglRectangle sub_area;
+
+ switch (data->split_strategy)
+ {
+ case GEGL_SPLIT_STRATEGY_HORIZONTAL:
+ sub_area.x = data->area->x;
+ sub_area.width = data->area->width;
+
+ sub_area.y = (2 * i * data->area->height + n) / (2 * n);
+ sub_area.height = (2 * (i + 1) * data->area->height + n) / (2 * n);
+
+ sub_area.height -= sub_area.y;
+ sub_area.y += data->area->y;
+
+ break;
+
+ case GEGL_SPLIT_STRATEGY_VERTICAL:
+ sub_area.y = data->area->y;
+ sub_area.height = data->area->height;
+
+ sub_area.x = (2 * i * data->area->width + n) / (2 * n);
+ sub_area.width = (2 * (i + 1) * data->area->width + n) / (2 * n);
+
+ sub_area.width -= sub_area.x;
+ sub_area.x += data->area->x;
+
+ break;
+
+ default:
+ g_return_if_reached ();
+ }
+
+ data->func (&sub_area, data->user_data);
+}
+
+void
+gegl_parallel_distribute_area (const GeglRectangle *area,
+ gsize min_sub_area,
+ GeglSplitStrategy split_strategy,
+ GeglParallelDistributeAreaFunc func,
+ gpointer user_data)
+{
+ GeglParallelDistributeAreaData data;
+ gsize n;
+
+ g_return_if_fail (area != NULL);
+ g_return_if_fail (func != NULL);
+
+ if (area->width <= 0 || area->height <= 0)
+ return;
+
+ if (split_strategy == GEGL_SPLIT_STRATEGY_AUTO)
+ {
+ if (area->width > area->height)
+ split_strategy = GEGL_SPLIT_STRATEGY_VERTICAL;
+ else
+ split_strategy = GEGL_SPLIT_STRATEGY_HORIZONTAL;
+ }
+
+ n = (gsize) area->width * (gsize) area->height;
+
+ if (min_sub_area > 1)
+ n /= min_sub_area;
+
+ n = CLAMP (n, 1, gegl_parallel_distribute_n_threads);
+
+ data.area = area;
+ data.split_strategy = split_strategy;
+ data.func = func;
+ data.user_data = user_data;
+
+ gegl_parallel_distribute (
+ n,
+ (GeglParallelDistributeFunc) gegl_parallel_distribute_area_func,
+ &data);
+}
+
+
+/* private functions */
+
+
+static void
+gegl_parallel_notify_threads (GeglConfig *config)
+{
+ gint n_threads;
+
+ g_object_get (config,
+ "threads", &n_threads,
+ NULL);
+
+ gegl_parallel_set_n_threads (n_threads,
+ /* finish_tasks = */ TRUE);
+}
+
+static void
+gegl_parallel_set_n_threads (gint n_threads,
+ gboolean finish_tasks)
+{
+ gegl_parallel_distribute_set_n_threads (n_threads);
+}
+
+static void
+gegl_parallel_distribute_set_n_threads (gint n_threads)
+{
+ gint i;
+
+ while (! g_atomic_int_compare_and_exchange (&gegl_parallel_distribute_busy,
+ 0, 1));
+
+ n_threads = CLAMP (n_threads, 1, GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS);
+
+ if (n_threads > gegl_parallel_distribute_n_threads) /* need more threads */
+ {
+ for (i = gegl_parallel_distribute_n_threads - 1; i < n_threads - 1; i++)
+ {
+ GeglParallelDistributeThread *thread =
+ &gegl_parallel_distribute_threads[i];
+
+ thread->quit = FALSE;
+ thread->task = NULL;
+
+ thread->thread = g_thread_new (
+ "worker",
+ (GThreadFunc) gegl_parallel_distribute_thread_func,
+ thread);
+ }
+ }
+ else if (n_threads < gegl_parallel_distribute_n_threads) /* need less threads */
+ {
+ for (i = n_threads - 1; i < gegl_parallel_distribute_n_threads - 1; i++)
+ {
+ GeglParallelDistributeThread *thread =
+ &gegl_parallel_distribute_threads[i];
+
+ g_mutex_lock (&thread->mutex);
+
+ thread->quit = TRUE;
+ g_cond_signal (&thread->cond);
+
+ g_mutex_unlock (&thread->mutex);
+ }
+
+ for (i = n_threads - 1; i < gegl_parallel_distribute_n_threads - 1; i++)
+ {
+ GeglParallelDistributeThread *thread =
+ &gegl_parallel_distribute_threads[i];
+
+ g_thread_join (thread->thread);
+ }
+ }
+
+ gegl_parallel_distribute_n_threads = n_threads;
+
+ g_atomic_int_set (&gegl_parallel_distribute_busy, 0);
+}
+
+static gpointer
+gegl_parallel_distribute_thread_func (GeglParallelDistributeThread *thread)
+{
+ g_mutex_lock (&thread->mutex);
+
+ while (TRUE)
+ {
+ if (thread->quit)
+ {
+ break;
+ }
+ else if (thread->task)
+ {
+ thread->task->func (thread->i, thread->task->n,
+ thread->task->user_data);
+
+ if (g_atomic_int_dec_and_test (
+ &gegl_parallel_distribute_completion_counter))
+ {
+ g_mutex_lock (&gegl_parallel_distribute_completion_mutex);
+
+ g_cond_signal (&gegl_parallel_distribute_completion_cond);
+
+ g_mutex_unlock (&gegl_parallel_distribute_completion_mutex);
+ }
+
+ thread->task = NULL;
+ }
+
+ g_cond_wait (&thread->cond, &thread->mutex);
+ }
+
+ g_mutex_unlock (&thread->mutex);
+
+ return NULL;
+}
diff --git a/gegl/gegl-parallel.h b/gegl/gegl-parallel.h
new file mode 100644
index 000000000..4e8798331
--- /dev/null
+++ b/gegl/gegl-parallel.h
@@ -0,0 +1,191 @@
+/* This file is part of GEGL.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
+ *
+ * Copyright 2018 Ell
+ */
+
+#ifndef __GEGL_PARALLEL_H__
+#define __GEGL_PARALLEL_H__
+
+
+G_BEGIN_DECLS
+
+
+/**
+ * GeglParallelDistributeFunc:
+ * @i: the current thread index, in the range [0,@n)
+ * @n: the number of threads execution is distributed across
+ * @user_data: user data pointer
+ *
+ * Specifies the type of function passed to gegl_parallel_distribute().
+ *
+ * The function should process the @i-th part of the data, out of @n
+ * equal parts. @n may be less-than or equal-to the @max_n argument
+ * passed to gegl_parallel_distribute().
+ */
+typedef void (* GeglParallelDistributeFunc) (gint i,
+ gint n,
+ gpointer user_data);
+
+/**
+ * GeglParallelDistributeRangeFunc:
+ * @offset: the current data offset
+ * @size: the current data size
+ * @user_data: user data pointer
+ *
+ * Specifies the type of function passed to gegl_parallel_distribute_range().
+ *
+ * The function should process @size elements of the data, starting
+ * at @offset. @size may be greater-than or equal-to the @min_sub_size
+ * argument passed to gegl_parallel_distribute_range().
+ */
+typedef void (* GeglParallelDistributeRangeFunc) (gsize offset,
+ gsize size,
+ gpointer user_data);
+
+/**
+ * GeglParallelDistributeAreaFunc:
+ * @area: the current sub-region
+ * @user_data: user data pointer
+ *
+ * Specifies the type of function passed to gegl_parallel_distribute_area().
+ *
+ * The function should process the sub-region specified by @area, whose
+ * area may be greater-than or equal-to the @min_sub_area argument passed
+ * to gegl_parallel_distribute_area().
+ *
+ */
+typedef void (* GeglParallelDistributeAreaFunc) (const GeglRectangle *area,
+ gpointer user_data);
+
+
+/**
+ * gegl_parallel_distribute:
+ * @max_n: the maximal number of threads to use
+ * @func: (closure user_data) (scope call): the function to call
+ * @user_data: user data to pass to the function
+ *
+ * Distributes the execution of a function across multiple threads,
+ * by calling it with a different index on each thread.
+ */
+void gegl_parallel_distribute (gint max_n,
+ GeglParallelDistributeFunc func,
+ gpointer user_data);
+
+/**
+ * gegl_parallel_distribute_range:
+ * @size: the total size of the data
+ * @min_sub_size: the minimal data size to be processed by each thread
+ * @func: (closure user_data) (scope call): the function to call
+ * @user_data: user data to pass to the function
+ *
+ * Distributes the processing of a linear data-structure across
+ * multiple threads, by calling the given function with different
+ * sub-ranges on different threads.
+ */
+void gegl_parallel_distribute_range (gsize size,
+ gsize min_sub_size,
+ GeglParallelDistributeRangeFunc func,
+ gpointer user_data);
+
+/**
+ * gegl_parallel_distribute_area:
+ * @area: the region to process
+ * @min_sub_area: the minimal area to be processed by each thread
+ * @split_strategy: the strategy to use for dividing the region
+ * @func: (closure user_data) (scope call): the function to call
+ * @user_data: user data to pass to the function
+ *
+ * Distributes the processing of a planar data-structure across
+ * multiple threads, by calling the given function with different
+ * sub-regions on different threads.
+ */
+void gegl_parallel_distribute_area (const GeglRectangle *area,
+ gsize min_sub_area,
+ GeglSplitStrategy split_strategy,
+ GeglParallelDistributeAreaFunc func,
+ gpointer user_data);
+
+
+#ifdef __cplusplus
+
+extern "C++"
+{
+
+template <class ParallelDistributeFunc>
+inline void
+gegl_parallel_distribute (gint max_n,
+ ParallelDistributeFunc func)
+{
+ gegl_parallel_distribute (max_n,
+ [] (gint i,
+ gint n,
+ gpointer user_data)
+ {
+ ParallelDistributeFunc func_copy (
+ *(const ParallelDistributeFunc *) user_data);
+
+ func_copy (i, n);
+ },
+ &func);
+}
+
+template <class ParallelDistributeRangeFunc>
+inline void
+gegl_parallel_distribute_range (gsize size,
+ gsize min_sub_size,
+ ParallelDistributeRangeFunc func)
+{
+ gegl_parallel_distribute_range (size, min_sub_size,
+ [] (gsize offset,
+ gsize size,
+ gpointer user_data)
+ {
+ ParallelDistributeRangeFunc func_copy (
+ *(const ParallelDistributeRangeFunc *) user_data);
+
+ func_copy (offset, size);
+ },
+ &func);
+}
+
+template <class ParallelDistributeAreaFunc>
+inline void
+gegl_parallel_distribute_area (const GeglRectangle *area,
+ gsize min_sub_area,
+ GeglSplitStrategy split_strategy,
+ ParallelDistributeAreaFunc func)
+{
+ gegl_parallel_distribute_area (area, min_sub_area, split_strategy,
+ [] (const GeglRectangle *area,
+ gpointer user_data)
+ {
+ ParallelDistributeAreaFunc func_copy (
+ *(const ParallelDistributeAreaFunc *) user_data);
+
+ func_copy (area);
+ },
+ &func);
+}
+
+}
+
+#endif /* __cplusplus */
+
+
+G_END_DECLS
+
+
+#endif /* __GEGL_PARALLEL_H__ */
diff --git a/gegl/gegl.h b/gegl/gegl.h
index 1d2e34bbb..6bd3324d6 100644
--- a/gegl/gegl.h
+++ b/gegl/gegl.h
@@ -37,6 +37,7 @@
#include <gegl-init.h>
#include <gegl-version.h>
#include <gegl-random.h>
+#include <gegl-parallel.h>
#include <gegl-node.h>
#include <gegl-processor.h>
#include <gegl-apply.h>
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]