[ocrfeeder/new_fixes: 9/10] add a multiprocess Pool to the async worker.
- From: Joaquim Manuel Pereira Rocha <jrocha src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ocrfeeder/new_fixes: 9/10] add a multiprocess Pool to the async worker.
- Date: Mon, 1 Oct 2012 19:51:33 +0000 (UTC)
commit b45eb02ae0810ee28547b314ea321b6e3d4dd420
Author: Jan Losinski <losinski wh2 tu-dresden de>
Date: Fri Jul 27 15:25:38 2012 +0200
add a multiprocess Pool to the async worker.
The multiprocess.Pool is a Process pool for doing parallel work without
a limiting GIL - so its really parallel if you have more than one cpu.
The Pool usage is limited for cases where the "parallel" argument of the
AsyncWorker is >1 - in other cases this makes no sense and is uneccesary
overhead.
Another benefit is, that there are less shared-state-problems that
treads normally introduce - because of the real process insolation.
Signed-off-by: Jan Losinski <losinski wh2 tu-dresden de>
src/ocrfeeder/util/asyncworker.py | 26 +++++++++++++++++++++++---
1 files changed, 23 insertions(+), 3 deletions(-)
---
diff --git a/src/ocrfeeder/util/asyncworker.py b/src/ocrfeeder/util/asyncworker.py
index b2731a6..ffee92f 100644
--- a/src/ocrfeeder/util/asyncworker.py
+++ b/src/ocrfeeder/util/asyncworker.py
@@ -26,23 +26,29 @@ from threading import Thread, BoundedSemaphore, RLock
import Queue
import gobject
from lib import debug
+from multiprocessing import Pool
class AsyncItem(object):
- def __init__(self, target_method, target_method_args, finish_callback = None, finish_callback_args = ()):
+ def __init__(self, target_method, target_method_args, finish_callback = None, finish_callback_args = (), process_pool=None):
self.target_method = target_method
self.target_method_args = target_method_args
self.finish_callback = finish_callback
self.finish_callback_args = finish_callback_args
self.canceled = False
+ self.process_pool = process_pool
def run(self):
if self.canceled:
return
results = error = None
try:
- results = self.target_method(*self.target_method_args)
+ if self.process_pool is None:
+ results = self.target_method(*self.target_method_args)
+ else:
+ results = self.process_pool.apply(self.target_method, self.target_method_args)
+
except Exception, exception:
debug(str(exception))
error = exception
@@ -64,6 +70,7 @@ class AsyncWorker(Thread):
def __init__(self, parallel=1):
Thread.__init__(self)
self.queue = Queue.Queue(0)
+ self.daemon = True
self.stopped = False
self.item_number = -1
@@ -74,19 +81,28 @@ class AsyncWorker(Thread):
self.done = False
self.queue_processing = True
+ self.process_pool = None
+ if parallel > 1:
+ self.process_pool = Pool(parallel)
+
+
def run(self):
try:
- while not self.stopped or not self.queue.empty():
+ while not self.stopped and not self.queue.empty():
try:
async_item = self.queue.get(False)
self.item_number += 1
thread = Thread(target=self._run_item, args=(async_item, ))
+ self.running_items.append(async_item)
self.thread_sem.acquire()
+ async_item.process_pool = self.process_pool
self.worker_threads.append(thread)
thread.start()
except Queue.Empty:
+ if self.process_pool is not None:
+ self.process_pool.close()
break
except Exception, exception:
@@ -98,6 +114,8 @@ class AsyncWorker(Thread):
for thread in self.worker_threads:
thread.join()
self.stopped = True
+ if self.process_pool is not None:
+ self.process_pool.terminate()
def stop(self):
@@ -105,6 +123,8 @@ class AsyncWorker(Thread):
if len(self.running_items):
for async_item in self.running_items:
async_item.cancel()
+ if self.process_pool is not None:
+ self.process_pool.close()
def _run_item(self, async_item):
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]