[gegl] operation: use gegl_parallel_distribute_area() for parallelization



commit f14b731d8d94060d766ea79feb2c60975dfdf26f
Author: Ell <ell_se yahoo com>
Date:   Sat Nov 10 15:38:55 2018 -0500

    operation: use gegl_parallel_distribute_area() for parallelization
    
    In all operation types that support auto-threading, use
    gegl_parallel_distribute_area(), added in the previous commit, to
    parallelize processing, instead of local thread pools.

 gegl/operation/gegl-operation-composer.c        | 101 +++++++---------------
 gegl/operation/gegl-operation-composer3.c       | 101 +++++++---------------
 gegl/operation/gegl-operation-filter.c          | 109 +++++++-----------------
 gegl/operation/gegl-operation-point-composer.c  |  96 ++++++---------------
 gegl/operation/gegl-operation-point-composer3.c | 103 ++++++----------------
 gegl/operation/gegl-operation-point-filter.c    |  86 +++++--------------
 gegl/operation/gegl-operation-source.c          |  83 +++++-------------
 gegl/operation/gegl-operation.c                 |  10 ++-
 gegl/operation/gegl-operation.h                 |   2 +
 9 files changed, 199 insertions(+), 492 deletions(-)
---
diff --git a/gegl/operation/gegl-operation-composer.c b/gegl/operation/gegl-operation-composer.c
index 82a1eb92e..a9ff76047 100644
--- a/gegl/operation/gegl-operation-composer.c
+++ b/gegl/operation/gegl-operation-composer.c
@@ -102,43 +102,36 @@ typedef struct ThreadData
   GeglOperationComposerClass *klass;
   GeglOperation              *operation;
   GeglOperationContext       *context;
+  GeglBuffer                 *input;
   GeglBuffer                 *aux;
   GeglBuffer                 *output;
-  gint                       *pending;
+  const GeglRectangle        *roi;
   gint                        level;
   gboolean                    success;
-  GeglRectangle               roi;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer input)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
+  GeglBuffer *input;
 
-  if (! input)
+  if (area->x == data->roi->x && area->y == data->roi->y)
+    {
+      input = g_object_ref (data->input);
+    }
+  else
     {
       input = gegl_operation_context_dup_input_maybe_copy (data->context,
-                                                           "input", &data->roi);
+                                                           "input", area);
     }
 
   if (!data->klass->process (data->operation,
                              input, data->aux, data->output,
-                             &data->roi, data->level))
+                             area, data->level))
     data->success = FALSE;
 
   g_object_unref (input);
-
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-    {
-      pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-                                 FALSE, NULL);
-    }
-  return pool;
 }
 
 static gboolean
@@ -175,54 +168,26 @@ gegl_operation_composer_process (GeglOperation        *operation,
     {
       if (gegl_operation_use_threading (operation, result))
       {
-        gint threads = gegl_config_threads ();
-        GThreadPool *pool = thread_pool ();
-        ThreadData thread_data[GEGL_MAX_THREADS];
-        gint pending = threads;
-
-        if (result->width > result->height)
-        {
-          gint bit = result->width / threads;
-          for (gint j = 0; j < threads; j++)
-          {
-            thread_data[j].roi.y = result->y;
-            thread_data[j].roi.height = result->height;
-            thread_data[j].roi.x = result->x + bit * j;
-            thread_data[j].roi.width = bit;
-          }
-          thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
-        }
-        else
-        {
-          gint bit = result->height / threads;
-          for (gint j = 0; j < threads; j++)
-          {
-            thread_data[j].roi.x = result->x;
-            thread_data[j].roi.width = result->width;
-            thread_data[j].roi.y = result->y + bit * j;
-            thread_data[j].roi.height = bit;
-          }
-          thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
-        }
-        for (gint i = 0; i < threads; i++)
-        {
-          thread_data[i].klass = klass;
-          thread_data[i].operation = operation;
-          thread_data[i].context = context;
-          thread_data[i].aux = aux;
-          thread_data[i].output = output;
-          thread_data[i].pending = &pending;
-          thread_data[i].level = level;
-          thread_data[i].success = TRUE;
-        }
-
-        for (gint i = 1; i < threads; i++)
-          g_thread_pool_push (pool, &thread_data[i], NULL);
-        thread_process (&thread_data[0], g_object_ref (input));
-
-        while (g_atomic_int_get (&pending)) {};
-
-        success = thread_data[0].success;
+        ThreadData data;
+
+        data.klass = klass;
+        data.operation = operation;
+        data.context = context;
+        data.input = input;
+        data.aux = aux;
+        data.output = output;
+        data.roi = result;
+        data.level = level;
+        data.success = TRUE;
+
+        gegl_parallel_distribute_area (
+          result,
+          gegl_operation_get_min_threaded_sub_area (operation),
+          GEGL_SPLIT_STRATEGY_AUTO,
+          (GeglParallelDistributeAreaFunc) thread_process,
+          &data);
+
+        success = data.success;
       }
       else
       {
diff --git a/gegl/operation/gegl-operation-composer3.c b/gegl/operation/gegl-operation-composer3.c
index 5c2da7379..25269e8de 100644
--- a/gegl/operation/gegl-operation-composer3.c
+++ b/gegl/operation/gegl-operation-composer3.c
@@ -111,44 +111,37 @@ typedef struct ThreadData
   GeglOperationComposer3Class *klass;
   GeglOperation               *operation;
   GeglOperationContext        *context;
+  GeglBuffer                  *input;
   GeglBuffer                  *aux;
   GeglBuffer                  *aux2;
   GeglBuffer                  *output;
-  gint                        *pending;
+  const GeglRectangle         *roi;
   gint                         level;
   gboolean                     success;
-  GeglRectangle                roi;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer input)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
+  GeglBuffer *input;
 
-  if (! input)
+  if (area->x == data->roi->x && area->y == data->roi->y)
+    {
+      input = g_object_ref (data->input);
+    }
+  else
     {
       input = gegl_operation_context_dup_input_maybe_copy (data->context,
-                                                           "input", &data->roi);
+                                                           "input", area);
     }
 
   if (!data->klass->process (data->operation,
                              input, data->aux, data->aux2, 
-                             data->output, &data->roi, data->level))
+                             data->output, area, data->level))
     data->success = FALSE;
 
   g_object_unref (input);
-
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-  {
-    pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-        FALSE, NULL);
-  }
-  return pool;
 }
 
 
@@ -196,55 +189,27 @@ gegl_operation_composer3_process (GeglOperation        *operation,
     {
       if (gegl_operation_use_threading (operation, result))
       {
-        gint threads = gegl_config_threads ();
-        GThreadPool *pool = thread_pool ();
-        ThreadData thread_data[GEGL_MAX_THREADS];
-        gint pending = threads;
-
-        if (result->width > result->height)
-        {
-          gint bit = result->width / threads;
-          for (gint j = 0; j < threads; j++)
-          {
-            thread_data[j].roi.y = result->y;
-            thread_data[j].roi.height = result->height;
-            thread_data[j].roi.x = result->x + bit * j;
-            thread_data[j].roi.width = bit;
-          }
-          thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
-        }
-        else
-        {
-          gint bit = result->height / threads;
-          for (gint j = 0; j < threads; j++)
-          {
-            thread_data[j].roi.x = result->x;
-            thread_data[j].roi.width = result->width;
-            thread_data[j].roi.y = result->y + bit * j;
-            thread_data[j].roi.height = bit;
-          }
-          thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
-        }
-        for (gint i = 0; i < threads; i++)
-        {
-          thread_data[i].klass = klass;
-          thread_data[i].operation = operation;
-          thread_data[i].context = context;
-          thread_data[i].aux = aux;
-          thread_data[i].aux2 = aux2;
-          thread_data[i].output = output;
-          thread_data[i].pending = &pending;
-          thread_data[i].level = level;
-          thread_data[i].success = TRUE;
-        }
-
-        for (gint i = 1; i < threads; i++)
-          g_thread_pool_push (pool, &thread_data[i], NULL);
-        thread_process (&thread_data[0], g_object_ref (input));
-
-        while (g_atomic_int_get (&pending)) {};
+        ThreadData data;
+
+        data.klass = klass;
+        data.operation = operation;
+        data.context = context;
+        data.input = input;
+        data.aux = aux;
+        data.aux2 = aux2;
+        data.output = output;
+        data.roi = result;
+        data.level = level;
+        data.success = TRUE;
+
+        gegl_parallel_distribute_area (
+          result,
+          gegl_operation_get_min_threaded_sub_area (operation),
+          GEGL_SPLIT_STRATEGY_AUTO,
+          (GeglParallelDistributeAreaFunc) thread_process,
+          &data);
         
-        success = thread_data[0].success;
+        success = data.success;
       }
       else
       {
diff --git a/gegl/operation/gegl-operation-filter.c b/gegl/operation/gegl-operation-filter.c
index 92da722a1..a5c7af3ec 100644
--- a/gegl/operation/gegl-operation-filter.c
+++ b/gegl/operation/gegl-operation-filter.c
@@ -120,41 +120,34 @@ typedef struct ThreadData
   GeglOperationFilterClass *klass;
   GeglOperation            *operation;
   GeglOperationContext     *context;
+  GeglBuffer               *input;
   GeglBuffer               *output;
-  gint                     *pending;
+  const GeglRectangle      *roi;
   gint                      level;
   gboolean                  success;
-  GeglRectangle             roi;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer input)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
+  GeglBuffer *input;
 
-  if (! input)
+  if (area->x == data->roi->x && area->y == data->roi->y)
+    {
+      input = g_object_ref (data->input);
+    }
+  else
     {
       input = gegl_operation_context_dup_input_maybe_copy (data->context,
-                                                           "input", &data->roi);
+                                                           "input", area);
     }
 
   if (!data->klass->process (data->operation,
-                             input, data->output, &data->roi, data->level))
+                             input, data->output, area, data->level))
     data->success = FALSE;
 
   g_object_unref (input);
-
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-    {
-      pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-                                 FALSE, NULL);
-    }
-  return pool;
 }
 
 static gboolean
@@ -187,11 +180,8 @@ gegl_operation_filter_process (GeglOperation        *operation,
 
   if (gegl_operation_use_threading (operation, result))
   {
-    gint threads = gegl_config_threads ();
+    ThreadData        data;
     GeglSplitStrategy split_strategy = GEGL_SPLIT_STRATEGY_AUTO;
-    GThreadPool *pool = thread_pool ();
-    ThreadData thread_data[GEGL_MAX_THREADS];
-    gint pending = threads;
 
     if (klass->get_split_strategy)
     {
@@ -199,60 +189,23 @@ gegl_operation_filter_process (GeglOperation        *operation,
                                                   output_prop, result, level);
     }
 
-    if (split_strategy == GEGL_SPLIT_STRATEGY_AUTO)
-    {
-      if (result->width > result->height)
-        split_strategy = GEGL_SPLIT_STRATEGY_VERTICAL;
-      else
-        split_strategy = GEGL_SPLIT_STRATEGY_HORIZONTAL;
-    }
-
-    if (split_strategy == GEGL_SPLIT_STRATEGY_VERTICAL)
-    {
-      gint bit = result->width / threads;
-      for (gint j = 0; j < threads; j++)
-      {
-        thread_data[j].roi.y = result->y;
-        thread_data[j].roi.height = result->height;
-        thread_data[j].roi.x = result->x + bit * j;
-        thread_data[j].roi.width = bit;
-      }
-      thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
-    }
-    else if (split_strategy == GEGL_SPLIT_STRATEGY_HORIZONTAL)
-    {
-      gint bit = result->height / threads;
-      for (gint j = 0; j < threads; j++)
-      {
-        thread_data[j].roi.x = result->x;
-        thread_data[j].roi.width = result->width;
-        thread_data[j].roi.y = result->y + bit * j;
-        thread_data[j].roi.height = bit;
-      }
-      thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
-    }
-    else
-    {
-      g_return_val_if_reached (FALSE);
-    }
-    for (gint i = 0; i < threads; i++)
-    {
-      thread_data[i].klass = klass;
-      thread_data[i].operation = operation;
-      thread_data[i].context = context;
-      thread_data[i].output = output;
-      thread_data[i].pending = &pending;
-      thread_data[i].level = level;
-      thread_data[i].success = TRUE;
-    }
-
-    for (gint i = 1; i < threads; i++)
-      g_thread_pool_push (pool, &thread_data[i], NULL);
-    thread_process (&thread_data[0], g_object_ref (input));
-
-    while (g_atomic_int_get (&pending)) {};
-
-    success = thread_data[0].success;
+    data.klass = klass;
+    data.operation = operation;
+    data.context = context;
+    data.input = input;
+    data.output = output;
+    data.roi = result;
+    data.level = level;
+    data.success = TRUE;
+
+    gegl_parallel_distribute_area (
+      result,
+      gegl_operation_get_min_threaded_sub_area (operation),
+      split_strategy,
+      (GeglParallelDistributeAreaFunc) thread_process,
+      &data);
+
+    success = data.success;
   }
   else
   {
diff --git a/gegl/operation/gegl-operation-point-composer.c b/gegl/operation/gegl-operation-point-composer.c
index bacc14688..eb75e151c 100644
--- a/gegl/operation/gegl-operation-point-composer.c
+++ b/gegl/operation/gegl-operation-point-composer.c
@@ -46,23 +46,22 @@ typedef struct ThreadData
   GeglBuffer                      *input;
   GeglBuffer                      *aux;
   GeglBuffer                      *output;
-  gint                            *pending;
   gint                             level;
   gboolean                         success;
-  GeglRectangle                    result;
 
   const Babl *input_format;
   const Babl *aux_format;
   const Babl *output_format;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
   gint read = 0;
   gint aux  = 0;
   GeglBufferIterator *i = gegl_buffer_iterator_new (data->output,
-                                                    &data->result,
+                                                    area,
                                                     data->level,
                                                     data->output_format,
                                                     GEGL_ACCESS_WRITE,
@@ -70,11 +69,11 @@ static void thread_process (gpointer thread_data, gpointer unused)
                                                     4);
 
   if (data->input)
-    read = gegl_buffer_iterator_add (i, data->input, &data->result, data->level,
+    read = gegl_buffer_iterator_add (i, data->input, area, data->level,
                                      data->input_format,
                                      GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
   if (data->aux)
-    aux = gegl_buffer_iterator_add (i, data->aux, &data->result, data->level,
+    aux = gegl_buffer_iterator_add (i, data->aux, area, data->level,
                                     data->aux_format,
                                     GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
 
@@ -85,19 +84,6 @@ static void thread_process (gpointer thread_data, gpointer unused)
                            data->aux?i->items[aux].data:NULL,
                            i->items[0].data, i->length, &(i->items[0].roi), data->level);
   }
-
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-    {
-      pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-                                 FALSE, NULL);
-    }
-  return pool;
 }
 
 static gboolean
@@ -321,54 +307,19 @@ gegl_operation_point_composer_process (GeglOperation       *operation,
               return TRUE;
         }
 
-      if (gegl_operation_use_threading (operation, result) && result->height > 1)
+      if (gegl_operation_use_threading (operation, result))
       {
-        gint threads = gegl_config_threads ();
-        ThreadData thread_data[GEGL_MAX_THREADS];
-        GThreadPool *pool = thread_pool ();
-        gint pending;
-        gint j;
-
-        if (result->width > result->height)
-          for (j = 0; j < threads; j++)
-          {
-            GeglRectangle rect = *result;
-            rect.width /= threads;
-            rect.x += rect.width * j;
-
-            if (j == threads-1)
-              rect.width = (result->width + result->x) - rect.x;
-
-            thread_data[j].result = rect;
-          }
-        else
-          for (j = 0; j < threads; j++)
-          {
-            GeglRectangle rect = *result;
-            rect = *result;
-            rect.height /= threads;
-            rect.y += rect.height * j;
-
-            if (j == threads-1)
-              rect.height = (result->height + result->y) - rect.y;
-
-            thread_data[j].result = rect;
-          }
-
-        for (j = 0; j < threads; j++)
-        {
-          thread_data[j].klass = point_composer_class;
-          thread_data[j].operation = operation;
-          thread_data[j].input = input;
-          thread_data[j].aux = aux;
-          thread_data[j].output = output;
-          thread_data[j].pending = &pending;
-          thread_data[j].level = level;
-          thread_data[j].input_format = in_format;
-          thread_data[j].aux_format = aux_format;
-          thread_data[j].output_format = out_format;
-        }
-        pending = threads;
+        ThreadData data;
+
+        data.klass = point_composer_class;
+        data.operation = operation;
+        data.input = input;
+        data.aux = aux;
+        data.output = output;
+        data.level = level;
+        data.input_format = in_format;
+        data.aux_format = aux_format;
+        data.output_format = out_format;
 
         if (gegl_cl_is_accelerated ())
         {
@@ -378,11 +329,12 @@ gegl_operation_point_composer_process (GeglOperation       *operation,
             gegl_buffer_flush_ext (aux, result);
         }
 
-        for (gint j = 1; j < threads; j++)
-          g_thread_pool_push (pool, &thread_data[j], NULL);
-
-        thread_process (&thread_data[0], NULL);
-        while (g_atomic_int_get (&pending)) {};
+        gegl_parallel_distribute_area (
+          result,
+          gegl_operation_get_min_threaded_sub_area (operation),
+          GEGL_SPLIT_STRATEGY_AUTO,
+          (GeglParallelDistributeAreaFunc) thread_process,
+          &data);
 
         return TRUE;
       }
diff --git a/gegl/operation/gegl-operation-point-composer3.c b/gegl/operation/gegl-operation-point-composer3.c
index ebfcd22a5..56279396f 100644
--- a/gegl/operation/gegl-operation-point-composer3.c
+++ b/gegl/operation/gegl-operation-point-composer3.c
@@ -42,10 +42,8 @@ typedef struct ThreadData
   GeglBuffer                       *aux;
   GeglBuffer                       *aux2;
   GeglBuffer                       *output;
-  gint                             *pending;
   gint                              level;
   gboolean                          success;
-  GeglRectangle                     result;
 
   const Babl *input_format;
   const Babl *aux_format;
@@ -53,29 +51,30 @@ typedef struct ThreadData
   const Babl *output_format;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
   gint read = 0;
   gint aux  = 0;
   gint aux2 = 0;
   GeglBufferIterator *i = gegl_buffer_iterator_new (data->output,
-                                                    &data->result,
+                                                    area,
                                                     data->level,
                                                     data->output_format,
                                                     GEGL_ACCESS_WRITE,
                                                     GEGL_ABYSS_NONE, 4);
 
   if (data->input)
-    read = gegl_buffer_iterator_add (i, data->input, &data->result, data->level,
+    read = gegl_buffer_iterator_add (i, data->input, area, data->level,
                                      data->input_format,
                                      GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
   if (data->aux)
-    aux = gegl_buffer_iterator_add (i, data->aux, &data->result, data->level,
+    aux = gegl_buffer_iterator_add (i, data->aux, area, data->level,
                                     data->aux_format,
                                     GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
   if (data->aux2)
-    aux2 = gegl_buffer_iterator_add (i, data->aux2, &data->result, data->level,
+    aux2 = gegl_buffer_iterator_add (i, data->aux2, area, data->level,
                                     data->aux2_format,
                                     GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
 
@@ -88,19 +87,6 @@ static void thread_process (gpointer thread_data, gpointer unused)
                            data->aux2?i->items[aux2].data:NULL,
                            i->items[0].data, i->length, &(i->items[0].roi), data->level);
   }
-
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-    {
-      pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-                                 FALSE, NULL);
-    }
-  return pool;
 }
 
 static gboolean
@@ -235,58 +221,21 @@ gegl_operation_point_composer3_process (GeglOperation       *operation,
 
   if ((result->width > 0) && (result->height > 0))
     {
-      if (gegl_operation_use_threading (operation, result) && result->height > 1)
+      if (gegl_operation_use_threading (operation, result))
       {
-        gint threads = gegl_config_threads ();
-        ThreadData thread_data[GEGL_MAX_THREADS];
-        GThreadPool *pool = thread_pool ();
-        gint pending;
-        gint j;
-
-        if (result->width > result->height)
-          for (j = 0; j < threads; j++)
-          {
-            GeglRectangle rect = *result;
-
-            rect.width /= threads;
-            rect.x += rect.width * j;
-
-            if (j == threads-1)
-              rect.width = (result->width + result->x) - rect.x;
-
-            thread_data[j].result = rect;
-          }
-        else
-          for (j = 0; j < threads; j++)
-          {
-            GeglRectangle rect = *result;
-
-            rect = *result;
-            rect.height /= threads;
-            rect.y += rect.height * j;
-
-            if (j == threads-1)
-              rect.height = (result->height + result->y) - rect.y;
-
-            thread_data[j].result = rect;
-          }
-
-        for (j = 0; j < threads; j++)
-        {
-          thread_data[j].klass = point_composer3_class;
-          thread_data[j].operation = operation;
-          thread_data[j].input = input;
-          thread_data[j].aux = aux;
-          thread_data[j].aux2 = aux2;
-          thread_data[j].output = output;
-          thread_data[j].pending = &pending;
-          thread_data[j].level = level;
-          thread_data[j].input_format = in_format;
-          thread_data[j].aux_format = aux_format;
-          thread_data[j].aux2_format = aux2_format;
-          thread_data[j].output_format = out_format;
-        }
-        pending = threads;
+        ThreadData data;
+
+        data.klass = point_composer3_class;
+        data.operation = operation;
+        data.input = input;
+        data.aux = aux;
+        data.aux2 = aux2;
+        data.output = output;
+        data.level = level;
+        data.input_format = in_format;
+        data.aux_format = aux_format;
+        data.aux2_format = aux2_format;
+        data.output_format = out_format;
 
         if (gegl_cl_is_accelerated ())
         {
@@ -298,10 +247,12 @@ gegl_operation_point_composer3_process (GeglOperation       *operation,
             gegl_buffer_flush_ext (aux2, result);
         }
 
-        for (gint j = 1; j < threads; j++)
-          g_thread_pool_push (pool, &thread_data[j], NULL);
-        thread_process (&thread_data[0], NULL);
-        while (g_atomic_int_get (&pending)) {};
+        gegl_parallel_distribute_area (
+          result,
+          gegl_operation_get_min_threaded_sub_area (operation),
+          GEGL_SPLIT_STRATEGY_AUTO,
+          (GeglParallelDistributeAreaFunc) thread_process,
+          &data);
 
         return TRUE;
       }
diff --git a/gegl/operation/gegl-operation-point-filter.c b/gegl/operation/gegl-operation-point-filter.c
index 6fffa74a1..365cb8c01 100644
--- a/gegl/operation/gegl-operation-point-filter.c
+++ b/gegl/operation/gegl-operation-point-filter.c
@@ -44,26 +44,25 @@ typedef struct ThreadData
   GeglOperation                 *operation;
   GeglBuffer                    *input;
   GeglBuffer                    *output;
-  gint                          *pending;
-  GeglRectangle                  result;
   gint                           level;
   gboolean                       success;
   const Babl                    *input_format;
   const Babl                    *output_format;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
   GeglBufferIterator *i = gegl_buffer_iterator_new (data->output,
-                                                    &data->result,
+                                                    area,
                                                     data->level,
                                                     data->output_format,
                                                     GEGL_ACCESS_WRITE,
                                                     GEGL_ABYSS_NONE, 4);
   gint read = 0;
   if (data->input)
-    read = gegl_buffer_iterator_add (i, data->input, &data->result, data->level,
+    read = gegl_buffer_iterator_add (i, data->input, area, data->level,
                                      data->input_format,
                                      GEGL_ACCESS_READ, GEGL_ABYSS_NONE);
 
@@ -73,19 +72,6 @@ static void thread_process (gpointer thread_data, gpointer unused)
      data->klass->process (data->operation, data->input?i->items[read].data:NULL,
                            i->items[0].data, i->length, &(i->items[0].roi), data->level);
   }
-
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-    {
-      pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-                                 FALSE, NULL);
-    }
-  return pool;
 }
 
 static gboolean
@@ -286,59 +272,27 @@ gegl_operation_point_filter_process (GeglOperation       *operation,
             return TRUE;
       }
 
-      if (gegl_operation_use_threading (operation, result) && result->height > 1)
+      if (gegl_operation_use_threading (operation, result))
       {
-        gint threads = gegl_config_threads ();
-        ThreadData thread_data[GEGL_MAX_THREADS];
-        GThreadPool *pool = thread_pool ();
-        gint pending;
-        int j;
-
-        if (result->width > result->height)
-          for (j = 0; j < threads; j++)
-          {
-            GeglRectangle rect = *result;
-            rect.width /= threads;
-            rect.x += rect.width * j;
-
-            if (j == threads-1)
-              rect.width = (result->width + result->x) - rect.x;
-
-            thread_data[j].result = rect;
-          }
-        else
-          for (j = 0; j < threads; j++)
-          {
-            GeglRectangle rect = *result;
-            rect = *result;
-            rect.height /= threads;
-            rect.y += rect.height * j;
-            if (j == threads-1)
-              rect.height = (result->height + result->y) - rect.y;
-            thread_data[j].result = rect;
-          }
-
-        for (j = 0; j < threads; j++)
-        {
-          thread_data[j].klass = point_filter_class;
-          thread_data[j].operation = operation;
-          thread_data[j].input = input;
-          thread_data[j].output = output;
-          thread_data[j].pending = &pending;
-          thread_data[j].level = level;
-          thread_data[j].input_format = in_format;
-          thread_data[j].output_format = out_format;
-        }
+        ThreadData data;
 
-        pending = threads;
+        data.klass = point_filter_class;
+        data.operation = operation;
+        data.input = input;
+        data.output = output;
+        data.level = level;
+        data.input_format = in_format;
+        data.output_format = out_format;
 
         if (gegl_cl_is_accelerated () && input)
           gegl_buffer_flush_ext (input, result);
 
-        for (gint j = 1; j < threads; j++)
-          g_thread_pool_push (pool, &thread_data[j], NULL);
-        thread_process (&thread_data[0], NULL);
-        while (g_atomic_int_get (&pending)) {};
+        gegl_parallel_distribute_area (
+          result,
+          gegl_operation_get_min_threaded_sub_area (operation),
+          GEGL_SPLIT_STRATEGY_AUTO,
+          (GeglParallelDistributeAreaFunc) thread_process,
+          &data);
 
         return TRUE;
       }
diff --git a/gegl/operation/gegl-operation-source.c b/gegl/operation/gegl-operation-source.c
index a226018c2..a64881e94 100644
--- a/gegl/operation/gegl-operation-source.c
+++ b/gegl/operation/gegl-operation-source.c
@@ -82,30 +82,17 @@ typedef struct ThreadData
   GeglOperationSourceClass *klass;
   GeglOperation            *operation;
   GeglBuffer               *output;
-  gint                     *pending;
   gint                      level;
   gboolean                  success;
-  GeglRectangle             roi;
 } ThreadData;
 
-static void thread_process (gpointer thread_data, gpointer unused)
+static void
+thread_process (const GeglRectangle *area,
+                ThreadData          *data)
 {
-  ThreadData *data = thread_data;
   if (!data->klass->process (data->operation,
-                       data->output, &data->roi, data->level))
+                       data->output, area, data->level))
     data->success = FALSE;
-  g_atomic_int_add (data->pending, -1);
-}
-
-static GThreadPool *thread_pool (void)
-{
-  static GThreadPool *pool = NULL;
-  if (!pool)
-    {
-      pool =  g_thread_pool_new (thread_process, NULL, gegl_config_threads (),
-                                 FALSE, NULL);
-    }
-  return pool;
 }
 
 static gboolean
@@ -130,52 +117,22 @@ gegl_operation_source_process (GeglOperation        *operation,
 
   if (gegl_operation_use_threading (operation, result))
   {
-    gint threads = gegl_config_threads ();
-    GThreadPool *pool = thread_pool ();
-    ThreadData thread_data[GEGL_MAX_THREADS];
-    gint pending = threads;
-
-    if (result->width > result->height)
-    {
-      gint bit = result->width / threads;
-      for (gint j = 0; j < threads; j++)
-      {
-        thread_data[j].roi.y = result->y;
-        thread_data[j].roi.height = result->height;
-        thread_data[j].roi.x = result->x + bit * j;
-        thread_data[j].roi.width = bit;
-      }
-      thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
-    }
-    else
-    {
-      gint bit = result->height / threads;
-      for (gint j = 0; j < threads; j++)
-      {
-        thread_data[j].roi.x = result->x;
-        thread_data[j].roi.width = result->width;
-        thread_data[j].roi.y = result->y + bit * j;
-        thread_data[j].roi.height = bit;
-      }
-      thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
-    }
-    for (gint i = 0; i < threads; i++)
-    {
-      thread_data[i].klass = klass;
-      thread_data[i].operation = operation;
-      thread_data[i].output = output;
-      thread_data[i].pending = &pending;
-      thread_data[i].level = level;
-      thread_data[i].success = TRUE;
-    }
-
-    for (gint i = 1; i < threads; i++)
-      g_thread_pool_push (pool, &thread_data[i], NULL);
-    thread_process (&thread_data[0], NULL);
-
-    while (g_atomic_int_get (&pending)) {};
-
-    success = thread_data[0].success;
+    ThreadData data;
+
+    data.klass = klass;
+    data.operation = operation;
+    data.output = output;
+    data.level = level;
+    data.success = TRUE;
+
+    gegl_parallel_distribute_area (
+      result,
+      gegl_operation_get_min_threaded_sub_area (operation),
+      GEGL_SPLIT_STRATEGY_AUTO,
+      (GeglParallelDistributeAreaFunc) thread_process,
+      &data);
+
+    success = data.success;
   }
   else
   {
diff --git a/gegl/operation/gegl-operation.c b/gegl/operation/gegl-operation.c
index de7bc9ccd..f60922ce7 100644
--- a/gegl/operation/gegl-operation.c
+++ b/gegl/operation/gegl-operation.c
@@ -816,12 +816,20 @@ gegl_operation_use_threading (GeglOperation *operation,
       return FALSE;
 
     if (op_class->threaded &&
-        roi->width * roi->height > 64*64)
+        roi->width * roi->height >
+        gegl_operation_get_min_threaded_sub_area (operation))
       return TRUE;
   }
   return FALSE;
 }
 
+gsize
+gegl_operation_get_min_threaded_sub_area (GeglOperation *operation)
+{
+  /* FIXME: too arbitrary? */
+  return 64 * 64;
+}
+
 static guchar *gegl_temp_alloc[GEGL_MAX_THREADS * 4]={NULL,};
 static gint    gegl_temp_size[GEGL_MAX_THREADS * 4]={0,};
 
diff --git a/gegl/operation/gegl-operation.h b/gegl/operation/gegl-operation.h
index a48e2adc9..b4d473fb4 100644
--- a/gegl/operation/gegl-operation.h
+++ b/gegl/operation/gegl-operation.h
@@ -248,6 +248,8 @@ gboolean      gegl_operation_use_opencl         (const GeglOperation *operation)
 gboolean
 gegl_operation_use_threading (GeglOperation *operation,
                               const GeglRectangle *roi);
+gsize
+gegl_operation_get_min_threaded_sub_area (GeglOperation *operation);
 
 /* Invalidate a specific rectangle, indicating the any computation depending
  * on this roi is now invalid.


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]