finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid
Commits:
-
2b182aff
by Finn at 2018-11-22T16:50:48Z
-
31d511e8
by Finn at 2018-11-22T17:19:06Z
-
f9f5c4da
by Finn at 2018-11-22T17:19:53Z
-
6336a84f
by Finn at 2018-11-22T17:19:53Z
-
9faa383c
by Finn at 2018-11-22T17:19:53Z
-
594525fe
by Finn at 2018-11-22T17:19:53Z
-
26b470bf
by Finn at 2018-11-22T17:19:53Z
-
a384cba9
by Finn at 2018-11-22T17:19:53Z
8 changed files:
- buildgrid/_app/commands/cmd_operation.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
- tests/integration/operations_service.py
Changes:
| ... | ... | @@ -155,6 +155,26 @@ def status(context, operation_name, json): |
| 155 | 155 |
click.echo(json_format.MessageToJson(operation))
|
| 156 | 156 |
|
| 157 | 157 |
|
| 158 |
+@cli.command('cancel', short_help="Cancel an operation.")
|
|
| 159 |
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
|
| 160 |
+@pass_context
|
|
| 161 |
+def cancel(context, operation_name):
|
|
| 162 |
+ context.logger.info("Cancelling an operation...")
|
|
| 163 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
| 164 |
+ |
|
| 165 |
+ request = operations_pb2.CancelOperationRequest(name=operation_name)
|
|
| 166 |
+ |
|
| 167 |
+ try:
|
|
| 168 |
+ stub.CancelOperation(request)
|
|
| 169 |
+ except grpc.RpcError as e:
|
|
| 170 |
+ status_code = e.code()
|
|
| 171 |
+ if status_code != grpc.StatusCode.CANCELLED:
|
|
| 172 |
+ click.echo("Operation did not cancel: {}".format(e))
|
|
| 173 |
+ sys.exit(-1)
|
|
| 174 |
+ |
|
| 175 |
+ click.echo("Operation cancelled: [{}]".format(request))
|
|
| 176 |
+ |
|
| 177 |
+ |
|
| 158 | 178 |
@cli.command('list', short_help="List operations.")
|
| 159 | 179 |
@click.option('--json', is_flag=True, show_default=True,
|
| 160 | 180 |
help="Print operations list in JSON format.")
|
| ... | ... | @@ -72,8 +72,10 @@ class ExecutionInstance: |
| 72 | 72 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 73 | 73 |
|
| 74 | 74 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 75 |
- operation = message_queue.get()
|
|
| 76 |
- while not operation.done:
|
|
| 77 |
- yield operation
|
|
| 78 |
- operation = message_queue.get()
|
|
| 79 |
- yield operation
|
|
| 75 |
+ job = message_queue.get()
|
|
| 76 |
+ while not job.operation.done:
|
|
| 77 |
+ yield job.operation
|
|
| 78 |
+ job = message_queue.get()
|
|
| 79 |
+ job.check_operation_status()
|
|
| 80 |
+ |
|
| 81 |
+ yield job.operation
|
| ... | ... | @@ -26,7 +26,7 @@ from functools import partial |
| 26 | 26 |
|
| 27 | 27 |
import grpc
|
| 28 | 28 |
|
| 29 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
| 29 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
|
|
| 30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
| 31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| 32 | 32 |
|
| ... | ... | @@ -79,6 +79,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 79 | 79 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 80 | 80 |
yield operations_pb2.Operation()
|
| 81 | 81 |
|
| 82 |
+ except CancelledError as e:
|
|
| 83 |
+ self.__logger.error(e)
|
|
| 84 |
+ context.set_details(str(e))
|
|
| 85 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
| 86 |
+ yield operations_pb2.Operation()
|
|
| 87 |
+ |
|
| 82 | 88 |
def WaitExecution(self, request, context):
|
| 83 | 89 |
self.__logger.debug("WaitExecution request from [%s]", context.peer())
|
| 84 | 90 |
|
| ... | ... | @@ -111,6 +117,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 111 | 117 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 112 | 118 |
yield operations_pb2.Operation()
|
| 113 | 119 |
|
| 120 |
+ except CancelledError as e:
|
|
| 121 |
+ self.__logger.error(e)
|
|
| 122 |
+ context.set_details(str(e))
|
|
| 123 |
+ context.set_code(grpc.StatusCode.CANCELLED)
|
|
| 124 |
+ yield operations_pb2.Operation()
|
|
| 125 |
+ |
|
| 114 | 126 |
def _get_instance(self, name):
|
| 115 | 127 |
try:
|
| 116 | 128 |
return self._instances[name]
|
| ... | ... | @@ -19,6 +19,7 @@ import uuid |
| 19 | 19 |
from google.protobuf import timestamp_pb2
|
| 20 | 20 |
|
| 21 | 21 |
from buildgrid._enums import LeaseState, OperationStage
|
| 22 |
+from buildgrid._exceptions import CancelledError
|
|
| 22 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 23 | 24 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
| 24 | 25 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| ... | ... | @@ -37,10 +38,14 @@ class Job: |
| 37 | 38 |
|
| 38 | 39 |
self.__execute_response = None
|
| 39 | 40 |
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
| 41 |
+ |
|
| 40 | 42 |
self.__queued_timestamp = timestamp_pb2.Timestamp()
|
| 41 | 43 |
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
|
| 42 | 44 |
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
|
| 43 | 45 |
|
| 46 |
+ self.__operation_cancelled = False
|
|
| 47 |
+ self.__lease_cancelled = False
|
|
| 48 |
+ |
|
| 44 | 49 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
| 45 | 50 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
| 46 | 51 |
|
| ... | ... | @@ -104,11 +109,13 @@ class Job: |
| 104 | 109 |
def register_client(self, queue):
|
| 105 | 110 |
"""Subscribes to the job's :class:`Operation` stage change events.
|
| 106 | 111 |
|
| 112 |
+ Queues this :object:`Job` instance.
|
|
| 113 |
+ |
|
| 107 | 114 |
Args:
|
| 108 | 115 |
queue (queue.Queue): the event queue to register.
|
| 109 | 116 |
"""
|
| 110 | 117 |
self._operation_update_queues.append(queue)
|
| 111 |
- queue.put(self._operation)
|
|
| 118 |
+ queue.put(self)
|
|
| 112 | 119 |
|
| 113 | 120 |
def unregister_client(self, queue):
|
| 114 | 121 |
"""Unsubscribes to the job's :class:`Operation` stage change events.
|
| ... | ... | @@ -131,7 +138,9 @@ class Job: |
| 131 | 138 |
Only one :class:`Lease` can be emitted for a given job. This method
|
| 132 | 139 |
should only be used once, any furhter calls are ignored.
|
| 133 | 140 |
"""
|
| 134 |
- if self._lease is not None:
|
|
| 141 |
+ if self.__operation_cancelled:
|
|
| 142 |
+ return None
|
|
| 143 |
+ elif self._lease is not None:
|
|
| 135 | 144 |
return None
|
| 136 | 145 |
|
| 137 | 146 |
self._lease = bots_pb2.Lease()
|
| ... | ... | @@ -189,6 +198,15 @@ class Job: |
| 189 | 198 |
self.__execute_response.cached_result = False
|
| 190 | 199 |
self.__execute_response.status.CopyFrom(status)
|
| 191 | 200 |
|
| 201 |
+ def cancel_lease(self):
|
|
| 202 |
+ """Triggers a job's :class:Lease cancellation.
|
|
| 203 |
+ |
|
| 204 |
+ This will not cancel the job's :class:Operation.
|
|
| 205 |
+ """
|
|
| 206 |
+ self.__lease_cancelled = True
|
|
| 207 |
+ if self._lease is not None:
|
|
| 208 |
+ self.update_lease_state(LeaseState.CANCELLED)
|
|
| 209 |
+ |
|
| 192 | 210 |
def update_operation_stage(self, stage):
|
| 193 | 211 |
"""Operates a stage transition for the job's :class:Operation.
|
| 194 | 212 |
|
| ... | ... | @@ -213,4 +231,28 @@ class Job: |
| 213 | 231 |
self._operation.metadata.Pack(self.__operation_metadata)
|
| 214 | 232 |
|
| 215 | 233 |
for queue in self._operation_update_queues:
|
| 216 |
- queue.put(self._operation)
|
|
| 234 |
+ queue.put(self)
|
|
| 235 |
+ |
|
| 236 |
+ def check_operation_status(self):
|
|
| 237 |
+ """Reports errors on unexpected job's :class:Operation state.
|
|
| 238 |
+ |
|
| 239 |
+ Raises:
|
|
| 240 |
+ CancelledError: if the job's :class:Operation was cancelled.
|
|
| 241 |
+ """
|
|
| 242 |
+ if self.__operation_cancelled:
|
|
| 243 |
+ raise CancelledError(self.__execute_response.status.message)
|
|
| 244 |
+ |
|
| 245 |
+ def cancel_operation(self):
|
|
| 246 |
+ """Triggers a job's :class:Operation cancellation.
|
|
| 247 |
+ |
|
| 248 |
+ This will also cancel any job's :class:Lease that may have been issued.
|
|
| 249 |
+ """
|
|
| 250 |
+ self.__operation_cancelled = True
|
|
| 251 |
+ if self._lease is not None:
|
|
| 252 |
+ self.cancel_lease()
|
|
| 253 |
+ |
|
| 254 |
+ self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
| 255 |
+ self.__execute_response.status.code = code_pb2.CANCELLED
|
|
| 256 |
+ self.__execute_response.status.message = "Operation cancelled by client."
|
|
| 257 |
+ |
|
| 258 |
+ self.update_operation_stage(OperationStage.COMPLETED)
|
| ... | ... | @@ -65,6 +65,13 @@ class OperationsInstance: |
| 65 | 65 |
except KeyError:
|
| 66 | 66 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 67 | 67 |
|
| 68 |
+ def cancel_operation(self, name):
|
|
| 69 |
+ try:
|
|
| 70 |
+ self._scheduler.cancel_job_operation(name)
|
|
| 71 |
+ |
|
| 72 |
+ except KeyError:
|
|
| 73 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
| 74 |
+ |
|
| 68 | 75 |
def register_message_client(self, name, queue):
|
| 69 | 76 |
try:
|
| 70 | 77 |
self._scheduler.register_client(name, queue)
|
| ... | ... | @@ -80,12 +87,10 @@ class OperationsInstance: |
| 80 | 87 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 81 | 88 |
|
| 82 | 89 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 83 |
- operation = message_queue.get()
|
|
| 84 |
- while not operation.done:
|
|
| 85 |
- yield operation
|
|
| 86 |
- operation = message_queue.get()
|
|
| 87 |
- yield operation
|
|
| 90 |
+ job = message_queue.get()
|
|
| 91 |
+ while not job.operation.done:
|
|
| 92 |
+ yield job.operation
|
|
| 93 |
+ job = message_queue.get()
|
|
| 94 |
+ job.check_operation_status()
|
|
| 88 | 95 |
|
| 89 |
- def cancel_operation(self, name):
|
|
| 90 |
- # TODO: Cancel leases
|
|
| 91 |
- raise NotImplementedError("Cancelled operations not supported")
|
|
| 96 |
+ yield job.operation
|
| ... | ... | @@ -120,11 +120,6 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
| 120 | 120 |
operation_name = self._parse_operation_name(name)
|
| 121 | 121 |
instance.cancel_operation(operation_name)
|
| 122 | 122 |
|
| 123 |
- except NotImplementedError as e:
|
|
| 124 |
- self.__logger.error(e)
|
|
| 125 |
- context.set_details(str(e))
|
|
| 126 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 127 |
- |
|
| 128 | 123 |
except InvalidArgumentError as e:
|
| 129 | 124 |
self.__logger.error(e)
|
| 130 | 125 |
context.set_details(str(e))
|
| ... | ... | @@ -97,7 +97,10 @@ class Scheduler: |
| 97 | 97 |
# For now, one lease at a time:
|
| 98 | 98 |
lease = job.create_lease()
|
| 99 | 99 |
|
| 100 |
- return [lease]
|
|
| 100 |
+ if lease:
|
|
| 101 |
+ return [lease]
|
|
| 102 |
+ |
|
| 103 |
+ return None
|
|
| 101 | 104 |
|
| 102 | 105 |
def update_job_lease_state(self, job_name, lease_state, lease_status=None, lease_result=None):
|
| 103 | 106 |
"""Requests a state transition for a job's current :class:Lease.
|
| ... | ... | @@ -136,3 +139,13 @@ class Scheduler: |
| 136 | 139 |
def get_job_operation(self, job_name):
|
| 137 | 140 |
"""Returns the operation associated to job."""
|
| 138 | 141 |
return self.jobs[job_name].operation
|
| 142 |
+ |
|
| 143 |
+ def cancel_job_operation(self, job_name):
|
|
| 144 |
+ """"Cancels the underlying operation of a given job.
|
|
| 145 |
+ |
|
| 146 |
+ This will also cancel any job's lease that may have been issued.
|
|
| 147 |
+ |
|
| 148 |
+ Args:
|
|
| 149 |
+ job_name (str): name of the job holding the operation to cancel.
|
|
| 150 |
+ """
|
|
| 151 |
+ self.jobs[job_name].cancel_operation()
|
| ... | ... | @@ -24,6 +24,7 @@ import grpc |
| 24 | 24 |
from grpc._server import _Context
|
| 25 | 25 |
import pytest
|
| 26 | 26 |
|
| 27 |
+from buildgrid._enums import OperationStage
|
|
| 27 | 28 |
from buildgrid._exceptions import InvalidArgumentError
|
| 28 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 29 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| ... | ... | @@ -236,12 +237,24 @@ def test_delete_operation_fail(instance, context): |
| 236 | 237 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
| 237 | 238 |
|
| 238 | 239 |
|
| 239 |
-def test_cancel_operation(instance, context):
|
|
| 240 |
+def test_cancel_operation(instance, controller, execute_request, context):
|
|
| 241 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
| 242 |
+ execute_request.skip_cache_lookup)
|
|
| 243 |
+ |
|
| 240 | 244 |
request = operations_pb2.CancelOperationRequest()
|
| 241 |
- request.name = "{}/{}".format(instance_name, "runner")
|
|
| 245 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
| 246 |
+ |
|
| 242 | 247 |
instance.CancelOperation(request, context)
|
| 243 | 248 |
|
| 244 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 249 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
| 250 |
+ response = instance.ListOperations(request, context)
|
|
| 251 |
+ |
|
| 252 |
+ assert len(response.operations) is 1
|
|
| 253 |
+ |
|
| 254 |
+ for operation in response.operations:
|
|
| 255 |
+ operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
| 256 |
+ operation.metadata.Unpack(operation_metadata)
|
|
| 257 |
+ assert operation_metadata.stage == OperationStage.COMPLETED.value
|
|
| 245 | 258 |
|
| 246 | 259 |
|
| 247 | 260 |
def test_cancel_operation_blank(blank_instance, context):
|
| ... | ... | @@ -249,7 +262,7 @@ def test_cancel_operation_blank(blank_instance, context): |
| 249 | 262 |
request.name = "runner"
|
| 250 | 263 |
blank_instance.CancelOperation(request, context)
|
| 251 | 264 |
|
| 252 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 265 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 253 | 266 |
|
| 254 | 267 |
|
| 255 | 268 |
def test_cancel_operation_instance_fail(instance, context):
|
