Martin Blanchard pushed to branch mablanch/138-exit-on-startup-failure at BuildGrid / buildgrid
Commits:
-
2975041d
by RichKen at 2019-01-15T15:09:22Z
-
7fec32ac
by Santiago Gil at 2019-01-18T09:53:32Z
-
cc92a38f
by Santiago Gil at 2019-01-18T12:52:57Z
-
b990291b
by Santiago Gil at 2019-01-18T12:52:57Z
-
00226325
by Martin Blanchard at 2019-01-21T16:34:35Z
-
e3397c2d
by Martin Blanchard at 2019-01-21T16:34:49Z
-
ad4eb44f
by Martin Blanchard at 2019-01-21T16:34:52Z
-
15d44d3b
by Martin Blanchard at 2019-01-24T08:57:04Z
-
4a600cfc
by Martin Blanchard at 2019-01-24T08:57:04Z
-
89f2b765
by Martin Blanchard at 2019-01-24T08:57:04Z
-
02692ded
by Martin Blanchard at 2019-01-24T08:57:04Z
10 changed files:
- buildgrid/_app/commands/cmd_cas.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/_app/settings/parser.py
- buildgrid/_exceptions.py
- buildgrid/client/cas.py
- buildgrid/server/_authentication.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/instance.py
- tests/auth/test_interceptor.py
Changes:
... | ... | @@ -147,33 +147,41 @@ def _create_digest(digest_string): |
147 | 147 |
return digest
|
148 | 148 |
|
149 | 149 |
|
150 |
-@cli.command('download-file', short_help="Download a file from the CAS server.")
|
|
151 |
-@click.argument('digest-string', nargs=1, type=click.STRING, required=True)
|
|
152 |
-@click.argument('file-path', nargs=1, type=click.Path(exists=False), required=True)
|
|
150 |
+@cli.command('download-file', short_help="Download one or more files from the CAS server. "
|
|
151 |
+ "(Specified as a space-separated list of DIGEST FILE_PATH)")
|
|
152 |
+@click.argument('digest-path-list', nargs=-1, type=str, required=True) # 'digest path' pairs
|
|
153 | 153 |
@click.option('--verify', is_flag=True, show_default=True,
|
154 | 154 |
help="Check downloaded file's integrity.")
|
155 | 155 |
@pass_context
|
156 |
-def download_file(context, digest_string, file_path, verify):
|
|
157 |
- if os.path.exists(file_path):
|
|
158 |
- click.echo("Error: Invalid value for " +
|
|
159 |
- "path=[{}] already exists.".format(file_path), err=True)
|
|
160 |
- return
|
|
161 |
- |
|
162 |
- digest = _create_digest(digest_string)
|
|
156 |
+def download_file(context, digest_path_list, verify):
|
|
157 |
+ # Downloading files:
|
|
158 |
+ downloaded_files = {}
|
|
163 | 159 |
with download(context.channel, instance=context.instance_name) as downloader:
|
164 |
- downloader.download_file(digest, file_path)
|
|
165 |
- |
|
166 |
- if verify:
|
|
167 |
- file_digest = create_digest(read_file(file_path))
|
|
168 |
- if file_digest != digest:
|
|
169 |
- click.echo("Error: Failed to verify path=[{}]".format(file_path), err=True)
|
|
170 |
- return
|
|
171 |
- |
|
172 |
- if os.path.isfile(file_path):
|
|
173 |
- click.echo("Success: Pulled path=[{}] from digest=[{}/{}]"
|
|
174 |
- .format(file_path, digest.hash, digest.size_bytes))
|
|
175 |
- else:
|
|
176 |
- click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
|
|
160 |
+ for (digest_string, file_path) in zip(digest_path_list[0::2],
|
|
161 |
+ digest_path_list[1::2]):
|
|
162 |
+ if os.path.exists(file_path):
|
|
163 |
+ click.echo("Error: Invalid value for " +
|
|
164 |
+ "path=[{}] already exists.".format(file_path), err=True)
|
|
165 |
+ continue
|
|
166 |
+ |
|
167 |
+ digest = _create_digest(digest_string)
|
|
168 |
+ |
|
169 |
+ downloader.download_file(digest, file_path)
|
|
170 |
+ downloaded_files[file_path] = digest
|
|
171 |
+ |
|
172 |
+ # Verifying:
|
|
173 |
+ for (file_path, digest) in downloaded_files.items():
|
|
174 |
+ if verify:
|
|
175 |
+ file_digest = create_digest(read_file(file_path))
|
|
176 |
+ if file_digest != digest:
|
|
177 |
+ click.echo("Error: Failed to verify path=[{}]".format(file_path), err=True)
|
|
178 |
+ continue
|
|
179 |
+ |
|
180 |
+ if os.path.isfile(file_path):
|
|
181 |
+ click.echo("Success: Pulled path=[{}] from digest=[{}/{}]"
|
|
182 |
+ .format(file_path, digest.hash, digest.size_bytes))
|
|
183 |
+ else:
|
|
184 |
+ click.echo('Error: Failed pulling "{}"'.format(file_path), err=True)
|
|
177 | 185 |
|
178 | 186 |
|
179 | 187 |
@cli.command('download-dir', short_help="Download a directory from the CAS server.")
|
... | ... | @@ -24,6 +24,7 @@ import sys |
24 | 24 |
|
25 | 25 |
import click
|
26 | 26 |
|
27 |
+from buildgrid._exceptions import PermissionDeniedError
|
|
27 | 28 |
from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
|
28 | 29 |
from buildgrid.server.instance import BuildGridServer
|
29 | 30 |
from buildgrid.server._monitoring import MonitoringOutputType, MonitoringOutputFormat
|
... | ... | @@ -120,8 +121,13 @@ def _create_server_from_config(configuration): |
120 | 121 |
|
121 | 122 |
server = BuildGridServer(**kargs)
|
122 | 123 |
|
123 |
- for channel in network:
|
|
124 |
- server.add_port(channel.address, channel.credentials)
|
|
124 |
+ try:
|
|
125 |
+ for channel in network:
|
|
126 |
+ server.add_port(channel.address, channel.credentials)
|
|
127 |
+ |
|
128 |
+ except PermissionDeniedError as e:
|
|
129 |
+ click.echo("Error: {}.".format(e), err=True)
|
|
130 |
+ sys.exit(-1)
|
|
125 | 131 |
|
126 | 132 |
for instance in instances:
|
127 | 133 |
instance_name = instance['name']
|
... | ... | @@ -82,9 +82,9 @@ class Channel(YamlFactory): |
82 | 82 |
client_certs = credentials['tls-client-certs']
|
83 | 83 |
self.credentials = context.load_server_credentials(server_key, server_cert, client_certs)
|
84 | 84 |
|
85 |
- if not credentials:
|
|
86 |
- click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
|
|
87 |
- "Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
|
|
85 |
+ if not self.credentials:
|
|
86 |
+ click.echo("ERROR: load_server_credentials could not find certificates.\n" +
|
|
87 |
+ "Please check whether the specified certificate paths exist.\n", err=True)
|
|
88 | 88 |
sys.exit(-1)
|
89 | 89 |
|
90 | 90 |
|
... | ... | @@ -89,3 +89,9 @@ class FailedPreconditionError(BgdError): |
89 | 89 |
able to fix the errors and retry."""
|
90 | 90 |
def __init__(self, message, detail=None, reason=None):
|
91 | 91 |
super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
|
92 |
+ |
|
93 |
+ |
|
94 |
+class PermissionDeniedError(BgdError):
|
|
95 |
+ """The caller does not have permission to execute the specified operation."""
|
|
96 |
+ def __init__(self, message, detail=None, reason=None):
|
|
97 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
|
... | ... | @@ -391,7 +391,7 @@ class Downloader: |
391 | 391 |
except grpc.RpcError as e:
|
392 | 392 |
status_code = e.code()
|
393 | 393 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
394 |
- _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
|
|
394 |
+ _CallCache.mark_unimplemented(self.channel, 'GetTree')
|
|
395 | 395 |
|
396 | 396 |
elif status_code == grpc.StatusCode.NOT_FOUND:
|
397 | 397 |
raise NotFoundError("Requested directory does not exist on the remote.")
|
... | ... | @@ -200,9 +200,19 @@ class AuthMetadataServerInterceptor(grpc.ServerInterceptor): |
200 | 200 |
expiration_time = self.__bearer_cache[bearer]
|
201 | 201 |
|
202 | 202 |
# Accept request if cached token hasn't expired yet:
|
203 |
- if expiration_time < datetime.utcnow():
|
|
203 |
+ if expiration_time >= datetime.utcnow():
|
|
204 | 204 |
return continuation(handler_call_details) # Accepted
|
205 | 205 |
|
206 |
+ else:
|
|
207 |
+ del self.__bearer_cache[bearer]
|
|
208 |
+ |
|
209 |
+ # Cached token has expired, reject the request:
|
|
210 |
+ self.__logger.error("Rejecting '{}' request: {}"
|
|
211 |
+ .format(handler_call_details.method.split('/')[-1],
|
|
212 |
+ self.__auth_errors['expired-token']))
|
|
213 |
+ # TODO: Use grpc.Status.details to inform the client of the expiry?
|
|
214 |
+ return self.__terminators['expired-token']
|
|
215 |
+ |
|
206 | 216 |
except KeyError:
|
207 | 217 |
pass
|
208 | 218 |
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
25 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
27 |
+from buildgrid._protos.google.rpc import code_pb2, status_pb2
|
|
27 | 28 |
from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
28 | 29 |
from buildgrid.utils import get_hash_type
|
29 | 30 |
|
... | ... | @@ -70,6 +71,35 @@ class ContentAddressableStorageInstance: |
70 | 71 |
|
71 | 72 |
return response
|
72 | 73 |
|
74 |
+ def batch_read_blobs(self, digests):
|
|
75 |
+ storage = self._storage
|
|
76 |
+ |
|
77 |
+ response = re_pb2.BatchReadBlobsResponse()
|
|
78 |
+ |
|
79 |
+ requested_bytes = sum((digest.size_bytes for digest in digests))
|
|
80 |
+ max_batch_size = self.max_batch_total_size_bytes()
|
|
81 |
+ |
|
82 |
+ if requested_bytes > max_batch_size:
|
|
83 |
+ raise InvalidArgumentError('Combined total size of blobs exceeds '
|
|
84 |
+ 'server limit. '
|
|
85 |
+ '({} > {} [byte])'.format(requested_bytes,
|
|
86 |
+ max_batch_size))
|
|
87 |
+ |
|
88 |
+ for digest in digests:
|
|
89 |
+ response_proto = response.responses.add()
|
|
90 |
+ response_proto.digest.CopyFrom(digest)
|
|
91 |
+ |
|
92 |
+ blob = storage.get_blob(digest)
|
|
93 |
+ if blob:
|
|
94 |
+ response_proto.data = blob.read()
|
|
95 |
+ status_code = code_pb2.OK
|
|
96 |
+ else:
|
|
97 |
+ status_code = code_pb2.NOT_FOUND
|
|
98 |
+ |
|
99 |
+ response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
|
|
100 |
+ |
|
101 |
+ return response
|
|
102 |
+ |
|
73 | 103 |
def get_tree(self, request):
|
74 | 104 |
storage = self._storage
|
75 | 105 |
|
... | ... | @@ -86,8 +86,15 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
86 | 86 |
def BatchReadBlobs(self, request, context):
|
87 | 87 |
self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
|
88 | 88 |
|
89 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
90 |
- context.set_details('Method not implemented!')
|
|
89 |
+ try:
|
|
90 |
+ instance = self._get_instance(request.instance_name)
|
|
91 |
+ response = instance.batch_read_blobs(request.digests)
|
|
92 |
+ return response
|
|
93 |
+ |
|
94 |
+ except InvalidArgumentError as e:
|
|
95 |
+ self.__logger.error(e)
|
|
96 |
+ context.set_details(str(e))
|
|
97 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
91 | 98 |
|
92 | 99 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
93 | 100 |
|
... | ... | @@ -27,6 +27,7 @@ import grpc |
27 | 27 |
import janus
|
28 | 28 |
|
29 | 29 |
from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
|
30 |
+from buildgrid._exceptions import PermissionDeniedError
|
|
30 | 31 |
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
|
31 | 32 |
from buildgrid.server.actioncache.service import ActionCacheService
|
32 | 33 |
from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
|
... | ... | @@ -87,7 +88,8 @@ class BuildGridServer: |
87 | 88 |
AuthContext.interceptor = self.__grpc_auth_interceptor
|
88 | 89 |
|
89 | 90 |
self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
|
90 |
- self.__grpc_server = grpc.server(self.__grpc_executor)
|
|
91 |
+ self.__grpc_server = grpc.server(self.__grpc_executor,
|
|
92 |
+ options=(('grpc.so_reuseport', 0),))
|
|
91 | 93 |
|
92 | 94 |
self.__main_loop = asyncio.get_event_loop()
|
93 | 95 |
|
... | ... | @@ -205,6 +207,9 @@ class BuildGridServer: |
205 | 207 |
|
206 | 208 |
Returns:
|
207 | 209 |
int: Number of the bound port.
|
210 |
+ |
|
211 |
+ Raises:
|
|
212 |
+ PermissionDeniedError: If socket binding fails.
|
|
208 | 213 |
"""
|
209 | 214 |
if credentials is not None:
|
210 | 215 |
self.__logger.info("Adding secure connection on: [%s]", address)
|
... | ... | @@ -214,6 +219,9 @@ class BuildGridServer: |
214 | 219 |
self.__logger.info("Adding insecure connection on [%s]", address)
|
215 | 220 |
port_number = self.__grpc_server.add_insecure_port(address)
|
216 | 221 |
|
222 |
+ if not port_number:
|
|
223 |
+ raise PermissionDeniedError("Unable to configure socket")
|
|
224 |
+ |
|
217 | 225 |
return port_number
|
218 | 226 |
|
219 | 227 |
def add_execution_instance(self, instance, instance_name):
|
... | ... | @@ -16,8 +16,10 @@ |
16 | 16 |
|
17 | 17 |
|
18 | 18 |
from collections import namedtuple
|
19 |
+from datetime import datetime
|
|
19 | 20 |
from unittest import mock
|
20 | 21 |
import os
|
22 |
+import time
|
|
21 | 23 |
|
22 | 24 |
import grpc
|
23 | 25 |
from grpc._server import _Context
|
... | ... | @@ -167,3 +169,60 @@ def test_jwt_authorization(token, secret, algorithm, validity): |
167 | 169 |
else:
|
168 | 170 |
context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
|
169 | 171 |
context.set_code.assert_not_called()
|
172 |
+ |
|
173 |
+ # Token should have been cached now, let's test authorization again:
|
|
174 |
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
175 |
+ |
|
176 |
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
177 |
+ handler.unary_unary(None, context)
|
|
178 |
+ |
|
179 |
+ if validity:
|
|
180 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.OK)
|
|
181 |
+ context.abort.assert_not_called()
|
|
182 |
+ |
|
183 |
+ else:
|
|
184 |
+ context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
|
|
185 |
+ context.set_code.assert_not_called()
|
|
186 |
+ |
|
187 |
+ |
|
188 |
+@pytest.mark.skipif(not HAVE_JWT, reason="No pyjwt")
|
|
189 |
+def test_jwt_authorization_expiry():
|
|
190 |
+ secret, algorithm = 'your-256-bit-secret', AuthMetadataAlgorithm.JWT_HS256
|
|
191 |
+ now = int(datetime.utcnow().timestamp())
|
|
192 |
+ payload = {'sub': 'BuildGrid Expiry Test', 'iat': now, 'exp': now + 2}
|
|
193 |
+ token = jwt.encode(payload, secret, algorithm=algorithm.value.upper()).decode()
|
|
194 |
+ |
|
195 |
+ interceptor = AuthMetadataServerInterceptor(
|
|
196 |
+ method=AuthMetadataMethod.JWT, secret=secret, algorithm=algorithm)
|
|
197 |
+ |
|
198 |
+ # First, test generated token validation:
|
|
199 |
+ continuator = _unary_unary_rpc_terminator
|
|
200 |
+ call_details = _mock_call_details(token)
|
|
201 |
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
202 |
+ |
|
203 |
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
204 |
+ handler.unary_unary(None, context)
|
|
205 |
+ |
|
206 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.OK)
|
|
207 |
+ context.abort.assert_not_called()
|
|
208 |
+ |
|
209 |
+ # Second, ensure cached token validation:
|
|
210 |
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
211 |
+ |
|
212 |
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
213 |
+ handler.unary_unary(None, context)
|
|
214 |
+ |
|
215 |
+ context.set_code.assert_called_once_with(grpc.StatusCode.OK)
|
|
216 |
+ context.abort.assert_not_called()
|
|
217 |
+ |
|
218 |
+ # Then wait for the token to expire:
|
|
219 |
+ time.sleep(3)
|
|
220 |
+ |
|
221 |
+ # Finally, test for cached-token invalidation:
|
|
222 |
+ context = mock.create_autospec(_Context, spec_set=True)
|
|
223 |
+ |
|
224 |
+ handler = interceptor.intercept_service(continuator, call_details)
|
|
225 |
+ handler.unary_unary(None, context)
|
|
226 |
+ |
|
227 |
+ context.abort.assert_called_once_with(grpc.StatusCode.UNAUTHENTICATED, mock.ANY)
|
|
228 |
+ context.set_code.assert_not_called()
|