Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
-
158760b1
by Martin Blanchard at 2018-10-29T17:13:26Z
-
1c106889
by Martin Blanchard at 2018-10-30T09:47:53Z
-
5c3d1f04
by Martin Blanchard at 2018-10-30T11:13:26Z
-
adccb84c
by Martin Blanchard at 2018-10-30T13:08:06Z
6 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/scheduler.py
- setup.py
Changes:
| ... | ... | @@ -21,11 +21,9 @@ An instance of the Remote Execution Service. |
| 21 | 21 |
|
| 22 | 22 |
import logging
|
| 23 | 23 |
|
| 24 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
| 24 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
|
|
| 25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
| 26 | 26 |
|
| 27 |
-from ..job import Job
|
|
| 28 |
- |
|
| 29 | 27 |
|
| 30 | 28 |
class ExecutionInstance:
|
| 31 | 29 |
|
| ... | ... | @@ -37,7 +35,7 @@ class ExecutionInstance: |
| 37 | 35 |
def register_instance_with_server(self, instance_name, server):
|
| 38 | 36 |
server.add_execution_instance(self, instance_name)
|
| 39 | 37 |
|
| 40 |
- def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
| 38 |
+ def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
|
|
| 41 | 39 |
""" Sends a job for execution.
|
| 42 | 40 |
Queues an action and creates an Operation instance to be associated with
|
| 43 | 41 |
this action.
|
| ... | ... | @@ -48,28 +46,27 @@ class ExecutionInstance: |
| 48 | 46 |
if not action:
|
| 49 | 47 |
raise FailedPreconditionError("Could not get action from storage.")
|
| 50 | 48 |
|
| 51 |
- job = Job(action, action_digest)
|
|
| 52 |
- if message_queue is not None:
|
|
| 53 |
- job.register_client(message_queue)
|
|
| 49 |
+ job = self._scheduler.queue_job(action, action_digest, skip_cache_lookup)
|
|
| 54 | 50 |
|
| 55 | 51 |
self.logger.info("Operation name: [{}]".format(job.name))
|
| 56 | 52 |
|
| 57 |
- self._scheduler.queue_job(job, skip_cache_lookup)
|
|
| 53 |
+ if peer is not None and message_queue is not None:
|
|
| 54 |
+ job.register_client(peer, message_queue)
|
|
| 58 | 55 |
|
| 59 | 56 |
return job.operation
|
| 60 | 57 |
|
| 61 |
- def register_message_client(self, name, queue):
|
|
| 58 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
| 62 | 59 |
try:
|
| 63 |
- self._scheduler.register_client(name, queue)
|
|
| 60 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
| 64 | 61 |
|
| 65 |
- except KeyError:
|
|
| 62 |
+ except NotFoundError:
|
|
| 66 | 63 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 67 | 64 |
|
| 68 |
- def unregister_message_client(self, name, queue):
|
|
| 65 |
+ def unregister_message_client(self, job_name, peer):
|
|
| 69 | 66 |
try:
|
| 70 |
- self._scheduler.unregister_client(name, queue)
|
|
| 67 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
| 71 | 68 |
|
| 72 |
- except KeyError:
|
|
| 69 |
+ except NotFoundError:
|
|
| 73 | 70 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 74 | 71 |
|
| 75 | 72 |
def stream_operation_updates(self, message_queue, operation_name):
|
| ... | ... | @@ -47,10 +47,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 47 | 47 |
instance = self._get_instance(request.instance_name)
|
| 48 | 48 |
operation = instance.execute(request.action_digest,
|
| 49 | 49 |
request.skip_cache_lookup,
|
| 50 |
- message_queue)
|
|
| 50 |
+ peer=context.peer(),
|
|
| 51 |
+ message_queue=message_queue)
|
|
| 51 | 52 |
|
| 52 | 53 |
context.add_callback(partial(instance.unregister_message_client,
|
| 53 |
- operation.name, message_queue))
|
|
| 54 |
+ operation.name, context.peer()))
|
|
| 54 | 55 |
|
| 55 | 56 |
yield from instance.stream_operation_updates(message_queue,
|
| 56 | 57 |
operation.name)
|
| ... | ... | @@ -26,10 +26,11 @@ from buildgrid._protos.google.longrunning import operations_pb2 |
| 26 | 26 |
|
| 27 | 27 |
class Job:
|
| 28 | 28 |
|
| 29 |
- def __init__(self, action, action_digest):
|
|
| 29 |
+ def __init__(self, action, action_digest, priority=0):
|
|
| 30 | 30 |
self.logger = logging.getLogger(__name__)
|
| 31 | 31 |
|
| 32 | 32 |
self._name = str(uuid.uuid4())
|
| 33 |
+ self._priority = priority
|
|
| 33 | 34 |
self._action = remote_execution_pb2.Action()
|
| 34 | 35 |
self._operation = operations_pb2.Operation()
|
| 35 | 36 |
self._lease = None
|
| ... | ... | @@ -45,7 +46,7 @@ class Job: |
| 45 | 46 |
|
| 46 | 47 |
self._action.CopyFrom(action)
|
| 47 | 48 |
self._do_not_cache = self._action.do_not_cache
|
| 48 |
- self._operation_update_queues = []
|
|
| 49 |
+ self._operation_update_queues = {}
|
|
| 49 | 50 |
self._operation.name = self._name
|
| 50 | 51 |
self._operation.done = False
|
| 51 | 52 |
self._n_tries = 0
|
| ... | ... | @@ -54,6 +55,10 @@ class Job: |
| 54 | 55 |
def name(self):
|
| 55 | 56 |
return self._name
|
| 56 | 57 |
|
| 58 |
+ @property
|
|
| 59 |
+ def priority(self):
|
|
| 60 |
+ return self._priority
|
|
| 61 |
+ |
|
| 57 | 62 |
@property
|
| 58 | 63 |
def do_not_cache(self):
|
| 59 | 64 |
return self._do_not_cache
|
| ... | ... | @@ -100,22 +105,26 @@ class Job: |
| 100 | 105 |
def n_clients(self):
|
| 101 | 106 |
return len(self._operation_update_queues)
|
| 102 | 107 |
|
| 103 |
- def register_client(self, queue):
|
|
| 104 |
- """Subscribes to the job's :class:`Operation` stage change events.
|
|
| 108 |
+ def register_client(self, peer, message_queue):
|
|
| 109 |
+ """Subscribes to the job's :class:`Operation` stage changes.
|
|
| 105 | 110 |
|
| 106 | 111 |
Args:
|
| 107 |
- queue (queue.Queue): the event queue to register.
|
|
| 112 |
+ peer (str): a unique string identifying the client.
|
|
| 113 |
+ message_queue (queue.Queue): the event queue to register.
|
|
| 108 | 114 |
"""
|
| 109 |
- self._operation_update_queues.append(queue)
|
|
| 110 |
- queue.put(self._operation)
|
|
| 115 |
+ if peer not in self._operation_update_queues:
|
|
| 116 |
+ self._operation_update_queues[peer] = message_queue
|
|
| 117 |
+ |
|
| 118 |
+ message_queue.put(self._operation)
|
|
| 111 | 119 |
|
| 112 |
- def unregister_client(self, queue):
|
|
| 113 |
- """Unsubscribes to the job's :class:`Operation` stage change events.
|
|
| 120 |
+ def unregister_client(self, peer):
|
|
| 121 |
+ """Unsubscribes to the job's :class:`Operation` stage change.
|
|
| 114 | 122 |
|
| 115 | 123 |
Args:
|
| 116 |
- queue (queue.Queue): the event queue to unregister.
|
|
| 124 |
+ peer (str): a unique string identifying the client.
|
|
| 117 | 125 |
"""
|
| 118 |
- self._operation_update_queues.remove(queue)
|
|
| 126 |
+ if peer not in self._operation_update_queues:
|
|
| 127 |
+ del self._operation_update_queues[peer]
|
|
| 119 | 128 |
|
| 120 | 129 |
def set_cached_result(self, action_result):
|
| 121 | 130 |
"""Allows specifying an action result form the action cache for the job.
|
| ... | ... | @@ -211,5 +220,5 @@ class Job: |
| 211 | 220 |
|
| 212 | 221 |
self._operation.metadata.Pack(self.__operation_metadata)
|
| 213 | 222 |
|
| 214 |
- for queue in self._operation_update_queues:
|
|
| 223 |
+ for queue in self._operation_update_queues.values():
|
|
| 215 | 224 |
queue.put(self._operation)
|
| ... | ... | @@ -58,18 +58,18 @@ class OperationsInstance: |
| 58 | 58 |
except KeyError:
|
| 59 | 59 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 60 | 60 |
|
| 61 |
- def register_message_client(self, name, queue):
|
|
| 61 |
+ def register_message_client(self, job_name, peer, message_queue):
|
|
| 62 | 62 |
try:
|
| 63 |
- self._scheduler.register_client(name, queue)
|
|
| 63 |
+ self._scheduler.register_client(job_name, peer, message_queue)
|
|
| 64 | 64 |
|
| 65 |
- except KeyError:
|
|
| 65 |
+ except NotFoundError:
|
|
| 66 | 66 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 67 | 67 |
|
| 68 |
- def unregister_message_client(self, name, queue):
|
|
| 68 |
+ def unregister_message_client(self, job_name, peer):
|
|
| 69 | 69 |
try:
|
| 70 |
- self._scheduler.unregister_client(name, queue)
|
|
| 70 |
+ self._scheduler.unregister_client(job_name, peer)
|
|
| 71 | 71 |
|
| 72 |
- except KeyError:
|
|
| 72 |
+ except NotFoundError:
|
|
| 73 | 73 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 74 | 74 |
|
| 75 | 75 |
def stream_operation_updates(self, message_queue, operation_name):
|
| ... | ... | @@ -23,7 +23,7 @@ from collections import deque |
| 23 | 23 |
|
| 24 | 24 |
from buildgrid._exceptions import NotFoundError
|
| 25 | 25 |
|
| 26 |
-from .job import OperationStage, LeaseState
|
|
| 26 |
+from .job import Job, OperationStage, LeaseState
|
|
| 27 | 27 |
|
| 28 | 28 |
|
| 29 | 29 |
class Scheduler:
|
| ... | ... | @@ -32,25 +32,76 @@ class Scheduler: |
| 32 | 32 |
|
| 33 | 33 |
def __init__(self, action_cache=None):
|
| 34 | 34 |
self._action_cache = action_cache
|
| 35 |
- self.jobs = {}
|
|
| 35 |
+ self.__jobs_by_action = {}
|
|
| 36 |
+ self.__jobs_by_name = {}
|
|
| 36 | 37 |
self.queue = deque()
|
| 37 | 38 |
|
| 38 |
- def register_client(self, job_name, queue):
|
|
| 39 |
- self.jobs[job_name].register_client(queue)
|
|
| 39 |
+ def register_client(self, job_name, peer, message_queue):
|
|
| 40 |
+ """Subscribes to one of the job's :class:`Operation` stage changes.
|
|
| 40 | 41 |
|
| 41 |
- def unregister_client(self, job_name, queue):
|
|
| 42 |
- self.jobs[job_name].unregister_client(queue)
|
|
| 42 |
+ Args:
|
|
| 43 |
+ job_name (str): name of the job to subcribe to.
|
|
| 44 |
+ peer (str): a unique string identifying the client.
|
|
| 45 |
+ message_queue (queue.Queue): the event queue to register.
|
|
| 46 |
+ |
|
| 47 |
+ Raises:
|
|
| 48 |
+ NotFoundError: If no job with `job_name` exists.
|
|
| 49 |
+ """
|
|
| 50 |
+ try:
|
|
| 51 |
+ self.__jobs_by_name[job_name].register_client(peer, message_queue)
|
|
| 52 |
+ |
|
| 53 |
+ except KeyError:
|
|
| 54 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
| 55 |
+ |
|
| 56 |
+ def unregister_client(self, job_name, peer):
|
|
| 57 |
+ """Unsubscribes to one of the job's :class:`Operation` stage changes.
|
|
| 58 |
+ |
|
| 59 |
+ Args:
|
|
| 60 |
+ job_name (str): name of the job to unsubcribe from.
|
|
| 61 |
+ peer (str): a unique string identifying the client.
|
|
| 62 |
+ |
|
| 63 |
+ Raises:
|
|
| 64 |
+ NotFoundError: If no job with `job_name` exists.
|
|
| 65 |
+ """
|
|
| 66 |
+ try:
|
|
| 67 |
+ self.__jobs_by_name[job_name].unregister_client(peer)
|
|
| 68 |
+ |
|
| 69 |
+ except KeyError:
|
|
| 70 |
+ raise NotFoundError('No job named {} found.'.format(job_name))
|
|
| 43 | 71 |
|
| 44 |
- if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
|
|
| 45 |
- del self.jobs[job_name]
|
|
| 72 |
+ if (self.__jobs_by_name[job_name].n_clients == 0 and
|
|
| 73 |
+ self.__jobs_by_name[job_name].operation.done):
|
|
| 74 |
+ del self.__jobs_by_name[job_name]
|
|
| 46 | 75 |
|
| 47 |
- def queue_job(self, job, skip_cache_lookup=False):
|
|
| 48 |
- self.jobs[job.name] = job
|
|
| 76 |
+ def queue_job(self, action, action_digest, priority=0, skip_cache_lookup=False):
|
|
| 77 |
+ """Inserts a newly created job into the execution queue.
|
|
| 78 |
+ |
|
| 79 |
+ Args:
|
|
| 80 |
+ action (Action): the given action to queue for execution.
|
|
| 81 |
+ action_digest (Digest): the digest of the given action.
|
|
| 82 |
+ priority (int): the execution job's priority.
|
|
| 83 |
+ skip_cache_lookup (bool): whether or not to look for pre-computed
|
|
| 84 |
+ result for the given action.
|
|
| 85 |
+ """
|
|
| 86 |
+ if action_digest.hash in self.__jobs_by_action:
|
|
| 87 |
+ job = self.__jobs_by_action[action_digest.hash]
|
|
| 88 |
+ |
|
| 89 |
+ if priority < job.priority:
|
|
| 90 |
+ #TODO: We need to requeue here
|
|
| 91 |
+ job.priority = priority
|
|
| 92 |
+ |
|
| 93 |
+ return job
|
|
| 94 |
+ |
|
| 95 |
+ job = Job(action, action_digest, priority=priority)
|
|
| 96 |
+ |
|
| 97 |
+ self.__jobs_by_action[job.action_digest.hash] = job
|
|
| 98 |
+ self.__jobs_by_name[job.name] = job
|
|
| 49 | 99 |
|
| 50 | 100 |
operation_stage = None
|
| 51 | 101 |
if self._action_cache is not None and not skip_cache_lookup:
|
| 52 | 102 |
try:
|
| 53 | 103 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
| 104 |
+ |
|
| 54 | 105 |
except NotFoundError:
|
| 55 | 106 |
operation_stage = OperationStage.QUEUED
|
| 56 | 107 |
self.queue.append(job)
|
| ... | ... | @@ -65,9 +116,11 @@ class Scheduler: |
| 65 | 116 |
|
| 66 | 117 |
job.update_operation_stage(operation_stage)
|
| 67 | 118 |
|
| 119 |
+ return job
|
|
| 120 |
+ |
|
| 68 | 121 |
def retry_job(self, job_name):
|
| 69 |
- if job_name in self.jobs:
|
|
| 70 |
- job = self.jobs[job_name]
|
|
| 122 |
+ if job_name in self.__jobs_by_name:
|
|
| 123 |
+ job = self.__jobs_by_name[job_name]
|
|
| 71 | 124 |
if job.n_tries >= self.MAX_N_TRIES:
|
| 72 | 125 |
# TODO: Decide what to do with these jobs
|
| 73 | 126 |
job.update_operation_stage(OperationStage.COMPLETED)
|
| ... | ... | @@ -77,7 +130,7 @@ class Scheduler: |
| 77 | 130 |
self.queue.appendleft(job)
|
| 78 | 131 |
|
| 79 | 132 |
def list_jobs(self):
|
| 80 |
- return self.jobs.values()
|
|
| 133 |
+ return self.__jobs_by_name.values()
|
|
| 81 | 134 |
|
| 82 | 135 |
def request_job_leases(self, worker_capabilities):
|
| 83 | 136 |
"""Generates a list of the highest priority leases to be run.
|
| ... | ... | @@ -107,7 +160,7 @@ class Scheduler: |
| 107 | 160 |
lease_result (google.protobuf.Any): the lease execution result, only
|
| 108 | 161 |
required if `lease_state` is `COMPLETED`.
|
| 109 | 162 |
"""
|
| 110 |
- job = self.jobs[job_name]
|
|
| 163 |
+ job = self.__jobs_by_name[job_name]
|
|
| 111 | 164 |
|
| 112 | 165 |
if lease_state == LeaseState.PENDING:
|
| 113 | 166 |
job.update_lease_state(LeaseState.PENDING)
|
| ... | ... | @@ -128,8 +181,8 @@ class Scheduler: |
| 128 | 181 |
|
| 129 | 182 |
def get_job_lease(self, job_name):
|
| 130 | 183 |
"""Returns the lease associated to job, if any have been emitted yet."""
|
| 131 |
- return self.jobs[job_name].lease
|
|
| 184 |
+ return self.__jobs_by_name[job_name].lease
|
|
| 132 | 185 |
|
| 133 | 186 |
def get_job_operation(self, job_name):
|
| 134 | 187 |
"""Returns the operation associated to job."""
|
| 135 |
- return self.jobs[job_name].operation
|
|
| 188 |
+ return self.__jobs_by_name[job_name].operation
|
| ... | ... | @@ -86,7 +86,7 @@ def get_cmdclass(): |
| 86 | 86 |
return cmdclass
|
| 87 | 87 |
|
| 88 | 88 |
tests_require = [
|
| 89 |
- 'coverage == 4.4.0',
|
|
| 89 |
+ 'coverage >= 4.5.0',
|
|
| 90 | 90 |
'moto',
|
| 91 | 91 |
'pep8',
|
| 92 | 92 |
'psutil',
|
