[gnome-continuous-yocto/gnomeostree-3.28-rocko: 1939/8267] bitbake: runqueue: Abstract worker functionality to an object/array
- From: Emmanuele Bassi <ebassi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-continuous-yocto/gnomeostree-3.28-rocko: 1939/8267] bitbake: runqueue: Abstract worker functionality to an object/array
- Date: Sat, 16 Dec 2017 22:31:48 +0000 (UTC)
commit 0ef16f083eddb0eccd5fd1604e6e922a38705ae5
Author: Richard Purdie <richard purdie linuxfoundation org>
Date: Mon Aug 15 17:58:39 2016 +0100
bitbake: runqueue: Abstract worker functionality to an object/array
With the introduction of multi-config and the possibility of distributed
builds we need arrays of workers rather than the existing two.
This refactors the code to have a dict() of workers and a dict of
fakeworkers, represented by objects. The code can iterate over these.
This is separated out from the multi-config changes since its separable
and clearer this way.
(Bitbake rev: 8181d96e0a4df0aa47287669681116fa65bcae16)
Signed-off-by: Richard Purdie <richard purdie linuxfoundation org>
bitbake/lib/bb/runqueue.py | 122 ++++++++++++++++++++++++--------------------
1 files changed, 66 insertions(+), 56 deletions(-)
---
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 3a593b6..6a953b8 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -922,6 +922,11 @@ class RunQueueData:
self.runtaskentries[tid].depends,
self.runtaskentries[tid].revdeps)
+class RunQueueWorker():
+ def __init__(self, process, pipe):
+ self.process = process
+ self.pipe = pipe
+
class RunQueue:
def __init__(self, cooker, cfgData, dataCache, taskData, targets):
@@ -940,10 +945,8 @@ class RunQueue:
self.dm = monitordisk.diskMonitor(cfgData)
self.rqexe = None
- self.worker = None
- self.workerpipe = None
- self.fakeworker = None
- self.fakeworkerpipe = None
+ self.worker = {}
+ self.fakeworker = {}
def _start_worker(self, fakeroot = False, rqexec = None):
logger.debug(1, "Starting bitbake-worker")
@@ -988,55 +991,56 @@ class RunQueue:
worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
worker.stdin.flush()
- return worker, workerpipe
+ return RunQueueWorker(worker, workerpipe)
- def _teardown_worker(self, worker, workerpipe):
+ def _teardown_worker(self, worker):
if not worker:
return
logger.debug(1, "Teardown for bitbake-worker")
try:
- worker.stdin.write(b"<quit></quit>")
- worker.stdin.flush()
- worker.stdin.close()
+ worker.process.stdin.write(b"<quit></quit>")
+ worker.process.stdin.flush()
+ worker.process.stdin.close()
except IOError:
pass
- while worker.returncode is None:
- workerpipe.read()
- worker.poll()
- while workerpipe.read():
+ while worker.process.returncode is None:
+ worker.pipe.read()
+ worker.process.poll()
+ while worker.pipe.read():
continue
- workerpipe.close()
+ worker.pipe.close()
def start_worker(self):
if self.worker:
self.teardown_workers()
self.teardown = False
- self.worker, self.workerpipe = self._start_worker()
+ self.worker[''] = self._start_worker()
def start_fakeworker(self, rqexec):
if not self.fakeworker:
- self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
+ self.fakeworker[''] = self._start_worker(True, rqexec)
def teardown_workers(self):
self.teardown = True
- self._teardown_worker(self.worker, self.workerpipe)
- self.worker = None
- self.workerpipe = None
- self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
- self.fakeworker = None
- self.fakeworkerpipe = None
+ for mc in self.worker:
+ self._teardown_worker(self.worker[mc])
+ self.worker = {}
+ for mc in self.fakeworker:
+ self._teardown_worker(self.fakeworker[mc])
+ self.fakeworker = {}
def read_workers(self):
- self.workerpipe.read()
- if self.fakeworkerpipe:
- self.fakeworkerpipe.read()
+ for mc in self.worker:
+ self.worker[mc].pipe.read()
+ for mc in self.fakeworker:
+ self.fakeworker[mc].pipe.read()
def active_fds(self):
fds = []
- if self.workerpipe:
- fds.append(self.workerpipe.input)
- if self.fakeworkerpipe:
- fds.append(self.fakeworkerpipe.input)
+ for mc in self.worker:
+ fds.append(self.worker[mc].pipe.input)
+ for mc in self.fakeworker:
+ fds.append(self.fakeworker[mc].pipe.input)
return fds
def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
@@ -1393,9 +1397,10 @@ class RunQueueExecute:
self.stampcache = {}
- rq.workerpipe.setrunqueueexec(self)
- if rq.fakeworkerpipe:
- rq.fakeworkerpipe.setrunqueueexec(self)
+ for mc in rq.worker:
+ rq.worker[mc].pipe.setrunqueueexec(self)
+ for mc in rq.fakeworker:
+ rq.fakeworker[mc].pipe.setrunqueueexec(self)
if self.number_tasks <= 0:
bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
@@ -1414,15 +1419,21 @@ class RunQueueExecute:
return True
def finish_now(self):
- for worker in [self.rq.worker, self.rq.fakeworker]:
- if not worker:
- continue
+ for mc in self.rq.worker:
+ try:
+ self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ self.rq.worker[mc].process.stdin.flush()
+ except IOError:
+ # worker must have died?
+ pass
+ for mc in self.rq.fakeworker:
try:
- worker.stdin.write(b"<finishnow></finishnow>")
- worker.stdin.flush()
+ self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+ self.rq.fakeworker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
+
if len(self.failed_fns) != 0:
self.rq.state = runQueueFailed
return
@@ -1733,11 +1744,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
self.rq.state = runQueueFailed
return True
- self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False,
self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
- self.rq.fakeworker.stdin.flush()
+ self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname,
False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
+ self.rq.fakeworker[''].process.stdin.flush()
else:
- self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False,
self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
- self.rq.worker.stdin.flush()
+ self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname,
False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
+ self.rq.worker[''].process.stdin.flush()
self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
self.build_stamps2.append(self.build_stamps[task])
@@ -2143,11 +2154,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not
self.cooker.configuration.dry_run:
if not self.rq.fakeworker:
self.rq.start_fakeworker(self)
- self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True,
self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
- self.rq.fakeworker.stdin.flush()
+ self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname,
True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
+ self.rq.fakeworker[''].process.stdin.flush()
else:
- self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True,
self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
- self.rq.worker.stdin.flush()
+ self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname,
True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
+ self.rq.worker[''].process.stdin.flush()
self.runq_running.add(task)
self.stats.taskActive()
@@ -2301,17 +2312,16 @@ class runQueuePipe():
def read(self):
for w in [self.rq.worker, self.rq.fakeworker]:
- if not w:
- continue
- w.poll()
- if w.returncode is not None and not self.rq.teardown:
- name = None
- if self.rq.worker and w.pid == self.rq.worker.pid:
- name = "Worker"
- elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
- name = "Fakeroot"
- bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid,
str(w.returncode)))
- self.rq.finish_runqueue(True)
+ for mc in w:
+ w[mc].process.poll()
+ if w[mc].process.returncode is not None and not self.rq.teardown:
+ name = None
+ if w in self.rq.worker:
+ name = "Worker"
+ elif w in self.rq.fakeworker:
+ name = "Fakeroot"
+ bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid,
str(w.returncode)))
+ self.rq.finish_runqueue(True)
start = len(self.queue)
try:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]