finnball pushed to branch finn/async at BuildGrid / buildgrid
Commits:
-
4c708a2d
by finn at 2018-08-03T08:14:52Z
5 changed files:
- buildgrid/server/execution/execution_instance.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/execution_service.py
Changes:
... | ... | @@ -34,12 +34,12 @@ class ExecutionInstance(): |
34 | 34 |
self.logger = logging.getLogger(__name__)
|
35 | 35 |
self._scheduler = scheduler
|
36 | 36 |
|
37 |
- def execute(self, action_digest, skip_cache_lookup):
|
|
37 |
+ def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
38 | 38 |
""" Sends a job for execution.
|
39 | 39 |
Queues an action and creates an Operation instance to be associated with
|
40 | 40 |
this action.
|
41 | 41 |
"""
|
42 |
- job = Job(action_digest)
|
|
42 |
+ job = Job(action_digest, message_queue)
|
|
43 | 43 |
self.logger.info("Operation name: {}".format(job.name))
|
44 | 44 |
|
45 | 45 |
if not skip_cache_lookup:
|
... | ... | @@ -70,3 +70,15 @@ class ExecutionInstance(): |
70 | 70 |
def cancel_operation(self, name):
|
71 | 71 |
# TODO: Cancel leases
|
72 | 72 |
raise NotImplementedError("Cancelled operations not supported")
|
73 |
+ |
|
74 |
+ def register_message_client(self, name, queue):
|
|
75 |
+ try:
|
|
76 |
+ self._scheduler.register_client(name, queue)
|
|
77 |
+ except KeyError:
|
|
78 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
|
79 |
+ |
|
80 |
+ def unregister_message_client(self, name, queue):
|
|
81 |
+ try:
|
|
82 |
+ self._scheduler.unregister_client(name, queue)
|
|
83 |
+ except KeyError:
|
|
84 |
+ raise InvalidArgumentError("Operation name does not exist: {}".format(name))
|
... | ... | @@ -22,10 +22,9 @@ ExecutionService |
22 | 22 |
Serves remote execution requests.
|
23 | 23 |
"""
|
24 | 24 |
|
25 |
-import copy
|
|
26 | 25 |
import grpc
|
27 | 26 |
import logging
|
28 |
-import time
|
|
27 |
+import queue
|
|
29 | 28 |
|
30 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
31 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
... | ... | @@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError |
35 | 34 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
36 | 35 |
|
37 | 36 |
def __init__(self, instance):
|
38 |
- self._instance = instance
|
|
39 | 37 |
self.logger = logging.getLogger(__name__)
|
38 |
+ self._instance = instance
|
|
40 | 39 |
|
41 | 40 |
def Execute(self, request, context):
|
42 | 41 |
# Ignore request.instance_name for now
|
43 | 42 |
# Have only one instance
|
44 | 43 |
try:
|
44 |
+ message_queue = queue.Queue()
|
|
45 | 45 |
operation = self._instance.execute(request.action_digest,
|
46 |
- request.skip_cache_lookup)
|
|
46 |
+ request.skip_cache_lookup,
|
|
47 |
+ message_queue)
|
|
47 | 48 |
|
48 |
- yield from self._stream_operation_updates(operation.name)
|
|
49 |
+ remove_client = lambda : self._remove_client(operation.name, message_queue)
|
|
50 |
+ context.add_callback(remove_client)
|
|
51 |
+ |
|
52 |
+ yield from self._stream_operation_updates(message_queue,
|
|
53 |
+ operation.name)
|
|
49 | 54 |
|
50 | 55 |
except InvalidArgumentError as e:
|
51 | 56 |
self.logger.error(e)
|
... | ... | @@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
59 | 64 |
|
60 | 65 |
def WaitExecution(self, request, context):
|
61 | 66 |
try:
|
62 |
- yield from self._stream_operation_updates(request.name)
|
|
67 |
+ message_queue = queue.Queue()
|
|
68 |
+ operation_name = request.name
|
|
69 |
+ |
|
70 |
+ self._instance.register_message_client(operation_name, message_queue)
|
|
71 |
+ |
|
72 |
+ remove_client = lambda : self._remove_client(operation_name, message_queue)
|
|
73 |
+ context.add_callback(remove_client)
|
|
74 |
+ |
|
75 |
+ yield from self._stream_operation_updates(message_queue,
|
|
76 |
+ operation_name)
|
|
63 | 77 |
|
64 | 78 |
except InvalidArgumentError as e:
|
65 | 79 |
self.logger.error(e)
|
66 | 80 |
context.set_details(str(e))
|
67 | 81 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
68 | 82 |
|
69 |
- def _stream_operation_updates(self, name):
|
|
70 |
- stream_previous = None
|
|
71 |
- while True:
|
|
72 |
- stream = self._instance.get_operation(name)
|
|
73 |
- if stream != stream_previous:
|
|
74 |
- yield stream
|
|
75 |
- if stream.done == True: break
|
|
76 |
- stream_previous = copy.deepcopy(stream)
|
|
77 |
- time.sleep(1)
|
|
83 |
+ def _remove_client(self, operation_name, message_queue):
|
|
84 |
+ self._instance.unregister_message_client(operation_name, message_queue)
|
|
85 |
+ |
|
86 |
+ def _stream_operation_updates(self, message_queue, operation_name):
|
|
87 |
+ operation = message_queue.get()
|
|
88 |
+ while not operation.done:
|
|
89 |
+ yield operation
|
|
90 |
+ operation = message_queue.get()
|
|
91 |
+ yield operation
|
... | ... | @@ -51,9 +51,8 @@ class LeaseState(Enum): |
51 | 51 |
|
52 | 52 |
class Job():
|
53 | 53 |
|
54 |
- def __init__(self, action):
|
|
55 |
- self.action = action
|
|
56 |
- self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
|
|
54 |
+ def __init__(self, action_digest, message_queue=None):
|
|
55 |
+ self.action_digest = action_digest
|
|
57 | 56 |
self.execute_stage = ExecuteStage.UNKNOWN
|
58 | 57 |
self.lease = None
|
59 | 58 |
self.logger = logging.getLogger(__name__)
|
... | ... | @@ -62,10 +61,24 @@ class Job(): |
62 | 61 |
|
63 | 62 |
self._n_tries = 0
|
64 | 63 |
self._operation = operations_pb2.Operation(name = self.name)
|
64 |
+ self._operation_update_queues = []
|
|
65 |
+ |
|
66 |
+ if message_queue is not None:
|
|
67 |
+ self.register_client(message_queue)
|
|
68 |
+ |
|
69 |
+ def check_job_finished(self):
|
|
70 |
+ if not self._operation_update_queues:
|
|
71 |
+ return self._operation.done
|
|
72 |
+ return False
|
|
73 |
+ |
|
74 |
+ def register_client(self, queue):
|
|
75 |
+ self._operation_update_queues.append(queue)
|
|
76 |
+ |
|
77 |
+ def unregister_client(self, queue):
|
|
78 |
+ self._operation_update_queues.remove(queue)
|
|
65 | 79 |
|
66 | 80 |
def get_operation(self):
|
67 | 81 |
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
|
68 |
- |
|
69 | 82 |
if self.result is not None:
|
70 | 83 |
self._operation.done = True
|
71 | 84 |
response = ExecuteResponse()
|
... | ... | @@ -81,10 +94,10 @@ class Job(): |
81 | 94 |
return meta
|
82 | 95 |
|
83 | 96 |
def create_lease(self):
|
84 |
- action = self._pack_any(self.action)
|
|
97 |
+ action_digest = self._pack_any(self.action_digest)
|
|
85 | 98 |
|
86 | 99 |
lease = bots_pb2.Lease(id = self.name,
|
87 |
- payload = action,
|
|
100 |
+ payload = action_digest,
|
|
88 | 101 |
state = LeaseState.PENDING.value)
|
89 | 102 |
self.lease = lease
|
90 | 103 |
return lease
|
... | ... | @@ -92,6 +105,11 @@ class Job(): |
92 | 105 |
def get_operations(self):
|
93 | 106 |
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
|
94 | 107 |
|
108 |
+ def update_execute_stage(self, stage):
|
|
109 |
+ self.execute_stage = stage
|
|
110 |
+ for queue in self._operation_update_queues:
|
|
111 |
+ queue.put(self.get_operation())
|
|
112 |
+ |
|
95 | 113 |
def _pack_any(self, pack):
|
96 | 114 |
any = any_pb2.Any()
|
97 | 115 |
any.Pack(pack)
|
... | ... | @@ -35,8 +35,17 @@ class Scheduler(): |
35 | 35 |
self.jobs = {}
|
36 | 36 |
self.queue = deque()
|
37 | 37 |
|
38 |
+ def register_client(self, name, queue):
|
|
39 |
+ self.jobs[name].register_client(queue)
|
|
40 |
+ |
|
41 |
+ def unregister_client(self, name, queue):
|
|
42 |
+ job = self.jobs[name]
|
|
43 |
+ job.unregister_client(queue)
|
|
44 |
+ if job.check_job_finished():
|
|
45 |
+ del self.jobs[name]
|
|
46 |
+ |
|
38 | 47 |
def append_job(self, job):
|
39 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
48 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
40 | 49 |
self.jobs[job.name] = job
|
41 | 50 |
self.queue.append(job)
|
42 | 51 |
|
... | ... | @@ -45,9 +54,9 @@ class Scheduler(): |
45 | 54 |
|
46 | 55 |
if job.n_tries >= self.MAX_N_TRIES:
|
47 | 56 |
# TODO: Decide what to do with these jobs
|
48 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
57 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
49 | 58 |
else:
|
50 |
- job.execute_stage = ExecuteStage.QUEUED
|
|
59 |
+ job.update_execute_stage(ExecuteStage.QUEUED)
|
|
51 | 60 |
job.n_tries += 1
|
52 | 61 |
self.queue.appendleft(job)
|
53 | 62 |
|
... | ... | @@ -56,15 +65,14 @@ class Scheduler(): |
56 | 65 |
def create_job(self):
|
57 | 66 |
if len(self.queue) > 0:
|
58 | 67 |
job = self.queue.popleft()
|
59 |
- job.execute_stage = ExecuteStage.EXECUTING
|
|
68 |
+ job.update_execute_stage(ExecuteStage.EXECUTING)
|
|
60 | 69 |
self.jobs[job.name] = job
|
61 | 70 |
return job
|
62 |
- return None
|
|
63 | 71 |
|
64 | 72 |
def job_complete(self, name, result):
|
65 | 73 |
job = self.jobs[name]
|
66 |
- job.execute_stage = ExecuteStage.COMPLETED
|
|
67 | 74 |
job.result = result
|
75 |
+ job.update_execute_stage(ExecuteStage.COMPLETED)
|
|
68 | 76 |
self.jobs[name] = job
|
69 | 77 |
|
70 | 78 |
def get_operations(self):
|
... | ... | @@ -122,3 +130,7 @@ class Scheduler(): |
122 | 130 |
if state == LeaseState.PENDING.value or \
|
123 | 131 |
state == LeaseState.ACTIVE.value:
|
124 | 132 |
self.retry_job(name)
|
133 |
+ |
|
134 |
+ def _update_execute_stage(self, job, stage):
|
|
135 |
+ job.update_execute_stage(stage)
|
|
136 |
+ return job
|
... | ... | @@ -69,17 +69,22 @@ def test_execute(skip_cache_lookup, instance, context): |
69 | 69 |
assert result.done is False
|
70 | 70 |
|
71 | 71 |
def test_wait_execution(instance, context):
|
72 |
+ # TODO: Figure out why next(response) hangs on the .get()
|
|
73 |
+ # method when running in pytest.
|
|
72 | 74 |
action_digest = remote_execution_pb2.Digest()
|
73 | 75 |
action_digest.hash = 'zhora'
|
74 | 76 |
|
75 |
- execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
|
|
76 |
- action_digest = action_digest,
|
|
77 |
- skip_cache_lookup = True)
|
|
78 |
- execution_response = next(instance.Execute(execution_request, context))
|
|
77 |
+ j = job.Job(action_digest, None)
|
|
78 |
+ j._operation.done = True
|
|
79 | 79 |
|
80 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
|
|
80 | 81 |
|
81 |
- request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
|
|
82 |
+ instance._instance._scheduler.jobs[j.name] = j
|
|
82 | 83 |
|
83 |
- response = next(instance.WaitExecution(request, context))
|
|
84 |
+ action_result_any = any_pb2.Any()
|
|
85 |
+ action_result = remote_execution_pb2.ActionResult()
|
|
86 |
+ action_result_any.Pack(action_result)
|
|
84 | 87 |
|
85 |
- assert response == execution_response
|
|
88 |
+ instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
|
|
89 |
+ |
|
90 |
+ response = instance.WaitExecution(request, context)
|