... |
... |
@@ -26,6 +26,7 @@ from functools import partial |
26
|
26
|
|
27
|
27
|
import grpc
|
28
|
28
|
|
|
29
|
+from buildstream._message import Message, MessageType
|
29
|
30
|
from buildstream import utils
|
30
|
31
|
from . import Sandbox, SandboxCommandError
|
31
|
32
|
from .sandbox import _SandboxBatch
|
... |
... |
@@ -40,7 +41,7 @@ from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc |
40
|
41
|
from .._artifactcache.cascache import CASRemote, CASRemoteSpec
|
41
|
42
|
|
42
|
43
|
|
43
|
|
-class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
|
|
44
|
+class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
|
44
|
45
|
pass
|
45
|
46
|
|
46
|
47
|
|
... |
... |
@@ -60,6 +61,10 @@ class SandboxRemote(Sandbox): |
60
|
61
|
|
61
|
62
|
self.storage_url = config.storage_service['url']
|
62
|
63
|
self.exec_url = config.exec_service['url']
|
|
64
|
+ if config.action_service == None:
|
|
65
|
+ self.action_url = None
|
|
66
|
+ else:
|
|
67
|
+ self.action_url = config.action_service['url']
|
63
|
68
|
|
64
|
69
|
self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
|
65
|
70
|
server_cert=config.storage_service['server-cert'],
|
... |
... |
@@ -67,6 +72,9 @@ class SandboxRemote(Sandbox): |
67
|
72
|
client_cert=config.storage_service['client-cert'])
|
68
|
73
|
self.operation_name = None
|
69
|
74
|
|
|
75
|
+ def info(self, msg):
|
|
76
|
+ self._get_context().message(Message(None, MessageType.INFO, msg))
|
|
77
|
+
|
70
|
78
|
@staticmethod
|
71
|
79
|
def specs_from_config_node(config_node, basedir):
|
72
|
80
|
|
... |
... |
@@ -89,12 +97,17 @@ class SandboxRemote(Sandbox): |
89
|
97
|
|
90
|
98
|
tls_keys = ['client-key', 'client-cert', 'server-cert']
|
91
|
99
|
|
92
|
|
- _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url'])
|
|
100
|
+ _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url', 'action-service'])
|
93
|
101
|
remote_exec_service_config = require_node(remote_config, 'execution-service')
|
94
|
102
|
remote_exec_storage_config = require_node(remote_config, 'storage-service')
|
|
103
|
+ remote_exec_action_config = remote_config.get('action-service')
|
95
|
104
|
|
96
|
105
|
_yaml.node_validate(remote_exec_service_config, ['url'])
|
97
|
106
|
_yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
|
|
107
|
+ if remote_exec_action_config:
|
|
108
|
+ _yaml.node_validate(remote_exec_action_config, ['url'])
|
|
109
|
+ else:
|
|
110
|
+ remote_config['action-service'] = None
|
98
|
111
|
|
99
|
112
|
if 'url' in remote_config:
|
100
|
113
|
if 'execution-service' not in remote_config:
|
... |
... |
@@ -115,7 +128,9 @@ class SandboxRemote(Sandbox): |
115
|
128
|
"remote-execution configuration. Your config is missing '{}'."
|
116
|
129
|
.format(str(provenance), tls_keys, key))
|
117
|
130
|
|
118
|
|
- spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
|
|
131
|
+ spec = RemoteExecutionSpec(remote_config['execution-service'],
|
|
132
|
+ remote_config['storage-service'],
|
|
133
|
+ remote_config['action-service'])
|
119
|
134
|
return spec
|
120
|
135
|
|
121
|
136
|
def run_remote_command(self, channel, action_digest):
|
... |
... |
@@ -277,7 +292,7 @@ class SandboxRemote(Sandbox): |
277
|
292
|
"and '{}' was supplied.".format(url.scheme))
|
278
|
293
|
|
279
|
294
|
# check action cache download and download if there
|
280
|
|
- action_result = self._check_action_cache(channel, action_digest)
|
|
295
|
+ action_result = self._check_action_cache(action_digest)
|
281
|
296
|
|
282
|
297
|
if not action_result:
|
283
|
298
|
casremote = CASRemote(self.storage_remote_spec)
|
... |
... |
@@ -318,21 +333,28 @@ class SandboxRemote(Sandbox): |
318
|
333
|
|
319
|
334
|
return 0
|
320
|
335
|
|
321
|
|
- def _check_action_cache(self, channel, action_digest):
|
|
336
|
+ def _check_action_cache(self, action_digest):
|
322
|
337
|
# Checks the action cache to see if this artifact has already been built
|
323
|
338
|
#
|
324
|
339
|
# Should return either the action response or None if not found, raise
|
325
|
340
|
# Sandboxerror if other grpc error was raised
|
|
341
|
+ if not self.action_url:
|
|
342
|
+ return None
|
|
343
|
+ url = urlparse(self.action_url)
|
|
344
|
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
|
326
|
345
|
request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
|
327
|
346
|
stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
|
328
|
347
|
try:
|
329
|
|
- return stub.GetActionResult(request)
|
|
348
|
+ result = stub.GetActionResult(request)
|
330
|
349
|
except grpc.RpcError as e:
|
331
|
350
|
if e.code() != grpc.StatusCode.NOT_FOUND:
|
332
|
351
|
raise SandboxError("Failed to query action cache: {} ({})"
|
333
|
352
|
.format(e.code(), e.details()))
|
334
|
353
|
else:
|
335
|
354
|
return None
|
|
355
|
+ else:
|
|
356
|
+ self.info("Action result found in action cache")
|
|
357
|
+ return result
|
336
|
358
|
|
337
|
359
|
def _create_command(self, command, working_directory, environment):
|
338
|
360
|
# Creates a command proto
|