... |
... |
@@ -19,15 +19,19 @@ |
19
|
19
|
# Jim MacArthur <jim macarthur codethink co uk>
|
20
|
20
|
|
21
|
21
|
import os
|
|
22
|
+import signal
|
22
|
23
|
from urllib.parse import urlparse
|
|
24
|
+from functools import partial
|
23
|
25
|
|
24
|
26
|
import grpc
|
25
|
27
|
|
26
|
28
|
from . import Sandbox
|
27
|
29
|
from ..storage._filebaseddirectory import FileBasedDirectory
|
28
|
30
|
from ..storage._casbaseddirectory import CasBasedDirectory
|
|
31
|
+from .. import _signals
|
29
|
32
|
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
30
|
33
|
from .._protos.google.rpc import code_pb2
|
|
34
|
+from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
|
31
|
35
|
|
32
|
36
|
|
33
|
37
|
class SandboxError(Exception):
|
... |
... |
@@ -54,6 +58,7 @@ class SandboxRemote(Sandbox): |
54
|
58
|
"Only plain HTTP is currenlty supported (no HTTPS).")
|
55
|
59
|
|
56
|
60
|
self.server_url = '{}:{}'.format(url.hostname, url.port)
|
|
61
|
+ self.operation_name = None
|
57
|
62
|
|
58
|
63
|
def run_remote_command(self, command, input_root_digest, working_directory, environment):
|
59
|
64
|
# Sends an execution request to the remote execution server.
|
... |
... |
@@ -104,11 +109,21 @@ class SandboxRemote(Sandbox): |
104
|
109
|
request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
|
105
|
110
|
operation_iterator = stub.WaitExecution(request)
|
106
|
111
|
|
107
|
|
- for operation in operation_iterator:
|
108
|
|
- if operation.done:
|
109
|
|
- return operation
|
110
|
|
- else:
|
111
|
|
- last_operation = operation
|
|
112
|
+ # Set up SIGTERM handler, but block for first operation to get name
|
|
113
|
+ with _signals.terminator(partial(self.cancel_operation, channel)):
|
|
114
|
+ with _signals.blocked([signal.SIGTERM], ignore=False):
|
|
115
|
+ operation = next(operation_iterator)
|
|
116
|
+ self.operation_name = operation.name
|
|
117
|
+ if operation.done:
|
|
118
|
+ return operation
|
|
119
|
+ else:
|
|
120
|
+ last_operation = operation
|
|
121
|
+ for operation in operation_iterator:
|
|
122
|
+ if operation.done:
|
|
123
|
+ return operation
|
|
124
|
+ else:
|
|
125
|
+ last_operation = operation
|
|
126
|
+
|
112
|
127
|
except grpc.RpcError as e:
|
113
|
128
|
status_code = e.code()
|
114
|
129
|
if status_code == grpc.StatusCode.UNAVAILABLE:
|
... |
... |
@@ -135,12 +150,22 @@ class SandboxRemote(Sandbox): |
135
|
150
|
return None
|
136
|
151
|
elif operation.done:
|
137
|
152
|
return operation
|
138
|
|
-
|
139
|
153
|
while operation is not None and not operation.done:
|
140
|
154
|
operation = __run_remote_command(stub, running_operation=operation)
|
141
|
155
|
|
142
|
156
|
return operation
|
143
|
157
|
|
|
158
|
+ def cancel_operation(self, channel):
|
|
159
|
+ stub = operations_pb2_grpc.OperationsStub(channel)
|
|
160
|
+ request = operations_pb2.CancelOperationRequest(
|
|
161
|
+ name=str(self.operation_name))
|
|
162
|
+
|
|
163
|
+ try:
|
|
164
|
+ stub.CancelOperation(request)
|
|
165
|
+ except grpc.RpcError as e:
|
|
166
|
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
167
|
+ raise SandboxError("{} ({})".format(e.details(), e.code().name))
|
|
168
|
+
|
144
|
169
|
def process_job_output(self, output_directories, output_files):
|
145
|
170
|
# Reads the remote execution server response to an execution request.
|
146
|
171
|
#
|