finn pushed to branch finn/74-operation-cancelation at BuildGrid / buildgrid
Commits:
- 
28bf01e2
by Finn at 2018-11-02T14:23:38Z
 - 
146d07e0
by Finn at 2018-11-02T14:33:12Z
 - 
d4735838
by Finn at 2018-11-02T14:33:16Z
 
5 changed files:
- buildgrid/_app/commands/cmd_operation.py
 - buildgrid/server/execution/instance.py
 - buildgrid/server/job.py
 - buildgrid/server/operations/instance.py
 - tests/integration/operations_service.py
 
Changes:
| ... | ... | @@ -155,6 +155,25 @@ def status(context, operation_name, json): | 
| 155 | 155 | 
         click.echo(json_format.MessageToJson(operation))
 | 
| 156 | 156 | 
 | 
| 157 | 157 | 
 | 
| 158 | 
+@cli.command('cancel', short_help="Cancel an operation.")
 | 
|
| 159 | 
+@click.argument('operation-name', nargs=1, type=click.STRING, required=True)
 | 
|
| 160 | 
+@pass_context
 | 
|
| 161 | 
+def cancel(context, operation_name):
 | 
|
| 162 | 
+    context.logger.info("Cancelling an operation...")
 | 
|
| 163 | 
+    stub = operations_pb2_grpc.OperationsStub(context.channel)
 | 
|
| 164 | 
+  | 
|
| 165 | 
+    request = operations_pb2.CancelOperationRequest(name=operation_name)
 | 
|
| 166 | 
+  | 
|
| 167 | 
+    try:
 | 
|
| 168 | 
+        stub.CancelOperation(request)
 | 
|
| 169 | 
+    except grpc.RpcError as e:
 | 
|
| 170 | 
+        status_code = e.code()
 | 
|
| 171 | 
+        if status_code != grpc.StatusCode.CANCELLED:
 | 
|
| 172 | 
+            raise
 | 
|
| 173 | 
+  | 
|
| 174 | 
+    context.logger.info("Operation cancelled: [{}]".format(request))
 | 
|
| 175 | 
+  | 
|
| 176 | 
+  | 
|
| 158 | 177 | 
 @cli.command('list', short_help="List operations.")
 | 
| 159 | 178 | 
 @click.option('--json', is_flag=True, show_default=True,
 | 
| 160 | 179 | 
               help="Print operations list in JSON format.")
 | 
| ... | ... | @@ -71,8 +71,11 @@ class ExecutionInstance: | 
| 71 | 71 | 
             raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
| 72 | 72 | 
 | 
| 73 | 73 | 
     def stream_operation_updates(self, message_queue, operation_name):
 | 
| 74 | 
-        operation = message_queue.get()
 | 
|
| 75 | 
-        while not operation.done:
 | 
|
| 76 | 
-            yield operation
 | 
|
| 77 | 
-            operation = message_queue.get()
 | 
|
| 78 | 
-        yield operation
 | 
|
| 74 | 
+        job = message_queue.get()
 | 
|
| 75 | 
+        while not job.operation.done:
 | 
|
| 76 | 
+            yield job.operation
 | 
|
| 77 | 
+            job = message_queue.get()
 | 
|
| 78 | 
+  | 
|
| 79 | 
+        job.check_operation_status()
 | 
|
| 80 | 
+  | 
|
| 81 | 
+        yield job.operation
 | 
| ... | ... | @@ -113,7 +113,7 @@ class Job: | 
| 113 | 113 | 
             queue (queue.Queue): the event queue to register.
 | 
| 114 | 114 | 
         """
 | 
| 115 | 115 | 
         self._operation_update_queues.append(queue)
 | 
| 116 | 
-        queue.put(self._operation)
 | 
|
| 116 | 
+        queue.put(self)
 | 
|
| 117 | 117 | 
 | 
| 118 | 118 | 
     def unregister_client(self, queue):
 | 
| 119 | 119 | 
         """Unsubscribes to the job's :class:`Operation` stage change events.
 | 
| ... | ... | @@ -229,7 +229,16 @@ class Job: | 
| 229 | 229 | 
         self._operation.metadata.Pack(self.__operation_metadata)
 | 
| 230 | 230 | 
 | 
| 231 | 231 | 
         for queue in self._operation_update_queues:
 | 
| 232 | 
-            queue.put(self._operation)
 | 
|
| 232 | 
+            queue.put(self)
 | 
|
| 233 | 
+  | 
|
| 234 | 
+    def check_operation_status(self):
 | 
|
| 235 | 
+        """Reports errors on unexpected job's :class:Operation state.
 | 
|
| 236 | 
+  | 
|
| 237 | 
+        Raises:
 | 
|
| 238 | 
+            CancelledError: if the job's :class:Operation was cancelled.
 | 
|
| 239 | 
+        """
 | 
|
| 240 | 
+        if self.__operation_cancelled:
 | 
|
| 241 | 
+            raise CancelledError(self.__execute_response.status.message)
 | 
|
| 233 | 242 | 
 | 
| 234 | 243 | 
     def cancel_operation(self):
 | 
| 235 | 244 | 
         """Triggers a job's :class:Operation cancellation.
 | 
| ... | ... | @@ -86,8 +86,11 @@ class OperationsInstance: | 
| 86 | 86 | 
             raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
 | 
| 87 | 87 | 
 | 
| 88 | 88 | 
     def stream_operation_updates(self, message_queue, operation_name):
 | 
| 89 | 
-        operation = message_queue.get()
 | 
|
| 90 | 
-        while not operation.done:
 | 
|
| 91 | 
-            yield operation
 | 
|
| 92 | 
-            operation = message_queue.get()
 | 
|
| 93 | 
-        yield operation
 | 
|
| 89 | 
+        job = message_queue.get()
 | 
|
| 90 | 
+        while not job.operation.done:
 | 
|
| 91 | 
+            yield job.operation
 | 
|
| 92 | 
+            job = message_queue.get()
 | 
|
| 93 | 
+  | 
|
| 94 | 
+        job.check_operation_status()
 | 
|
| 95 | 
+  | 
|
| 96 | 
+        yield job.operation
 | 
| ... | ... | @@ -24,6 +24,7 @@ import grpc | 
| 24 | 24 | 
 from grpc._server import _Context
 | 
| 25 | 25 | 
 import pytest
 | 
| 26 | 26 | 
 | 
| 27 | 
+from buildgrid._enums import OperationStage
 | 
|
| 27 | 28 | 
 from buildgrid._exceptions import InvalidArgumentError
 | 
| 28 | 29 | 
 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 29 | 30 | 
 from buildgrid._protos.google.longrunning import operations_pb2
 | 
| ... | ... | @@ -236,12 +237,26 @@ def test_delete_operation_fail(instance, context): | 
| 236 | 237 | 
     context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 237 | 238 | 
 | 
| 238 | 239 | 
 | 
| 239 | 
-def test_cancel_operation(instance, context):
 | 
|
| 240 | 
+def test_cancel_operation(instance, controller, execute_request, context):
 | 
|
| 241 | 
+    response_execute = controller.execution_instance.execute(execute_request.action_digest,
 | 
|
| 242 | 
+                                                             execute_request.skip_cache_lookup)
 | 
|
| 243 | 
+  | 
|
| 240 | 244 | 
     request = operations_pb2.CancelOperationRequest()
 | 
| 241 | 
-    request.name = "{}/{}".format(instance_name, "runner")
 | 
|
| 245 | 
+    request.name = "{}/{}".format(instance_name, response_execute.name)
 | 
|
| 246 | 
+  | 
|
| 242 | 247 | 
     instance.CancelOperation(request, context)
 | 
| 243 | 248 | 
 | 
| 244 | 
-    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
|
| 249 | 
+    context.set_code.assert_called_once_with(grpc.StatusCode.CANCELLED)
 | 
|
| 250 | 
+  | 
|
| 251 | 
+    request = operations_pb2.ListOperationsRequest(name=instance_name)
 | 
|
| 252 | 
+    response = instance.ListOperations(request, context)
 | 
|
| 253 | 
+  | 
|
| 254 | 
+    assert len(response.operations) is 1
 | 
|
| 255 | 
+  | 
|
| 256 | 
+    for operation in response.operations:
 | 
|
| 257 | 
+        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
|
| 258 | 
+        operation.metadata.Unpack(operation_metadata)
 | 
|
| 259 | 
+        assert operation_metadata.stage == OperationStage.COMPLETED.value
 | 
|
| 245 | 260 | 
 | 
| 246 | 261 | 
 | 
| 247 | 262 | 
 def test_cancel_operation_blank(blank_instance, context):
 | 
| ... | ... | @@ -249,7 +264,7 @@ def test_cancel_operation_blank(blank_instance, context): | 
| 249 | 264 | 
     request.name = "runner"
 | 
| 250 | 265 | 
     blank_instance.CancelOperation(request, context)
 | 
| 251 | 266 | 
 | 
| 252 | 
-    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
|
| 267 | 
+    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
 | 
|
| 253 | 268 | 
 | 
| 254 | 269 | 
 | 
| 255 | 270 | 
 def test_cancel_operation_instance_fail(instance, context):
 | 
