| ... | ... | @@ -19,15 +19,19 @@ | 
| 19 | 19 |  #        Jim MacArthur <jim macarthur codethink co uk>
 | 
| 20 | 20 |  
 | 
| 21 | 21 |  import os
 | 
|  | 22 | +import signal
 | 
| 22 | 23 |  from urllib.parse import urlparse
 | 
|  | 24 | +from functools import partial
 | 
| 23 | 25 |  
 | 
| 24 | 26 |  import grpc
 | 
| 25 | 27 |  
 | 
| 26 | 28 |  from . import Sandbox
 | 
| 27 | 29 |  from ..storage._filebaseddirectory import FileBasedDirectory
 | 
| 28 | 30 |  from ..storage._casbaseddirectory import CasBasedDirectory
 | 
|  | 31 | +from .. import _signals
 | 
| 29 | 32 |  from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 30 | 33 |  from .._protos.google.rpc import code_pb2
 | 
|  | 34 | +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
 | 
| 31 | 35 |  
 | 
| 32 | 36 |  
 | 
| 33 | 37 |  class SandboxError(Exception):
 | 
| ... | ... | @@ -54,6 +58,7 @@ class SandboxRemote(Sandbox): | 
| 54 | 58 |                                 "Only plain HTTP is currenlty supported (no HTTPS).")
 | 
| 55 | 59 |  
 | 
| 56 | 60 |          self.server_url = '{}:{}'.format(url.hostname, url.port)
 | 
|  | 61 | +        self.operation_name = None
 | 
| 57 | 62 |  
 | 
| 58 | 63 |      def run_remote_command(self, command, input_root_digest, working_directory, environment):
 | 
| 59 | 64 |          # Sends an execution request to the remote execution server.
 | 
| ... | ... | @@ -104,11 +109,17 @@ class SandboxRemote(Sandbox): | 
| 104 | 109 |                      request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
 | 
| 105 | 110 |                      operation_iterator = stub.WaitExecution(request)
 | 
| 106 | 111 |  
 | 
| 107 |  | -                for operation in operation_iterator:
 | 
| 108 |  | -                    if operation.done:
 | 
| 109 |  | -                        return operation
 | 
| 110 |  | -                    else:
 | 
| 111 |  | -                        last_operation = operation
 | 
|  | 112 | +                with _signals.terminator(partial(self.cancel_operation, channel)):
 | 
|  | 113 | +                    with _signals.blocked([signal.SIGTERM], ignore=False):
 | 
|  | 114 | +                        operation = next(operation_iterator)
 | 
|  | 115 | +                        self.operation_name = operation.name
 | 
|  | 116 | +                        if operation.done:
 | 
|  | 117 | +                            return operation
 | 
|  | 118 | +                    for operation in operation_iterator:
 | 
|  | 119 | +                        if operation.done:
 | 
|  | 120 | +                            return operation
 | 
|  | 121 | +                        else:
 | 
|  | 122 | +                            last_operation = operation
 | 
| 112 | 123 |              except grpc.RpcError as e:
 | 
| 113 | 124 |                  status_code = e.code()
 | 
| 114 | 125 |                  if status_code == grpc.StatusCode.UNAVAILABLE:
 | 
| ... | ... | @@ -135,12 +146,22 @@ class SandboxRemote(Sandbox): | 
| 135 | 146 |                  return None
 | 
| 136 | 147 |              elif operation.done:
 | 
| 137 | 148 |                  return operation
 | 
| 138 |  | -
 | 
| 139 | 149 |              while operation is not None and not operation.done:
 | 
| 140 | 150 |                  operation = __run_remote_command(stub, running_operation=operation)
 | 
| 141 | 151 |  
 | 
| 142 | 152 |          return operation
 | 
| 143 | 153 |  
 | 
|  | 154 | +    def cancel_operation(self, channel):
 | 
|  | 155 | +        stub = operations_pb2_grpc.OperationsStub(channel)
 | 
|  | 156 | +        request = operations_pb2.CancelOperationRequest(
 | 
|  | 157 | +            name="{}".format(self.operation_name))
 | 
|  | 158 | +
 | 
|  | 159 | +        try:
 | 
|  | 160 | +            stub.CancelOperation(request)
 | 
|  | 161 | +        except grpc.RpcError as e:
 | 
|  | 162 | +            if e.code() != grpc.StatusCode.UNIMPLEMENTED:
 | 
|  | 163 | +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
 | 
|  | 164 | +
 | 
| 144 | 165 |      def process_job_output(self, output_directories, output_files):
 | 
| 145 | 166 |          # Reads the remote execution server response to an execution request.
 | 
| 146 | 167 |          #
 |