Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid
Commits:
-
a1ee2373
by Martin Blanchard at 2018-12-18T15:31:41Z
-
97e9d231
by Martin Blanchard at 2019-01-09T09:03:52Z
-
dc19f204
by Martin Blanchard at 2019-01-09T09:03:52Z
-
d7e793b0
by Martin Blanchard at 2019-01-09T09:03:52Z
-
aed13219
by Martin Blanchard at 2019-01-09T09:03:52Z
-
80341fc6
by Martin Blanchard at 2019-01-10T12:15:40Z
-
6b9e3a72
by Martin Blanchard at 2019-01-10T12:15:40Z
-
9176de9b
by Martin Blanchard at 2019-01-10T12:15:40Z
-
97734e28
by Martin Blanchard at 2019-01-10T12:15:40Z
-
0be7a01b
by Martin Blanchard at 2019-01-10T12:15:40Z
-
51ec760b
by Martin Blanchard at 2019-01-14T11:35:19Z
-
53803b85
by Martin Blanchard at 2019-01-14T11:35:20Z
-
3d335ef0
by Martin Blanchard at 2019-01-14T11:35:20Z
-
6410c3bf
by Martin Blanchard at 2019-01-14T11:35:20Z
-
c6786b53
by Martin Blanchard at 2019-01-14T11:35:20Z
-
8c4c3c05
by Martin Blanchard at 2019-01-14T11:35:21Z
-
1d0660ea
by Martin Blanchard at 2019-01-14T11:35:21Z
-
7ee0ae35
by Martin Blanchard at 2019-01-14T11:35:21Z
-
61c22c5b
by Martin Blanchard at 2019-01-14T11:35:21Z
-
e3423229
by Martin Blanchard at 2019-01-14T11:45:45Z
-
8416e634
by Martin Blanchard at 2019-01-14T11:45:45Z
-
2c0cd362
by Martin Blanchard at 2019-01-14T11:45:45Z
-
c59baac9
by Martin Blanchard at 2019-01-14T12:43:23Z
-
e892f06b
by Martin Blanchard at 2019-01-14T12:43:24Z
-
3261cea5
by Martin Blanchard at 2019-01-14T12:43:24Z
-
91aab9c8
by Martin Blanchard at 2019-01-14T12:43:24Z
-
f3170de5
by Martin Blanchard at 2019-01-14T13:03:09Z
-
5cfea63c
by Martin Blanchard at 2019-01-14T13:03:55Z
23 changed files:
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/bots/host.py
- buildgrid/bot/interface.py
- buildgrid/client/authentication.py
- buildgrid/server/_authentication.py
- buildgrid/server/actioncache/service.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/capabilities/service.py
- buildgrid/server/cas/service.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/referencestorage/service.py
- buildgrid/server/scheduler.py
- docs/source/conf.py
- docs/source/using_buildstream.rst
- tests/integration/bots_service.py
- tests/integration/execution_service.py
- tests/integration/operations_service.py
Changes:
... | ... | @@ -21,7 +21,7 @@ import tempfile |
21 | 21 |
from buildgrid.client.cas import download, upload
|
22 | 22 |
from buildgrid._exceptions import BotError
|
23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
24 |
-from buildgrid.settings import HASH_LENGTH
|
|
24 |
+from buildgrid.settings import HASH_LENGTH, MAX_REQUEST_SIZE
|
|
25 | 25 |
from buildgrid.utils import read_file, write_file
|
26 | 26 |
|
27 | 27 |
|
... | ... | @@ -52,9 +52,10 @@ def work_buildbox(lease, context, event): |
52 | 52 |
else:
|
53 | 53 |
working_directory = '/'
|
54 | 54 |
|
55 |
- logger.debug("command hash: {}".format(action.command_digest.hash))
|
|
56 |
- logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
|
57 |
- logger.debug("\n{}".format(' '.join(command.arguments)))
|
|
55 |
+ logger.debug("Command digest: [{}/{}]"
|
|
56 |
+ .format(action.command_digest.hash, action.command_digest.size_bytes))
|
|
57 |
+ logger.debug("Input root digest: [{}/{}]"
|
|
58 |
+ .format(action.input_root_digest.hash, action.input_root_digest.size_bytes))
|
|
58 | 59 |
|
59 | 60 |
os.makedirs(os.path.join(local_cas_directory, 'tmp'), exist_ok=True)
|
60 | 61 |
os.makedirs(context.fuse_dir, exist_ok=True)
|
... | ... | @@ -87,9 +88,7 @@ def work_buildbox(lease, context, event): |
87 | 88 |
command_line.append(context.fuse_dir)
|
88 | 89 |
command_line.extend(command.arguments)
|
89 | 90 |
|
90 |
- logger.debug(' '.join(command_line))
|
|
91 |
- logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
92 |
- logger.info("Launching process")
|
|
91 |
+ logger.info("Starting execution: [{}...]".format(command.arguments[0]))
|
|
93 | 92 |
|
94 | 93 |
command_line = subprocess.Popen(command_line,
|
95 | 94 |
stdin=subprocess.PIPE,
|
... | ... | @@ -97,22 +96,17 @@ def work_buildbox(lease, context, event): |
97 | 96 |
stderr=subprocess.PIPE)
|
98 | 97 |
stdout, stderr = command_line.communicate()
|
99 | 98 |
returncode = command_line.returncode
|
99 |
+ |
|
100 | 100 |
action_result = remote_execution_pb2.ActionResult()
|
101 |
- # TODO: Upload to CAS or output RAW
|
|
102 |
- # For now, just pass raw
|
|
103 |
- # https://gitlab.com/BuildGrid/buildgrid/issues/90
|
|
104 |
- action_result.stdout_raw = stdout
|
|
105 |
- action_result.stderr_raw = stderr
|
|
106 | 101 |
action_result.exit_code = returncode
|
107 | 102 |
|
108 |
- logger.debug("BuildBox stderr: [{}]".format(stderr))
|
|
109 |
- logger.debug("BuildBox stdout: [{}]".format(stdout))
|
|
110 |
- logger.debug("BuildBox exit code: [{}]".format(returncode))
|
|
103 |
+ logger.info("Execution finished with code: [{}]".format(returncode))
|
|
111 | 104 |
|
112 | 105 |
output_digest = remote_execution_pb2.Digest()
|
113 | 106 |
output_digest.ParseFromString(read_file(output_digest_file.name))
|
114 | 107 |
|
115 |
- logger.debug("Output root digest: [{}]".format(output_digest))
|
|
108 |
+ logger.debug("Output root digest: [{}/{}]"
|
|
109 |
+ .format(output_digest.hash, output_digest.size_bytes))
|
|
116 | 110 |
|
117 | 111 |
if len(output_digest.hash) != HASH_LENGTH:
|
118 | 112 |
raise BotError(stdout,
|
... | ... | @@ -126,11 +120,25 @@ def work_buildbox(lease, context, event): |
126 | 120 |
with upload(context.cas_channel) as uploader:
|
127 | 121 |
output_tree_digest = uploader.put_message(output_tree)
|
128 | 122 |
|
129 |
- output_directory = remote_execution_pb2.OutputDirectory()
|
|
130 |
- output_directory.tree_digest.CopyFrom(output_tree_digest)
|
|
131 |
- output_directory.path = os.path.relpath(working_directory, start='/')
|
|
123 |
+ output_directory = remote_execution_pb2.OutputDirectory()
|
|
124 |
+ output_directory.tree_digest.CopyFrom(output_tree_digest)
|
|
125 |
+ output_directory.path = os.path.relpath(working_directory, start='/')
|
|
126 |
+ |
|
127 |
+ action_result.output_directories.extend([output_directory])
|
|
128 |
+ |
|
129 |
+ if action_result.ByteSize() + len(stdout) > MAX_REQUEST_SIZE:
|
|
130 |
+ stdout_digest = uploader.put_blob(stdout)
|
|
131 |
+ action_result.stdout_digest.CopyFrom(stdout_digest)
|
|
132 |
+ |
|
133 |
+ else:
|
|
134 |
+ action_result.stdout_raw = stdout
|
|
135 |
+ |
|
136 |
+ if action_result.ByteSize() + len(stderr) > MAX_REQUEST_SIZE:
|
|
137 |
+ stderr_digest = uploader.put_blob(stderr)
|
|
138 |
+ action_result.stderr_digest.CopyFrom(stderr_digest)
|
|
132 | 139 |
|
133 |
- action_result.output_directories.extend([output_directory])
|
|
140 |
+ else:
|
|
141 |
+ action_result.stderr_raw = stderr
|
|
134 | 142 |
|
135 | 143 |
lease.result.Pack(action_result)
|
136 | 144 |
|
... | ... | @@ -20,6 +20,7 @@ import tempfile |
20 | 20 |
|
21 | 21 |
from buildgrid.client.cas import download, upload
|
22 | 22 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
23 |
+from buildgrid.settings import MAX_REQUEST_SIZE
|
|
23 | 24 |
from buildgrid.utils import get_hostname, output_file_maker, output_directory_maker
|
24 | 25 |
|
25 | 26 |
|
... | ... | @@ -27,6 +28,7 @@ def work_host_tools(lease, context, event): |
27 | 28 |
"""Executes a lease for a build action, using host tools.
|
28 | 29 |
"""
|
29 | 30 |
instance_name = context.parent
|
31 |
+ |
|
30 | 32 |
logger = logging.getLogger(__name__)
|
31 | 33 |
|
32 | 34 |
action_digest = remote_execution_pb2.Digest()
|
... | ... | @@ -51,6 +53,11 @@ def work_host_tools(lease, context, event): |
51 | 53 |
|
52 | 54 |
downloader.download_directory(action.input_root_digest, temp_directory)
|
53 | 55 |
|
56 |
+ logger.debug("Command digest: [{}/{}]"
|
|
57 |
+ .format(action.command_digest.hash, action.command_digest.size_bytes))
|
|
58 |
+ logger.debug("Input root digest: [{}/{}]"
|
|
59 |
+ .format(action.input_root_digest.hash, action.input_root_digest.size_bytes))
|
|
60 |
+ |
|
54 | 61 |
action_result.execution_metadata.input_fetch_completed_timestamp.GetCurrentTime()
|
55 | 62 |
|
56 | 63 |
environment = os.environ.copy()
|
... | ... | @@ -76,7 +83,7 @@ def work_host_tools(lease, context, event): |
76 | 83 |
os.path.dirname(output_path))
|
77 | 84 |
os.makedirs(directory_path, exist_ok=True)
|
78 | 85 |
|
79 |
- logger.debug(' '.join(command_line))
|
|
86 |
+ logger.info("Starting execution: [{}...]".format(command.arguments[0]))
|
|
80 | 87 |
|
81 | 88 |
action_result.execution_metadata.execution_start_timestamp.GetCurrentTime()
|
82 | 89 |
|
... | ... | @@ -92,16 +99,9 @@ def work_host_tools(lease, context, event): |
92 | 99 |
|
93 | 100 |
action_result.execution_metadata.execution_completed_timestamp.GetCurrentTime()
|
94 | 101 |
|
95 |
- # TODO: Upload to CAS or output RAW
|
|
96 |
- # For now, just pass raw
|
|
97 |
- # https://gitlab.com/BuildGrid/buildgrid/issues/90
|
|
98 |
- action_result.stdout_raw = stdout
|
|
99 |
- action_result.stderr_raw = stderr
|
|
100 | 102 |
action_result.exit_code = returncode
|
101 | 103 |
|
102 |
- logger.debug("Command stderr: [{}]".format(stderr))
|
|
103 |
- logger.debug("Command stdout: [{}]".format(stdout))
|
|
104 |
- logger.debug("Command exit code: [{}]".format(returncode))
|
|
104 |
+ logger.info("Execution finished with code: [{}]".format(returncode))
|
|
105 | 105 |
|
106 | 106 |
action_result.execution_metadata.output_upload_start_timestamp.GetCurrentTime()
|
107 | 107 |
|
... | ... | @@ -119,6 +119,9 @@ def work_host_tools(lease, context, event): |
119 | 119 |
file_digest)
|
120 | 120 |
output_files.append(output_file)
|
121 | 121 |
|
122 |
+ logger.debug("Output file digest: [{}/{}]"
|
|
123 |
+ .format(file_digest.hash, file_digest.size_bytes))
|
|
124 |
+ |
|
122 | 125 |
action_result.output_files.extend(output_files)
|
123 | 126 |
|
124 | 127 |
for output_path in command.output_directories:
|
... | ... | @@ -132,8 +135,25 @@ def work_host_tools(lease, context, event): |
132 | 135 |
tree_digest)
|
133 | 136 |
output_directories.append(output_directory)
|
134 | 137 |
|
138 |
+ logger.debug("Output tree digest: [{}/{}]"
|
|
139 |
+ .format(tree_digest.hash, tree_digest.size_bytes))
|
|
140 |
+ |
|
135 | 141 |
action_result.output_directories.extend(output_directories)
|
136 | 142 |
|
143 |
+ if action_result.ByteSize() + len(stdout) > MAX_REQUEST_SIZE:
|
|
144 |
+ stdout_digest = uploader.put_blob(stdout)
|
|
145 |
+ action_result.stdout_digest.CopyFrom(stdout_digest)
|
|
146 |
+ |
|
147 |
+ else:
|
|
148 |
+ action_result.stdout_raw = stdout
|
|
149 |
+ |
|
150 |
+ if action_result.ByteSize() + len(stderr) > MAX_REQUEST_SIZE:
|
|
151 |
+ stderr_digest = uploader.put_blob(stderr)
|
|
152 |
+ action_result.stderr_digest.CopyFrom(stderr_digest)
|
|
153 |
+ |
|
154 |
+ else:
|
|
155 |
+ action_result.stderr_raw = stderr
|
|
156 |
+ |
|
137 | 157 |
action_result.execution_metadata.output_upload_completed_timestamp.GetCurrentTime()
|
138 | 158 |
|
139 | 159 |
lease.result.Pack(action_result)
|
... | ... | @@ -57,5 +57,5 @@ class BotInterface: |
57 | 57 |
try:
|
58 | 58 |
return call(request)
|
59 | 59 |
except grpc.RpcError as e:
|
60 |
- self.__logger.error(e.code())
|
|
60 |
+ self.__logger.error(e)
|
|
61 | 61 |
return e.code()
|
... | ... | @@ -191,7 +191,7 @@ class AuthMetadataClientInterceptor( |
191 | 191 |
|
192 | 192 |
class _ClientCallDetails(
|
193 | 193 |
namedtuple('_ClientCallDetails',
|
194 |
- ('method', 'timeout', 'credentials', 'metadata')),
|
|
194 |
+ ('method', 'timeout', 'credentials', 'metadata',)),
|
|
195 | 195 |
grpc.ClientCallDetails):
|
196 | 196 |
pass
|
197 | 197 |
|
... | ... | @@ -13,8 +13,10 @@ |
13 | 13 |
# limitations under the License.
|
14 | 14 |
|
15 | 15 |
|
16 |
+from collections import namedtuple
|
|
16 | 17 |
from datetime import datetime
|
17 | 18 |
from enum import Enum
|
19 |
+import functools
|
|
18 | 20 |
import logging
|
19 | 21 |
|
20 | 22 |
import grpc
|
... | ... | @@ -55,6 +57,11 @@ class AuthMetadataAlgorithm(Enum): |
55 | 57 |
JWT_RS512 = 'rs512' # RSASSA-PKCS1-v1_5 signature algorithm using SHA-512 hash algorithm
|
56 | 58 |
|
57 | 59 |
|
60 |
+class AuthContext:
|
|
61 |
+ |
|
62 |
+ interceptor = None
|
|
63 |
+ |
|
64 |
+ |
|
58 | 65 |
class _InvalidTokenError(Exception):
|
59 | 66 |
pass
|
60 | 67 |
|
... | ... | @@ -67,14 +74,66 @@ class _UnboundedTokenError(Exception): |
67 | 74 |
pass
|
68 | 75 |
|
69 | 76 |
|
77 |
+def authorize(auth_context):
|
|
78 |
+ """RPC method decorator for authorization validations.
|
|
79 |
+ |
|
80 |
+ This decorator is design to be used together with an :class:`AuthContext`
|
|
81 |
+ authorization context holder::
|
|
82 |
+ |
|
83 |
+ @authorize(AuthContext)
|
|
84 |
+ def Execute(self, request, context):
|
|
85 |
+ |
|
86 |
+ By default, any request is accepted. Authorization validation can be
|
|
87 |
+ activated by setting up a :class:`grpc.ServerInterceptor`::
|
|
88 |
+ |
|
89 |
+ AuthContext.interceptor = AuthMetadataServerInterceptor()
|
|
90 |
+ |
|
91 |
+ Args:
|
|
92 |
+ auth_context(AuthContext): Authorization context holder.
|
|
93 |
+ """
|
|
94 |
+ def __authorize_decorator(behavior):
|
|
95 |
+ """RPC authorization method decorator."""
|
|
96 |
+ _HandlerCallDetails = namedtuple(
|
|
97 |
+ '_HandlerCallDetails', ('invocation_metadata', 'method',))
|
|
98 |
+ |
|
99 |
+ @functools.wraps(behavior)
|
|
100 |
+ def __authorize_wrapper(self, request, context):
|
|
101 |
+ """RPC authorization method wrapper."""
|
|
102 |
+ if auth_context.interceptor is None:
|
|
103 |
+ return behavior(self, request, context)
|
|
104 |
+ |
|
105 |
+ authorized = False
|
|
106 |
+ |
|
107 |
+ def __continuator(handler_call_details):
|
|
108 |
+ nonlocal authorized
|
|
109 |
+ authorized = True
|
|
110 |
+ |
|
111 |
+ details = _HandlerCallDetails(context.invocation_metadata(),
|
|
112 |
+ behavior.__name__)
|
|
113 |
+ |
|
114 |
+ auth_context.interceptor.intercept_service(__continuator, details)
|
|
115 |
+ |
|
116 |
+ if authorized:
|
|
117 |
+ return behavior(self, request, context)
|
|
118 |
+ |
|
119 |
+ context.abort(grpc.StatusCode.UNAUTHENTICATED,
|
|
120 |
+ "No valid authorization or authentication provided")
|
|
121 |
+ |
|
122 |
+ return None
|
|
123 |
+ |
|
124 |
+ return __authorize_wrapper
|
|
125 |
+ |
|
126 |
+ return __authorize_decorator
|
|
127 |
+ |
|
128 |
+ |
|
70 | 129 |
class AuthMetadataServerInterceptor(grpc.ServerInterceptor):
|
71 | 130 |
|
72 | 131 |
__auth_errors = {
|
73 |
- 'missing-bearer': 'Missing authentication header field',
|
|
74 |
- 'invalid-bearer': 'Invalid authentication header field',
|
|
75 |
- 'invalid-token': 'Invalid authentication token',
|
|
76 |
- 'expired-token': 'Expired authentication token',
|
|
77 |
- 'unbounded-token': 'Unbounded authentication token',
|
|
132 |
+ 'missing-bearer': "Missing authentication header field",
|
|
133 |
+ 'invalid-bearer': "Invalid authentication header field",
|
|
134 |
+ 'invalid-token': "Invalid authentication token",
|
|
135 |
+ 'expired-token': "Expired authentication token",
|
|
136 |
+ 'unbounded-token': "Unbounded authentication token",
|
|
78 | 137 |
}
|
79 | 138 |
|
80 | 139 |
def __init__(self, method, secret=None, algorithm=AuthMetadataAlgorithm.UNSPECIFIED):
|
... | ... | @@ -27,6 +27,7 @@ import grpc |
27 | 27 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError
|
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
30 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
30 | 31 |
|
31 | 32 |
|
32 | 33 |
class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
|
... | ... | @@ -38,9 +39,14 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
38 | 39 |
|
39 | 40 |
remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
|
40 | 41 |
|
42 |
+ # --- Public API ---
|
|
43 |
+ |
|
41 | 44 |
def add_instance(self, name, instance):
|
42 | 45 |
self._instances[name] = instance
|
43 | 46 |
|
47 |
+ # --- Public API: Servicer ---
|
|
48 |
+ |
|
49 |
+ @authorize(AuthContext)
|
|
44 | 50 |
def GetActionResult(self, request, context):
|
45 | 51 |
self.__logger.debug("GetActionResult request from [%s]", context.peer())
|
46 | 52 |
|
... | ... | @@ -59,6 +65,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
59 | 65 |
|
60 | 66 |
return remote_execution_pb2.ActionResult()
|
61 | 67 |
|
68 |
+ @authorize(AuthContext)
|
|
62 | 69 |
def UpdateActionResult(self, request, context):
|
63 | 70 |
self.__logger.debug("UpdateActionResult request from [%s]", context.peer())
|
64 | 71 |
|
... | ... | @@ -78,6 +85,8 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
78 | 85 |
|
79 | 86 |
return remote_execution_pb2.ActionResult()
|
80 | 87 |
|
88 |
+ # --- Private API ---
|
|
89 |
+ |
|
81 | 90 |
def _get_instance(self, instance_name):
|
82 | 91 |
try:
|
83 | 92 |
return self._instances[instance_name]
|
... | ... | @@ -83,7 +83,7 @@ class BotsInterface: |
83 | 83 |
self._check_bot_ids(bot_session.bot_id, name)
|
84 | 84 |
self._check_assigned_leases(bot_session)
|
85 | 85 |
|
86 |
- for lease in bot_session.leases:
|
|
86 |
+ for lease in list(bot_session.leases):
|
|
87 | 87 |
checked_lease = self._check_lease_state(lease)
|
88 | 88 |
if not checked_lease:
|
89 | 89 |
# TODO: Make sure we don't need this
|
... | ... | @@ -91,7 +91,10 @@ class BotsInterface: |
91 | 91 |
self._assigned_leases[name].remove(lease.id)
|
92 | 92 |
except KeyError:
|
93 | 93 |
pass
|
94 |
- lease.Clear()
|
|
94 |
+ |
|
95 |
+ self._scheduler.delete_job_lease(lease.id)
|
|
96 |
+ |
|
97 |
+ bot_session.leases.remove(lease)
|
|
95 | 98 |
|
96 | 99 |
self._request_leases(bot_session)
|
97 | 100 |
return bot_session
|
... | ... | @@ -117,13 +120,13 @@ class BotsInterface: |
117 | 120 |
|
118 | 121 |
try:
|
119 | 122 |
if self._scheduler.get_job_lease_cancelled(lease.id):
|
120 |
- lease.state.CopyFrom(LeaseState.CANCELLED.value)
|
|
123 |
+ lease.state = LeaseState.CANCELLED.value
|
|
121 | 124 |
return lease
|
122 | 125 |
except KeyError:
|
123 | 126 |
# Job does not exist, remove from bot.
|
124 | 127 |
return None
|
125 | 128 |
|
126 |
- self._scheduler.update_job_lease(lease)
|
|
129 |
+ self._scheduler.update_job_lease_state(lease.id, lease)
|
|
127 | 130 |
|
128 | 131 |
if lease_state == LeaseState.COMPLETED:
|
129 | 132 |
return None
|
... | ... | @@ -161,7 +164,7 @@ class BotsInterface: |
161 | 164 |
self.__logger.error("Assigned lease id=[%s],"
|
162 | 165 |
" not found on bot with name=[%s] and id=[%s]."
|
163 | 166 |
" Retrying job", lease_id, bot_session.name, bot_session.bot_id)
|
164 |
- self._scheduler.retry_job(lease_id)
|
|
167 |
+ self._scheduler.retry_job_lease(lease_id)
|
|
165 | 168 |
|
166 | 169 |
def _close_bot_session(self, name):
|
167 | 170 |
""" Before removing the session, close any leases and
|
... | ... | @@ -174,7 +177,7 @@ class BotsInterface: |
174 | 177 |
|
175 | 178 |
self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
|
176 | 179 |
for lease_id in self._assigned_leases[name]:
|
177 |
- self._scheduler.retry_job(lease_id)
|
|
180 |
+ self._scheduler.retry_job_lease(lease_id)
|
|
178 | 181 |
self._assigned_leases.pop(name)
|
179 | 182 |
|
180 | 183 |
self.__logger.debug("Closing bot session: [%s]", name)
|
... | ... | @@ -29,6 +29,7 @@ from buildgrid._enums import BotStatus |
29 | 29 |
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
|
30 | 30 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
31 | 31 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
|
32 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
32 | 33 |
|
33 | 34 |
|
34 | 35 |
class BotsService(bots_pb2_grpc.BotsServicer):
|
... | ... | @@ -86,6 +87,7 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
86 | 87 |
|
87 | 88 |
# --- Public API: Servicer ---
|
88 | 89 |
|
90 |
+ @authorize(AuthContext)
|
|
89 | 91 |
def CreateBotSession(self, request, context):
|
90 | 92 |
"""Handles CreateBotSessionRequest messages.
|
91 | 93 |
|
... | ... | @@ -121,6 +123,7 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
121 | 123 |
|
122 | 124 |
return bots_pb2.BotSession()
|
123 | 125 |
|
126 |
+ @authorize(AuthContext)
|
|
124 | 127 |
def UpdateBotSession(self, request, context):
|
125 | 128 |
"""Handles UpdateBotSessionRequest messages.
|
126 | 129 |
|
... | ... | @@ -175,6 +178,7 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
175 | 178 |
|
176 | 179 |
return bots_pb2.BotSession()
|
177 | 180 |
|
181 |
+ @authorize(AuthContext)
|
|
178 | 182 |
def PostBotEventTemp(self, request, context):
|
179 | 183 |
"""Handles PostBotEventTempRequest messages.
|
180 | 184 |
|
... | ... | @@ -19,15 +19,20 @@ import grpc |
19 | 19 |
|
20 | 20 |
from buildgrid._exceptions import InvalidArgumentError
|
21 | 21 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
22 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
22 | 23 |
|
23 | 24 |
|
24 | 25 |
class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
|
25 | 26 |
|
26 | 27 |
def __init__(self, server):
|
27 | 28 |
self.__logger = logging.getLogger(__name__)
|
29 |
+ |
|
28 | 30 |
self.__instances = {}
|
31 |
+ |
|
29 | 32 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
|
30 | 33 |
|
34 |
+ # --- Public API ---
|
|
35 |
+ |
|
31 | 36 |
def add_instance(self, name, instance):
|
32 | 37 |
self.__instances[name] = instance
|
33 | 38 |
|
... | ... | @@ -40,6 +45,9 @@ class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer): |
40 | 45 |
def add_execution_instance(self, name, instance):
|
41 | 46 |
self.__instances[name].add_execution_instance(instance)
|
42 | 47 |
|
48 |
+ # --- Public API: Servicer ---
|
|
49 |
+ |
|
50 |
+ @authorize(AuthContext)
|
|
43 | 51 |
def GetCapabilities(self, request, context):
|
44 | 52 |
try:
|
45 | 53 |
instance = self._get_instance(request.instance_name)
|
... | ... | @@ -52,6 +60,8 @@ class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer): |
52 | 60 |
|
53 | 61 |
return remote_execution_pb2.ServerCapabilities()
|
54 | 62 |
|
63 |
+ # --- Private API ---
|
|
64 |
+ |
|
55 | 65 |
def _get_instance(self, name):
|
56 | 66 |
try:
|
57 | 67 |
return self.__instances[name]
|
... | ... | @@ -29,6 +29,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang |
29 | 29 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
31 | 31 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
32 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
32 | 33 |
|
33 | 34 |
|
34 | 35 |
class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
... | ... | @@ -40,9 +41,14 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
40 | 41 |
|
41 | 42 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
|
42 | 43 |
|
44 |
+ # --- Public API ---
|
|
45 |
+ |
|
43 | 46 |
def add_instance(self, name, instance):
|
44 | 47 |
self._instances[name] = instance
|
45 | 48 |
|
49 |
+ # --- Public API: Servicer ---
|
|
50 |
+ |
|
51 |
+ @authorize(AuthContext)
|
|
46 | 52 |
def FindMissingBlobs(self, request, context):
|
47 | 53 |
self.__logger.debug("FindMissingBlobs request from [%s]", context.peer())
|
48 | 54 |
|
... | ... | @@ -59,6 +65,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
59 | 65 |
|
60 | 66 |
return remote_execution_pb2.FindMissingBlobsResponse()
|
61 | 67 |
|
68 |
+ @authorize(AuthContext)
|
|
62 | 69 |
def BatchUpdateBlobs(self, request, context):
|
63 | 70 |
self.__logger.debug("BatchUpdateBlobs request from [%s]", context.peer())
|
64 | 71 |
|
... | ... | @@ -75,6 +82,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
75 | 82 |
|
76 | 83 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
77 | 84 |
|
85 |
+ @authorize(AuthContext)
|
|
78 | 86 |
def BatchReadBlobs(self, request, context):
|
79 | 87 |
self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
|
80 | 88 |
|
... | ... | @@ -83,6 +91,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
83 | 91 |
|
84 | 92 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
85 | 93 |
|
94 |
+ @authorize(AuthContext)
|
|
86 | 95 |
def GetTree(self, request, context):
|
87 | 96 |
self.__logger.debug("GetTree request from [%s]", context.peer())
|
88 | 97 |
|
... | ... | @@ -97,6 +106,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
97 | 106 |
|
98 | 107 |
yield remote_execution_pb2.GetTreeResponse()
|
99 | 108 |
|
109 |
+ # --- Private API ---
|
|
110 |
+ |
|
100 | 111 |
def _get_instance(self, instance_name):
|
101 | 112 |
try:
|
102 | 113 |
return self._instances[instance_name]
|
... | ... | @@ -114,9 +125,14 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
114 | 125 |
|
115 | 126 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
|
116 | 127 |
|
128 |
+ # --- Public API ---
|
|
129 |
+ |
|
117 | 130 |
def add_instance(self, name, instance):
|
118 | 131 |
self._instances[name] = instance
|
119 | 132 |
|
133 |
+ # --- Public API: Servicer ---
|
|
134 |
+ |
|
135 |
+ @authorize(AuthContext)
|
|
120 | 136 |
def Read(self, request, context):
|
121 | 137 |
self.__logger.debug("Read request from [%s]", context.peer())
|
122 | 138 |
|
... | ... | @@ -163,6 +179,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
163 | 179 |
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
164 | 180 |
yield bytestream_pb2.ReadResponse()
|
165 | 181 |
|
182 |
+ @authorize(AuthContext)
|
|
166 | 183 |
def Write(self, requests, context):
|
167 | 184 |
self.__logger.debug("Write request from [%s]", context.peer())
|
168 | 185 |
|
... | ... | @@ -209,12 +226,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
209 | 226 |
|
210 | 227 |
return bytestream_pb2.WriteResponse()
|
211 | 228 |
|
229 |
+ @authorize(AuthContext)
|
|
212 | 230 |
def QueryWriteStatus(self, request, context):
|
213 | 231 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
214 | 232 |
context.set_details('Method not implemented!')
|
215 | 233 |
|
216 | 234 |
return bytestream_pb2.QueryWriteStatusResponse()
|
217 | 235 |
|
236 |
+ # --- Private API ---
|
|
237 |
+ |
|
218 | 238 |
def _get_instance(self, instance_name):
|
219 | 239 |
try:
|
220 | 240 |
return self._instances[instance_name]
|
... | ... | @@ -21,11 +21,9 @@ An instance of the Remote Execution Service. |
21 | 21 |
|
22 | 22 |
import logging
|
23 | 23 |
|
24 |
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
|
24 |
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
|
|
25 | 25 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
26 |
- |
|
27 |
-from ..job import Job
|
|
28 |
-from ...utils import get_hash_type
|
|
26 |
+from buildgrid.utils import get_hash_type
|
|
29 | 27 |
|
30 | 28 |
|
31 | 29 |
class ExecutionInstance:
|
... | ... | @@ -46,44 +44,45 @@ class ExecutionInstance: |
46 | 44 |
def hash_type(self):
|
47 | 45 |
return get_hash_type()
|
48 | 46 |
|
49 |
- def execute(self, action_digest, skip_cache_lookup, message_queue=None):
|
|
47 |
+ def execute(self, action_digest, skip_cache_lookup):
|
|
50 | 48 |
""" Sends a job for execution.
|
51 | 49 |
Queues an action and creates an Operation instance to be associated with
|
52 | 50 |
this action.
|
53 | 51 |
"""
|
54 |
- |
|
55 | 52 |
action = self._storage.get_message(action_digest, Action)
|
56 | 53 |
|
57 | 54 |
if not action:
|
58 | 55 |
raise FailedPreconditionError("Could not get action from storage.")
|
59 | 56 |
|
60 |
- job = Job(action, action_digest)
|
|
61 |
- if message_queue is not None:
|
|
62 |
- job.register_client(message_queue)
|
|
57 |
+ return self._scheduler.queue_job_operation(action, action_digest, skip_cache_lookup)
|
|
63 | 58 |
|
64 |
- self._scheduler.queue_job(job, skip_cache_lookup)
|
|
59 |
+ def register_operation_client(self, operation_name, peer, message_queue):
|
|
60 |
+ try:
|
|
61 |
+ return self._scheduler.register_job_operation_client(operation_name,
|
|
62 |
+ peer, message_queue)
|
|
65 | 63 |
|
66 |
- return job.operation
|
|
64 |
+ except NotFoundError:
|
|
65 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]"
|
|
66 |
+ .format(operation_name))
|
|
67 | 67 |
|
68 |
- def register_message_client(self, name, queue):
|
|
68 |
+ def unregister_operation_client(self, operation_name, peer):
|
|
69 | 69 |
try:
|
70 |
- self._scheduler.register_client(name, queue)
|
|
70 |
+ self._scheduler.unregister_job_operation_client(operation_name, peer)
|
|
71 | 71 |
|
72 |
- except KeyError:
|
|
73 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
72 |
+ except NotFoundError:
|
|
73 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]"
|
|
74 |
+ .format(operation_name))
|
|
74 | 75 |
|
75 |
- def unregister_message_client(self, name, queue):
|
|
76 |
- try:
|
|
77 |
- self._scheduler.unregister_client(name, queue)
|
|
76 |
+ def stream_operation_updates(self, message_queue):
|
|
77 |
+ error, operation = message_queue.get()
|
|
78 |
+ if error is not None:
|
|
79 |
+ raise error
|
|
78 | 80 |
|
79 |
- except KeyError:
|
|
80 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
81 |
+ while not operation.done:
|
|
82 |
+ yield operation
|
|
81 | 83 |
|
82 |
- def stream_operation_updates(self, message_queue, operation_name):
|
|
83 |
- job = message_queue.get()
|
|
84 |
- while not job.operation.done:
|
|
85 |
- yield job.operation
|
|
86 |
- job = message_queue.get()
|
|
87 |
- job.check_operation_status()
|
|
84 |
+ error, operation = message_queue.get()
|
|
85 |
+ if error is not None:
|
|
86 |
+ raise error
|
|
88 | 87 |
|
89 |
- yield job.operation
|
|
88 |
+ yield operation
|
... | ... | @@ -29,6 +29,7 @@ import grpc |
29 | 29 |
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
|
30 | 30 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
31 | 31 |
from buildgrid._protos.google.longrunning import operations_pb2
|
32 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
32 | 33 |
|
33 | 34 |
|
34 | 35 |
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
|
... | ... | @@ -81,6 +82,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
81 | 82 |
|
82 | 83 |
# --- Public API: Servicer ---
|
83 | 84 |
|
85 |
+ @authorize(AuthContext)
|
|
84 | 86 |
def Execute(self, request, context):
|
85 | 87 |
"""Handles ExecuteRequest messages.
|
86 | 88 |
|
... | ... | @@ -96,12 +98,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
96 | 98 |
|
97 | 99 |
try:
|
98 | 100 |
instance = self._get_instance(instance_name)
|
99 |
- operation = instance.execute(request.action_digest,
|
|
100 |
- request.skip_cache_lookup,
|
|
101 |
- message_queue)
|
|
101 |
+ |
|
102 |
+ job_name = instance.execute(request.action_digest,
|
|
103 |
+ request.skip_cache_lookup)
|
|
104 |
+ |
|
105 |
+ operation_name = instance.register_operation_client(job_name,
|
|
106 |
+ peer, message_queue)
|
|
102 | 107 |
|
103 | 108 |
context.add_callback(partial(self._rpc_termination_callback,
|
104 |
- peer, instance_name, operation.name, message_queue))
|
|
109 |
+ peer, instance_name, operation_name))
|
|
105 | 110 |
|
106 | 111 |
if self._is_instrumented:
|
107 | 112 |
if peer not in self.__peers:
|
... | ... | @@ -110,16 +115,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
110 | 115 |
else:
|
111 | 116 |
self.__peers[peer] += 1
|
112 | 117 |
|
113 |
- instanced_op_name = "{}/{}".format(instance_name, operation.name)
|
|
118 |
+ operation_full_name = "{}/{}".format(instance_name, operation_name)
|
|
114 | 119 |
|
115 |
- self.__logger.info("Operation name: [%s]", instanced_op_name)
|
|
120 |
+ self.__logger.info("Operation name: [%s]", operation_full_name)
|
|
116 | 121 |
|
117 |
- for operation in instance.stream_operation_updates(message_queue,
|
|
118 |
- operation.name):
|
|
119 |
- op = operations_pb2.Operation()
|
|
120 |
- op.CopyFrom(operation)
|
|
121 |
- op.name = instanced_op_name
|
|
122 |
- yield op
|
|
122 |
+ for operation in instance.stream_operation_updates(message_queue):
|
|
123 |
+ operation.name = operation_full_name
|
|
124 |
+ yield operation
|
|
123 | 125 |
|
124 | 126 |
except InvalidArgumentError as e:
|
125 | 127 |
self.__logger.error(e)
|
... | ... | @@ -139,6 +141,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
139 | 141 |
context.set_code(grpc.StatusCode.CANCELLED)
|
140 | 142 |
yield operations_pb2.Operation()
|
141 | 143 |
|
144 |
+ @authorize(AuthContext)
|
|
142 | 145 |
def WaitExecution(self, request, context):
|
143 | 146 |
"""Handles WaitExecutionRequest messages.
|
144 | 147 |
|
... | ... | @@ -157,9 +160,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
157 | 160 |
try:
|
158 | 161 |
instance = self._get_instance(instance_name)
|
159 | 162 |
|
160 |
- instance.register_message_client(operation_name, message_queue)
|
|
163 |
+ operation_name = instance.register_operation_client(operation_name,
|
|
164 |
+ peer, message_queue)
|
|
165 |
+ |
|
161 | 166 |
context.add_callback(partial(self._rpc_termination_callback,
|
162 |
- peer, instance_name, operation_name, message_queue))
|
|
167 |
+ peer, instance_name, operation_name))
|
|
163 | 168 |
|
164 | 169 |
if self._is_instrumented:
|
165 | 170 |
if peer not in self.__peers:
|
... | ... | @@ -168,12 +173,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
168 | 173 |
else:
|
169 | 174 |
self.__peers[peer] += 1
|
170 | 175 |
|
171 |
- for operation in instance.stream_operation_updates(message_queue,
|
|
172 |
- operation_name):
|
|
173 |
- op = operations_pb2.Operation()
|
|
174 |
- op.CopyFrom(operation)
|
|
175 |
- op.name = request.name
|
|
176 |
- yield op
|
|
176 |
+ operation_full_name = "{}/{}".format(instance_name, operation_name)
|
|
177 |
+ |
|
178 |
+ for operation in instance.stream_operation_updates(message_queue):
|
|
179 |
+ operation.name = operation_full_name
|
|
180 |
+ yield operation
|
|
177 | 181 |
|
178 | 182 |
except InvalidArgumentError as e:
|
179 | 183 |
self.__logger.error(e)
|
... | ... | @@ -208,10 +212,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
208 | 212 |
|
209 | 213 |
# --- Private API ---
|
210 | 214 |
|
211 |
- def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
|
|
215 |
+ def _rpc_termination_callback(self, peer, instance_name, operation_name):
|
|
212 | 216 |
instance = self._get_instance(instance_name)
|
213 | 217 |
|
214 |
- instance.unregister_message_client(job_name, message_queue)
|
|
218 |
+ instance.unregister_operation_client(operation_name, peer)
|
|
215 | 219 |
|
216 | 220 |
if self._is_instrumented:
|
217 | 221 |
if self.__peers[peer] > 1:
|
... | ... | @@ -29,7 +29,8 @@ import janus |
29 | 29 |
from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
|
30 | 30 |
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
|
31 | 31 |
from buildgrid.server.actioncache.service import ActionCacheService
|
32 |
-from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm, AuthMetadataServerInterceptor
|
|
32 |
+from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
|
|
33 |
+from buildgrid.server._authentication import AuthContext, AuthMetadataServerInterceptor
|
|
33 | 34 |
from buildgrid.server.bots.service import BotsService
|
34 | 35 |
from buildgrid.server.capabilities.instance import CapabilitiesInstance
|
35 | 36 |
from buildgrid.server.capabilities.service import CapabilitiesService
|
... | ... | @@ -78,16 +79,15 @@ class BuildGridServer: |
78 | 79 |
max_workers = (os.cpu_count() or 1) * 5
|
79 | 80 |
|
80 | 81 |
self.__grpc_auth_interceptor = None
|
82 |
+ |
|
81 | 83 |
if auth_method != AuthMetadataMethod.NONE:
|
82 | 84 |
self.__grpc_auth_interceptor = AuthMetadataServerInterceptor(
|
83 | 85 |
method=auth_method, secret=auth_secret, algorithm=auth_algorithm)
|
84 |
- self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
|
|
85 | 86 |
|
86 |
- if self.__grpc_auth_interceptor is not None:
|
|
87 |
- self.__grpc_server = grpc.server(
|
|
88 |
- self.__grpc_executor, interceptors=(self.__grpc_auth_interceptor,))
|
|
89 |
- else:
|
|
90 |
- self.__grpc_server = grpc.server(self.__grpc_executor)
|
|
87 |
+ AuthContext.interceptor = self.__grpc_auth_interceptor
|
|
88 |
+ |
|
89 |
+ self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
|
|
90 |
+ self.__grpc_server = grpc.server(self.__grpc_executor)
|
|
91 | 91 |
|
92 | 92 |
self.__main_loop = asyncio.get_event_loop()
|
93 | 93 |
|
... | ... | @@ -20,7 +20,7 @@ import uuid |
20 | 20 |
from google.protobuf import duration_pb2, timestamp_pb2
|
21 | 21 |
|
22 | 22 |
from buildgrid._enums import LeaseState, OperationStage
|
23 |
-from buildgrid._exceptions import CancelledError
|
|
23 |
+from buildgrid._exceptions import CancelledError, NotFoundError
|
|
24 | 24 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
25 | 25 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
26 | 26 |
from buildgrid._protos.google.longrunning import operations_pb2
|
... | ... | @@ -29,35 +29,46 @@ from buildgrid._protos.google.rpc import code_pb2 |
29 | 29 |
|
30 | 30 |
class Job:
|
31 | 31 |
|
32 |
- def __init__(self, action, action_digest):
|
|
32 |
+ def __init__(self, action, action_digest, priority=0):
|
|
33 | 33 |
self.__logger = logging.getLogger(__name__)
|
34 | 34 |
|
35 | 35 |
self._name = str(uuid.uuid4())
|
36 |
+ self._priority = priority
|
|
36 | 37 |
self._action = remote_execution_pb2.Action()
|
37 |
- self._operation = operations_pb2.Operation()
|
|
38 | 38 |
self._lease = None
|
39 | 39 |
|
40 | 40 |
self.__execute_response = None
|
41 | 41 |
self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
42 |
+ self.__operations_by_name = {}
|
|
43 |
+ self.__operations_by_peer = {}
|
|
42 | 44 |
|
43 | 45 |
self.__queued_timestamp = timestamp_pb2.Timestamp()
|
44 | 46 |
self.__queued_time_duration = duration_pb2.Duration()
|
45 | 47 |
self.__worker_start_timestamp = timestamp_pb2.Timestamp()
|
46 | 48 |
self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
|
47 | 49 |
|
48 |
- self.__operation_cancelled = False
|
|
50 |
+ self.__operations_message_queues = {}
|
|
51 |
+ self.__operations_cancelled = set()
|
|
49 | 52 |
self.__lease_cancelled = False
|
53 |
+ self.__job_cancelled = False
|
|
50 | 54 |
|
51 | 55 |
self.__operation_metadata.action_digest.CopyFrom(action_digest)
|
52 | 56 |
self.__operation_metadata.stage = OperationStage.UNKNOWN.value
|
53 | 57 |
|
54 | 58 |
self._action.CopyFrom(action)
|
55 | 59 |
self._do_not_cache = self._action.do_not_cache
|
56 |
- self._operation_update_queues = []
|
|
57 |
- self._operation.name = self._name
|
|
58 |
- self._operation.done = False
|
|
59 | 60 |
self._n_tries = 0
|
60 | 61 |
|
62 |
+ self._done = False
|
|
63 |
+ |
|
64 |
+ def __eq__(self, other):
|
|
65 |
+ if isinstance(other, Job):
|
|
66 |
+ return self.name == other.name
|
|
67 |
+ return False
|
|
68 |
+ |
|
69 |
+ def __ne__(self, other):
|
|
70 |
+ return not self.__eq__(other)
|
|
71 |
+ |
|
61 | 72 |
# --- Public API ---
|
62 | 73 |
|
63 | 74 |
@property
|
... | ... | @@ -65,12 +76,18 @@ class Job: |
65 | 76 |
return self._name
|
66 | 77 |
|
67 | 78 |
@property
|
68 |
- def do_not_cache(self):
|
|
69 |
- return self._do_not_cache
|
|
79 |
+ def priority(self):
|
|
80 |
+ return self._priority
|
|
70 | 81 |
|
71 | 82 |
@property
|
72 |
- def action(self):
|
|
73 |
- return self._action
|
|
83 |
+ def done(self):
|
|
84 |
+ return self._done
|
|
85 |
+ |
|
86 |
+ # --- Public API: REAPI ---
|
|
87 |
+ |
|
88 |
+ @property
|
|
89 |
+ def do_not_cache(self):
|
|
90 |
+ return self._do_not_cache
|
|
74 | 91 |
|
75 | 92 |
@property
|
76 | 93 |
def action_digest(self):
|
... | ... | @@ -84,19 +101,176 @@ class Job: |
84 | 101 |
return None
|
85 | 102 |
|
86 | 103 |
@property
|
87 |
- def holds_cached_action_result(self):
|
|
104 |
+ def holds_cached_result(self):
|
|
88 | 105 |
if self.__execute_response is not None:
|
89 | 106 |
return self.__execute_response.cached_result
|
90 | 107 |
else:
|
91 | 108 |
return False
|
92 | 109 |
|
93 |
- @property
|
|
94 |
- def operation(self):
|
|
95 |
- return self._operation
|
|
110 |
+ def set_cached_result(self, action_result):
|
|
111 |
+ """Allows specifying an action result form the action cache for the job.
|
|
112 |
+ |
|
113 |
+ Note:
|
|
114 |
+ This won't trigger any :class:`Operation` stage transition.
|
|
115 |
+ |
|
116 |
+ Args:
|
|
117 |
+ action_result (ActionResult): The result from cache.
|
|
118 |
+ """
|
|
119 |
+ self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
120 |
+ self.__execute_response.result.CopyFrom(action_result)
|
|
121 |
+ self.__execute_response.cached_result = True
|
|
96 | 122 |
|
97 | 123 |
@property
|
98 |
- def operation_stage(self):
|
|
99 |
- return OperationStage(self.__operation_metadata.state)
|
|
124 |
+ def n_clients(self):
|
|
125 |
+ return len(self.__operations_message_queues)
|
|
126 |
+ |
|
127 |
+ def register_operation_client(self, peer, message_queue):
|
|
128 |
+ """Subscribes to the job's :class:`Operation` stage changes.
|
|
129 |
+ |
|
130 |
+ Args:
|
|
131 |
+ peer (str): a unique string identifying the client.
|
|
132 |
+ message_queue (queue.Queue): the event queue to register.
|
|
133 |
+ |
|
134 |
+ Returns:
|
|
135 |
+ str: The name of the subscribed :class:`Operation`.
|
|
136 |
+ """
|
|
137 |
+ if peer in self.__operations_by_peer:
|
|
138 |
+ operation = self.__operations_by_peer[peer]
|
|
139 |
+ else:
|
|
140 |
+ operation = self.create_operation(peer)
|
|
141 |
+ |
|
142 |
+ self.__operations_message_queues[peer] = message_queue
|
|
143 |
+ |
|
144 |
+ self._send_operations_updates(peers=[peer])
|
|
145 |
+ |
|
146 |
+ return operation.name
|
|
147 |
+ |
|
148 |
+ def unregister_operation_client(self, peer):
|
|
149 |
+ """Unsubscribes to the job's :class:`Operation` stage change.
|
|
150 |
+ |
|
151 |
+ Args:
|
|
152 |
+ peer (str): a unique string identifying the client.
|
|
153 |
+ """
|
|
154 |
+ if peer in self.__operations_message_queues:
|
|
155 |
+ del self.__operations_message_queues[peer]
|
|
156 |
+ |
|
157 |
+ # Drop the operation if nobody is watching it anymore:
|
|
158 |
+ if peer in self.__operations_by_peer:
|
|
159 |
+ operation = self.__operations_by_peer[peer]
|
|
160 |
+ |
|
161 |
+ if operation not in self.__operations_by_peer.values():
|
|
162 |
+ del self.__operations_by_name[operation.name]
|
|
163 |
+ |
|
164 |
+ del self.__operations_by_peer[peer]
|
|
165 |
+ |
|
166 |
+ def create_operation(self, peer):
|
|
167 |
+ """Generates a new :class:`Operation` for `peer`.
|
|
168 |
+ |
|
169 |
+ Args:
|
|
170 |
+ peer (str): a unique string identifying the client.
|
|
171 |
+ """
|
|
172 |
+ if peer in self.__operations_by_peer:
|
|
173 |
+ return self.__operations_by_peer[peer]
|
|
174 |
+ |
|
175 |
+ new_operation = operations_pb2.Operation()
|
|
176 |
+ # Copy state from first existing and non cancelled operation:
|
|
177 |
+ for operation in self.__operations_by_name.values():
|
|
178 |
+ if operation.name not in self.__operations_cancelled:
|
|
179 |
+ new_operation.CopyFrom(operation)
|
|
180 |
+ break
|
|
181 |
+ |
|
182 |
+ new_operation.name = str(uuid.uuid4())
|
|
183 |
+ |
|
184 |
+ self.__operations_by_name[new_operation.name] = new_operation
|
|
185 |
+ self.__operations_by_peer[peer] = new_operation
|
|
186 |
+ |
|
187 |
+ return new_operation
|
|
188 |
+ |
|
189 |
+ def list_operations(self):
|
|
190 |
+ """Lists the :class:`Operation` related to a job.
|
|
191 |
+ |
|
192 |
+ Returns:
|
|
193 |
+ list: A list of :class:`Operation` names.
|
|
194 |
+ """
|
|
195 |
+ return list(self.__operations_by_name.keys())
|
|
196 |
+ |
|
197 |
+ def get_operation(self, operation_name):
|
|
198 |
+ """Returns a copy of the the job's :class:`Operation`.
|
|
199 |
+ |
|
200 |
+ Args:
|
|
201 |
+ operation_name (str): the operation's name.
|
|
202 |
+ |
|
203 |
+ Raises:
|
|
204 |
+ NotFoundError: If no operation with `operation_name` exists.
|
|
205 |
+ """
|
|
206 |
+ try:
|
|
207 |
+ operation = self.__operations_by_name[operation_name]
|
|
208 |
+ |
|
209 |
+ except KeyError:
|
|
210 |
+ raise NotFoundError("Operation name does not exist: [{}]"
|
|
211 |
+ .format(operation_name))
|
|
212 |
+ |
|
213 |
+ return self._copy_operation(operation)
|
|
214 |
+ |
|
215 |
+ def update_operation_stage(self, stage):
|
|
216 |
+ """Operates a stage transition for the job's :class:`Operation`.
|
|
217 |
+ |
|
218 |
+ Args:
|
|
219 |
+ stage (OperationStage): the operation stage to transition to.
|
|
220 |
+ """
|
|
221 |
+ if stage.value == self.__operation_metadata.stage:
|
|
222 |
+ return
|
|
223 |
+ |
|
224 |
+ self.__operation_metadata.stage = stage.value
|
|
225 |
+ |
|
226 |
+ if self.__operation_metadata.stage == OperationStage.QUEUED.value:
|
|
227 |
+ if self.__queued_timestamp.ByteSize() == 0:
|
|
228 |
+ self.__queued_timestamp.GetCurrentTime()
|
|
229 |
+ self._n_tries += 1
|
|
230 |
+ |
|
231 |
+ elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
|
|
232 |
+ queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
|
|
233 |
+ self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
|
|
234 |
+ |
|
235 |
+ elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
|
|
236 |
+ self._done = True
|
|
237 |
+ |
|
238 |
+ self._send_operations_updates()
|
|
239 |
+ |
|
240 |
+ def cancel_operation(self, peer):
|
|
241 |
+ """Triggers a job's :class:`Operation` cancellation.
|
|
242 |
+ |
|
243 |
+ This may cancel any job's :class:`Lease` that may have been issued.
|
|
244 |
+ |
|
245 |
+ Args:
|
|
246 |
+ peer (str): a unique string identifying the client.
|
|
247 |
+ """
|
|
248 |
+ |
|
249 |
+ operation_names, peers = set(), set()
|
|
250 |
+ if peer in self.__operations_by_peer:
|
|
251 |
+ operation_names.add(self.__operations_by_peer[peer].name)
|
|
252 |
+ peers.add(peer)
|
|
253 |
+ |
|
254 |
+ else:
|
|
255 |
+ operation_names.update(self.__operations_by_name.keys())
|
|
256 |
+ peers.update(self.__operations_by_peer.keys())
|
|
257 |
+ |
|
258 |
+ operations_cancelled = operation_names - self.__operations_cancelled
|
|
259 |
+ if not operations_cancelled:
|
|
260 |
+ return
|
|
261 |
+ |
|
262 |
+ self.__operations_cancelled.update(operations_cancelled)
|
|
263 |
+ |
|
264 |
+ operation_names = set(self.__operations_by_name.keys())
|
|
265 |
+ # Job is cancelled if all the operation are:
|
|
266 |
+ self.__job_cancelled = bool(operation_names - self.__operations_cancelled)
|
|
267 |
+ |
|
268 |
+ if self.__job_cancelled and self._lease is not None:
|
|
269 |
+ self.cancel_lease()
|
|
270 |
+ |
|
271 |
+ self._send_operations_updates(peers=peers, notify_cancelled=True)
|
|
272 |
+ |
|
273 |
+ # --- Public API: RWAPI ---
|
|
100 | 274 |
|
101 | 275 |
@property
|
102 | 276 |
def lease(self):
|
... | ... | @@ -117,45 +291,15 @@ class Job: |
117 | 291 |
def n_tries(self):
|
118 | 292 |
return self._n_tries
|
119 | 293 |
|
120 |
- @property
|
|
121 |
- def n_clients(self):
|
|
122 |
- return len(self._operation_update_queues)
|
|
123 |
- |
|
124 |
- def register_client(self, queue):
|
|
125 |
- """Subscribes to the job's :class:`Operation` stage change events.
|
|
126 |
- |
|
127 |
- Queues this :object:`Job` instance.
|
|
128 |
- |
|
129 |
- Args:
|
|
130 |
- queue (queue.Queue): the event queue to register.
|
|
131 |
- """
|
|
132 |
- self._operation_update_queues.append(queue)
|
|
133 |
- queue.put(self)
|
|
134 |
- |
|
135 |
- def unregister_client(self, queue):
|
|
136 |
- """Unsubscribes to the job's :class:`Operation` stage change events.
|
|
137 |
- |
|
138 |
- Args:
|
|
139 |
- queue (queue.Queue): the event queue to unregister.
|
|
140 |
- """
|
|
141 |
- self._operation_update_queues.remove(queue)
|
|
142 |
- |
|
143 |
- def set_cached_result(self, action_result):
|
|
144 |
- """Allows specifying an action result form the action cache for the job.
|
|
145 |
- """
|
|
146 |
- self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
147 |
- self.__execute_response.result.CopyFrom(action_result)
|
|
148 |
- self.__execute_response.cached_result = True
|
|
149 |
- |
|
150 | 294 |
def create_lease(self):
|
151 | 295 |
"""Emits a new :class:`Lease` for the job.
|
152 | 296 |
|
153 | 297 |
Only one :class:`Lease` can be emitted for a given job. This method
|
154 |
- should only be used once, any furhter calls are ignored.
|
|
298 |
+ should only be used once, any further calls are ignored.
|
|
155 | 299 |
"""
|
156 |
- if self.__operation_cancelled:
|
|
157 |
- return None
|
|
158 |
- elif self._lease is not None:
|
|
300 |
+ if self._lease is not None:
|
|
301 |
+ return self._lease
|
|
302 |
+ elif self.__job_cancelled:
|
|
159 | 303 |
return None
|
160 | 304 |
|
161 | 305 |
self._lease = bots_pb2.Lease()
|
... | ... | @@ -166,14 +310,14 @@ class Job: |
166 | 310 |
return self._lease
|
167 | 311 |
|
168 | 312 |
def update_lease_state(self, state, status=None, result=None):
|
169 |
- """Operates a state transition for the job's current :class:Lease.
|
|
313 |
+ """Operates a state transition for the job's current :class:`Lease`.
|
|
170 | 314 |
|
171 | 315 |
Args:
|
172 | 316 |
state (LeaseState): the lease state to transition to.
|
173 |
- status (google.rpc.Status): the lease execution status, only
|
|
174 |
- required if `state` is `COMPLETED`.
|
|
175 |
- result (google.protobuf.Any): the lease execution result, only
|
|
176 |
- required if `state` is `COMPLETED`.
|
|
317 |
+ status (google.rpc.Status, optional): the lease execution status,
|
|
318 |
+ only required if `state` is `COMPLETED`.
|
|
319 |
+ result (google.protobuf.Any, optional): the lease execution result,
|
|
320 |
+ only required if `state` is `COMPLETED`.
|
|
177 | 321 |
"""
|
178 | 322 |
if state.value == self._lease.state:
|
179 | 323 |
return
|
... | ... | @@ -214,67 +358,25 @@ class Job: |
214 | 358 |
self.__execute_response.status.CopyFrom(status)
|
215 | 359 |
|
216 | 360 |
def cancel_lease(self):
|
217 |
- """Triggers a job's :class:Lease cancellation.
|
|
361 |
+ """Triggers a job's :class:`Lease` cancellation.
|
|
218 | 362 |
|
219 |
- This will not cancel the job's :class:Operation.
|
|
363 |
+ Note:
|
|
364 |
+ This will not cancel the job's :class:`Operation`.
|
|
220 | 365 |
"""
|
221 | 366 |
self.__lease_cancelled = True
|
222 | 367 |
if self._lease is not None:
|
223 | 368 |
self.update_lease_state(LeaseState.CANCELLED)
|
224 | 369 |
|
225 |
- def update_operation_stage(self, stage):
|
|
226 |
- """Operates a stage transition for the job's :class:Operation.
|
|
370 |
+ def delete_lease(self):
|
|
371 |
+ """Discard the job's :class:`Lease`.
|
|
227 | 372 |
|
228 |
- Args:
|
|
229 |
- stage (OperationStage): the operation stage to transition to.
|
|
373 |
+ Note:
|
|
374 |
+ This will not cancel the job's :class:`Operation`.
|
|
230 | 375 |
"""
|
231 |
- if stage.value == self.__operation_metadata.stage:
|
|
232 |
- return
|
|
376 |
+ self.__worker_start_timestamp.Clear()
|
|
377 |
+ self.__worker_completed_timestamp.Clear()
|
|
233 | 378 |
|
234 |
- self.__operation_metadata.stage = stage.value
|
|
235 |
- |
|
236 |
- if self.__operation_metadata.stage == OperationStage.QUEUED.value:
|
|
237 |
- if self.__queued_timestamp.ByteSize() == 0:
|
|
238 |
- self.__queued_timestamp.GetCurrentTime()
|
|
239 |
- self._n_tries += 1
|
|
240 |
- |
|
241 |
- elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
|
|
242 |
- queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
|
|
243 |
- self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
|
|
244 |
- |
|
245 |
- elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
|
|
246 |
- if self.__execute_response is not None:
|
|
247 |
- self._operation.response.Pack(self.__execute_response)
|
|
248 |
- self._operation.done = True
|
|
249 |
- |
|
250 |
- self._operation.metadata.Pack(self.__operation_metadata)
|
|
251 |
- |
|
252 |
- for queue in self._operation_update_queues:
|
|
253 |
- queue.put(self)
|
|
254 |
- |
|
255 |
- def check_operation_status(self):
|
|
256 |
- """Reports errors on unexpected job's :class:Operation state.
|
|
257 |
- |
|
258 |
- Raises:
|
|
259 |
- CancelledError: if the job's :class:Operation was cancelled.
|
|
260 |
- """
|
|
261 |
- if self.__operation_cancelled:
|
|
262 |
- raise CancelledError(self.__execute_response.status.message)
|
|
263 |
- |
|
264 |
- def cancel_operation(self):
|
|
265 |
- """Triggers a job's :class:Operation cancellation.
|
|
266 |
- |
|
267 |
- This will also cancel any job's :class:Lease that may have been issued.
|
|
268 |
- """
|
|
269 |
- self.__operation_cancelled = True
|
|
270 |
- if self._lease is not None:
|
|
271 |
- self.cancel_lease()
|
|
272 |
- |
|
273 |
- self.__execute_response = remote_execution_pb2.ExecuteResponse()
|
|
274 |
- self.__execute_response.status.code = code_pb2.CANCELLED
|
|
275 |
- self.__execute_response.status.message = "Operation cancelled by client."
|
|
276 |
- |
|
277 |
- self.update_operation_stage(OperationStage.COMPLETED)
|
|
379 |
+ self._lease = None
|
|
278 | 380 |
|
279 | 381 |
# --- Public API: Monitoring ---
|
280 | 382 |
|
... | ... | @@ -283,3 +385,57 @@ class Job: |
283 | 385 |
|
284 | 386 |
def query_n_retries(self):
|
285 | 387 |
return self._n_tries - 1 if self._n_tries > 0 else 0
|
388 |
+ |
|
389 |
+ # --- Private API ---
|
|
390 |
+ |
|
391 |
+ def _copy_operation(self, operation):
|
|
392 |
+ """Simply duplicates a given :class:`Lease` object."""
|
|
393 |
+ new_operation = operations_pb2.Operation()
|
|
394 |
+ new_operation.CopyFrom(operation)
|
|
395 |
+ |
|
396 |
+ return new_operation
|
|
397 |
+ |
|
398 |
+ def _send_operations_updates(self, peers=None, notify_cancelled=False):
|
|
399 |
+ """Sends :class:`Operation` stage change messages to watchers."""
|
|
400 |
+ for operation in self.__operations_by_name.values():
|
|
401 |
+ if operation.name not in self.__operations_cancelled:
|
|
402 |
+ operation_metadata = self.__operation_metadata
|
|
403 |
+ execute_response = self.__execute_response
|
|
404 |
+ |
|
405 |
+ operation_done = self._done
|
|
406 |
+ |
|
407 |
+ else:
|
|
408 |
+ operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
|
409 |
+ operation_metadata.CopyFrom(self.__operation_metadata)
|
|
410 |
+ operation_metadata.stage = OperationStage.COMPLETED.value
|
|
411 |
+ |
|
412 |
+ execute_response = remote_execution_pb2.ExecuteResponse()
|
|
413 |
+ if self.__execute_response is not None:
|
|
414 |
+ execute_response.CopyFrom(self.__execute_response)
|
|
415 |
+ execute_response.status.code = code_pb2.CANCELLED
|
|
416 |
+ execute_response.status.message = "Operation cancelled by client."
|
|
417 |
+ |
|
418 |
+ operation_done = True
|
|
419 |
+ |
|
420 |
+ if execute_response is not None:
|
|
421 |
+ operation.response.Pack(execute_response)
|
|
422 |
+ operation.metadata.Pack(operation_metadata)
|
|
423 |
+ operation.done = operation_done
|
|
424 |
+ |
|
425 |
+ for peer, message_queue in self.__operations_message_queues.items():
|
|
426 |
+ if peer not in self.__operations_by_peer:
|
|
427 |
+ continue
|
|
428 |
+ elif peers and peer not in peers:
|
|
429 |
+ continue
|
|
430 |
+ |
|
431 |
+ operation = self.__operations_by_peer[peer]
|
|
432 |
+ # Messages are pairs of (Exception, Operation,):
|
|
433 |
+ if not notify_cancelled and operation.name in self.__operations_cancelled:
|
|
434 |
+ continue
|
|
435 |
+ elif operation.name not in self.__operations_cancelled:
|
|
436 |
+ message = (None, self._copy_operation(operation),)
|
|
437 |
+ else:
|
|
438 |
+ message = (CancelledError("Operation has been cancelled"),
|
|
439 |
+ self._copy_operation(operation),)
|
|
440 |
+ |
|
441 |
+ message_queue.put(message)
|
... | ... | @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service. |
21 | 21 |
|
22 | 22 |
import logging
|
23 | 23 |
|
24 |
-from buildgrid._exceptions import InvalidArgumentError
|
|
24 |
+from buildgrid._exceptions import InvalidArgumentError, NotFoundError
|
|
25 | 25 |
from buildgrid._protos.google.longrunning import operations_pb2
|
26 | 26 |
|
27 | 27 |
|
... | ... | @@ -39,62 +39,43 @@ class OperationsInstance: |
39 | 39 |
def register_instance_with_server(self, instance_name, server):
|
40 | 40 |
server.add_operations_instance(self, instance_name)
|
41 | 41 |
|
42 |
- def get_operation(self, name):
|
|
43 |
- job = self._scheduler.jobs.get(name)
|
|
42 |
+ def get_operation(self, job_name):
|
|
43 |
+ try:
|
|
44 |
+ operation = self._scheduler.get_job_operation(job_name)
|
|
44 | 45 |
|
45 |
- if job is None:
|
|
46 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
46 |
+ except NotFoundError:
|
|
47 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
47 | 48 |
|
48 |
- else:
|
|
49 |
- return job.operation
|
|
49 |
+ return operation
|
|
50 | 50 |
|
51 | 51 |
def list_operations(self, list_filter, page_size, page_token):
|
52 | 52 |
# TODO: Pages
|
53 | 53 |
# Spec says number of pages and length of a page are optional
|
54 | 54 |
response = operations_pb2.ListOperationsResponse()
|
55 |
+ |
|
56 |
+ operation_names = [operation_name for job_name in
|
|
57 |
+ self._scheduler.list_current_jobs() for operation_name in
|
|
58 |
+ self._scheduler.list_job_operations(job_name)]
|
|
59 |
+ |
|
55 | 60 |
operations = []
|
56 |
- for job in self._scheduler.list_jobs():
|
|
57 |
- op = operations_pb2.Operation()
|
|
58 |
- op.CopyFrom(job.operation)
|
|
59 |
- operations.append(op)
|
|
61 |
+ for operation_name in operation_names:
|
|
62 |
+ operation = self._scheduler.get_job_operation(operation_name)
|
|
63 |
+ operations.append(operation)
|
|
60 | 64 |
|
61 | 65 |
response.operations.extend(operations)
|
62 | 66 |
|
63 | 67 |
return response
|
64 | 68 |
|
65 |
- def delete_operation(self, name):
|
|
66 |
- try:
|
|
67 |
- self._scheduler.jobs.pop(name)
|
|
68 |
- |
|
69 |
- except KeyError:
|
|
70 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
71 |
- |
|
72 |
- def cancel_operation(self, name):
|
|
69 |
+ def delete_operation(self, job_name):
|
|
73 | 70 |
try:
|
74 |
- self._scheduler.cancel_job_operation(name)
|
|
71 |
+ self._scheduler.delete_job_operation(job_name)
|
|
75 | 72 |
|
76 |
- except KeyError:
|
|
77 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
73 |
+ except NotFoundError:
|
|
74 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
|
78 | 75 |
|
79 |
- def register_message_client(self, name, queue):
|
|
76 |
+ def cancel_operation(self, job_name):
|
|
80 | 77 |
try:
|
81 |
- self._scheduler.register_client(name, queue)
|
|
82 |
- |
|
83 |
- except KeyError:
|
|
84 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
85 |
- |
|
86 |
- def unregister_message_client(self, name, queue):
|
|
87 |
- try:
|
|
88 |
- self._scheduler.unregister_client(name, queue)
|
|
89 |
- |
|
90 |
- except KeyError:
|
|
91 |
- raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
|
|
92 |
- |
|
93 |
- def stream_operation_updates(self, message_queue, operation_name):
|
|
94 |
- job = message_queue.get()
|
|
95 |
- while not job.operation.done:
|
|
96 |
- yield job.operation
|
|
97 |
- job = message_queue.get()
|
|
98 |
- job.check_operation_status()
|
|
78 |
+ self._scheduler.cancel_job_operation(job_name)
|
|
99 | 79 |
|
100 |
- yield job.operation
|
|
80 |
+ except NotFoundError:
|
|
81 |
+ raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
|
... | ... | @@ -27,6 +27,7 @@ from google.protobuf.empty_pb2 import Empty |
27 | 27 |
|
28 | 28 |
from buildgrid._exceptions import InvalidArgumentError
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
|
30 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
30 | 31 |
|
31 | 32 |
|
32 | 33 |
class OperationsService(operations_pb2_grpc.OperationsServicer):
|
... | ... | @@ -51,6 +52,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
51 | 52 |
|
52 | 53 |
# --- Public API: Servicer ---
|
53 | 54 |
|
55 |
+ @authorize(AuthContext)
|
|
54 | 56 |
def GetOperation(self, request, context):
|
55 | 57 |
self.__logger.debug("GetOperation request from [%s]", context.peer())
|
56 | 58 |
|
... | ... | @@ -74,6 +76,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
74 | 76 |
|
75 | 77 |
return operations_pb2.Operation()
|
76 | 78 |
|
79 |
+ @authorize(AuthContext)
|
|
77 | 80 |
def ListOperations(self, request, context):
|
78 | 81 |
self.__logger.debug("ListOperations request from [%s]", context.peer())
|
79 | 82 |
|
... | ... | @@ -99,6 +102,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
99 | 102 |
|
100 | 103 |
return operations_pb2.ListOperationsResponse()
|
101 | 104 |
|
105 |
+ @authorize(AuthContext)
|
|
102 | 106 |
def DeleteOperation(self, request, context):
|
103 | 107 |
self.__logger.debug("DeleteOperation request from [%s]", context.peer())
|
104 | 108 |
|
... | ... | @@ -118,6 +122,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
118 | 122 |
|
119 | 123 |
return Empty()
|
120 | 124 |
|
125 |
+ @authorize(AuthContext)
|
|
121 | 126 |
def CancelOperation(self, request, context):
|
122 | 127 |
self.__logger.debug("CancelOperation request from [%s]", context.peer())
|
123 | 128 |
|
... | ... | @@ -20,6 +20,7 @@ import grpc |
20 | 20 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError
|
21 | 21 |
from buildgrid._protos.buildstream.v2 import buildstream_pb2
|
22 | 22 |
from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
|
23 |
+from buildgrid.server._authentication import AuthContext, authorize
|
|
23 | 24 |
|
24 | 25 |
|
25 | 26 |
class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
|
... | ... | @@ -31,9 +32,14 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
31 | 32 |
|
32 | 33 |
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(self, server)
|
33 | 34 |
|
35 |
+ # --- Public API ---
|
|
36 |
+ |
|
34 | 37 |
def add_instance(self, name, instance):
|
35 | 38 |
self._instances[name] = instance
|
36 | 39 |
|
40 |
+ # --- Public API: Servicer ---
|
|
41 |
+ |
|
42 |
+ @authorize(AuthContext)
|
|
37 | 43 |
def GetReference(self, request, context):
|
38 | 44 |
self.__logger.debug("GetReference request from [%s]", context.peer())
|
39 | 45 |
|
... | ... | @@ -55,6 +61,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
55 | 61 |
|
56 | 62 |
return buildstream_pb2.GetReferenceResponse()
|
57 | 63 |
|
64 |
+ @authorize(AuthContext)
|
|
58 | 65 |
def UpdateReference(self, request, context):
|
59 | 66 |
self.__logger.debug("UpdateReference request from [%s]", context.peer())
|
60 | 67 |
|
... | ... | @@ -75,6 +82,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
75 | 82 |
|
76 | 83 |
return buildstream_pb2.UpdateReferenceResponse()
|
77 | 84 |
|
85 |
+ @authorize(AuthContext)
|
|
78 | 86 |
def Status(self, request, context):
|
79 | 87 |
self.__logger.debug("Status request from [%s]", context.peer())
|
80 | 88 |
|
... | ... | @@ -90,6 +98,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
90 | 98 |
|
91 | 99 |
return buildstream_pb2.StatusResponse()
|
92 | 100 |
|
101 |
+ # --- Private API ---
|
|
102 |
+ |
|
93 | 103 |
def _get_instance(self, instance_name):
|
94 | 104 |
try:
|
95 | 105 |
return self._instances[instance_name]
|
... | ... | @@ -25,6 +25,7 @@ import logging |
25 | 25 |
|
26 | 26 |
from buildgrid._enums import LeaseState, OperationStage
|
27 | 27 |
from buildgrid._exceptions import NotFoundError
|
28 |
+from buildgrid.server.job import Job
|
|
28 | 29 |
|
29 | 30 |
|
30 | 31 |
class Scheduler:
|
... | ... | @@ -42,8 +43,12 @@ class Scheduler: |
42 | 43 |
self.__retries_count = 0
|
43 | 44 |
|
44 | 45 |
self._action_cache = action_cache
|
45 |
- self.jobs = {}
|
|
46 |
- self.queue = deque()
|
|
46 |
+ |
|
47 |
+ self.__jobs_by_action = {}
|
|
48 |
+ self.__jobs_by_operation = {}
|
|
49 |
+ self.__jobs_by_name = {}
|
|
50 |
+ |
|
51 |
+ self.__queue = deque()
|
|
47 | 52 |
|
48 | 53 |
self._is_instrumented = monitor
|
49 | 54 |
|
... | ... | @@ -52,39 +57,132 @@ class Scheduler: |
52 | 57 |
|
53 | 58 |
# --- Public API ---
|
54 | 59 |
|
55 |
- def register_client(self, job_name, queue):
|
|
56 |
- job = self.jobs[job_name]
|
|
60 |
+ def list_current_jobs(self):
|
|
61 |
+ """Returns a list of the :class:`Job` names currently managed."""
|
|
62 |
+ return self.__jobs_by_name.keys()
|
|
57 | 63 |
|
58 |
- job.register_client(queue)
|
|
64 |
+ def list_job_operations(self, job_name):
|
|
65 |
+ """Returns a list of :class:`Operation` names for a :class:`Job`."""
|
|
66 |
+ if job_name in self.__jobs_by_name:
|
|
67 |
+ return self.__jobs_by_name[job_name].list_operations()
|
|
68 |
+ else:
|
|
69 |
+ return []
|
|
59 | 70 |
|
60 |
- def unregister_client(self, job_name, queue):
|
|
61 |
- job = self.jobs[job_name]
|
|
71 |
+ # --- Public API: REAPI ---
|
|
62 | 72 |
|
63 |
- job.unregister_client(queue)
|
|
73 |
+ def register_job_operation_client(self, operation_name, peer, message_queue):
|
|
74 |
+ """Subscribes to one of the job's :class:`Operation` stage changes.
|
|
64 | 75 |
|
65 |
- if not job.n_clients and job.operation.done:
|
|
66 |
- del self.jobs[job_name]
|
|
76 |
+ Args:
|
|
77 |
+ operation_name (str): name of the operation to subscribe to.
|
|
78 |
+ peer (str): a unique string identifying the client.
|
|
79 |
+ message_queue (queue.Queue): the event queue to register.
|
|
67 | 80 |
|
68 |
- if self._is_instrumented:
|
|
69 |
- self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
|
|
70 |
- self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
|
|
71 |
- self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
|
|
72 |
- self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
|
|
81 |
+ Returns:
|
|
82 |
+ str: The name of the subscribed :class:`Operation`.
|
|
83 |
+ |
|
84 |
+ Raises:
|
|
85 |
+ NotFoundError: If no operation with `operation_name` exists.
|
|
86 |
+ """
|
|
87 |
+ if operation_name in self.__jobs_by_operation:
|
|
88 |
+ job = self.__jobs_by_operation[operation_name]
|
|
89 |
+ |
|
90 |
+ elif operation_name in self.__jobs_by_name:
|
|
91 |
+ job = self.__jobs_by_name[operation_name]
|
|
92 |
+ |
|
93 |
+ else:
|
|
94 |
+ raise NotFoundError("Operation name does not exist: [{}]"
|
|
95 |
+ .format(operation_name))
|
|
96 |
+ |
|
97 |
+ operation_name = job.register_operation_client(peer, message_queue)
|
|
98 |
+ |
|
99 |
+ self.__jobs_by_operation[operation_name] = job
|
|
100 |
+ |
|
101 |
+ return operation_name
|
|
102 |
+ |
|
103 |
+ def unregister_job_operation_client(self, operation_name, peer):
|
|
104 |
+ """Unsubscribes to one of the job's :class:`Operation` stage change.
|
|
105 |
+ |
|
106 |
+ Args:
|
|
107 |
+ operation_name (str): name of the operation to unsubscribe from.
|
|
108 |
+ peer (str): a unique string identifying the client.
|
|
109 |
+ |
|
110 |
+ Raises:
|
|
111 |
+ NotFoundError: If no operation with `operation_name` exists.
|
|
112 |
+ """
|
|
113 |
+ if operation_name in self.__jobs_by_operation:
|
|
114 |
+ job = self.__jobs_by_operation[operation_name]
|
|
115 |
+ |
|
116 |
+ elif operation_name in self.__jobs_by_name:
|
|
117 |
+ job = self.__jobs_by_name[operation_name]
|
|
118 |
+ |
|
119 |
+ else:
|
|
120 |
+ raise NotFoundError("Operation name does not exist: [{}]"
|
|
121 |
+ .format(operation_name))
|
|
122 |
+ |
|
123 |
+ if operation_name in self.__jobs_by_operation:
|
|
124 |
+ del self.__jobs_by_operation[operation_name]
|
|
125 |
+ |
|
126 |
+ job.unregister_operation_client(peer)
|
|
127 |
+ |
|
128 |
+ if not job.n_clients and job.done and not job.lease:
|
|
129 |
+ self._delete_job(job.name)
|
|
130 |
+ |
|
131 |
+ def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
|
|
132 |
+ """Inserts a newly created job into the execution queue.
|
|
133 |
+ |
|
134 |
+ Warning:
|
|
135 |
+ Priority is handle like a POSIX ``nice`` values: a higher value
|
|
136 |
+ means a low priority, 0 being default priority.
|
|
137 |
+ |
|
138 |
+ Args:
|
|
139 |
+ action (Action): the given action to queue for execution.
|
|
140 |
+ action_digest (Digest): the digest of the given action.
|
|
141 |
+ priority (int): the execution job's priority.
|
|
142 |
+ skip_cache_lookup (bool): whether or not to look for pre-computed
|
|
143 |
+ result for the given action.
|
|
144 |
+ |
|
145 |
+ Returns:
|
|
146 |
+ str: the newly created operation's name.
|
|
147 |
+ """
|
|
148 |
+ def __queue_job(jobs_queue, new_job):
|
|
149 |
+ index = 0
|
|
150 |
+ for queued_job in reversed(jobs_queue):
|
|
151 |
+ if new_job.priority < queued_job.priority:
|
|
152 |
+ index += 1
|
|
153 |
+ else:
|
|
154 |
+ break
|
|
155 |
+ |
|
156 |
+ index = len(jobs_queue) - index
|
|
157 |
+ |
|
158 |
+ jobs_queue.insert(index, new_job)
|
|
159 |
+ |
|
160 |
+ if action_digest.hash in self.__jobs_by_action:
|
|
161 |
+ job = self.__jobs_by_action[action_digest.hash]
|
|
73 | 162 |
|
74 |
- self.__leases_by_state[LeaseState.PENDING].discard(job_name)
|
|
75 |
- self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
|
|
76 |
- self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
|
|
163 |
+ # Reschedule if priority is now greater:
|
|
164 |
+ if priority < job.priority:
|
|
165 |
+ job.priority = priority
|
|
77 | 166 |
|
78 |
- def queue_job(self, job, skip_cache_lookup=False):
|
|
79 |
- self.jobs[job.name] = job
|
|
167 |
+ if job in self.__queue:
|
|
168 |
+ self.__queue.remove(job)
|
|
169 |
+ __queue_job(self.__queue, job)
|
|
170 |
+ |
|
171 |
+ return job.name
|
|
172 |
+ |
|
173 |
+ job = Job(action, action_digest, priority=priority)
|
|
174 |
+ |
|
175 |
+ self.__jobs_by_action[job.action_digest.hash] = job
|
|
176 |
+ self.__jobs_by_name[job.name] = job
|
|
80 | 177 |
|
81 | 178 |
operation_stage = None
|
82 | 179 |
if self._action_cache is not None and not skip_cache_lookup:
|
83 | 180 |
try:
|
84 | 181 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
182 |
+ |
|
85 | 183 |
except NotFoundError:
|
86 | 184 |
operation_stage = OperationStage.QUEUED
|
87 |
- self.queue.append(job)
|
|
185 |
+ __queue_job(self.__queue, job)
|
|
88 | 186 |
|
89 | 187 |
else:
|
90 | 188 |
job.set_cached_result(action_result)
|
... | ... | @@ -95,28 +193,68 @@ class Scheduler: |
95 | 193 |
|
96 | 194 |
else:
|
97 | 195 |
operation_stage = OperationStage.QUEUED
|
98 |
- self.queue.append(job)
|
|
196 |
+ __queue_job(self.__queue, job)
|
|
99 | 197 |
|
100 | 198 |
self._update_job_operation_stage(job.name, operation_stage)
|
101 | 199 |
|
102 |
- def retry_job(self, job_name):
|
|
103 |
- job = self.jobs[job_name]
|
|
200 |
+ return job.name
|
|
104 | 201 |
|
105 |
- operation_stage = None
|
|
106 |
- if job.n_tries >= self.MAX_N_TRIES:
|
|
107 |
- # TODO: Decide what to do with these jobs
|
|
108 |
- operation_stage = OperationStage.COMPLETED
|
|
109 |
- # TODO: Mark these jobs as done
|
|
202 |
+ def get_job_operation(self, operation_name):
|
|
203 |
+ """Retrieves a job's :class:`Operation` by name.
|
|
110 | 204 |
|
111 |
- else:
|
|
112 |
- operation_stage = OperationStage.QUEUED
|
|
113 |
- job.update_lease_state(LeaseState.PENDING)
|
|
114 |
- self.queue.append(job)
|
|
205 |
+ Args:
|
|
206 |
+ operation_name (str): name of the operation to query.
|
|
115 | 207 |
|
116 |
- self._update_job_operation_stage(job_name, operation_stage)
|
|
208 |
+ Raises:
|
|
209 |
+ NotFoundError: If no operation with `operation_name` exists.
|
|
210 |
+ """
|
|
211 |
+ try:
|
|
212 |
+ job = self.__jobs_by_operation[operation_name]
|
|
213 |
+ |
|
214 |
+ except KeyError:
|
|
215 |
+ raise NotFoundError("Operation name does not exist: [{}]"
|
|
216 |
+ .format(operation_name))
|
|
217 |
+ |
|
218 |
+ return job.get_operation(operation_name)
|
|
219 |
+ |
|
220 |
+ def cancel_job_operation(self, operation_name):
|
|
221 |
+ """"Cancels a job's :class:`Operation` by name.
|
|
222 |
+ |
|
223 |
+ Args:
|
|
224 |
+ operation_name (str): name of the operation to cancel.
|
|
225 |
+ |
|
226 |
+ Raises:
|
|
227 |
+ NotFoundError: If no operation with `operation_name` exists.
|
|
228 |
+ """
|
|
229 |
+ try:
|
|
230 |
+ job = self.__jobs_by_operation[operation_name]
|
|
231 |
+ |
|
232 |
+ except KeyError:
|
|
233 |
+ raise NotFoundError("Operation name does not exist: [{}]"
|
|
234 |
+ .format(operation_name))
|
|
235 |
+ |
|
236 |
+ job.cancel_operation(operation_name)
|
|
237 |
+ |
|
238 |
+ def delete_job_operation(self, operation_name):
|
|
239 |
+ """"Removes a job.
|
|
117 | 240 |
|
118 |
- def list_jobs(self):
|
|
119 |
- return self.jobs.values()
|
|
241 |
+ Args:
|
|
242 |
+ operation_name (str): name of the operation to cancel.
|
|
243 |
+ |
|
244 |
+ Raises:
|
|
245 |
+ NotFoundError: If no operation with `operation_name` exists.
|
|
246 |
+ """
|
|
247 |
+ try:
|
|
248 |
+ job = self.__jobs_by_operation[operation_name]
|
|
249 |
+ |
|
250 |
+ except KeyError:
|
|
251 |
+ raise NotFoundError("Operation name does not exist: [{}]"
|
|
252 |
+ .format(operation_name))
|
|
253 |
+ |
|
254 |
+ if not job.n_clients and job.done and not job.lease:
|
|
255 |
+ self._delete_job(job.name)
|
|
256 |
+ |
|
257 |
+ # --- Public API: RWAPI ---
|
|
120 | 258 |
|
121 | 259 |
def request_job_leases(self, worker_capabilities):
|
122 | 260 |
"""Generates a list of the highest priority leases to be run.
|
... | ... | @@ -126,10 +264,10 @@ class Scheduler: |
126 | 264 |
worker properties, configuration and state at the time of the
|
127 | 265 |
request.
|
128 | 266 |
"""
|
129 |
- if not self.queue:
|
|
267 |
+ if not self.__queue:
|
|
130 | 268 |
return []
|
131 | 269 |
|
132 |
- job = self.queue.popleft()
|
|
270 |
+ job = self.__queue.popleft()
|
|
133 | 271 |
|
134 | 272 |
lease = job.lease
|
135 | 273 |
|
... | ... | @@ -142,18 +280,25 @@ class Scheduler: |
142 | 280 |
|
143 | 281 |
return None
|
144 | 282 |
|
145 |
- def update_job_lease(self, lease):
|
|
283 |
+ def update_job_lease_state(self, job_name, lease):
|
|
146 | 284 |
"""Requests a state transition for a job's current :class:Lease.
|
147 | 285 |
|
286 |
+ Note:
|
|
287 |
+ This may trigger a job's :class:`Operation` stage transition.
|
|
288 |
+ |
|
148 | 289 |
Args:
|
149 | 290 |
job_name (str): name of the job to query.
|
150 |
- lease_state (LeaseState): the lease state to transition to.
|
|
151 |
- lease_status (google.rpc.Status): the lease execution status, only
|
|
152 |
- required if `lease_state` is `COMPLETED`.
|
|
153 |
- lease_result (google.protobuf.Any): the lease execution result, only
|
|
154 |
- required if `lease_state` is `COMPLETED`.
|
|
291 |
+ lease (Lease): the lease holding the new state.
|
|
292 |
+ |
|
293 |
+ Raises:
|
|
294 |
+ NotFoundError: If no job with `job_name` exists.
|
|
155 | 295 |
"""
|
156 |
- job = self.jobs[lease.id]
|
|
296 |
+ try:
|
|
297 |
+ job = self.__jobs_by_name[job_name]
|
|
298 |
+ |
|
299 |
+ except KeyError:
|
|
300 |
+ raise NotFoundError("Job name does not exist: [{}]".format(job_name))
|
|
301 |
+ |
|
157 | 302 |
lease_state = LeaseState(lease.state)
|
158 | 303 |
|
159 | 304 |
operation_stage = None
|
... | ... | @@ -189,29 +334,92 @@ class Scheduler: |
189 | 334 |
self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
|
190 | 335 |
self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
|
191 | 336 |
|
192 |
- self._update_job_operation_stage(lease.id, operation_stage)
|
|
337 |
+ self._update_job_operation_stage(job_name, operation_stage)
|
|
338 |
+ |
|
339 |
+ def retry_job_lease(self, job_name):
|
|
340 |
+ """Re-queues a job on lease execution failure.
|
|
341 |
+ |
|
342 |
+ Note:
|
|
343 |
+ This may trigger a job's :class:`Operation` stage transition.
|
|
344 |
+ |
|
345 |
+ Args:
|
|
346 |
+ job_name (str): name of the job to query.
|
|
347 |
+ |
|
348 |
+ Raises:
|
|
349 |
+ NotFoundError: If no job with `job_name` exists.
|
|
350 |
+ """
|
|
351 |
+ try:
|
|
352 |
+ job = self.__jobs_by_name[job_name]
|
|
353 |
+ |
|
354 |
+ except KeyError:
|
|
355 |
+ raise NotFoundError("Job name does not exist: [{}]".format(job_name))
|
|
356 |
+ |
|
357 |
+ operation_stage = None
|
|
358 |
+ if job.n_tries >= self.MAX_N_TRIES:
|
|
359 |
+ # TODO: Decide what to do with these jobs
|
|
360 |
+ operation_stage = OperationStage.COMPLETED
|
|
361 |
+ # TODO: Mark these jobs as done
|
|
362 |
+ |
|
363 |
+ else:
|
|
364 |
+ operation_stage = OperationStage.QUEUED
|
|
365 |
+ job.update_lease_state(LeaseState.PENDING)
|
|
366 |
+ self.__queue.append(job)
|
|
367 |
+ |
|
368 |
+ self._update_job_operation_stage(job_name, operation_stage)
|
|
193 | 369 |
|
194 | 370 |
def get_job_lease(self, job_name):
|
195 |
- """Returns the lease associated to job, if any have been emitted yet."""
|
|
196 |
- return self.jobs[job_name].lease
|
|
371 |
+ """Returns the lease associated to job, if any have been emitted yet.
|
|
197 | 372 |
|
198 |
- def get_job_lease_cancelled(self, job_name):
|
|
199 |
- """Returns true if the lease is cancelled"""
|
|
200 |
- return self.jobs[job_name].lease_cancelled
|
|
373 |
+ Args:
|
|
374 |
+ job_name (str): name of the job to query.
|
|
375 |
+ |
|
376 |
+ Raises:
|
|
377 |
+ NotFoundError: If no job with `job_name` exists.
|
|
378 |
+ """
|
|
379 |
+ try:
|
|
380 |
+ job = self.__jobs_by_name[job_name]
|
|
381 |
+ |
|
382 |
+ except KeyError:
|
|
383 |
+ raise NotFoundError("Job name does not exist: [{}]".format(job_name))
|
|
384 |
+ |
|
385 |
+ return job.lease
|
|
386 |
+ |
|
387 |
+ def delete_job_lease(self, job_name):
|
|
388 |
+ """Discards the lease associated with a job.
|
|
389 |
+ |
|
390 |
+ Args:
|
|
391 |
+ job_name (str): name of the job to query.
|
|
392 |
+ |
|
393 |
+ Raises:
|
|
394 |
+ NotFoundError: If no job with `job_name` exists.
|
|
395 |
+ """
|
|
396 |
+ try:
|
|
397 |
+ job = self.__jobs_by_name[job_name]
|
|
201 | 398 |
|
202 |
- def get_job_operation(self, job_name):
|
|
203 |
- """Returns the operation associated to job."""
|
|
204 |
- return self.jobs[job_name].operation
|
|
399 |
+ except KeyError:
|
|
400 |
+ raise NotFoundError("Job name does not exist: [{}]".format(job_name))
|
|
205 | 401 |
|
206 |
- def cancel_job_operation(self, job_name):
|
|
207 |
- """"Cancels the underlying operation of a given job.
|
|
402 |
+ job.delete_lease()
|
|
208 | 403 |
|
209 |
- This will also cancel any job's lease that may have been issued.
|
|
404 |
+ if not job.n_clients and job.operation.done:
|
|
405 |
+ self._delete_job(job.name)
|
|
406 |
+ |
|
407 |
+ def get_job_lease_cancelled(self, job_name):
|
|
408 |
+ """Returns true if the lease is cancelled.
|
|
210 | 409 |
|
211 | 410 |
Args:
|
212 |
- job_name (str): name of the job holding the operation to cancel.
|
|
411 |
+ job_name (str): name of the job to query.
|
|
412 |
+ |
|
413 |
+ Raises:
|
|
414 |
+ NotFoundError: If no job with `job_name` exists.
|
|
213 | 415 |
"""
|
214 |
- self.jobs[job_name].cancel_operation()
|
|
416 |
+ try:
|
|
417 |
+ job = self.__jobs_by_name[job_name]
|
|
418 |
+ |
|
419 |
+ except KeyError:
|
|
420 |
+ raise NotFoundError("Job name does not exist: [{}]".format(job_name))
|
|
421 |
+ |
|
422 |
+ return job.lease_cancelled
|
|
215 | 423 |
|
216 | 424 |
# --- Public API: Monitoring ---
|
217 | 425 |
|
... | ... | @@ -261,11 +469,11 @@ class Scheduler: |
261 | 469 |
self.__build_metadata_queues.append(message_queue)
|
262 | 470 |
|
263 | 471 |
def query_n_jobs(self):
|
264 |
- return len(self.jobs)
|
|
472 |
+ return len(self.__jobs_by_name)
|
|
265 | 473 |
|
266 | 474 |
def query_n_operations(self):
|
267 | 475 |
# For now n_operations == n_jobs:
|
268 |
- return len(self.jobs)
|
|
476 |
+ return len(self.__jobs_by_operation)
|
|
269 | 477 |
|
270 | 478 |
def query_n_operations_by_stage(self, operation_stage):
|
271 | 479 |
try:
|
... | ... | @@ -276,7 +484,7 @@ class Scheduler: |
276 | 484 |
return 0
|
277 | 485 |
|
278 | 486 |
def query_n_leases(self):
|
279 |
- return len(self.jobs)
|
|
487 |
+ return len(self.__jobs_by_name)
|
|
280 | 488 |
|
281 | 489 |
def query_n_leases_by_state(self, lease_state):
|
282 | 490 |
try:
|
... | ... | @@ -296,6 +504,23 @@ class Scheduler: |
296 | 504 |
|
297 | 505 |
# --- Private API ---
|
298 | 506 |
|
507 |
+ def _delete_job(self, job_name):
|
|
508 |
+ """Drops an entry from the internal list of jobs."""
|
|
509 |
+ job = self.__jobs_by_name[job_name]
|
|
510 |
+ |
|
511 |
+ del self.__jobs_by_action[job.action_digest.hash]
|
|
512 |
+ del self.__jobs_by_name[job.name]
|
|
513 |
+ |
|
514 |
+ if self._is_instrumented:
|
|
515 |
+ self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
|
|
516 |
+ self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
|
|
517 |
+ self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
|
|
518 |
+ self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
|
|
519 |
+ |
|
520 |
+ self.__leases_by_state[LeaseState.PENDING].discard(job.name)
|
|
521 |
+ self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
|
|
522 |
+ self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
|
|
523 |
+ |
|
299 | 524 |
def _update_job_operation_stage(self, job_name, operation_stage):
|
300 | 525 |
"""Requests a stage transition for the job's :class:Operations.
|
301 | 526 |
|
... | ... | @@ -303,7 +528,7 @@ class Scheduler: |
303 | 528 |
job_name (str): name of the job to query.
|
304 | 529 |
operation_stage (OperationStage): the stage to transition to.
|
305 | 530 |
"""
|
306 |
- job = self.jobs[job_name]
|
|
531 |
+ job = self.__jobs_by_name[job_name]
|
|
307 | 532 |
|
308 | 533 |
if operation_stage == OperationStage.CACHE_CHECK:
|
309 | 534 |
job.update_operation_stage(OperationStage.CACHE_CHECK)
|
... | ... | @@ -352,7 +577,7 @@ class Scheduler: |
352 | 577 |
|
353 | 578 |
self.__queue_time_average = average_order, average_time
|
354 | 579 |
|
355 |
- if not job.holds_cached_action_result:
|
|
580 |
+ if not job.holds_cached_result:
|
|
356 | 581 |
execution_metadata = job.action_result.execution_metadata
|
357 | 582 |
context_metadata = {'job-is': job.name}
|
358 | 583 |
|
... | ... | @@ -182,3 +182,11 @@ texinfo_documents = [ |
182 | 182 |
author, 'BuildGrid', 'One line description of project.',
|
183 | 183 |
'Miscellaneous'),
|
184 | 184 |
]
|
185 |
+ |
|
186 |
+# -- Options for the autodoc extension ----------------------------------------
|
|
187 |
+ |
|
188 |
+# This value selects if automatically documented members are sorted
|
|
189 |
+# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
|
|
190 |
+# by source order (value 'bysource'). The default is alphabetical.
|
|
191 |
+autodoc_member_order = 'bysource'
|
|
192 |
+ |
... | ... | @@ -4,9 +4,11 @@ |
4 | 4 |
BuildStream client
|
5 | 5 |
==================
|
6 | 6 |
|
7 |
-`BuildStream`_ is a free software tool for building and integrating software
|
|
7 |
+`BuildStream`_ is a free software tool for building and integrating software
|
|
8 | 8 |
stacks. It supports remote build execution using the remote execution API
|
9 |
-(REAPI) v2.
|
|
9 |
+(REAPI) v2. The project's documentation has a detailed section about its
|
|
10 |
+`remote execution subsystem architecture`_ that you are very recommanded to
|
|
11 |
+read first.
|
|
10 | 12 |
|
11 | 13 |
.. note::
|
12 | 14 |
|
... | ... | @@ -15,6 +17,7 @@ stacks. It supports remote build execution using the remote execution API |
15 | 17 |
remote execution.
|
16 | 18 |
|
17 | 19 |
.. _BuildStream: https://buildstream.build
|
20 |
+.. _remote execution subsystem architecture: https://buildstream.gitlab.io/buildstream/arch_remote_execution.html
|
|
18 | 21 |
.. _install it from sources: https://buildstream.build/source_install.html
|
19 | 22 |
|
20 | 23 |
|
... | ... | @@ -43,23 +46,23 @@ Project configuration |
43 | 46 |
In order to activate remote build execution at project-level, the project's
|
44 | 47 |
``project.conf`` file must declare two specific configuration nodes:
|
45 | 48 |
|
46 |
-- ``artifacts`` for `remote CAS endpoint details`_.
|
|
49 |
+- ``artifacts`` for `remote cache endpoint details`_.
|
|
47 | 50 |
- ``remote-execution`` for `remote execution endpoint details`_.
|
48 | 51 |
|
49 | 52 |
.. important::
|
50 | 53 |
|
51 | 54 |
BuildStream does not support multi-instance remote execution servers and will
|
52 | 55 |
always submit remote execution request omitting the instance name parameter.
|
53 |
- Thus, you must declare an unnamed `""` instance in your server configuration
|
|
56 |
+ Thus, you must declare an unnamed `''` instance in your server configuration
|
|
54 | 57 |
to workaround this.
|
55 | 58 |
|
56 | 59 |
.. important::
|
57 | 60 |
|
58 |
- If you are using BuildGrid's artifact server, the server instance **must**
|
|
59 |
- accept pushes from your client for remote execution to be possible.
|
|
61 |
+ If you are using BuildStream's artifact server, the server instance pointed
|
|
62 |
+ by the ``storage-service`` key **must** accept pushes from your client for
|
|
63 |
+ remote execution to be possible.
|
|
60 | 64 |
|
61 |
- |
|
62 |
-.. _remote CAS endpoint details: https://buildstream.gitlab.io/buildstream/install_artifacts.html#user-configuration
|
|
65 |
+.. _remote cache endpoint details: https://buildstream.gitlab.io/buildstream/format_project.html#artifact-server
|
|
63 | 66 |
.. _remote execution endpoint details: https://buildstream.gitlab.io/buildstream/format_project.html#remote-execution
|
64 | 67 |
|
65 | 68 |
|
... | ... | @@ -167,7 +170,15 @@ append at the end of the ``project.conf`` file from the root directory: |
167 | 170 |
push: true
|
168 | 171 |
|
169 | 172 |
remote-execution:
|
170 |
- url: http://localhost:50051
|
|
173 |
+ execution-service:
|
|
174 |
+ url: http://localhost:50051
|
|
175 |
+ storage-service:
|
|
176 |
+ url: http://localhost:50051
|
|
177 |
+ client-key: ''
|
|
178 |
+ client-cert: ''
|
|
179 |
+ server-cert: ''
|
|
180 |
+ action-cache-service:
|
|
181 |
+ url: http://localhost:50051
|
|
171 | 182 |
|
172 | 183 |
This activates BuildGrid's remote execution mode and points to the unnamed
|
173 | 184 |
remote execution server instance at ``localhost:50051``.
|
... | ... | @@ -25,7 +25,6 @@ import pytest |
25 | 25 |
|
26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
27 | 27 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
28 |
-from buildgrid.server import job
|
|
29 | 28 |
from buildgrid.server.controller import ExecutionController
|
30 | 29 |
from buildgrid.server.job import LeaseState
|
31 | 30 |
from buildgrid.server.bots import service
|
... | ... | @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance): |
159 | 158 |
def _inject_work(scheduler, action=None, action_digest=None):
|
160 | 159 |
if not action:
|
161 | 160 |
action = remote_execution_pb2.Action()
|
161 |
+ |
|
162 | 162 |
if not action_digest:
|
163 | 163 |
action_digest = remote_execution_pb2.Digest()
|
164 |
- j = job.Job(action, action_digest)
|
|
165 |
- scheduler.queue_job(j, True)
|
|
164 |
+ |
|
165 |
+ scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)
|
... | ... | @@ -20,11 +20,11 @@ |
20 | 20 |
import uuid
|
21 | 21 |
from unittest import mock
|
22 | 22 |
|
23 |
-from google.protobuf import any_pb2
|
|
24 | 23 |
import grpc
|
25 | 24 |
from grpc._server import _Context
|
26 | 25 |
import pytest
|
27 | 26 |
|
27 |
+from buildgrid._enums import OperationStage
|
|
28 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
29 | 29 |
from buildgrid._protos.google.longrunning import operations_pb2
|
30 | 30 |
|
... | ... | @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context): |
82 | 82 |
assert isinstance(result, operations_pb2.Operation)
|
83 | 83 |
metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
84 | 84 |
result.metadata.Unpack(metadata)
|
85 |
- assert metadata.stage == job.OperationStage.QUEUED.value
|
|
85 |
+ assert metadata.stage == OperationStage.QUEUED.value
|
|
86 | 86 |
operation_uuid = result.name.split('/')[-1]
|
87 | 87 |
assert uuid.UUID(operation_uuid, version=4)
|
88 | 88 |
assert result.done is False
|
... | ... | @@ -106,18 +106,14 @@ def test_no_action_digest_in_storage(instance, context): |
106 | 106 |
|
107 | 107 |
|
108 | 108 |
def test_wait_execution(instance, controller, context):
|
109 |
- j = job.Job(action, action_digest)
|
|
110 |
- j._operation.done = True
|
|
109 |
+ job_name = controller.execution_instance._scheduler.queue_job_operation(action,
|
|
110 |
+ action_digest,
|
|
111 |
+ skip_cache_lookup=True)
|
|
111 | 112 |
|
112 |
- request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
|
|
113 |
+ controller.execution_instance._scheduler._update_job_operation_stage(job_name,
|
|
114 |
+ OperationStage.COMPLETED)
|
|
113 | 115 |
|
114 |
- controller.execution_instance._scheduler.jobs[j.name] = j
|
|
115 |
- |
|
116 |
- action_result_any = any_pb2.Any()
|
|
117 |
- action_result = remote_execution_pb2.ActionResult()
|
|
118 |
- action_result_any.Pack(action_result)
|
|
119 |
- |
|
120 |
- j.update_operation_stage(job.OperationStage.COMPLETED)
|
|
116 |
+ request = remote_execution_pb2.WaitExecutionRequest(name=job_name)
|
|
121 | 117 |
|
122 | 118 |
response = instance.WaitExecution(request, context)
|
123 | 119 |
|
... | ... | @@ -127,7 +123,6 @@ def test_wait_execution(instance, controller, context): |
127 | 123 |
metadata = remote_execution_pb2.ExecuteOperationMetadata()
|
128 | 124 |
result.metadata.Unpack(metadata)
|
129 | 125 |
assert metadata.stage == job.OperationStage.COMPLETED.value
|
130 |
- assert uuid.UUID(result.name, version=4)
|
|
131 | 126 |
assert result.done is True
|
132 | 127 |
|
133 | 128 |
|
... | ... | @@ -17,6 +17,7 @@ |
17 | 17 |
|
18 | 18 |
# pylint: disable=redefined-outer-name
|
19 | 19 |
|
20 |
+import queue
|
|
20 | 21 |
from unittest import mock
|
21 | 22 |
|
22 | 23 |
from google.protobuf import any_pb2
|
... | ... | @@ -86,8 +87,13 @@ def blank_instance(controller): |
86 | 87 |
|
87 | 88 |
# Queue an execution, get operation corresponding to that request
|
88 | 89 |
def test_get_operation(instance, controller, execute_request, context):
|
89 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
90 |
- execute_request.skip_cache_lookup)
|
|
90 |
+ job_name = controller.execution_instance.execute(execute_request.action_digest,
|
|
91 |
+ execute_request.skip_cache_lookup)
|
|
92 |
+ |
|
93 |
+ message_queue = queue.Queue()
|
|
94 |
+ operation_name = controller.execution_instance.register_operation_client(job_name,
|
|
95 |
+ context.peer(),
|
|
96 |
+ message_queue)
|
|
91 | 97 |
|
92 | 98 |
request = operations_pb2.GetOperationRequest()
|
93 | 99 |
|
... | ... | @@ -95,25 +101,28 @@ def test_get_operation(instance, controller, execute_request, context): |
95 | 101 |
# we're manually creating the instance here, it doesn't get a name.
|
96 | 102 |
# Therefore we need to manually add the instance name to the operation
|
97 | 103 |
# name in the GetOperation request.
|
98 |
- request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
104 |
+ request.name = "{}/{}".format(instance_name, operation_name)
|
|
99 | 105 |
|
100 | 106 |
response = instance.GetOperation(request, context)
|
101 |
- assert response.name == "{}/{}".format(instance_name, response_execute.name)
|
|
102 |
- assert response.done == response_execute.done
|
|
107 |
+ assert response.name == "{}/{}".format(instance_name, operation_name)
|
|
103 | 108 |
|
104 | 109 |
|
105 | 110 |
# Queue an execution, get operation corresponding to that request
|
106 | 111 |
def test_get_operation_blank(blank_instance, controller, execute_request, context):
|
107 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
108 |
- execute_request.skip_cache_lookup)
|
|
112 |
+ job_name = controller.execution_instance.execute(execute_request.action_digest,
|
|
113 |
+ execute_request.skip_cache_lookup)
|
|
114 |
+ |
|
115 |
+ message_queue = queue.Queue()
|
|
116 |
+ operation_name = controller.execution_instance.register_operation_client(job_name,
|
|
117 |
+ context.peer(),
|
|
118 |
+ message_queue)
|
|
109 | 119 |
|
110 | 120 |
request = operations_pb2.GetOperationRequest()
|
111 | 121 |
|
112 |
- request.name = response_execute.name
|
|
122 |
+ request.name = operation_name
|
|
113 | 123 |
|
114 | 124 |
response = blank_instance.GetOperation(request, context)
|
115 |
- assert response.name == response_execute.name
|
|
116 |
- assert response.done == response_execute.done
|
|
125 |
+ assert response.name == operation_name
|
|
117 | 126 |
|
118 | 127 |
|
119 | 128 |
def test_get_operation_fail(instance, context):
|
... | ... | @@ -133,25 +142,35 @@ def test_get_operation_instance_fail(instance, context): |
133 | 142 |
|
134 | 143 |
|
135 | 144 |
def test_list_operations(instance, controller, execute_request, context):
|
136 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
137 |
- execute_request.skip_cache_lookup)
|
|
145 |
+ job_name = controller.execution_instance.execute(execute_request.action_digest,
|
|
146 |
+ execute_request.skip_cache_lookup)
|
|
147 |
+ |
|
148 |
+ message_queue = queue.Queue()
|
|
149 |
+ operation_name = controller.execution_instance.register_operation_client(job_name,
|
|
150 |
+ context.peer(),
|
|
151 |
+ message_queue)
|
|
138 | 152 |
|
139 | 153 |
request = operations_pb2.ListOperationsRequest(name=instance_name)
|
140 | 154 |
response = instance.ListOperations(request, context)
|
141 | 155 |
|
142 | 156 |
names = response.operations[0].name.split('/')
|
143 | 157 |
assert names[0] == instance_name
|
144 |
- assert names[1] == response_execute.name
|
|
158 |
+ assert names[1] == operation_name
|
|
145 | 159 |
|
146 | 160 |
|
147 | 161 |
def test_list_operations_blank(blank_instance, controller, execute_request, context):
|
148 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
149 |
- execute_request.skip_cache_lookup)
|
|
162 |
+ job_name = controller.execution_instance.execute(execute_request.action_digest,
|
|
163 |
+ execute_request.skip_cache_lookup)
|
|
164 |
+ |
|
165 |
+ message_queue = queue.Queue()
|
|
166 |
+ operation_name = controller.execution_instance.register_operation_client(job_name,
|
|
167 |
+ context.peer(),
|
|
168 |
+ message_queue)
|
|
150 | 169 |
|
151 | 170 |
request = operations_pb2.ListOperationsRequest(name='')
|
152 | 171 |
response = blank_instance.ListOperations(request, context)
|
153 | 172 |
|
154 |
- assert response.operations[0].name.split('/')[-1] == response_execute.name
|
|
173 |
+ assert response.operations[0].name.split('/')[-1] == operation_name
|
|
155 | 174 |
|
156 | 175 |
|
157 | 176 |
def test_list_operations_instance_fail(instance, controller, execute_request, context):
|
... | ... | @@ -174,14 +193,19 @@ def test_list_operations_empty(instance, context): |
174 | 193 |
|
175 | 194 |
# Send execution off, delete, try to find operation should fail
|
176 | 195 |
def test_delete_operation(instance, controller, execute_request, context):
|
177 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
178 |
- execute_request.skip_cache_lookup)
|
|
196 |
+ job_name = controller.execution_instance.execute(execute_request.action_digest,
|
|
197 |
+ execute_request.skip_cache_lookup)
|
|
198 |
+ |
|
199 |
+ message_queue = queue.Queue()
|
|
200 |
+ operation_name = controller.execution_instance.register_operation_client(job_name,
|
|
201 |
+ context.peer(),
|
|
202 |
+ message_queue)
|
|
179 | 203 |
|
180 | 204 |
request = operations_pb2.DeleteOperationRequest()
|
181 |
- request.name = response_execute.name
|
|
205 |
+ request.name = operation_name
|
|
182 | 206 |
instance.DeleteOperation(request, context)
|
183 | 207 |
|
184 |
- request_name = "{}/{}".format(instance_name, response_execute.name)
|
|
208 |
+ request_name = "{}/{}".format(instance_name, operation_name)
|
|
185 | 209 |
|
186 | 210 |
with pytest.raises(InvalidArgumentError):
|
187 | 211 |
controller.operations_instance.get_operation(request_name)
|
... | ... | @@ -189,17 +213,11 @@ def test_delete_operation(instance, controller, execute_request, context): |
189 | 213 |
|
190 | 214 |
# Send execution off, delete, try to find operation should fail
|
191 | 215 |
def test_delete_operation_blank(blank_instance, controller, execute_request, context):
|
192 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
193 |
- execute_request.skip_cache_lookup)
|
|
194 |
- |
|
195 | 216 |
request = operations_pb2.DeleteOperationRequest()
|
196 |
- request.name = response_execute.name
|
|
217 |
+ request.name = "runner"
|
|
197 | 218 |
blank_instance.DeleteOperation(request, context)
|
198 | 219 |
|
199 |
- request_name = response_execute.name
|
|
200 |
- |
|
201 |
- with pytest.raises(InvalidArgumentError):
|
|
202 |
- controller.operations_instance.get_operation(request_name)
|
|
220 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
|
203 | 221 |
|
204 | 222 |
|
205 | 223 |
def test_delete_operation_fail(instance, context):
|
... | ... | @@ -211,11 +229,16 @@ def test_delete_operation_fail(instance, context): |
211 | 229 |
|
212 | 230 |
|
213 | 231 |
def test_cancel_operation(instance, controller, execute_request, context):
|
214 |
- response_execute = controller.execution_instance.execute(execute_request.action_digest,
|
|
215 |
- execute_request.skip_cache_lookup)
|
|
232 |
+ job_name = controller.execution_instance.execute(execute_request.action_digest,
|
|
233 |
+ execute_request.skip_cache_lookup)
|
|
234 |
+ |
|
235 |
+ message_queue = queue.Queue()
|
|
236 |
+ operation_name = controller.execution_instance.register_operation_client(job_name,
|
|
237 |
+ context.peer(),
|
|
238 |
+ message_queue)
|
|
216 | 239 |
|
217 | 240 |
request = operations_pb2.CancelOperationRequest()
|
218 |
- request.name = "{}/{}".format(instance_name, response_execute.name)
|
|
241 |
+ request.name = "{}/{}".format(instance_name, operation_name)
|
|
219 | 242 |
|
220 | 243 |
instance.CancelOperation(request, context)
|
221 | 244 |
|
... | ... | @@ -238,7 +261,7 @@ def test_cancel_operation_blank(blank_instance, context): |
238 | 261 |
context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
|
239 | 262 |
|
240 | 263 |
|
241 |
-def test_cancel_operation_instance_fail(instance, context):
|
|
264 |
+def test_cancel_operation__fail(instance, context):
|
|
242 | 265 |
request = operations_pb2.CancelOperationRequest()
|
243 | 266 |
instance.CancelOperation(request, context)
|
244 | 267 |
|