Martin Blanchard pushed to branch mablanch/74-operation-cancelation at BuildGrid / buildgrid
Commits:
-
31397f9e
by Martin Blanchard at 2018-10-22T10:44:07Z
-
1f186eec
by Martin Blanchard at 2018-10-22T10:44:08Z
-
95172f85
by Martin Blanchard at 2018-10-22T10:44:08Z
-
8a619f7e
by Martin Blanchard at 2018-10-22T10:44:08Z
-
81aac437
by Martin Blanchard at 2018-10-22T11:03:58Z
5 changed files:
- buildgrid/_exceptions.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
Changes:
| ... | ... | @@ -52,6 +52,11 @@ class BotError(BgdError): |
| 52 | 52 |
super().__init__(message, detail=detail, domain=ErrorDomain.BOT, reason=reason)
|
| 53 | 53 |
|
| 54 | 54 |
|
| 55 |
+class CancelledError(BgdError):
|
|
| 56 |
+ def __init__(self, message, detail=None, reason=None):
|
|
| 57 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
|
|
| 58 |
+ |
|
| 59 |
+ |
|
| 55 | 60 |
class InvalidArgumentError(BgdError):
|
| 56 | 61 |
"""A bad argument was passed, such as a name which doesn't exist."""
|
| 57 | 62 |
def __init__(self, message, detail=None, reason=None):
|
| ... | ... | @@ -22,7 +22,7 @@ An instance of the Remote Execution Service. |
| 22 | 22 |
import logging
|
| 23 | 23 |
|
| 24 | 24 |
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
| 25 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
|
| 25 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
| 26 | 26 |
|
| 27 | 27 |
from ..job import Job
|
| 28 | 28 |
|
| ... | ... | @@ -43,7 +43,7 @@ class ExecutionInstance: |
| 43 | 43 |
this action.
|
| 44 | 44 |
"""
|
| 45 | 45 |
|
| 46 |
- action = self._storage.get_message(action_digest, Action)
|
|
| 46 |
+ action = self._storage.get_message(action_digest, remote_execution_pb2.Action)
|
|
| 47 | 47 |
|
| 48 | 48 |
if not action:
|
| 49 | 49 |
raise FailedPreconditionError("Could not get action from storage.")
|
| ... | ... | @@ -73,8 +73,11 @@ class ExecutionInstance: |
| 73 | 73 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 74 | 74 |
|
| 75 | 75 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 76 |
- operation = message_queue.get()
|
|
| 77 |
- while not operation.done:
|
|
| 78 |
- yield operation
|
|
| 79 |
- operation = message_queue.get()
|
|
| 80 |
- yield operation
|
|
| 76 |
+ job = message_queue.get()
|
|
| 77 |
+ while not job.operation.done:
|
|
| 78 |
+ yield job.operation
|
|
| 79 |
+ job = message_queue.get()
|
|
| 80 |
+ |
|
| 81 |
+ job.check_operation_status()
|
|
| 82 |
+ |
|
| 83 |
+ yield job.operation
|
| ... | ... | @@ -26,7 +26,7 @@ from functools import partial |
| 26 | 26 |
|
| 27 | 27 |
import grpc
|
| 28 | 28 |
|
| 29 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
| 29 |
+from buildgrid._exceptions import CancelledError, FailedPreconditionError, InvalidArgumentError
|
|
| 30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
| 31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| 32 | 32 |
|
| ... | ... | @@ -67,6 +67,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 67 | 67 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 68 | 68 |
yield operations_pb2.Operation()
|
| 69 | 69 |
|
| 70 |
+ except CancelledError as e:
|
|
| 71 |
+ self.logger.error(e)
|
|
| 72 |
+ context.set_details(str(e))
|
|
| 73 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
| 74 |
+ yield operations_pb2.Operation()
|
|
| 75 |
+ |
|
| 70 | 76 |
def WaitExecution(self, request, context):
|
| 71 | 77 |
try:
|
| 72 | 78 |
names = request.name.split("/")
|
| ... | ... | @@ -17,9 +17,11 @@ import logging |
| 17 | 17 |
import uuid
|
| 18 | 18 |
from enum import Enum
|
| 19 | 19 |
|
| 20 |
+from buildgrid._exceptions import CancelledError
|
|
| 20 | 21 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 21 | 22 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
| 22 | 23 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| 24 |
+from buildgrid._protos.google.rpc import code_pb2
|
|
| 23 | 25 |
|
| 24 | 26 |
|
| 25 | 27 |
class OperationStage(Enum):
|
| ... | ... | @@ -60,6 +62,8 @@ class Job: |
| 60 | 62 |
|
| 61 | 63 |
self.__execute_response = None
|
| 62 | 64 |
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
| 65 |
+ self.__operation_canceled = False
|
|
| 66 |
+ self.__lease_canceled = False
|
|
| 63 | 67 |
|
| 64 | 68 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
| 65 | 69 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
| ... | ... | @@ -128,7 +132,8 @@ class Job: |
| 128 | 132 |
queue (queue.Queue): the event queue to register.
|
| 129 | 133 |
"""
|
| 130 | 134 |
self._operation_update_queues.append(queue)
|
| 131 |
- queue.put(self._operation)
|
|
| 135 |
+ |
|
| 136 |
+ queue.put(self)
|
|
| 132 | 137 |
|
| 133 | 138 |
def unregister_client(self, queue):
|
| 134 | 139 |
"""Unsubscribes to the job's :class:`Operation` stage change events.
|
| ... | ... | @@ -153,6 +158,8 @@ class Job: |
| 153 | 158 |
"""
|
| 154 | 159 |
if self._lease is not None:
|
| 155 | 160 |
return None
|
| 161 |
+ elif self.__lease_canceled:
|
|
| 162 |
+ return None
|
|
| 156 | 163 |
|
| 157 | 164 |
self._lease = bots_pb2.Lease()
|
| 158 | 165 |
self._lease.id = self._name
|
| ... | ... | @@ -196,6 +203,15 @@ class Job: |
| 196 | 203 |
self.__execute_response.cached_result = False
|
| 197 | 204 |
self.__execute_response.status.CopyFrom(status)
|
| 198 | 205 |
|
| 206 |
+ def cancel_lease(self):
|
|
| 207 |
+ """Triggers a job's :class:Lease cancellation.
|
|
| 208 |
+ |
|
| 209 |
+ This will not cancel the job's :class:Operation.
|
|
| 210 |
+ """
|
|
| 211 |
+ self.__lease_canceled = True
|
|
| 212 |
+ if self._lease is not None:
|
|
| 213 |
+ self.update_lease_state(LeaseState.CANCELLED)
|
|
| 214 |
+ |
|
| 199 | 215 |
def update_operation_stage(self, stage):
|
| 200 | 216 |
"""Operates a stage transition for the job's :class:Operation.
|
| 201 | 217 |
|
| ... | ... | @@ -218,4 +234,28 @@ class Job: |
| 218 | 234 |
self._operation.metadata.Pack(self.__operation_metadata)
|
| 219 | 235 |
|
| 220 | 236 |
for queue in self._operation_update_queues:
|
| 221 |
- queue.put(self._operation)
|
|
| 237 |
+ queue.put(self)
|
|
| 238 |
+ |
|
| 239 |
+ def check_operation_status(self):
|
|
| 240 |
+ """Reports errors on unexpected job's :class:Operation state.
|
|
| 241 |
+ |
|
| 242 |
+ Raises:
|
|
| 243 |
+ CancelledError: if the job's :class:Operation was cancelled.
|
|
| 244 |
+ """
|
|
| 245 |
+ if self.__operation_canceled:
|
|
| 246 |
+ raise CancelledError(self.__execute_response.status.message)
|
|
| 247 |
+ |
|
| 248 |
+ def cancel_operation(self):
|
|
| 249 |
+ """Triggers a job's :class:Operation cancellation.
|
|
| 250 |
+ |
|
| 251 |
+ This will also cancel any job's :class:Lease that may have been issued.
|
|
| 252 |
+ """
|
|
| 253 |
+ self.__operation_canceled = True
|
|
| 254 |
+ if self._lease is not None:
|
|
| 255 |
+ self.cancel_lease()
|
|
| 256 |
+ |
|
| 257 |
+ self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
| 258 |
+ self.__execute_response.status.code = code_pb2.CANCELLED
|
|
| 259 |
+ self.__execute_response.status.message = "The operation was cancelled by the caller."
|
|
| 260 |
+ |
|
| 261 |
+ self.update_operation_stage(OperationStage.COMPLETED)
|
| ... | ... | @@ -128,3 +128,13 @@ class Scheduler: |
| 128 | 128 |
def get_job_operation(self, job_name):
|
| 129 | 129 |
"""Returns the operation associated to job."""
|
| 130 | 130 |
return self.jobs[job_name].operation
|
| 131 |
+ |
|
| 132 |
+ def cancel_job_operation(self, job_name):
|
|
| 133 |
+ """"Cancels the underlying operation of a given job.
|
|
| 134 |
+ |
|
| 135 |
+ This will also cancel any job's lease that may have been issued.
|
|
| 136 |
+ |
|
| 137 |
+ Args:
|
|
| 138 |
+ job_name (str): name of the job holding the operation to cancel.
|
|
| 139 |
+ """
|
|
| 140 |
+ self.jobs[job_name].cancel_operation()
|
