| ... | ... | @@ -20,15 +20,18 @@ | 
| 20 | 20 |  
 | 
| 21 | 21 |  import os
 | 
| 22 | 22 |  from urllib.parse import urlparse
 | 
|  | 23 | +from functools import partial
 | 
| 23 | 24 |  
 | 
| 24 | 25 |  import grpc
 | 
| 25 | 26 |  
 | 
| 26 | 27 |  from . import Sandbox
 | 
| 27 | 28 |  from ..storage._filebaseddirectory import FileBasedDirectory
 | 
| 28 | 29 |  from ..storage._casbaseddirectory import CasBasedDirectory
 | 
|  | 30 | +from .. import _signals
 | 
| 29 | 31 |  from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 30 | 32 |  from .._protos.google.rpc import code_pb2
 | 
| 31 | 33 |  from .._exceptions import SandboxError
 | 
|  | 34 | +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
 | 
| 32 | 35 |  
 | 
| 33 | 36 |  
 | 
| 34 | 37 |  # SandboxRemote()
 | 
| ... | ... | @@ -51,6 +54,7 @@ class SandboxRemote(Sandbox): | 
| 51 | 54 |                                 "Only plain HTTP is currenlty supported (no HTTPS).")
 | 
| 52 | 55 |  
 | 
| 53 | 56 |          self.server_url = '{}:{}'.format(url.hostname, url.port)
 | 
|  | 57 | +        self.operation_name = None
 | 
| 54 | 58 |  
 | 
| 55 | 59 |      def run_remote_command(self, command, input_root_digest, working_directory, environment):
 | 
| 56 | 60 |          # Sends an execution request to the remote execution server.
 | 
| ... | ... | @@ -101,11 +105,18 @@ class SandboxRemote(Sandbox): | 
| 101 | 105 |                      request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
 | 
| 102 | 106 |                      operation_iterator = stub.WaitExecution(request)
 | 
| 103 | 107 |  
 | 
|  | 108 | +                operation = next(operation_iterator)
 | 
|  | 109 | +                self.operation_name = operation.name
 | 
|  | 110 | +                if operation.done:
 | 
|  | 111 | +                    return operation
 | 
|  | 112 | +                else:
 | 
|  | 113 | +                    last_operation = operation
 | 
| 104 | 114 |                  for operation in operation_iterator:
 | 
| 105 | 115 |                      if operation.done:
 | 
| 106 | 116 |                          return operation
 | 
| 107 | 117 |                      else:
 | 
| 108 | 118 |                          last_operation = operation
 | 
|  | 119 | +
 | 
| 109 | 120 |              except grpc.RpcError as e:
 | 
| 110 | 121 |                  status_code = e.code()
 | 
| 111 | 122 |                  if status_code == grpc.StatusCode.UNAVAILABLE:
 | 
| ... | ... | @@ -125,19 +136,38 @@ class SandboxRemote(Sandbox): | 
| 125 | 136 |  
 | 
| 126 | 137 |              return last_operation
 | 
| 127 | 138 |  
 | 
|  | 139 | +        # Set up signal handler to trigger cancel_operation on SIGTERM
 | 
| 128 | 140 |          operation = None
 | 
| 129 |  | -        with self._get_context().timed_activity("Waiting for the remote build to complete"):
 | 
|  | 141 | +        with self._get_context().timed_activity("Waiting for the remote build to complete"), \
 | 
|  | 142 | +            _signals.terminator(partial(self.cancel_operation, channel)):
 | 
| 130 | 143 |              operation = __run_remote_command(stub, execute_request=request)
 | 
| 131 | 144 |              if operation is None:
 | 
| 132 | 145 |                  return None
 | 
| 133 | 146 |              elif operation.done:
 | 
| 134 | 147 |                  return operation
 | 
| 135 |  | -
 | 
| 136 | 148 |              while operation is not None and not operation.done:
 | 
| 137 | 149 |                  operation = __run_remote_command(stub, running_operation=operation)
 | 
| 138 | 150 |  
 | 
| 139 | 151 |          return operation
 | 
| 140 | 152 |  
 | 
|  | 153 | +    def cancel_operation(self, channel):
 | 
|  | 154 | +        # If we don't have the name can't send request.
 | 
|  | 155 | +        if self.operation_name is None:
 | 
|  | 156 | +            return
 | 
|  | 157 | +
 | 
|  | 158 | +        stub = operations_pb2_grpc.OperationsStub(channel)
 | 
|  | 159 | +        request = operations_pb2.CancelOperationRequest(
 | 
|  | 160 | +            name=str(self.operation_name))
 | 
|  | 161 | +
 | 
|  | 162 | +        try:
 | 
|  | 163 | +            stub.CancelOperation(request)
 | 
|  | 164 | +        except grpc.RpcError as e:
 | 
|  | 165 | +            if (e.code() == grpc.StatusCode.UNIMPLEMENTED
 | 
|  | 166 | +                or e.code() == grpc.StatusCode.INVALID_ARGUMENT):
 | 
|  | 167 | +                pass
 | 
|  | 168 | +            else:
 | 
|  | 169 | +                raise SandboxError("{} ({})".format(e.details(), e.code().name))
 | 
|  | 170 | +
 | 
| 141 | 171 |      def process_job_output(self, output_directories, output_files):
 | 
| 142 | 172 |          # Reads the remote execution server response to an execution request.
 | 
| 143 | 173 |          #
 |