finn pushed to branch finn/80-remote-parser at BuildGrid / buildgrid
Commits:
- 
a7f2ea44
by finnball at 2018-09-12T15:51:38Z
- 
142d7ef2
by finnball at 2018-09-12T15:51:42Z
- 
2e426480
by finnball at 2018-09-12T15:51:42Z
- 
8564f37c
by finnball at 2018-09-12T15:51:42Z
- 
cbda7bda
by finnball at 2018-09-12T15:51:42Z
9 changed files:
- buildgrid/_app/commands/cmd_server.py
- + buildgrid/_app/settings/__init__.py
- + buildgrid/_app/settings/cas.yml
- buildgrid/_app/settings/default.yml
- buildgrid/_app/settings/parser.py
- + buildgrid/_app/settings/remote-storage.yml
- buildgrid/server/cas/service.py
- buildgrid/server/cas/storage/remote.py
- tests/cas/test_storage.py
Changes:
| ... | ... | @@ -49,41 +49,49 @@ def start(context, config): | 
| 49 | 49 |      with open(config) as f:
 | 
| 50 | 50 |          settings = parser.get_parser().safe_load(f)
 | 
| 51 | 51 |  | 
| 52 | -    server_settings = settings['server']
 | |
| 53 | -    insecure_mode = server_settings['insecure-mode']
 | |
| 54 | - | |
| 55 | -    credentials = None
 | |
| 56 | -    if not insecure_mode:
 | |
| 57 | -        server_key = server_settings['tls-server-key']
 | |
| 58 | -        server_cert = server_settings['tls-server-cert']
 | |
| 59 | -        client_certs = server_settings['tls-client-certs']
 | |
| 60 | -        credentials = context.load_server_credentials(server_key, server_cert, client_certs)
 | |
| 61 | - | |
| 62 | -        if not credentials:
 | |
| 63 | -            click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
 | |
| 64 | -                       "Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
 | |
| 65 | -            sys.exit(-1)
 | |
| 66 | - | |
| 67 | -    instances = settings['instances']
 | |
| 68 | - | |
| 69 | -    execution_controllers = _instance_maker(instances, ExecutionController)
 | |
| 70 | - | |
| 71 | -    execution_instances = {}
 | |
| 72 | -    bots_interfaces = {}
 | |
| 73 | -    operations_instances = {}
 | |
| 74 | - | |
| 75 | -    # TODO: map properly in parser
 | |
| 76 | -    for k, v in execution_controllers.items():
 | |
| 77 | -        execution_instances[k] = v.execution_instance
 | |
| 78 | -        bots_interfaces[k] = v.bots_interface
 | |
| 79 | -        operations_instances[k] = v.operations_instance
 | |
| 80 | - | |
| 81 | -    reference_caches = _instance_maker(instances, ReferenceCache)
 | |
| 82 | -    action_caches = _instance_maker(instances, ActionCache)
 | |
| 83 | -    cas = _instance_maker(instances, ContentAddressableStorageInstance)
 | |
| 84 | -    bytestreams = _instance_maker(instances, ByteStreamInstance)
 | |
| 52 | +    try:
 | |
| 53 | +        server_settings = settings['server']
 | |
| 54 | +        insecure_mode = server_settings['insecure-mode']
 | |
| 55 | + | |
| 56 | +        credentials = None
 | |
| 57 | +        if not insecure_mode:
 | |
| 58 | +            credential_settings = server_settings['credentials']
 | |
| 59 | +            server_key = credential_settings['tls-server-key']
 | |
| 60 | +            server_cert = credential_settings['tls-server-cert']
 | |
| 61 | +            client_certs = credential_settings['tls-client-certs']
 | |
| 62 | +            credentials = context.load_server_credentials(server_key, server_cert, client_certs)
 | |
| 63 | + | |
| 64 | +            if not credentials:
 | |
| 65 | +                click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
 | |
| 66 | +                           "Set `insecure-mode: false` in order to deactivate TLS encryption.\n", err=True)
 | |
| 67 | +                sys.exit(-1)
 | |
| 68 | + | |
| 69 | +        port = server_settings['port']
 | |
| 70 | +        instances = settings['instances']
 | |
| 71 | + | |
| 72 | +        execution_controllers = _instance_maker(instances, ExecutionController)
 | |
| 73 | + | |
| 74 | +        execution_instances = {}
 | |
| 75 | +        bots_interfaces = {}
 | |
| 76 | +        operations_instances = {}
 | |
| 77 | + | |
| 78 | +        # TODO: map properly in parser
 | |
| 79 | +        # Issue 82
 | |
| 80 | +        for k, v in execution_controllers.items():
 | |
| 81 | +            execution_instances[k] = v.execution_instance
 | |
| 82 | +            bots_interfaces[k] = v.bots_interface
 | |
| 83 | +            operations_instances[k] = v.operations_instance
 | |
| 84 | + | |
| 85 | +        reference_caches = _instance_maker(instances, ReferenceCache)
 | |
| 86 | +        action_caches = _instance_maker(instances, ActionCache)
 | |
| 87 | +        cas = _instance_maker(instances, ContentAddressableStorageInstance)
 | |
| 88 | +        bytestreams = _instance_maker(instances, ByteStreamInstance)
 | |
| 89 | + | |
| 90 | +    except KeyError as e:
 | |
| 91 | +        click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
 | |
| 92 | +        sys.exit(-1)
 | |
| 93 | +>>>>>>> 682ac9f... Catch key errors when attempting to parse.
 | |
| 85 | 94 |  | 
| 86 | -    port = server_settings['port']
 | |
| 87 | 95 |      server = BuildGridServer(port=port,
 | 
| 88 | 96 |                               credentials=credentials,
 | 
| 89 | 97 |                               execution_instances=execution_instances,
 | 
| 1 | +server:
 | |
| 2 | +  port: 50052
 | |
| 3 | +  insecure-mode: true
 | |
| 4 | +  credentials:
 | |
| 5 | +    tls-server-key: null
 | |
| 6 | +    tls-server-cert: null
 | |
| 7 | +    tls-client-certs: null
 | |
| 8 | + | |
| 9 | +description: |
 | |
| 10 | +  Just a CAS.
 | |
| 11 | + | |
| 12 | +instances:
 | |
| 13 | +  - name: main
 | |
| 14 | +    description: |
 | |
| 15 | +      The main server
 | |
| 16 | + | |
| 17 | +    storages:
 | |
| 18 | +        - !disk-storage &main-storage
 | |
| 19 | +          path: ~/cas/
 | |
| 20 | + | |
| 21 | +    services:
 | |
| 22 | +      - !cas
 | |
| 23 | +        storage: *main-storage
 | |
| 24 | + | |
| 25 | +      - !bytestream
 | |
| 26 | +        storage: *main-storage | 
| 1 | 1 |  server:
 | 
| 2 | 2 |    port: 50051
 | 
| 3 | -  tls-server-key: null
 | |
| 4 | -  tls-server-cert: null
 | |
| 5 | -  tls-client-certs: null
 | |
| 6 | 3 |    insecure-mode: true
 | 
| 4 | +  credentials:
 | |
| 5 | +    tls-server-key: null
 | |
| 6 | +    tls-server-cert: null
 | |
| 7 | +    tls-client-certs: null
 | |
| 7 | 8 |  | 
| 8 | 9 |  description: |
 | 
| 9 | 10 |    A single default instance
 | 
| ... | ... | @@ -14,7 +14,11 @@ | 
| 14 | 14 |  | 
| 15 | 15 |  | 
| 16 | 16 |  import os
 | 
| 17 | +import sys
 | |
| 18 | +from urllib.parse import urlparse
 | |
| 17 | 19 |  | 
| 20 | +import click
 | |
| 21 | +import grpc
 | |
| 18 | 22 |  import yaml
 | 
| 19 | 23 |  | 
| 20 | 24 |  from buildgrid.server.controller import ExecutionController
 | 
| ... | ... | @@ -22,9 +26,12 @@ from buildgrid.server.actioncache.storage import ActionCache | 
| 22 | 26 |  from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
 | 
| 23 | 27 |  from buildgrid.server.cas.storage.disk import DiskStorage
 | 
| 24 | 28 |  from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
 | 
| 29 | +from buildgrid.server.cas.storage.remote import RemoteStorage
 | |
| 25 | 30 |  from buildgrid.server.cas.storage.s3 import S3Storage
 | 
| 26 | 31 |  from buildgrid.server.cas.storage.with_cache import WithCacheStorage
 | 
| 27 | 32 |  | 
| 33 | +from ..cli import Context
 | |
| 34 | + | |
| 28 | 35 |  | 
| 29 | 36 |  class YamlFactory(yaml.YAMLObject):
 | 
| 30 | 37 |      @classmethod
 | 
| ... | ... | @@ -58,6 +65,47 @@ class S3(YamlFactory): | 
| 58 | 65 |          return S3Storage(bucket, endpoint_url=endpoint)
 | 
| 59 | 66 |  | 
| 60 | 67 |  | 
| 68 | +class Remote(YamlFactory):
 | |
| 69 | + | |
| 70 | +    yaml_tag = u'!remote-storage'
 | |
| 71 | + | |
| 72 | +    def __new__(cls, url, instance_name, credentials=None):
 | |
| 73 | +        # TODO: Context could be passed into the parser.
 | |
| 74 | +        # Also find way to get instance_name from parent
 | |
| 75 | +        # Issue 82
 | |
| 76 | +        context = Context()
 | |
| 77 | + | |
| 78 | +        url = urlparse(url)
 | |
| 79 | +        remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | |
| 80 | + | |
| 81 | +        channel = None
 | |
| 82 | +        if url.scheme == 'http':
 | |
| 83 | +            channel = grpc.insecure_channel(remote)
 | |
| 84 | + | |
| 85 | +        else:
 | |
| 86 | +            if not credentials:
 | |
| 87 | +                click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
 | |
| 88 | +                           "Set remote url scheme to `http` in order to deactivate" +
 | |
| 89 | +                           "TLS encryption.\n", err=True)
 | |
| 90 | +                sys.exit(-1)
 | |
| 91 | + | |
| 92 | +            client_key = credentials['tls-client-key']
 | |
| 93 | +            client_cert = credentials['tls-client-cert']
 | |
| 94 | +            server_cert = credentials['tls-server-cert']
 | |
| 95 | +            credentials = context.load_client_credentials(client_key,
 | |
| 96 | +                                                          client_cert,
 | |
| 97 | +                                                          server_cert)
 | |
| 98 | +            if not credentials:
 | |
| 99 | +                click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
 | |
| 100 | +                           "Set remote url scheme to `http` in order to deactivate" +
 | |
| 101 | +                           "TLS encryption.\n", err=True)
 | |
| 102 | +                sys.exit(-1)
 | |
| 103 | + | |
| 104 | +            channel = grpc.secure_channel(remote, credentials)
 | |
| 105 | + | |
| 106 | +        return RemoteStorage(channel, instance_name)
 | |
| 107 | + | |
| 108 | + | |
| 61 | 109 |  class WithCache(YamlFactory):
 | 
| 62 | 110 |  | 
| 63 | 111 |      yaml_tag = u'!with-cache-storage'
 | 
| ... | ... | @@ -118,6 +166,7 @@ def get_parser(): | 
| 118 | 166 |      yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
 | 
| 119 | 167 |      yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
 | 
| 120 | 168 |      yaml.SafeLoader.add_constructor(S3.yaml_tag, S3.from_yaml)
 | 
| 169 | +    yaml.SafeLoader.add_constructor(Remote.yaml_tag, Remote.from_yaml)
 | |
| 121 | 170 |      yaml.SafeLoader.add_constructor(WithCache.yaml_tag, WithCache.from_yaml)
 | 
| 122 | 171 |      yaml.SafeLoader.add_constructor(CAS.yaml_tag, CAS.from_yaml)
 | 
| 123 | 172 |      yaml.SafeLoader.add_constructor(ByteStream.yaml_tag, ByteStream.from_yaml)
 | 
| 1 | +server:
 | |
| 2 | +  port: 50051
 | |
| 3 | +  insecure-mode: true
 | |
| 4 | +  credentials:
 | |
| 5 | +    tls-server-key: null
 | |
| 6 | +    tls-server-cert: null
 | |
| 7 | +    tls-client-certs: null
 | |
| 8 | + | |
| 9 | + | |
| 10 | +description: |
 | |
| 11 | +  A single default instance with remote storage.
 | |
| 12 | + | |
| 13 | +instances:
 | |
| 14 | +  - name: main
 | |
| 15 | +    description: |
 | |
| 16 | +      The main server
 | |
| 17 | + | |
| 18 | +    storages:
 | |
| 19 | +        - !remote-storage &main-storage
 | |
| 20 | +          url: "http://localhost:50052"
 | |
| 21 | +          instance_name: main
 | |
| 22 | +          credentials:
 | |
| 23 | +            tls-client-key: null
 | |
| 24 | +            tls-client-cert: null
 | |
| 25 | +            tls-server-cert: null
 | |
| 26 | + | |
| 27 | +    services:
 | |
| 28 | +      - !action-cache &main-action
 | |
| 29 | +        storage: *main-storage
 | |
| 30 | +        max_cached_refs: 256
 | |
| 31 | +        allow_updates: true
 | |
| 32 | + | |
| 33 | +      - !execution
 | |
| 34 | +        storage: *main-storage
 | |
| 35 | +        action_cache: *main-action
 | |
| 36 | + | |
| 37 | +      - !cas
 | |
| 38 | +        storage: *main-storage
 | |
| 39 | + | |
| 40 | +      - !bytestream
 | |
| 41 | +        storage: *main-storage | 
| ... | ... | @@ -89,15 +89,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): | 
| 89 | 89 |              # TODO: Decide on default instance name
 | 
| 90 | 90 |              if path[0] == "blobs":
 | 
| 91 | 91 |                  if len(path) < 3 or not path[2].isdigit():
 | 
| 92 | -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
 | |
| 92 | +                    raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name))
 | |
| 93 | 93 |                  instance_name = ""
 | 
| 94 | 94 |  | 
| 95 | 95 |              elif path[1] == "blobs":
 | 
| 96 | 96 |                  if len(path) < 4 or not path[3].isdigit():
 | 
| 97 | -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
 | |
| 97 | +                    raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name))
 | |
| 98 | 98 |  | 
| 99 | 99 |              else:
 | 
| 100 | -                raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
 | |
| 100 | +                raise InvalidArgumentError("Invalid resource name: {}".format(request.resource_name))
 | |
| 101 | 101 |  | 
| 102 | 102 |              instance = self._get_instance(instance_name)
 | 
| 103 | 103 |              yield from instance.read(path,
 | 
| ... | ... | @@ -134,15 +134,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): | 
| 134 | 134 |              # TODO: Sort out no instance name
 | 
| 135 | 135 |              if path[0] == "uploads":
 | 
| 136 | 136 |                  if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
 | 
| 137 | -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
 | |
| 137 | +                    raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name))
 | |
| 138 | 138 |                  instance_name = ""
 | 
| 139 | 139 |  | 
| 140 | 140 |              elif path[1] == "uploads":
 | 
| 141 | 141 |                  if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
 | 
| 142 | -                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
 | |
| 142 | +                    raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name))
 | |
| 143 | 143 |  | 
| 144 | 144 |              else:
 | 
| 145 | -                raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
 | |
| 145 | +                raise InvalidArgumentError("Invalid resource name: {}".format(first_request.resource_name))
 | |
| 146 | 146 |  | 
| 147 | 147 |              instance = self._get_instance(instance_name)
 | 
| 148 | 148 |              return instance.write(requests)
 | 
| ... | ... | @@ -23,6 +23,8 @@ Forwwards storage requests to a remote storage. | 
| 23 | 23 |  import io
 | 
| 24 | 24 |  import logging
 | 
| 25 | 25 |  | 
| 26 | +import grpc
 | |
| 27 | + | |
| 26 | 28 |  from buildgrid.utils import gen_fetch_blob, gen_write_request_blob
 | 
| 27 | 29 |  from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
 | 
| 28 | 30 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| ... | ... | @@ -32,7 +34,7 @@ from .storage_abc import StorageABC | 
| 32 | 34 |  | 
| 33 | 35 |  class RemoteStorage(StorageABC):
 | 
| 34 | 36 |  | 
| 35 | -    def __init__(self, channel, instance_name=""):
 | |
| 37 | +    def __init__(self, channel, instance_name):
 | |
| 36 | 38 |          self.logger = logging.getLogger(__name__)
 | 
| 37 | 39 |          self._instance_name = instance_name
 | 
| 38 | 40 |          self._stub_bs = bytestream_pb2_grpc.ByteStreamStub(channel)
 | 
| ... | ... | @@ -44,18 +46,29 @@ class RemoteStorage(StorageABC): | 
| 44 | 46 |          return False
 | 
| 45 | 47 |  | 
| 46 | 48 |      def get_blob(self, digest):
 | 
| 47 | -        fetched_data = io.BytesIO()
 | |
| 48 | -        length = 0
 | |
| 49 | -        for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
 | |
| 50 | -            length += fetched_data.write(data)
 | |
| 51 | - | |
| 52 | -        if length:
 | |
| 53 | -            assert digest.size_bytes == length
 | |
| 54 | -            fetched_data.seek(0)
 | |
| 55 | -            return fetched_data
 | |
| 56 | - | |
| 57 | -        else:
 | |
| 58 | -            return None
 | |
| 49 | +        try:
 | |
| 50 | +            fetched_data = io.BytesIO()
 | |
| 51 | +            length = 0
 | |
| 52 | + | |
| 53 | +            for data in gen_fetch_blob(self._stub_bs, digest, self._instance_name):
 | |
| 54 | +                length += fetched_data.write(data)
 | |
| 55 | + | |
| 56 | +            if length:
 | |
| 57 | +                assert digest.size_bytes == length
 | |
| 58 | +                fetched_data.seek(0)
 | |
| 59 | +                return fetched_data
 | |
| 60 | + | |
| 61 | +            else:
 | |
| 62 | +                return None
 | |
| 63 | + | |
| 64 | +        except grpc.RpcError as e:
 | |
| 65 | +            if e.code() == grpc.StatusCode.NOT_FOUND:
 | |
| 66 | +                pass
 | |
| 67 | +            else:
 | |
| 68 | +                self.logger.error(e.details())
 | |
| 69 | +                raise
 | |
| 70 | + | |
| 71 | +        return None
 | |
| 59 | 72 |  | 
| 60 | 73 |      def begin_write(self, digest):
 | 
| 61 | 74 |          return io.BytesIO(digest.SerializeToString())
 | 
| ... | ... | @@ -98,17 +98,6 @@ def instance(params): | 
| 98 | 98 |      return {params, MockCASStorage()}
 | 
| 99 | 99 |  | 
| 100 | 100 |  | 
| 101 | -@pytest.fixture()
 | |
| 102 | -@mock.patch.object(remote, 'bytestream_pb2_grpc')
 | |
| 103 | -@mock.patch.object(remote, 'remote_execution_pb2_grpc')
 | |
| 104 | -def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
 | |
| 105 | -    mock_server = MockStubServer()
 | |
| 106 | -    storage = remote.RemoteStorage(instance)
 | |
| 107 | -    storage._stub_bs = mock_server
 | |
| 108 | -    storage._stub_cas = mock_server
 | |
| 109 | -    yield storage
 | |
| 110 | - | |
| 111 | - | |
| 112 | 101 |  # General tests for all storage providers
 | 
| 113 | 102 |  | 
| 114 | 103 |  | 
| ... | ... | @@ -138,7 +127,7 @@ def any_storage(request): | 
| 138 | 127 |          with mock.patch.object(remote, 'bytestream_pb2_grpc'):
 | 
| 139 | 128 |              with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
 | 
| 140 | 129 |                  mock_server = MockStubServer()
 | 
| 141 | -                storage = remote.RemoteStorage(instance)
 | |
| 130 | +                storage = remote.RemoteStorage(instance, "")
 | |
| 142 | 131 |                  storage._stub_bs = mock_server
 | 
| 143 | 132 |                  storage._stub_cas = mock_server
 | 
| 144 | 133 |                  yield storage
 | 
