[ocrfeeder/new_fixes: 4/10] Change the async worker to use threads for its job execution
- From: Joaquim Manuel Pereira Rocha <jrocha src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ocrfeeder/new_fixes: 4/10] Change the async worker to use threads for its job execution
- Date: Mon, 1 Oct 2012 19:51:07 +0000 (UTC)
commit 6ee1bc5fa106725fd3c8bd304cb400b49d97e446
Author: Jan Losinski <losinski wh2 tu-dresden de>
Date: Thu Jul 26 20:41:23 2012 +0200
Change the async worker to use threads for its job execution
The async worker is only one thread to decouple the job execution from
the ui. This introduces per-job-threads within the async worker. This
allows to execute more than one job at a time in the worker. The maximum
spawned threads are controlled by a semaphore. The count is hardcoded at
the moment.
Signed-off-by: Jan Losinski <losinski wh2 tu-dresden de>
src/ocrfeeder/studio/widgetModeler.py | 4 +-
src/ocrfeeder/studio/widgetPresenter.py | 4 +-
src/ocrfeeder/util/asyncworker.py | 70 +++++++++++++++++++++++--------
3 files changed, 56 insertions(+), 22 deletions(-)
---
diff --git a/src/ocrfeeder/studio/widgetModeler.py b/src/ocrfeeder/studio/widgetModeler.py
index 0e3bcd6..7d27554 100644
--- a/src/ocrfeeder/studio/widgetModeler.py
+++ b/src/ocrfeeder/studio/widgetModeler.py
@@ -585,7 +585,7 @@ class ImageReviewer_Controler:
def recognizeDocument(self):
pages = self.source_images_selector_widget.getAllPages()
- dialog = QueuedEventsProgressDialog(self.main_window.window)
+ dialog = QueuedEventsProgressDialog(self.main_window.window, parallel=4)
items = []
i = 1
total = len(pages)
@@ -645,7 +645,7 @@ class ImageReviewer_Controler:
pages_to_process,
data_boxes, error):
page.data_boxes = data_boxes
- if page == pages_to_process[-1]:
+ if dialog.worker.done:
dialog.cancel()
self.__updateImageReviewers()
diff --git a/src/ocrfeeder/studio/widgetPresenter.py b/src/ocrfeeder/studio/widgetPresenter.py
index c8e1dc3..cd970cf 100644
--- a/src/ocrfeeder/studio/widgetPresenter.py
+++ b/src/ocrfeeder/studio/widgetPresenter.py
@@ -1170,12 +1170,12 @@ class CommandProgressBarDialog(gtk.Dialog):
class QueuedEventsProgressDialog(gtk.Dialog):
- def __init__(self, parent, items_list = []):
+ def __init__(self, parent, items_list = [], parallel=1):
super(QueuedEventsProgressDialog, self).__init__(parent = parent,
flags = gtk.DIALOG_MODAL)
self.set_icon_from_file(WINDOW_ICON)
self.info_list = []
- self.worker = AsyncWorker()
+ self.worker = AsyncWorker(parallel)
self.setItemsList(items_list)
self.label = gtk.Label()
self.__makeProgressBar()
diff --git a/src/ocrfeeder/util/asyncworker.py b/src/ocrfeeder/util/asyncworker.py
index e1ad5a1..b2731a6 100644
--- a/src/ocrfeeder/util/asyncworker.py
+++ b/src/ocrfeeder/util/asyncworker.py
@@ -22,11 +22,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###########################################################################
-from threading import Thread
+from threading import Thread, BoundedSemaphore, RLock
import Queue
import gobject
from lib import debug
+
class AsyncItem(object):
def __init__(self, target_method, target_method_args, finish_callback = None, finish_callback_args = ()):
@@ -54,32 +55,65 @@ class AsyncItem(object):
def cancel(self):
self.canceled = True
+
+ready_lock = RLock()
+
+
class AsyncWorker(Thread):
- def __init__(self):
+ def __init__(self, parallel=1):
Thread.__init__(self)
self.queue = Queue.Queue(0)
self.stopped = False
- self.async_item = None
self.item_number = -1
+ self.parallel = parallel
+ self.running_items = []
+ self.worker_threads = []
+ self.thread_sem = BoundedSemaphore(value=parallel)
+ self.done = False
+ self.queue_processing = True
+
def run(self):
- while not self.stopped:
- if self.queue.empty():
- self.stop()
- break
- try:
- self.async_item = self.queue.get()
- self.item_number += 1
- self.async_item.run()
- self.queue.task_done()
- self.async_item = None
- except Exception, exception:
- debug(str(exception))
- self.stop()
+ try:
+ while not self.stopped or not self.queue.empty():
+ try:
+ async_item = self.queue.get(False)
+ self.item_number += 1
+
+ thread = Thread(target=self._run_item, args=(async_item, ))
+ self.thread_sem.acquire()
+ self.worker_threads.append(thread)
+ thread.start()
+
+ except Queue.Empty:
+ break
+
+ except Exception, exception:
+ debug(str(exception))
+ self.stop()
+ finally:
+ with ready_lock:
+ self.queue_processing = False
+ for thread in self.worker_threads:
+ thread.join()
+ self.stopped = True
+
def stop(self):
self.stopped = True
- if self.async_item:
- self.async_item.cancel()
+ if len(self.running_items):
+ for async_item in self.running_items:
+ async_item.cancel()
+
+ def _run_item(self, async_item):
+ with ready_lock:
+ self.running_items.append(async_item)
+ async_item.run()
+ self.queue.task_done()
+ self.running_items.remove(async_item)
+ with ready_lock:
+ if not self.queue_processing and not self.running_items:
+ self.done = True
+ self.thread_sem.release()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]