finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid
Commits:
-
28bf01e2
by Finn at 2018-11-02T14:23:38Z
-
146d07e0
by Finn at 2018-11-02T14:33:12Z
-
d4735838
by Finn at 2018-11-02T14:33:16Z
5 changed files:
- buildgrid/_app/commands/cmd_operation.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- tests/integration/operations_service.py
Changes:
| ... | ... | @@ -155,6 +155,25 @@ def status(context, operation_name, json): |
| 155 | 155 |
click.echo(json_format.MessageToJson(operation))
|
| 156 | 156 |
|
| 157 | 157 |
|
| 158 |
+@cli.command('cancel', short_help="Cancel an operation.")
|
|
| 159 |
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
|
|
| 160 |
+@pass_context
|
|
| 161 |
+def cancel(context, operation_name):
|
|
| 162 |
+ context.logger.info("Cancelling an operation...")
|
|
| 163 |
+ stub = operations_pb2_grpc.OperationsStub(context.channel)
|
|
| 164 |
+ |
|
| 165 |
+ request = operations_pb2.CancelOperationRequest(name=operation_name)
|
|
| 166 |
+ |
|
| 167 |
+ try:
|
|
| 168 |
+ stub.CancelOperation(request)
|
|
| 169 |
+ except grpc.RpcError as e:
|
|
| 170 |
+ status_code = e.code()
|
|
| 171 |
+ if status_code != grpc.StatusCode.CANCELLED:
|
|
| 172 |
+ raise
|
|
| 173 |
+ |
|
| 174 |
+ context.logger.info("Operation cancelled: [{}]".format(request))
|
|
| 175 |
+ |
|
| 176 |
+ |
|
| 158 | 177 |
@cli.command('list', short_help="List operations.")
|
| 159 | 178 |
@click.option('--json', is_flag=True, show_default=True,
|
| 160 | 179 |
help="Print operations list in JSON format.")
|
| ... | ... | @@ -71,8 +71,11 @@ class ExecutionInstance: |
| 71 | 71 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 72 | 72 |
|
| 73 | 73 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 74 |
- operation = message_queue.get()
|
|
| 75 |
- while not operation.done:
|
|
| 76 |
- yield operation
|
|
| 77 |
- operation = message_queue.get()
|
|
| 78 |
- yield operation
|
|
| 74 |
+ job = message_queue.get()
|
|
| 75 |
+ while not job.operation.done:
|
|
| 76 |
+ yield job.operation
|
|
| 77 |
+ job = message_queue.get()
|
|
| 78 |
+ |
|
| 79 |
+ job.check_operation_status()
|
|
| 80 |
+ |
|
| 81 |
+ yield job.operation
|
| ... | ... | @@ -113,7 +113,7 @@ class Job: |
| 113 | 113 |
queue (queue.Queue): the event queue to register.
|
| 114 | 114 |
"""
|
| 115 | 115 |
self._operation_update_queues.append(queue)
|
| 116 |
- queue.put(self._operation)
|
|
| 116 |
+ queue.put(self)
|
|
| 117 | 117 |
|
| 118 | 118 |
def unregister_client(self, queue):
|
| 119 | 119 |
"""Unsubscribes to the job's :class:`Operation` stage change events.
|
| ... | ... | @@ -229,7 +229,16 @@ class Job: |
| 229 | 229 |
self._operation.metadata.Pack(self.__operation_metadata)
|
| 230 | 230 |
|
| 231 | 231 |
for queue in self._operation_update_queues:
|
| 232 |
- queue.put(self._operation)
|
|
| 232 |
+ queue.put(self)
|
|
| 233 |
+ |
|
| 234 |
+ def check_operation_status(self):
|
|
| 235 |
+ """Reports errors on unexpected job's :class:Operation state.
|
|
| 236 |
+ |
|
| 237 |
+ Raises:
|
|
| 238 |
+ CancelledError: if the job's :class:Operation was cancelled.
|
|
| 239 |
+ """
|
|
| 240 |
+ if self.__operation_cancelled:
|
|
| 241 |
+ raise CancelledError(self.__execute_response.status.message)
|
|
| 233 | 242 |
|
| 234 | 243 |
def cancel_operation(self):
|
| 235 | 244 |
"""Triggers a job's :class:Operation cancellation.
|
| ... | ... | @@ -86,8 +86,11 @@ class OperationsInstance: |
| 86 | 86 |
raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
| 87 | 87 |
|
| 88 | 88 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 89 |
- operation = message_queue.get()
|
|
| 90 |
- while not operation.done:
|
|
| 91 |
- yield operation
|
|
| 92 |
- operation = message_queue.get()
|
|
| 93 |
- yield operation
|
|
| 89 |
+ job = message_queue.get()
|
|
| 90 |
+ while not job.operation.done:
|
|
| 91 |
+ yield job.operation
|
|
| 92 |
+ job = message_queue.get()
|
|
| 93 |
+ |
|
| 94 |
+ job.check_operation_status()
|
|
| 95 |
+ |
|
| 96 |
+ yield job.operation
|
| ... | ... | @@ -24,6 +24,7 @@ import grpc |
| 24 | 24 |
from grpc._server import _Context
|
| 25 | 25 |
import pytest
|
| 26 | 26 |
|
| 27 |
+from buildgrid._enums import OperationStage
|
|
| 27 | 28 |
from buildgrid._exceptions import InvalidArgumentError
|
| 28 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 29 | 30 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| ... | ... | @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context): |
| 236 | 237 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
| 237 | 238 |
|
| 238 | 239 |
|
| 239 |
-def test_cancel_operation(instance, context):
|
|
| 240 |
+def test_cancel_operation(instance, controller, execute_request, context):
|
|
| 241 |
+ response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
| 242 |
+ execute_request.skip_cache_lookup)
|
|
| 243 |
+ |
|
| 240 | 244 |
request = operations_pb2.CancelOperationRequest()
|
| 241 |
- request.name = "{}/{}".format(instance_name, "runner")
|
|
| 245 |
+ request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
| 246 |
+ |
|
| 242 | 247 |
instance.CancelOperation(request, context)
|
| 243 | 248 |
|
| 244 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 249 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
|
|
| 250 |
+ |
|
| 251 |
+ request = operations_pb2.ListOperationsRequest(name=instance_name)
|
|
| 252 |
+ response = instance.ListOperations(request, context)
|
|
| 253 |
+ |
|
| 254 |
+ assert len(response.operations) is 1
|
|
| 255 |
+ |
|
| 256 |
+ for operation in response.operations:
|
|
| 257 |
+ operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
| 258 |
+ operation.metadata.Unpack(operation_metadata)
|
|
| 259 |
+ assert operation_metadata.stage == OperationStage.COMPLETED.value
|
|
| 245 | 260 |
|
| 246 | 261 |
|
| 247 | 262 |
def test_cancel_operation_blank(blank_instance, context):
|
| ... | ... | @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context): |
| 249 | 264 |
request.name = "runner"
|
| 250 | 265 |
blank_instance.CancelOperation(request, context)
|
| 251 | 266 |
|
| 252 |
- context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 267 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 253 | 268 |
|
| 254 | 269 |
|
| 255 | 270 |
def test_cancel_operation_instance_fail(instance, context):
|
