Martin Blanchard pushed to branch mablanch/144-jwt-authentication at BuildGrid / buildgrid
Commits:
- 
859c0fa8
by Finn at 2018-11-27T15:25:25Z
- 
e1091b04
by Finn at 2018-11-27T15:25:25Z
- 
1dc5d2d2
by Finn at 2018-11-27T15:25:25Z
- 
35c901bd
by Finn at 2018-11-27T16:23:49Z
- 
5cb38e2a
by Finn at 2018-11-27T16:23:52Z
- 
32ad653d
by Finn at 2018-11-27T16:23:52Z
- 
e15a9c91
by Finn at 2018-11-27T16:23:52Z
- 
bd5587ea
by Finn at 2018-11-27T16:23:52Z
- 
d94fa258
by Finn at 2018-11-27T16:23:52Z
- 
6f90a553
by Finn at 2018-11-27T16:23:52Z
- 
db65c5ec
by Raoul Hidalgo Charman at 2018-11-28T12:23:41Z
- 
52a45470
by Martin Blanchard at 2018-11-28T14:52:03Z
- 
d0444a61
by Martin Blanchard at 2018-11-28T14:52:03Z
13 changed files:
- + buildgrid/_app/commands/cmd_capabilities.py
- + buildgrid/client/capabilities.py
- + buildgrid/server/_authentication.py
- + buildgrid/server/capabilities/__init__.py
- + buildgrid/server/capabilities/instance.py
- + buildgrid/server/capabilities/service.py
- buildgrid/server/cas/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/instance.py
- buildgrid/utils.py
- tests/cas/test_storage.py
- + tests/integration/capabilities_service.py
- + tests/utils/capabilities.py
Changes:
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import sys
 | |
| 17 | +from urllib.parse import urlparse
 | |
| 18 | + | |
| 19 | +import click
 | |
| 20 | +import grpc
 | |
| 21 | + | |
| 22 | +from buildgrid.client.capabilities import CapabilitiesInterface
 | |
| 23 | + | |
| 24 | +from ..cli import pass_context
 | |
| 25 | + | |
| 26 | + | |
| 27 | +@click.command(name='capabilities', short_help="Capabilities service.")
 | |
| 28 | +@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
 | |
| 29 | +              help="Remote execution server's URL (port defaults to 50051 if no specified).")
 | |
| 30 | +@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
 | |
| 31 | +              help="Private client key for TLS (PEM-encoded)")
 | |
| 32 | +@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | |
| 33 | +              help="Public client certificate for TLS (PEM-encoded)")
 | |
| 34 | +@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | |
| 35 | +              help="Public server certificate for TLS (PEM-encoded)")
 | |
| 36 | +@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
 | |
| 37 | +              help="Targeted farm instance name.")
 | |
| 38 | +@pass_context
 | |
| 39 | +def cli(context, remote, instance_name, client_key, client_cert, server_cert):
 | |
| 40 | +    click.echo("Getting capabilities...")
 | |
| 41 | +    url = urlparse(remote)
 | |
| 42 | + | |
| 43 | +    remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | |
| 44 | +    instance_name = instance_name
 | |
| 45 | + | |
| 46 | +    if url.scheme == 'http':
 | |
| 47 | +        channel = grpc.insecure_channel(remote)
 | |
| 48 | +    else:
 | |
| 49 | +        credentials = context.load_client_credentials(client_key, client_cert, server_cert)
 | |
| 50 | +        if not credentials:
 | |
| 51 | +            click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
 | |
| 52 | +            sys.exit(-1)
 | |
| 53 | + | |
| 54 | +        channel = grpc.secure_channel(remote, credentials)
 | |
| 55 | + | |
| 56 | +    interface = CapabilitiesInterface(channel)
 | |
| 57 | +    response = interface.get_capabilities(instance_name)
 | |
| 58 | +    click.echo(response) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import logging
 | |
| 17 | +import grpc
 | |
| 18 | + | |
| 19 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 20 | + | |
| 21 | + | |
| 22 | +class CapabilitiesInterface:
 | |
| 23 | +    """Interface for calls the the Capabilities Service."""
 | |
| 24 | + | |
| 25 | +    def __init__(self, channel):
 | |
| 26 | +        """Initialises an instance of the capabilities service.
 | |
| 27 | + | |
| 28 | +        Args:
 | |
| 29 | +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
 | |
| 30 | +        """
 | |
| 31 | +        self.__logger = logging.getLogger(__name__)
 | |
| 32 | +        self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
 | |
| 33 | + | |
| 34 | +    def get_capabilities(self, instance_name):
 | |
| 35 | +        """Returns the capabilities or the server to the user.
 | |
| 36 | + | |
| 37 | +        Args:
 | |
| 38 | +            instance_name (str): The name of the instance."""
 | |
| 39 | + | |
| 40 | +        request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
 | |
| 41 | +        try:
 | |
| 42 | +            return self.__stub.GetCapabilities(request)
 | |
| 43 | + | |
| 44 | +        except grpc.RpcError as e:
 | |
| 45 | +            self.__logger.error(e)
 | |
| 46 | +            raise | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +from datetime import datetime
 | |
| 17 | +from enum import Enum
 | |
| 18 | + | |
| 19 | +import grpc
 | |
| 20 | +import jwt
 | |
| 21 | + | |
| 22 | +from buildgrid._exceptions import InvalidArgumentError
 | |
| 23 | + | |
| 24 | + | |
| 25 | +class JwtAlgorithm(Enum):
 | |
| 26 | +    # HMAC algorithms:
 | |
| 27 | +    HS256 = 'HS256'
 | |
| 28 | +    HS384 = 'HS384'
 | |
| 29 | +    HS512 = 'HS512'
 | |
| 30 | + | |
| 31 | +    # RSASSA-PKCS algorithms:
 | |
| 32 | +    RS256 = 'RS256'
 | |
| 33 | +    RS384 = 'RS384'
 | |
| 34 | +    RS512 = 'RS512'
 | |
| 35 | + | |
| 36 | +    # RSASSA-PSS algorithms:
 | |
| 37 | +    PS256 = 'PS256'
 | |
| 38 | +    PS384 = 'PS384'
 | |
| 39 | +    PS512 = 'PS512'
 | |
| 40 | + | |
| 41 | +    # ECDSA algorithms:
 | |
| 42 | +    ES256 = 'ES256'
 | |
| 43 | +    ES384 = 'ES384'
 | |
| 44 | +    ES521 = 'ES521'
 | |
| 45 | +    ES512 = 'ES512'
 | |
| 46 | + | |
| 47 | + | |
| 48 | +class JwtAuthMetadataInterceptor(grpc.ServerInterceptor):
 | |
| 49 | + | |
| 50 | +    __auth_errors = {
 | |
| 51 | +        'missing-bearer': 'Missing authentication header field.',
 | |
| 52 | +        'invalid-bearer': 'Invalid authentication header field.',
 | |
| 53 | +        'invalid-token': 'Invalid authentication token.',
 | |
| 54 | +        'expired-token': 'Expired authentication token.',
 | |
| 55 | +        'unbounded-token': 'Unbounded authentication token.',
 | |
| 56 | +    }
 | |
| 57 | + | |
| 58 | +    def __init__(self, secret, algorithm):
 | |
| 59 | +        """Initialises a new :class:`JwtAuthMetadataInterceptor`.
 | |
| 60 | + | |
| 61 | +        Args:
 | |
| 62 | +            secret (str): Symetric secret key or asymetric public key.
 | |
| 63 | +            algorithm (JwtAlgorithm): Algorithm used to encode `secret`.
 | |
| 64 | + | |
| 65 | +        Raises:
 | |
| 66 | +            InvalidArgumentError: If `algorithm` is not supported.
 | |
| 67 | +        """
 | |
| 68 | +        self.__bearer_cache = {}
 | |
| 69 | +        self.__terminators = {}
 | |
| 70 | +        self.__secret = secret
 | |
| 71 | + | |
| 72 | +        self._algorithm = algorithm.value
 | |
| 73 | + | |
| 74 | +        try:
 | |
| 75 | +            jwt.register_algorithm(self._algorithm, None)
 | |
| 76 | + | |
| 77 | +        except TypeError:
 | |
| 78 | +            raise InvalidArgumentError('Algorithm not supported for JWT decoding: [{}]'
 | |
| 79 | +                                       .format(self._algorithm))
 | |
| 80 | + | |
| 81 | +        except ValueError:
 | |
| 82 | +            pass
 | |
| 83 | + | |
| 84 | +        for code, message in self.__auth_errors.items():
 | |
| 85 | +            self.__terminators[code] = _unary_unary_rpc_terminator(message)
 | |
| 86 | + | |
| 87 | +    @property
 | |
| 88 | +    def algorithm(self):
 | |
| 89 | +        return JwtAlgorithm(self._algorithm)
 | |
| 90 | + | |
| 91 | +    def intercept_service(self, continuation, handler_call_details):
 | |
| 92 | +        try:
 | |
| 93 | +            # Reject requests not carrying a token:
 | |
| 94 | +            bearer = dict(handler_call_details.invocation_metadata)['Authorization']
 | |
| 95 | + | |
| 96 | +        except KeyError:
 | |
| 97 | +            return self.__terminators['missing-bearer']  # Rejected
 | |
| 98 | + | |
| 99 | +        # Reject requests with malformated bearer:
 | |
| 100 | +        if not bearer.startswith('Bearer '):
 | |
| 101 | +            return self.__terminators['invalid-bearer']  # Rejected
 | |
| 102 | + | |
| 103 | +        try:
 | |
| 104 | +            # Hit the cache for already validated token:
 | |
| 105 | +            expiration_time = self.__bearer_cache[bearer]
 | |
| 106 | + | |
| 107 | +            # Accept request if cached token hasn't expired yet:
 | |
| 108 | +            if expiration_time < datetime.utcnow():
 | |
| 109 | +                return continuation(handler_call_details)  # Accepted
 | |
| 110 | + | |
| 111 | +        except KeyError:
 | |
| 112 | +            pass
 | |
| 113 | + | |
| 114 | +        try:
 | |
| 115 | +            # Decode and validate the new token:
 | |
| 116 | +            payload = jwt.decode(bearer[7:], self.__secret, algorithm=self._algorithm)
 | |
| 117 | + | |
| 118 | +        except jwt.exceptions.ExpiredSignatureError:
 | |
| 119 | +            return self.__terminators['expired-token']  # Rejected
 | |
| 120 | + | |
| 121 | +        except jwt.exceptions.InvalidTokenError:
 | |
| 122 | +            return self.__terminators['invalid-token']  # Rejected
 | |
| 123 | + | |
| 124 | +        # Do not accept token without an expiration time:
 | |
| 125 | +        if 'exp' not in payload or isinstance(payload['exp'], int):
 | |
| 126 | +            return self.__terminators['unbounded-token']  # Rejected
 | |
| 127 | + | |
| 128 | +        # Cache the validated token and store expiration time:
 | |
| 129 | +        self.__bearer_cache[bearer] = datetime.fromtimestamp(payload['exp'])
 | |
| 130 | + | |
| 131 | +        return continuation(handler_call_details)  # Accepted
 | |
| 132 | + | |
| 133 | + | |
| 134 | +def _unary_unary_rpc_terminator(details):
 | |
| 135 | + | |
| 136 | +    def terminate(ignored_request, context):
 | |
| 137 | +        context.abort(grpc.StatusCode.UNAUTHENTICATED, details)
 | |
| 138 | + | |
| 139 | +    return grpc.unary_unary_rpc_method_handler(terminate) | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import logging
 | |
| 17 | + | |
| 18 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 19 | + | |
| 20 | + | |
| 21 | +class CapabilitiesInstance:
 | |
| 22 | + | |
| 23 | +    def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
 | |
| 24 | +        self.__logger = logging.getLogger(__name__)
 | |
| 25 | +        self.__cas_instance = cas_instance
 | |
| 26 | +        self.__action_cache_instance = action_cache_instance
 | |
| 27 | +        self.__execution_instance = execution_instance
 | |
| 28 | + | |
| 29 | +    def register_instance_with_server(self, instance_name, server):
 | |
| 30 | +        server.add_capabilities_instance(self, instance_name)
 | |
| 31 | + | |
| 32 | +    def add_cas_instance(self, cas_instance):
 | |
| 33 | +        self.__cas_instance = cas_instance
 | |
| 34 | + | |
| 35 | +    def add_action_cache_instance(self, action_cache_instance):
 | |
| 36 | +        self.__action_cache_instance = action_cache_instance
 | |
| 37 | + | |
| 38 | +    def add_execution_instance(self, execution_instance):
 | |
| 39 | +        self.__execution_instance = execution_instance
 | |
| 40 | + | |
| 41 | +    def get_capabilities(self):
 | |
| 42 | +        server_capabilities = remote_execution_pb2.ServerCapabilities()
 | |
| 43 | +        server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
 | |
| 44 | +        server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
 | |
| 45 | +        # TODO
 | |
| 46 | +        # When API is stable, fill out SemVer values
 | |
| 47 | +        # server_capabilities.deprecated_api_version =
 | |
| 48 | +        # server_capabilities.low_api_version =
 | |
| 49 | +        # server_capabilities.low_api_version =
 | |
| 50 | +        # server_capabilities.hig_api_version =
 | |
| 51 | +        return server_capabilities
 | |
| 52 | + | |
| 53 | +    def _get_cache_capabilities(self):
 | |
| 54 | +        capabilities = remote_execution_pb2.CacheCapabilities()
 | |
| 55 | +        action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
 | |
| 56 | + | |
| 57 | +        if self.__cas_instance:
 | |
| 58 | +            capabilities.digest_function.extend([self.__cas_instance.hash_type()])
 | |
| 59 | +            capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
 | |
| 60 | +            capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
 | |
| 61 | +            # TODO: execution priority #102
 | |
| 62 | +            # capabilities.cache_priority_capabilities =
 | |
| 63 | + | |
| 64 | +        if self.__action_cache_instance:
 | |
| 65 | +            action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
 | |
| 66 | + | |
| 67 | +        capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
 | |
| 68 | +        return capabilities
 | |
| 69 | + | |
| 70 | +    def _get_capabilities_execution(self):
 | |
| 71 | +        capabilities = remote_execution_pb2.ExecutionCapabilities()
 | |
| 72 | +        if self.__execution_instance:
 | |
| 73 | +            capabilities.exec_enabled = True
 | |
| 74 | +            capabilities.digest_function = self.__execution_instance.hash_type()
 | |
| 75 | +            # TODO: execution priority #102
 | |
| 76 | +            # capabilities.execution_priority =
 | |
| 77 | + | |
| 78 | +        else:
 | |
| 79 | +            capabilities.exec_enabled = False
 | |
| 80 | + | |
| 81 | +        return capabilities | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +import logging
 | |
| 17 | + | |
| 18 | +import grpc
 | |
| 19 | + | |
| 20 | +from buildgrid._exceptions import InvalidArgumentError
 | |
| 21 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 22 | + | |
| 23 | + | |
| 24 | +class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
 | |
| 25 | + | |
| 26 | +    def __init__(self, server):
 | |
| 27 | +        self.__logger = logging.getLogger(__name__)
 | |
| 28 | +        self.__instances = {}
 | |
| 29 | +        remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
 | |
| 30 | + | |
| 31 | +    def add_instance(self, name, instance):
 | |
| 32 | +        self.__instances[name] = instance
 | |
| 33 | + | |
| 34 | +    def add_cas_instance(self, name, instance):
 | |
| 35 | +        self.__instances[name].add_cas_instance(instance)
 | |
| 36 | + | |
| 37 | +    def add_action_cache_instance(self, name, instance):
 | |
| 38 | +        self.__instances[name].add_action_cache_instance(instance)
 | |
| 39 | + | |
| 40 | +    def add_execution_instance(self, name, instance):
 | |
| 41 | +        self.__instances[name].add_execution_instance(instance)
 | |
| 42 | + | |
| 43 | +    def GetCapabilities(self, request, context):
 | |
| 44 | +        try:
 | |
| 45 | +            instance = self._get_instance(request.instance_name)
 | |
| 46 | +            return instance.get_capabilities()
 | |
| 47 | + | |
| 48 | +        except InvalidArgumentError as e:
 | |
| 49 | +            self.__logger.error(e)
 | |
| 50 | +            context.set_details(str(e))
 | |
| 51 | +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | |
| 52 | + | |
| 53 | +        return remote_execution_pb2.ServerCapabilities()
 | |
| 54 | + | |
| 55 | +    def _get_instance(self, name):
 | |
| 56 | +        try:
 | |
| 57 | +            return self.__instances[name]
 | |
| 58 | + | |
| 59 | +        except KeyError:
 | |
| 60 | +            raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name)) | 
| ... | ... | @@ -25,6 +25,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang | 
| 25 | 25 |  from buildgrid._protos.google.bytestream import bytestream_pb2
 | 
| 26 | 26 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
 | 
| 27 | 27 |  from buildgrid.settings import HASH, HASH_LENGTH
 | 
| 28 | +from buildgrid.utils import get_hash_type
 | |
| 28 | 29 |  | 
| 29 | 30 |  | 
| 30 | 31 |  class ContentAddressableStorageInstance:
 | 
| ... | ... | @@ -37,6 +38,19 @@ class ContentAddressableStorageInstance: | 
| 37 | 38 |      def register_instance_with_server(self, instance_name, server):
 | 
| 38 | 39 |          server.add_cas_instance(self, instance_name)
 | 
| 39 | 40 |  | 
| 41 | +    def hash_type(self):
 | |
| 42 | +        return get_hash_type()
 | |
| 43 | + | |
| 44 | +    def max_batch_total_size_bytes(self):
 | |
| 45 | +        # TODO: link with max size
 | |
| 46 | +        # Should be added from settings in MR !119
 | |
| 47 | +        return 2000000
 | |
| 48 | + | |
| 49 | +    def symlink_absolute_path_strategy(self):
 | |
| 50 | +        # Currently this strategy is hardcoded into BuildGrid
 | |
| 51 | +        # With no setting to reference
 | |
| 52 | +        return re_pb2.CacheCapabilities().DISALLOWED
 | |
| 53 | + | |
| 40 | 54 |      def find_missing_blobs(self, blob_digests):
 | 
| 41 | 55 |          storage = self._storage
 | 
| 42 | 56 |          return re_pb2.FindMissingBlobsResponse(
 | 
| ... | ... | @@ -25,6 +25,7 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError | 
| 25 | 25 |  from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | 
| 26 | 26 |  | 
| 27 | 27 |  from ..job import Job
 | 
| 28 | +from ...utils import get_hash_type
 | |
| 28 | 29 |  | 
| 29 | 30 |  | 
| 30 | 31 |  class ExecutionInstance:
 | 
| ... | ... | @@ -38,6 +39,9 @@ class ExecutionInstance: | 
| 38 | 39 |      def register_instance_with_server(self, instance_name, server):
 | 
| 39 | 40 |          server.add_execution_instance(self, instance_name)
 | 
| 40 | 41 |  | 
| 42 | +    def hash_type(self):
 | |
| 43 | +        return get_hash_type()
 | |
| 44 | + | |
| 41 | 45 |      def execute(self, action_digest, skip_cache_lookup, message_queue=None):
 | 
| 42 | 46 |          """ Sends a job for execution.
 | 
| 43 | 47 |          Queues an action and creates an Operation instance to be associated with
 | 
| ... | ... | @@ -22,12 +22,15 @@ import signal | 
| 22 | 22 |  import grpc
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid.server.actioncache.service import ActionCacheService
 | 
| 25 | +from buildgrid.server._authentication import JwtAlgorithm, JwtAuthMetadataInterceptor
 | |
| 25 | 26 |  from buildgrid.server.bots.service import BotsService
 | 
| 26 | 27 |  from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 | 
| 27 | 28 |  from buildgrid.server.execution.service import ExecutionService
 | 
| 28 | 29 |  from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
 | 
| 29 | 30 |  from buildgrid.server.operations.service import OperationsService
 | 
| 30 | 31 |  from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 32 | +from buildgrid.server.capabilities.instance import CapabilitiesInstance
 | |
| 33 | +from buildgrid.server.capabilities.service import CapabilitiesService
 | |
| 31 | 34 |  | 
| 32 | 35 |  | 
| 33 | 36 |  class BuildGridServer:
 | 
| ... | ... | @@ -50,11 +53,16 @@ class BuildGridServer: | 
| 50 | 53 |              max_workers = (os.cpu_count() or 1) * 5
 | 
| 51 | 54 |  | 
| 52 | 55 |          self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
 | 
| 53 | -        self.__grpc_server = grpc.server(self.__grpc_executor)
 | |
| 56 | +        self.__grpc_auth_interceptor = JwtAuthMetadataInterceptor('your-256-bit-secret', JwtAlgorithm.HS256)
 | |
| 57 | +        self.__grpc_server = grpc.server(
 | |
| 58 | +            self.__grpc_executor, interceptors=(self.__grpc_auth_interceptor,))
 | |
| 54 | 59 |  | 
| 55 | 60 |          self.__main_loop = asyncio.get_event_loop()
 | 
| 56 | 61 |          self.__monitoring_bus = None
 | 
| 57 | 62 |  | 
| 63 | +        # We always want a capabilities service
 | |
| 64 | +        self._capabilities_service = CapabilitiesService(self.__grpc_server)
 | |
| 65 | + | |
| 58 | 66 |          self._execution_service = None
 | 
| 59 | 67 |          self._bots_service = None
 | 
| 60 | 68 |          self._operations_service = None
 | 
| ... | ... | @@ -128,6 +136,7 @@ class BuildGridServer: | 
| 128 | 136 |              self._execution_service = ExecutionService(self.__grpc_server)
 | 
| 129 | 137 |  | 
| 130 | 138 |          self._execution_service.add_instance(instance_name, instance)
 | 
| 139 | +        self._add_capabilities_instance(instance_name, execution_instance=instance)
 | |
| 131 | 140 |  | 
| 132 | 141 |      def add_bots_interface(self, instance, instance_name):
 | 
| 133 | 142 |          """Adds a :obj:`BotsInterface` to the service.
 | 
| ... | ... | @@ -184,9 +193,10 @@ class BuildGridServer: | 
| 184 | 193 |              self._action_cache_service = ActionCacheService(self.__grpc_server)
 | 
| 185 | 194 |  | 
| 186 | 195 |          self._action_cache_service.add_instance(instance_name, instance)
 | 
| 196 | +        self._add_capabilities_instance(instance_name, action_cache_instance=instance)
 | |
| 187 | 197 |  | 
| 188 | 198 |      def add_cas_instance(self, instance, instance_name):
 | 
| 189 | -        """Stores a :obj:`ContentAddressableStorageInstance` to the service.
 | |
| 199 | +        """Adds a :obj:`ContentAddressableStorageInstance` to the service.
 | |
| 190 | 200 |  | 
| 191 | 201 |          If no service exists, it creates one.
 | 
| 192 | 202 |  | 
| ... | ... | @@ -198,9 +208,10 @@ class BuildGridServer: | 
| 198 | 208 |              self._cas_service = ContentAddressableStorageService(self.__grpc_server)
 | 
| 199 | 209 |  | 
| 200 | 210 |          self._cas_service.add_instance(instance_name, instance)
 | 
| 211 | +        self._add_capabilities_instance(instance_name, cas_instance=instance)
 | |
| 201 | 212 |  | 
| 202 | 213 |      def add_bytestream_instance(self, instance, instance_name):
 | 
| 203 | -        """Stores a :obj:`ByteStreamInstance` to the service.
 | |
| 214 | +        """Adds a :obj:`ByteStreamInstance` to the service.
 | |
| 204 | 215 |  | 
| 205 | 216 |          If no service exists, it creates one.
 | 
| 206 | 217 |  | 
| ... | ... | @@ -213,6 +224,31 @@ class BuildGridServer: | 
| 213 | 224 |  | 
| 214 | 225 |          self._bytestream_service.add_instance(instance_name, instance)
 | 
| 215 | 226 |  | 
| 227 | +    def _add_capabilities_instance(self, instance_name,
 | |
| 228 | +                                   cas_instance=None,
 | |
| 229 | +                                   action_cache_instance=None,
 | |
| 230 | +                                   execution_instance=None):
 | |
| 231 | +        """Adds a :obj:`CapabilitiesInstance` to the service.
 | |
| 232 | + | |
| 233 | +        Args:
 | |
| 234 | +            instance (:obj:`CapabilitiesInstance`): Instance to add.
 | |
| 235 | +            instance_name (str): Instance name.
 | |
| 236 | +        """
 | |
| 237 | + | |
| 238 | +        try:
 | |
| 239 | +            if cas_instance:
 | |
| 240 | +                self._capabilities_service.add_cas_instance(instance_name, cas_instance)
 | |
| 241 | +            if action_cache_instance:
 | |
| 242 | +                self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
 | |
| 243 | +            if execution_instance:
 | |
| 244 | +                self._capabilities_service.add_execution_instance(instance_name, execution_instance)
 | |
| 245 | + | |
| 246 | +        except KeyError:
 | |
| 247 | +            capabilities_instance = CapabilitiesInstance(cas_instance,
 | |
| 248 | +                                                         action_cache_instance,
 | |
| 249 | +                                                         execution_instance)
 | |
| 250 | +            self._capabilities_service.add_instance(instance_name, capabilities_instance)
 | |
| 251 | + | |
| 216 | 252 |      # --- Public API: Monitoring ---
 | 
| 217 | 253 |  | 
| 218 | 254 |      @property
 | 
| ... | ... | @@ -30,6 +30,14 @@ def get_hostname(): | 
| 30 | 30 |      return socket.gethostname()
 | 
| 31 | 31 |  | 
| 32 | 32 |  | 
| 33 | +def get_hash_type():
 | |
| 34 | +    """Returns the hash type."""
 | |
| 35 | +    hash_name = HASH().name
 | |
| 36 | +    if hash_name == "sha256":
 | |
| 37 | +        return remote_execution_pb2.SHA256
 | |
| 38 | +    return remote_execution_pb2.UNKNOWN
 | |
| 39 | + | |
| 40 | + | |
| 33 | 41 |  def create_digest(bytes_to_digest):
 | 
| 34 | 42 |      """Computes the :obj:`Digest` of a piece of data.
 | 
| 35 | 43 |  | 
| ... | ... | @@ -21,8 +21,8 @@ import tempfile | 
| 21 | 21 |  | 
| 22 | 22 |  import boto3
 | 
| 23 | 23 |  import grpc
 | 
| 24 | -import pytest
 | |
| 25 | 24 |  from moto import mock_s3
 | 
| 25 | +import pytest
 | |
| 26 | 26 |  | 
| 27 | 27 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 28 | 28 |  from buildgrid.server.cas.storage.remote import RemoteStorage
 | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | +# pylint: disable=redefined-outer-name
 | |
| 16 | + | |
| 17 | + | |
| 18 | +import grpc
 | |
| 19 | +import pytest
 | |
| 20 | + | |
| 21 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | |
| 22 | +from buildgrid.client.capabilities import CapabilitiesInterface
 | |
| 23 | +from buildgrid.server.controller import ExecutionController
 | |
| 24 | +from buildgrid.server.actioncache.storage import ActionCache
 | |
| 25 | +from buildgrid.server.cas.instance import ContentAddressableStorageInstance
 | |
| 26 | +from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
 | |
| 27 | + | |
| 28 | +from ..utils.utils import run_in_subprocess
 | |
| 29 | +from ..utils.capabilities import serve_capabilities_service
 | |
| 30 | + | |
| 31 | + | |
| 32 | +INSTANCES = ['', 'instance']
 | |
| 33 | + | |
| 34 | + | |
| 35 | +# Use subprocess to avoid creation of gRPC threads in main process
 | |
| 36 | +# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
 | |
| 37 | +# Multiprocessing uses pickle which protobufs don't work with
 | |
| 38 | +# Workaround wrapper to send messages as strings
 | |
| 39 | +class ServerInterface:
 | |
| 40 | + | |
| 41 | +    def __init__(self, remote):
 | |
| 42 | +        self.__remote = remote
 | |
| 43 | + | |
| 44 | +    def get_capabilities(self, instance_name):
 | |
| 45 | + | |
| 46 | +        def __get_capabilities(queue, remote, instance_name):
 | |
| 47 | +            interface = CapabilitiesInterface(grpc.insecure_channel(remote))
 | |
| 48 | + | |
| 49 | +            result = interface.get_capabilities(instance_name)
 | |
| 50 | +            queue.put(result.SerializeToString())
 | |
| 51 | + | |
| 52 | +        result = run_in_subprocess(__get_capabilities,
 | |
| 53 | +                                   self.__remote, instance_name)
 | |
| 54 | + | |
| 55 | +        capabilities = remote_execution_pb2.ServerCapabilities()
 | |
| 56 | +        capabilities.ParseFromString(result)
 | |
| 57 | +        return capabilities
 | |
| 58 | + | |
| 59 | + | |
| 60 | +@pytest.mark.parametrize('instance', INSTANCES)
 | |
| 61 | +def test_execution_not_available_capabilities(instance):
 | |
| 62 | +    with serve_capabilities_service([instance]) as server:
 | |
| 63 | +        server_interface = ServerInterface(server.remote)
 | |
| 64 | +        response = server_interface.get_capabilities(instance)
 | |
| 65 | + | |
| 66 | +        assert not response.execution_capabilities.exec_enabled
 | |
| 67 | + | |
| 68 | + | |
| 69 | +@pytest.mark.parametrize('instance', INSTANCES)
 | |
| 70 | +def test_execution_available_capabilities(instance):
 | |
| 71 | +    controller = ExecutionController()
 | |
| 72 | + | |
| 73 | +    with serve_capabilities_service([instance],
 | |
| 74 | +                                    execution_instance=controller.execution_instance) as server:
 | |
| 75 | +        server_interface = ServerInterface(server.remote)
 | |
| 76 | +        response = server_interface.get_capabilities(instance)
 | |
| 77 | + | |
| 78 | +        assert response.execution_capabilities.exec_enabled
 | |
| 79 | +        assert response.execution_capabilities.digest_function
 | |
| 80 | + | |
| 81 | + | |
| 82 | +@pytest.mark.parametrize('instance', INSTANCES)
 | |
| 83 | +def test_action_cache_allow_updates_capabilities(instance):
 | |
| 84 | +    storage = LRUMemoryCache(limit=256)
 | |
| 85 | +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
 | |
| 86 | + | |
| 87 | +    with serve_capabilities_service([instance],
 | |
| 88 | +                                    action_cache_instance=action_cache) as server:
 | |
| 89 | +        server_interface = ServerInterface(server.remote)
 | |
| 90 | +        response = server_interface.get_capabilities(instance)
 | |
| 91 | + | |
| 92 | +        assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
 | |
| 93 | + | |
| 94 | + | |
| 95 | +@pytest.mark.parametrize('instance', INSTANCES)
 | |
| 96 | +def test_action_cache_not_allow_updates_capabilities(instance):
 | |
| 97 | +    storage = LRUMemoryCache(limit=256)
 | |
| 98 | +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
 | |
| 99 | + | |
| 100 | +    with serve_capabilities_service([instance],
 | |
| 101 | +                                    action_cache_instance=action_cache) as server:
 | |
| 102 | +        server_interface = ServerInterface(server.remote)
 | |
| 103 | +        response = server_interface.get_capabilities(instance)
 | |
| 104 | + | |
| 105 | +        assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
 | |
| 106 | + | |
| 107 | + | |
| 108 | +@pytest.mark.parametrize('instance', INSTANCES)
 | |
| 109 | +def test_cas_capabilities(instance):
 | |
| 110 | +    cas = ContentAddressableStorageInstance(None)
 | |
| 111 | + | |
| 112 | +    with serve_capabilities_service([instance],
 | |
| 113 | +                                    cas_instance=cas) as server:
 | |
| 114 | +        server_interface = ServerInterface(server.remote)
 | |
| 115 | +        response = server_interface.get_capabilities(instance)
 | |
| 116 | + | |
| 117 | +        assert len(response.cache_capabilities.digest_function) == 1
 | |
| 118 | +        assert response.cache_capabilities.digest_function[0]
 | |
| 119 | +        assert response.cache_capabilities.symlink_absolute_path_strategy
 | |
| 120 | +        assert response.cache_capabilities.max_batch_total_size_bytes | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +from concurrent import futures
 | |
| 17 | +from contextlib import contextmanager
 | |
| 18 | +import multiprocessing
 | |
| 19 | +import os
 | |
| 20 | +import signal
 | |
| 21 | + | |
| 22 | +import grpc
 | |
| 23 | +import pytest_cov
 | |
| 24 | + | |
| 25 | +from buildgrid.server.capabilities.service import CapabilitiesService
 | |
| 26 | +from buildgrid.server.capabilities.instance import CapabilitiesInstance
 | |
| 27 | + | |
| 28 | + | |
| 29 | +@contextmanager
 | |
| 30 | +def serve_capabilities_service(instances,
 | |
| 31 | +                               cas_instance=None,
 | |
| 32 | +                               action_cache_instance=None,
 | |
| 33 | +                               execution_instance=None):
 | |
| 34 | +    server = Server(instances,
 | |
| 35 | +                    cas_instance,
 | |
| 36 | +                    action_cache_instance,
 | |
| 37 | +                    execution_instance)
 | |
| 38 | +    try:
 | |
| 39 | +        yield server
 | |
| 40 | +    finally:
 | |
| 41 | +        server.quit()
 | |
| 42 | + | |
| 43 | + | |
| 44 | +class Server:
 | |
| 45 | + | |
| 46 | +    def __init__(self, instances,
 | |
| 47 | +                 cas_instance=None,
 | |
| 48 | +                 action_cache_instance=None,
 | |
| 49 | +                 execution_instance=None):
 | |
| 50 | +        self.instances = instances
 | |
| 51 | + | |
| 52 | +        self.__queue = multiprocessing.Queue()
 | |
| 53 | +        self.__process = multiprocessing.Process(
 | |
| 54 | +            target=Server.serve,
 | |
| 55 | +            args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
 | |
| 56 | +        self.__process.start()
 | |
| 57 | + | |
| 58 | +        self.port = self.__queue.get(timeout=1)
 | |
| 59 | +        self.remote = 'localhost:{}'.format(self.port)
 | |
| 60 | + | |
| 61 | +    @staticmethod
 | |
| 62 | +    def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
 | |
| 63 | +        pytest_cov.embed.cleanup_on_sigterm()
 | |
| 64 | + | |
| 65 | +        # Use max_workers default from Python 3.5+
 | |
| 66 | +        max_workers = (os.cpu_count() or 1) * 5
 | |
| 67 | +        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | |
| 68 | +        port = server.add_insecure_port('localhost:0')
 | |
| 69 | + | |
| 70 | +        capabilities_service = CapabilitiesService(server)
 | |
| 71 | +        for name in instances:
 | |
| 72 | +            capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
 | |
| 73 | +            capabilities_service.add_instance(name, capabilities_instance)
 | |
| 74 | + | |
| 75 | +        server.start()
 | |
| 76 | +        queue.put(port)
 | |
| 77 | +        signal.pause()
 | |
| 78 | + | |
| 79 | +    def quit(self):
 | |
| 80 | +        if self.__process:
 | |
| 81 | +            self.__process.terminate()
 | |
| 82 | +            self.__process.join() | 
