[gnome-music/wip/mschraal/prioritypool-v3: 5/5] asyncqueue: Add a slow running task pool
- From: Marinus Schraal <mschraal src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-music/wip/mschraal/prioritypool-v3: 5/5] asyncqueue: Add a slow running task pool
- Date: Tue, 1 Feb 2022 23:10:47 +0000 (UTC)
commit fc26f85051fa8591d93dd0f63f9f35e671265dce
Author: Marinus Schraal <mschraal gnome org>
Date: Tue Feb 1 21:49:18 2022 +0100
asyncqueue: Add a slow running task pool
gnomemusic/asyncqueue.py | 30 ++++++++++++++++++++++++++----
1 file changed, 26 insertions(+), 4 deletions(-)
---
diff --git a/gnomemusic/asyncqueue.py b/gnomemusic/asyncqueue.py
index 9b39ab6e5..813f95285 100644
--- a/gnomemusic/asyncqueue.py
+++ b/gnomemusic/asyncqueue.py
@@ -22,7 +22,8 @@
# code, but you are not obligated to do so. If you do not wish to do so,
# delete this exception statement from your version.
-from typing import Any, Dict, List, Optional, Tuple
+from collections import deque
+from typing import Any, Deque, Dict, List, Optional, Tuple
import time
from gi.repository import GObject, GLib
@@ -58,8 +59,10 @@ class AsyncQueue(GObject.GObject):
self._async_pool: Dict[int, Tuple] = {}
self._async_pool_coreobject_hash: Dict = {}
self._async_active_pool: Dict[int, Tuple] = {}
+ self._async_active_slow_pool: Dict[int, Tuple] = {}
self._async_data: Dict[object, Tuple[int, float]] = {}
self._log = MusicLogger()
+ self._task_tracker: Deque[Any] = deque([], 5)
self._max_async = 4
self._priority_pool = PriorityPool()
self._queue_name = queue_name if queue_name else self
@@ -76,7 +79,8 @@ class AsyncQueue(GObject.GObject):
async_obj_id = id(args[0])
if (async_obj_id not in self._async_pool
- and async_obj_id not in self._async_active_pool):
+ and async_obj_id not in self._async_active_pool
+ and async_obj_id not in self._async_active_slow_pool):
self._async_pool[async_obj_id] = (args)
self._async_pool_coreobject_hash[async_obj_id] = id(args[1])
@@ -84,8 +88,9 @@ class AsyncQueue(GObject.GObject):
self._timeout_id = GLib.timeout_add(100, self._dispatch)
def _dispatch(self) -> bool:
- common_ids = self._common_ids()
+ self._track_long_running_tasks()
+ common_ids = self._common_ids()
while len(self._async_active_pool) < self._max_async:
if len(self._async_pool) == 0:
self._timeout_id = 0
@@ -127,8 +132,25 @@ class AsyncQueue(GObject.GObject):
f"{a} active task(s) of {len(self._async_pool) + a}")
obj.disconnect(handler_id)
- self._async_active_pool.pop(id(obj))
+ if id(obj) in self._async_active_pool.keys():
+ self._async_active_pool.pop(id(obj))
+ else:
+ self._async_active_slow_pool.pop(id(obj))
def _common_ids(self) -> List[int]:
return list(set(self._priority_pool.props.pool).intersection(
self._async_pool_coreobject_hash.values()))
+
+ def _track_long_running_tasks(self) -> None:
+ self._task_tracker.append(self._async_active_pool.keys())
+
+ if len(self._task_tracker) < 5:
+ return
+
+ common_task_ids = self._task_tracker[0]
+ for _, task_ids in enumerate(self._task_tracker):
+ common_task_ids = set(common_task_ids).intersection(task_ids)
+
+ for task_id in common_task_ids:
+ self._async_active_slow_pool[
+ task_id] = self._async_active_pool.pop(task_id)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]