[Notes] [Git][BuildGrid/buildgrid][finn/separate-services] 5 commits: Separated out CAS instance from service



Title: GitLab

finn pushed to branch finn/separate-services at BuildGrid / buildgrid

Commits:

25 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -30,7 +30,7 @@ before_script:
    30 30
     .run-dummy-job-template: &dummy-job
    
    31 31
       stage: test
    
    32 32
       script:
    
    33
    -    - ${BGD} server start --allow-insecure &
    
    33
    +    - ${BGD} server start buildgrid/_app/settings/default.yml &
    
    34 34
         - sleep 1 # Allow server to boot
    
    35 35
         - ${BGD} bot dummy &
    
    36 36
         - ${BGD} execute request-dummy --wait-for-completion
    

  • buildgrid/_app/cli.py
    ... ... @@ -170,11 +170,8 @@ class BuildGridCLI(click.MultiCommand):
    170 170
             return commands
    
    171 171
     
    
    172 172
         def get_command(self, context, name):
    
    173
    -        try:
    
    174
    -            mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
    
    175
    -                             fromlist=['cli'])
    
    176
    -        except ImportError:
    
    177
    -            return None
    
    173
    +        mod = __import__(name='buildgrid._app.commands.cmd_{}'.format(name),
    
    174
    +                         fromlist=['cli'])
    
    178 175
             return mod.cli
    
    179 176
     
    
    180 177
     
    

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -22,20 +22,17 @@ Create a BuildGrid server.
    22 22
     
    
    23 23
     import asyncio
    
    24 24
     import logging
    
    25
    -import sys
    
    26 25
     
    
    27 26
     import click
    
    28 27
     
    
    29
    -from buildgrid.server import buildgrid_server
    
    30
    -from buildgrid.server.cas.storage.disk import DiskStorage
    
    31
    -from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    32
    -from buildgrid.server.cas.storage.s3 import S3Storage
    
    33
    -from buildgrid.server.cas.storage.with_cache import WithCacheStorage
    
    28
    +from buildgrid.server.controller import ExecutionController
    
    34 29
     from buildgrid.server.actioncache.storage import ActionCache
    
    30
    +from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    31
    +from buildgrid.server.referencestorage.storage import ReferenceCache
    
    35 32
     
    
    36 33
     from ..cli import pass_context
    
    37
    -
    
    38
    -_SIZE_PREFIXES = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
    
    34
    +from ..settings import parser
    
    35
    +from ..server import BuildGridServer
    
    39 36
     
    
    40 37
     
    
    41 38
     @click.group(name='server', short_help="Start a local server instance.")
    
    ... ... @@ -45,71 +42,31 @@ def cli(context):
    45 42
     
    
    46 43
     
    
    47 44
     @cli.command('start', short_help="Setup a new server instance.")
    
    48
    -@click.argument('instances', nargs=-1, type=click.STRING)
    
    49
    -@click.option('--port', type=click.INT, default='50051', show_default=True,
    
    50
    -              help="The port number to be listened.")
    
    51
    -@click.option('--server-key', type=click.Path(exists=True, dir_okay=False), default=None,
    
    52
    -              help="Private server key for TLS (PEM-encoded)")
    
    53
    -@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    54
    -              help="Public server certificate for TLS (PEM-encoded)")
    
    55
    -@click.option('--client-certs', type=click.Path(exists=True, dir_okay=False), default=None,
    
    56
    -              help="Public client certificates for TLS (PEM-encoded, one single file)")
    
    57
    -@click.option('--allow-insecure', type=click.BOOL, is_flag=True,
    
    58
    -              help="Whether or not to allow unencrypted connections.")
    
    59
    -@click.option('--allow-update-action-result/--forbid-update-action-result',
    
    60
    -              'allow_uar', default=True, show_default=True,
    
    61
    -              help="Whether or not to allow clients to manually edit the action cache.")
    
    62
    -@click.option('--max-cached-actions', type=click.INT, default=50, show_default=True,
    
    63
    -              help="Maximum number of actions to keep in the ActionCache.")
    
    64
    -@click.option('--cas', type=click.Choice(('lru', 's3', 'disk', 'with-cache')),
    
    65
    -              help="The CAS storage type to use.")
    
    66
    -@click.option('--cas-cache', type=click.Choice(('lru', 's3', 'disk')),
    
    67
    -              help="For --cas=with-cache, the CAS storage to use as the cache.")
    
    68
    -@click.option('--cas-fallback', type=click.Choice(('lru', 's3', 'disk')),
    
    69
    -              help="For --cas=with-cache, the CAS storage to use as the fallback.")
    
    70
    -@click.option('--cas-lru-size', type=click.STRING,
    
    71
    -              help="For --cas=lru, the LRU cache's memory limit.")
    
    72
    -@click.option('--cas-s3-bucket', type=click.STRING,
    
    73
    -              help="For --cas=s3, the bucket name.")
    
    74
    -@click.option('--cas-s3-endpoint', type=click.STRING,
    
    75
    -              help="For --cas=s3, the endpoint URI.")
    
    76
    -@click.option('--cas-disk-directory', type=click.Path(file_okay=False, dir_okay=True, writable=True),
    
    77
    -              help="For --cas=disk, the folder to store CAS blobs in.")
    
    45
    +@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
    
    78 46
     @pass_context
    
    79
    -def start(context, port, allow_insecure, server_key, server_cert, client_certs,
    
    80
    -          instances, max_cached_actions, allow_uar, cas, **cas_args):
    
    81
    -    """Setups a new server instance."""
    
    82
    -    credentials = None
    
    83
    -    if not allow_insecure:
    
    84
    -        credentials = context.load_server_credentials(server_key, server_cert, client_certs)
    
    85
    -    if not credentials and not allow_insecure:
    
    86
    -        click.echo("ERROR: no TLS keys were specified and no defaults could be found.\n" +
    
    87
    -                   "Use --allow-insecure in order to deactivate TLS encryption.\n", err=True)
    
    88
    -        sys.exit(-1)
    
    89
    -
    
    90
    -    context.credentials = credentials
    
    91
    -    context.port = port
    
    92
    -
    
    93
    -    context.logger.info("BuildGrid server booting up")
    
    94
    -    context.logger.info("Starting on port {}".format(port))
    
    95
    -
    
    96
    -    cas_storage = _make_cas_storage(context, cas, cas_args)
    
    97
    -
    
    98
    -    if cas_storage is None:
    
    99
    -        context.logger.info("Running without CAS - action cache will be unavailable")
    
    100
    -        action_cache = None
    
    101
    -
    
    102
    -    else:
    
    103
    -        action_cache = ActionCache(cas_storage, max_cached_actions, allow_uar)
    
    104
    -
    
    105
    -    if instances is None:
    
    106
    -        instances = ['main']
    
    107
    -
    
    108
    -    server = buildgrid_server.BuildGridServer(port=context.port,
    
    109
    -                                              credentials=context.credentials,
    
    110
    -                                              instances=instances,
    
    111
    -                                              cas_storage=cas_storage,
    
    112
    -                                              action_cache=action_cache)
    
    47
    +def start(context, config):
    
    48
    +    with open(config) as f:
    
    49
    +        settings = parser.get_parser().safe_load(f)
    
    50
    +
    
    51
    +    server_settings = settings['server']
    
    52
    +
    
    53
    +    instances = settings['instances']
    
    54
    +
    
    55
    +    execution_controllers = _instance_maker(instances, ExecutionController)
    
    56
    +    reference_caches = _instance_maker(instances, ReferenceCache)
    
    57
    +    action_caches = _instance_maker(instances, ActionCache)
    
    58
    +    cas = _instance_maker(instances, ContentAddressableStorageInstance)
    
    59
    +    bytestreams = _instance_maker(instances, ByteStreamInstance)
    
    60
    +
    
    61
    +    port = server_settings['port']
    
    62
    +    server = BuildGridServer(port=port,
    
    63
    +                             execution_controller=execution_controllers,
    
    64
    +                             reference_storage_instances=reference_caches,
    
    65
    +                             action_cache_instances=action_caches,
    
    66
    +                             cas_instances=cas,
    
    67
    +                             bytestream_instances=bytestreams)
    
    68
    +
    
    69
    +    context.logger.info("Starting server on port {}".format(port))
    
    113 70
         loop = asyncio.get_event_loop()
    
    114 71
         try:
    
    115 72
             server.start()
    
    ... ... @@ -119,57 +76,20 @@ def start(context, port, allow_insecure, server_key, server_cert, client_certs,
    119 76
             pass
    
    120 77
     
    
    121 78
         finally:
    
    79
    +        context.logger.info("Stopping server")
    
    122 80
             server.stop()
    
    123 81
             loop.close()
    
    124 82
     
    
    125 83
     
    
    126
    -def _make_cas_storage(context, cas_type, cas_args):
    
    127
    -    """Returns the storage provider corresponding to the given `cas_type`,
    
    128
    -    or None if the provider cannot be created.
    
    129
    -    """
    
    130
    -    if cas_type == "lru":
    
    131
    -        if cas_args["cas_lru_size"] is None:
    
    132
    -            context.logger.error("--cas-lru-size is required for LRU CAS")
    
    133
    -            return None
    
    134
    -        try:
    
    135
    -            size = _parse_size(cas_args["cas_lru_size"])
    
    136
    -        except ValueError:
    
    137
    -            context.logger.error('Invalid LRU size "{0}"'.format(cas_args["cas_lru_size"]))
    
    138
    -            return None
    
    139
    -        return LRUMemoryCache(size)
    
    140
    -    elif cas_type == "s3":
    
    141
    -        if cas_args["cas_s3_bucket"] is None:
    
    142
    -            context.logger.error("--cas-s3-bucket is required for S3 CAS")
    
    143
    -            return None
    
    144
    -        if cas_args["cas_s3_endpoint"] is not None:
    
    145
    -            return S3Storage(cas_args["cas_s3_bucket"],
    
    146
    -                             endpoint_url=cas_args["cas_s3_endpoint"])
    
    147
    -        return S3Storage(cas_args["cas_s3_bucket"])
    
    148
    -    elif cas_type == "disk":
    
    149
    -        if cas_args["cas_disk_directory"] is None:
    
    150
    -            context.logger.error("--cas-disk-directory is required for disk CAS")
    
    151
    -            return None
    
    152
    -        return DiskStorage(cas_args["cas_disk_directory"])
    
    153
    -    elif cas_type == "with-cache":
    
    154
    -        cache = _make_cas_storage(context, cas_args["cas_cache"], cas_args)
    
    155
    -        fallback = _make_cas_storage(context, cas_args["cas_fallback"], cas_args)
    
    156
    -        if cache is None:
    
    157
    -            context.logger.error("Missing cache provider for --cas=with-cache")
    
    158
    -            return None
    
    159
    -        elif fallback is None:
    
    160
    -            context.logger.error("Missing fallback provider for --cas=with-cache")
    
    161
    -            return None
    
    162
    -        return WithCacheStorage(cache, fallback)
    
    163
    -    elif cas_type is None:
    
    164
    -        return None
    
    165
    -    return None
    
    166
    -
    
    167
    -
    
    168
    -def _parse_size(size):
    
    169
    -    """Convert a string containing a size in bytes (e.g. '2GB') to a number."""
    
    170
    -    size = size.lower()
    
    171
    -    if size[-1] == 'b':
    
    172
    -        size = size[:-1]
    
    173
    -    if size[-1] in _SIZE_PREFIXES:
    
    174
    -        return int(size[:-1]) * _SIZE_PREFIXES[size[-1]]
    
    175
    -    return int(size)
    84
    +# Turn away now if you want to keep your eyes
    
    85
    +def _instance_maker(instances, service_type):
    
    86
    +    # TODO get this mapped in parser
    
    87
    +    made = {}
    
    88
    +
    
    89
    +    for instance in instances:
    
    90
    +        services = instance['services']
    
    91
    +        instance_name = instance['name']
    
    92
    +        for service in services:
    
    93
    +            if isinstance(service, service_type):
    
    94
    +                made[instance_name] = service
    
    95
    +    return made

  • buildgrid/_app/server.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +BuildGridServer
    
    18
    +==============
    
    19
    +
    
    20
    +Creates a BuildGrid server, binding all the requisite service instances together.
    
    21
    +"""
    
    22
    +
    
    23
    +import logging
    
    24
    +from concurrent import futures
    
    25
    +
    
    26
    +import grpc
    
    27
    +
    
    28
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    29
    +from buildgrid.server.actioncache.service import ActionCacheService
    
    30
    +from buildgrid.server.execution.service import ExecutionService
    
    31
    +from buildgrid.server.operations.service import OperationsService
    
    32
    +from buildgrid.server.bots.service import BotsService
    
    33
    +from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    34
    +
    
    35
    +
    
    36
    +class BuildGridServer:
    
    37
    +
    
    38
    +    def __init__(self, port=50051, max_workers=10, credentials=None,
    
    39
    +                 execution_controller=None, reference_storage_instances=None,
    
    40
    +                 action_cache_instances=None, cas_instances=None, bytestream_instances=None):
    
    41
    +
    
    42
    +        self.logger = logging.getLogger(__name__)
    
    43
    +        address = '[::]:{0}'.format(port)
    
    44
    +
    
    45
    +        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    46
    +
    
    47
    +        if credentials is not None:
    
    48
    +            self.logger.info("Secure connection")
    
    49
    +            server.add_secure_port(address, credentials)
    
    50
    +
    
    51
    +        else:
    
    52
    +            self.logger.info("Insecure connection")
    
    53
    +            server.add_insecure_port(address)
    
    54
    +
    
    55
    +        if execution_controller:
    
    56
    +            self.logger.debug("Adding execution controllers {}".format(
    
    57
    +                execution_controller.keys()))
    
    58
    +            ExecutionService(server, execution_controller)
    
    59
    +            BotsService(server, execution_controller)
    
    60
    +            OperationsService(server, execution_controller)
    
    61
    +
    
    62
    +        if reference_storage_instances:
    
    63
    +            self.logger.debug("Adding reference storages {}".format(
    
    64
    +                reference_storage_instances.keys()))
    
    65
    +            ReferenceStorageService(server, reference_storage_instances)
    
    66
    +
    
    67
    +        if action_cache_instances:
    
    68
    +            self.logger.debug("Adding action cache instances {}".format(
    
    69
    +                action_cache_instances.keys()))
    
    70
    +            ActionCacheService(server, action_cache_instances)
    
    71
    +
    
    72
    +        if cas_instances:
    
    73
    +            self.logger.debug("Adding cas instances {}".format(
    
    74
    +                cas_instances.keys()))
    
    75
    +            ContentAddressableStorageService(server, cas_instances)
    
    76
    +
    
    77
    +        if bytestream_instances:
    
    78
    +            self.logger.debug("Adding bytestream instances {}".format(
    
    79
    +                bytestream_instances.keys()))
    
    80
    +            ByteStreamService(server, bytestream_instances)
    
    81
    +
    
    82
    +        self._server = server
    
    83
    +
    
    84
    +    def start(self):
    
    85
    +        self._server.start()
    
    86
    +
    
    87
    +    def stop(self):
    
    88
    +        self._server.stop(grace=0)

  • buildgrid/_app/settings/default.yml
    1
    +server:
    
    2
    +  port: 50051
    
    3
    +  tls-server-key: null
    
    4
    +  tls-server-cert: null
    
    5
    +  tls-client-certs: null
    
    6
    +  insecure-mode: true
    
    7
    +
    
    8
    +description: |
    
    9
    +  A single default instance
    
    10
    +
    
    11
    +instances:
    
    12
    +  - name: main
    
    13
    +    description: |
    
    14
    +      The main server
    
    15
    +
    
    16
    +    storages:
    
    17
    +        - !disk-storage &main-storage
    
    18
    +          path: ~/cas/
    
    19
    +
    
    20
    +    services:
    
    21
    +      - !action-cache &main-action
    
    22
    +        storage: *main-storage
    
    23
    +        max_cached_refs: 256
    
    24
    +        allow_updates: true
    
    25
    +
    
    26
    +      - !execution
    
    27
    +        storage: *main-storage
    
    28
    +        action_cache: *main-action
    
    29
    +
    
    30
    +      - !cas
    
    31
    +        storage: *main-storage
    
    32
    +
    
    33
    +      - !bytestream
    
    34
    +        storage: *main-storage

  • buildgrid/_app/settings/parser.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import yaml
    
    17
    +
    
    18
    +from buildgrid.server.controller import ExecutionController
    
    19
    +from buildgrid.server.actioncache.storage import ActionCache
    
    20
    +from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    21
    +from buildgrid.server.cas.storage.disk import DiskStorage
    
    22
    +from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    23
    +
    
    24
    +
    
    25
    +class YamlFactory(yaml.YAMLObject):
    
    26
    +    @classmethod
    
    27
    +    def from_yaml(cls, loader, node):
    
    28
    +        values = loader.construct_mapping(node, deep=True)
    
    29
    +        return cls(**values)
    
    30
    +
    
    31
    +
    
    32
    +class Disk(YamlFactory):
    
    33
    +
    
    34
    +    yaml_tag = u'!disk-storage'
    
    35
    +
    
    36
    +    def __new__(cls, path):
    
    37
    +        return DiskStorage(path)
    
    38
    +
    
    39
    +
    
    40
    +class LRU(YamlFactory):
    
    41
    +
    
    42
    +    yaml_tag = u'!lru-storage'
    
    43
    +
    
    44
    +    def __new__(cls, size):
    
    45
    +        return LRUMemoryCache(_parse_size(size))
    
    46
    +
    
    47
    +
    
    48
    +class Execution(YamlFactory):
    
    49
    +
    
    50
    +    yaml_tag = u'!execution'
    
    51
    +
    
    52
    +    def __new__(cls, storage, action_cache=None):
    
    53
    +        return ExecutionController(action_cache, storage)
    
    54
    +
    
    55
    +
    
    56
    +class Action(YamlFactory):
    
    57
    +
    
    58
    +    yaml_tag = u'!action-cache'
    
    59
    +
    
    60
    +    def __new__(cls, storage, max_cached_refs=0, allow_updates=True):
    
    61
    +        return ActionCache(storage, max_cached_refs, allow_updates)
    
    62
    +
    
    63
    +
    
    64
    +class CAS(YamlFactory):
    
    65
    +
    
    66
    +    yaml_tag = u'!cas'
    
    67
    +
    
    68
    +    def __new__(cls, storage):
    
    69
    +        return ContentAddressableStorageInstance(storage)
    
    70
    +
    
    71
    +
    
    72
    +class ByteStream(YamlFactory):
    
    73
    +
    
    74
    +    yaml_tag = u'!bytestream'
    
    75
    +
    
    76
    +    def __new__(cls, storage):
    
    77
    +        return ByteStreamInstance(storage)
    
    78
    +
    
    79
    +
    
    80
    +def _parse_size(size):
    
    81
    +    """Convert a string containing a size in bytes (e.g. '2GB') to a number."""
    
    82
    +    _size_prefixes = {'k': 2 ** 10, 'm': 2 ** 20, 'g': 2 ** 30, 't': 2 ** 40}
    
    83
    +    size = size.lower()
    
    84
    +
    
    85
    +    if size[-1] == 'b':
    
    86
    +        size = size[:-1]
    
    87
    +    if size[-1] in _size_prefixes:
    
    88
    +        return int(size[:-1]) * _size_prefixes[size[-1]]
    
    89
    +    return int(size)
    
    90
    +
    
    91
    +
    
    92
    +def get_parser():
    
    93
    +
    
    94
    +    yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
    
    95
    +    yaml.SafeLoader.add_constructor(Execution.yaml_tag, Execution.from_yaml)
    
    96
    +    yaml.SafeLoader.add_constructor(Action.yaml_tag, Action.from_yaml)
    
    97
    +    yaml.SafeLoader.add_constructor(Disk.yaml_tag, Disk.from_yaml)
    
    98
    +    yaml.SafeLoader.add_constructor(LRU.yaml_tag, LRU.from_yaml)
    
    99
    +    yaml.SafeLoader.add_constructor(CAS.yaml_tag, CAS.from_yaml)
    
    100
    +    yaml.SafeLoader.add_constructor(ByteStream.yaml_tag, ByteStream.from_yaml)
    
    101
    +
    
    102
    +    return yaml

  • buildgrid/server/actioncache/service.py
    ... ... @@ -27,18 +27,27 @@ import grpc
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    29 29
     
    
    30
    -from .._exceptions import NotFoundError
    
    30
    +from .._exceptions import InvalidArgumentError, NotFoundError
    
    31 31
     
    
    32 32
     
    
    33 33
     class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    
    34 34
     
    
    35
    -    def __init__(self, action_cache):
    
    36
    -        self._action_cache = action_cache
    
    35
    +    def __init__(self, server, instances):
    
    36
    +        self._instances = instances
    
    37
    +
    
    37 38
             self.logger = logging.getLogger(__name__)
    
    38 39
     
    
    40
    +        remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
    
    41
    +
    
    39 42
         def GetActionResult(self, request, context):
    
    40 43
             try:
    
    41
    -            return self._action_cache.get_action_result(request.action_digest)
    
    44
    +            instance = self._get_instance(request.instance_name)
    
    45
    +            return instance.get_action_result(request.action_digest)
    
    46
    +
    
    47
    +        except InvalidArgumentError as e:
    
    48
    +            self.logger.error(e)
    
    49
    +            context.set_details(str(e))
    
    50
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    42 51
     
    
    43 52
             except NotFoundError as e:
    
    44 53
                 self.logger.error(e)
    
    ... ... @@ -48,11 +57,24 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    48 57
     
    
    49 58
         def UpdateActionResult(self, request, context):
    
    50 59
             try:
    
    51
    -            self._action_cache.update_action_result(request.action_digest, request.action_result)
    
    60
    +            instance = self._get_instance(request.instance_name)
    
    61
    +            instance.update_action_result(request.action_digest, request.action_result)
    
    52 62
                 return request.action_result
    
    53 63
     
    
    64
    +        except InvalidArgumentError as e:
    
    65
    +            self.logger.error(e)
    
    66
    +            context.set_details(str(e))
    
    67
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68
    +
    
    54 69
             except NotImplementedError as e:
    
    55 70
                 self.logger.error(e)
    
    56 71
                 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    57 72
     
    
    58 73
             return remote_execution_pb2.ActionResult()
    
    74
    +
    
    75
    +    def _get_instance(self, instance_name):
    
    76
    +        try:
    
    77
    +            return self._instances[instance_name]
    
    78
    +
    
    79
    +        except KeyError:
    
    80
    +            raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))

  • buildgrid/server/bots/service.py
    ... ... @@ -33,10 +33,12 @@ from .._exceptions import InvalidArgumentError, OutofSyncError
    33 33
     
    
    34 34
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    35 35
     
    
    36
    -    def __init__(self, instances):
    
    36
    +    def __init__(self, server, instances):
    
    37 37
             self._instances = instances
    
    38 38
             self.logger = logging.getLogger(__name__)
    
    39 39
     
    
    40
    +        bots_pb2_grpc.add_BotsServicer_to_server(self, server)
    
    41
    +
    
    40 42
         def CreateBotSession(self, request, context):
    
    41 43
             try:
    
    42 44
                 parent = request.parent
    

  • buildgrid/server/buildgrid_server.py deleted
    1
    -# Copyright (C) 2018 Bloomberg LP
    
    2
    -#
    
    3
    -# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    -# you may not use this file except in compliance with the License.
    
    5
    -# You may obtain a copy of the License at
    
    6
    -#
    
    7
    -#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    -#
    
    9
    -# Unless required by applicable law or agreed to in writing, software
    
    10
    -# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    -# See the License for the specific language governing permissions and
    
    13
    -# limitations under the License.
    
    14
    -
    
    15
    -
    
    16
    -"""
    
    17
    -BuildGridServer
    
    18
    -==============
    
    19
    -
    
    20
    -Creates the user a local server BuildGrid server.
    
    21
    -"""
    
    22
    -
    
    23
    -from concurrent import futures
    
    24
    -
    
    25
    -import grpc
    
    26
    -
    
    27
    -from buildgrid._protos.google.bytestream import bytestream_pb2_grpc
    
    28
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    29
    -from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    30
    -from buildgrid._protos.google.longrunning import operations_pb2_grpc
    
    31
    -
    
    32
    -from .instance import BuildGridInstance
    
    33
    -from .cas.service import ByteStreamService, ContentAddressableStorageService
    
    34
    -from .actioncache.service import ActionCacheService
    
    35
    -from .execution.service import ExecutionService
    
    36
    -from .operations.service import OperationsService
    
    37
    -from .bots.service import BotsService
    
    38
    -
    
    39
    -
    
    40
    -class BuildGridServer:
    
    41
    -
    
    42
    -    def __init__(self, port=50051, credentials=None, instances=None,
    
    43
    -                 max_workers=10, action_cache=None, cas_storage=None):
    
    44
    -        address = '[::]:{0}'.format(port)
    
    45
    -
    
    46
    -        self._server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    47
    -
    
    48
    -        if credentials is not None:
    
    49
    -            self._server.add_secure_port(address, credentials)
    
    50
    -        else:
    
    51
    -            self._server.add_insecure_port(address)
    
    52
    -
    
    53
    -        if cas_storage is not None:
    
    54
    -            cas_service = ContentAddressableStorageService(cas_storage)
    
    55
    -            remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(cas_service,
    
    56
    -                                                                                      self._server)
    
    57
    -            bytestream_pb2_grpc.add_ByteStreamServicer_to_server(ByteStreamService(cas_storage),
    
    58
    -                                                                 self._server)
    
    59
    -        if action_cache is not None:
    
    60
    -            action_cache_service = ActionCacheService(action_cache)
    
    61
    -            remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(action_cache_service,
    
    62
    -                                                                        self._server)
    
    63
    -
    
    64
    -        buildgrid_instances = {}
    
    65
    -        if not instances:
    
    66
    -            buildgrid_instances["main"] = BuildGridInstance(action_cache, cas_storage)
    
    67
    -        else:
    
    68
    -            for name in instances:
    
    69
    -                buildgrid_instances[name] = BuildGridInstance(action_cache, cas_storage)
    
    70
    -
    
    71
    -        bots_pb2_grpc.add_BotsServicer_to_server(BotsService(buildgrid_instances),
    
    72
    -                                                 self._server)
    
    73
    -        remote_execution_pb2_grpc.add_ExecutionServicer_to_server(ExecutionService(buildgrid_instances),
    
    74
    -                                                                  self._server)
    
    75
    -        operations_pb2_grpc.add_OperationsServicer_to_server(OperationsService(buildgrid_instances),
    
    76
    -                                                             self._server)
    
    77
    -
    
    78
    -    def start(self):
    
    79
    -        self._server.start()
    
    80
    -
    
    81
    -    def stop(self):
    
    82
    -        self._server.stop(0)

  • buildgrid/server/cas/instance.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +"""
    
    17
    +Storage Instances
    
    18
    +=========
    
    19
    +Instances of CAS and ByteStream
    
    20
    +"""
    
    21
    +
    
    22
    +from buildgrid._protos.google.bytestream import bytestream_pb2
    
    23
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    24
    +
    
    25
    +from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    26
    +from ...settings import HASH
    
    27
    +
    
    28
    +
    
    29
    +class ContentAddressableStorageInstance:
    
    30
    +
    
    31
    +    def __init__(self, storage):
    
    32
    +        self._storage = storage
    
    33
    +
    
    34
    +    def find_missing_blobs(self, blob_digests):
    
    35
    +        storage = self._storage
    
    36
    +        return re_pb2.FindMissingBlobsResponse(
    
    37
    +            missing_blob_digests=storage.missing_blobs(blob_digests))
    
    38
    +
    
    39
    +    def batch_update_blobs(self, requests):
    
    40
    +        storage = self._storage
    
    41
    +        store = []
    
    42
    +        for request_proto in requests:
    
    43
    +            store.append((request_proto.digest, request_proto.data))
    
    44
    +
    
    45
    +        response = re_pb2.BatchUpdateBlobsResponse()
    
    46
    +        statuses = storage.bulk_update_blobs(store)
    
    47
    +
    
    48
    +        for (digest, _), status in zip(store, statuses):
    
    49
    +            response_proto = response.responses.add()
    
    50
    +            response_proto.digest.CopyFrom(digest)
    
    51
    +            response_proto.status.CopyFrom(status)
    
    52
    +
    
    53
    +        return response
    
    54
    +
    
    55
    +
    
    56
    +class ByteStreamInstance:
    
    57
    +
    
    58
    +    BLOCK_SIZE = 1 * 1024 * 1024  # 1 MB block size
    
    59
    +
    
    60
    +    def __init__(self, storage):
    
    61
    +        self._storage = storage
    
    62
    +
    
    63
    +    def read(self, path, read_offset, read_limit):
    
    64
    +        storage = self._storage
    
    65
    +
    
    66
    +        if path[0] == "blobs"
    
    67
    +            path = [""] + path
    
    68
    +
    
    69
    +        # Parse/verify resource name.
    
    70
    +        # Read resource names look like "[instance/]blobs/abc123hash/99".
    
    71
    +        digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
    
    72
    +
    
    73
    +        # Check the given read offset and limit.
    
    74
    +        if read_offset < 0 or read_offset > digest.size_bytes:
    
    75
    +            raise OutOfRangeError("Read offset out of range")
    
    76
    +
    
    77
    +        elif read_limit == 0:
    
    78
    +            bytes_remaining = digest.size_bytes - read_offset
    
    79
    +
    
    80
    +        elif read_limit > 0:
    
    81
    +            bytes_remaining = read_limit
    
    82
    +
    
    83
    +        else:
    
    84
    +            raise InvalidArgumentError("Negative read_limit is invalid")
    
    85
    +
    
    86
    +        # Read the blob from storage and send its contents to the client.
    
    87
    +        result = storage.get_blob(digest)
    
    88
    +        if result is None:
    
    89
    +            raise NotFoundError("Blob not found")
    
    90
    +
    
    91
    +        elif result.seekable():
    
    92
    +            result.seek(read_offset)
    
    93
    +
    
    94
    +        else:
    
    95
    +            result.read(read_offset)
    
    96
    +
    
    97
    +        while bytes_remaining > 0:
    
    98
    +            yield bytestream_pb2.ReadResponse(
    
    99
    +                data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
    
    100
    +            bytes_remaining -= self.BLOCK_SIZE
    
    101
    +
    
    102
    +    def write(self, requests):
    
    103
    +        storage = self._storage
    
    104
    +
    
    105
    +        first_request = next(requests)
    
    106
    +        path = first_request.resource_name.split("/")
    
    107
    +
    
    108
    +        if path[0] == "uploads":
    
    109
    +            path = [""] + path
    
    110
    +
    
    111
    +        if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
    
    112
    +            raise InvalidArgumentError("Invalid resource name")
    
    113
    +
    
    114
    +        digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
    
    115
    +        write_session = storage.begin_write(digest)
    
    116
    +
    
    117
    +        # Start the write session and write the first request's data.
    
    118
    +        write_session.write(first_request.data)
    
    119
    +        hash_ = HASH(first_request.data)
    
    120
    +        bytes_written = len(first_request.data)
    
    121
    +        finished = first_request.finish_write
    
    122
    +
    
    123
    +        # Handle subsequent write requests.
    
    124
    +        while not finished:
    
    125
    +
    
    126
    +            for request in requests:
    
    127
    +                if finished:
    
    128
    +                    raise InvalidArgumentError("Write request sent after write finished")
    
    129
    +
    
    130
    +                elif request.write_offset != bytes_written:
    
    131
    +                    raise InvalidArgumentError("Invalid write offset")
    
    132
    +
    
    133
    +                elif request.resource_name and request.resource_name != first_request.resource_name:
    
    134
    +                    raise InvalidArgumentError("Resource name changed mid-write")
    
    135
    +
    
    136
    +                finished = request.finish_write
    
    137
    +                bytes_written += len(request.data)
    
    138
    +                if bytes_written > digest.size_bytes:
    
    139
    +                    raise InvalidArgumentError("Wrote too much data to blob")
    
    140
    +
    
    141
    +                write_session.write(request.data)
    
    142
    +                hash_.update(request.data)
    
    143
    +
    
    144
    +        # Check that the data matches the provided digest.
    
    145
    +        if bytes_written != digest.size_bytes or not finished:
    
    146
    +            raise NotImplementedError("Cannot close stream before finishing write")
    
    147
    +
    
    148
    +        elif hash_.hexdigest() != digest.hash:
    
    149
    +            raise InvalidArgumentError("Data does not match hash")
    
    150
    +
    
    151
    +        storage.commit_write(digest, write_session)
    
    152
    +        return bytestream_pb2.WriteResponse(committed_size=bytes_written)

  • buildgrid/server/cas/service.py
    ... ... @@ -21,131 +21,146 @@ Implements the Content Addressable Storage API and ByteStream API.
    21 21
     """
    
    22 22
     
    
    23 23
     
    
    24
    +from itertools import tee
    
    25
    +import logging
    
    26
    +
    
    24 27
     import grpc
    
    25 28
     
    
    26 29
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    27
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    28
    -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc as re_pb2_grpc
    
    30
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    32
    +
    
    33
    +from .._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    29 34
     
    
    30
    -from ...settings import HASH
    
    31 35
     
    
    36
    +class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    32 37
     
    
    33
    -class ContentAddressableStorageService(re_pb2_grpc.ContentAddressableStorageServicer):
    
    38
    +    def __init__(self, server, instances):
    
    39
    +        self.logger = logging.getLogger(__name__)
    
    40
    +        self._instances = instances
    
    34 41
     
    
    35
    -    def __init__(self, storage):
    
    36
    -        self._storage = storage
    
    42
    +        remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
    
    37 43
     
    
    38 44
         def FindMissingBlobs(self, request, context):
    
    39
    -        # Only one instance for now.
    
    40
    -        storage = self._storage
    
    41
    -        return re_pb2.FindMissingBlobsResponse(
    
    42
    -            missing_blob_digests=storage.missing_blobs(request.blob_digests))
    
    45
    +        try:
    
    46
    +            instance = self._get_instance(request.instance_name)
    
    47
    +            return instance.find_missing_blobs(request.blob_digests)
    
    48
    +
    
    49
    +        except InvalidArgumentError as e:
    
    50
    +            self.logger.error(e)
    
    51
    +            context.set_details(str(e))
    
    52
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    53
    +
    
    54
    +        return remote_execution_pb2.FindMissingBlobsResponse()
    
    43 55
     
    
    44 56
         def BatchUpdateBlobs(self, request, context):
    
    45
    -        # Only one instance for now.
    
    46
    -        storage = self._storage
    
    47
    -        requests = []
    
    48
    -        for request_proto in request.requests:
    
    49
    -            requests.append((request_proto.digest, request_proto.data))
    
    50
    -        response = re_pb2.BatchUpdateBlobsResponse()
    
    51
    -        for (digest, _), status in zip(requests, storage.bulk_update_blobs(requests)):
    
    52
    -            response_proto = response.responses.add()
    
    53
    -            response_proto.digest.CopyFrom(digest)
    
    54
    -            response_proto.status.CopyFrom(status)
    
    55
    -        return response
    
    57
    +        try:
    
    58
    +            instance = self._get_instance(request.instance_name)
    
    59
    +            return instance.batch_update_blobs(request.requests)
    
    60
    +
    
    61
    +        except InvalidArgumentError as e:
    
    62
    +            self.logger.error(e)
    
    63
    +            context.set_details(str(e))
    
    64
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    65
    +
    
    66
    +        return remote_execution_pb2.BatchReadBlobsResponse()
    
    67
    +
    
    68
    +    def _get_instance(self, instance_name):
    
    69
    +        try:
    
    70
    +            return self._instances[instance_name]
    
    71
    +
    
    72
    +        except KeyError:
    
    73
    +            raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))
    
    56 74
     
    
    57 75
     
    
    58 76
     class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    
    59 77
     
    
    60
    -    BLOCK_SIZE = 1 * 1024 * 1024  # 1 MB block size
    
    78
    +    def __init__(self, server, instances):
    
    79
    +        self.logger = logging.getLogger(__name__)
    
    80
    +        self._instances = instances
    
    61 81
     
    
    62
    -    def __init__(self, storage):
    
    63
    -        self._storage = storage
    
    82
    +        bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
    
    64 83
     
    
    65 84
         def Read(self, request, context):
    
    66
    -        # Only one instance for now.
    
    67
    -        storage = self._storage
    
    68
    -
    
    69
    -        # Parse/verify resource name.
    
    70
    -        # Read resource names look like "[instance/]blobs/abc123hash/99".
    
    71
    -        path = request.resource_name.split("/")
    
    72
    -        if len(path) == 3:
    
    73
    -            path = [""] + path
    
    74
    -        if len(path) != 4 or path[1] != "blobs" or not path[3].isdigit():
    
    75
    -            context.abort(grpc.StatusCode.NOT_FOUND, "Invalid resource name")
    
    76
    -        # instance_name = path[0]
    
    77
    -        digest = re_pb2.Digest(hash=path[2], size_bytes=int(path[3]))
    
    78
    -
    
    79
    -        # Check the given read offset and limit.
    
    80
    -        if request.read_offset < 0 or request.read_offset > digest.size_bytes:
    
    81
    -            context.abort(grpc.StatusCode.OUT_OF_RANGE, "Read offset out of range")
    
    82
    -        elif request.read_limit == 0:
    
    83
    -            bytes_remaining = digest.size_bytes - request.read_offset
    
    84
    -        elif request.read_limit > 0:
    
    85
    -            bytes_remaining = request.read_limit
    
    86
    -        else:
    
    87
    -            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Negative read_limit is invalid")
    
    88
    -
    
    89
    -        # Read the blob from storage and send its contents to the client.
    
    90
    -        result = storage.get_blob(digest)
    
    91
    -        if result is None:
    
    92
    -            context.abort(grpc.StatusCode.NOT_FOUND, "Blob not found")
    
    93
    -        elif result.seekable():
    
    94
    -            result.seek(request.read_offset)
    
    95
    -        else:
    
    96
    -            result.read(request.read_offset)
    
    97
    -        while bytes_remaining > 0:
    
    98
    -            yield bytestream_pb2.ReadResponse(
    
    99
    -                data=result.read(min(self.BLOCK_SIZE, bytes_remaining)))
    
    100
    -            bytes_remaining -= self.BLOCK_SIZE
    
    101
    -
    
    102
    -    def Write(self, request_iterator, context):
    
    103
    -        # Only one instance for now.
    
    104
    -        storage = self._storage
    
    105
    -
    
    106
    -        requests = iter(request_iterator)
    
    107
    -        first_request = next(requests)
    
    108
    -        if first_request.write_offset != 0:
    
    109
    -            context.abort(grpc.StatusCode.UNIMPLEMENTED, "Nonzero write offset is unsupported")
    
    110
    -
    
    111
    -        # Parse/verify resource name.
    
    112
    -        # Write resource names look like "[instance/]uploads/SOME-GUID/blobs/abc123hash/99".
    
    113
    -        path = first_request.resource_name.split("/")
    
    114
    -        if path[0] == "uploads":
    
    115
    -            path = [""] + path
    
    116
    -        if len(path) < 6 or path[1] != "uploads" or path[3] != "blobs" or not path[5].isdigit():
    
    117
    -            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid resource name")
    
    118
    -        # instance_name = path[0]
    
    119
    -        digest = re_pb2.Digest(hash=path[4], size_bytes=int(path[5]))
    
    120
    -
    
    121
    -        # Start the write session and write the first request's data.
    
    122
    -        write_session = storage.begin_write(digest)
    
    123
    -        write_session.write(first_request.data)
    
    124
    -        hash_ = HASH(first_request.data)
    
    125
    -        bytes_written = len(first_request.data)
    
    126
    -        done = first_request.finish_write
    
    127
    -
    
    128
    -        # Handle subsequent write requests.
    
    129
    -        for request in requests:
    
    130
    -            if done:
    
    131
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT,
    
    132
    -                              "Write request sent after write finished")
    
    133
    -            elif request.write_offset != bytes_written:
    
    134
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Invalid write offset")
    
    135
    -            elif request.resource_name and request.resource_name != first_request.resource_name:
    
    136
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Resource name changed mid-write")
    
    137
    -            done = request.finish_write
    
    138
    -            bytes_written += len(request.data)
    
    139
    -            if bytes_written > digest.size_bytes:
    
    140
    -                context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Wrote too much data to blob")
    
    141
    -            write_session.write(request.data)
    
    142
    -            hash_.update(request.data)
    
    143
    -
    
    144
    -        # Check that the data matches the provided digest.
    
    145
    -        if bytes_written != digest.size_bytes or not done:
    
    146
    -            context.abort(grpc.StatusCode.UNIMPLEMENTED,
    
    147
    -                          "Cannot close stream before finishing write")
    
    148
    -        elif hash_.hexdigest() != digest.hash:
    
    149
    -            context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Data does not match hash")
    
    150
    -        storage.commit_write(digest, write_session)
    
    151
    -        return bytestream_pb2.WriteResponse(committed_size=bytes_written)
    85
    +        try:
    
    86
    +            path = request.resource_name.split("/")
    
    87
    +            instance_name = path[0]
    
    88
    +
    
    89
    +            # TODO: Decide on default instance name
    
    90
    +            if path[0] == "blobs":
    
    91
    +                if len(path) < 3 or not path[2].isdigit():
    
    92
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    93
    +                instance_name = ""
    
    94
    +
    
    95
    +            elif path[1] == "blobs":
    
    96
    +                if len(path) < 4 or not path[3].isdigit():
    
    97
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    98
    +
    
    99
    +            instance = self._get_instance(instance_name)
    
    100
    +            yield from instance.read(path,
    
    101
    +                                     request.read_offset,
    
    102
    +                                     request.read_limit)
    
    103
    +
    
    104
    +        except InvalidArgumentError as e:
    
    105
    +            self.logger.error(e)
    
    106
    +            context.set_details(str(e))
    
    107
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    108
    +            yield bytestream_pb2.ReadResponse()
    
    109
    +
    
    110
    +        except NotFoundError as e:
    
    111
    +            self.logger.error(e)
    
    112
    +            context.set_details(str(e))
    
    113
    +            context.set_code(grpc.StatusCode.NOT_FOUND)
    
    114
    +            yield bytestream_pb2.ReadResponse()
    
    115
    +
    
    116
    +        except OutOfRangeError as e:
    
    117
    +            self.logger.error(e)
    
    118
    +            context.set_details(str(e))
    
    119
    +            context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    120
    +            yield bytestream_pb2.ReadResponse()
    
    121
    +
    
    122
    +    def Write(self, requests, context):
    
    123
    +        try:
    
    124
    +            requests, request_probe = tee(requests, 2)
    
    125
    +            first_request = next(request_probe)
    
    126
    +
    
    127
    +            path = first_request.resource_name.split("/")
    
    128
    +
    
    129
    +            instance_name = path[0]
    
    130
    +
    
    131
    +            # TODO: Sort out no instance name
    
    132
    +            if path[0] == "uploads":
    
    133
    +                if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
    
    134
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    135
    +                instance_name = ""
    
    136
    +
    
    137
    +            elif path[1] == "uploads":
    
    138
    +                if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
    
    139
    +                    raise InvalidArgumentError("Invalid resource name: {}".format(context.resource_name))
    
    140
    +
    
    141
    +            instance = self._get_instance(instance_name)
    
    142
    +            return instance.write(requests)
    
    143
    +
    
    144
    +        except NotImplementedError as e:
    
    145
    +            self.logger.error(e)
    
    146
    +            context.set_details(str(e))
    
    147
    +            context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    148
    +
    
    149
    +        except InvalidArgumentError as e:
    
    150
    +            self.logger.error(e)
    
    151
    +            context.set_details(str(e))
    
    152
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    153
    +
    
    154
    +        except NotFoundError as e:
    
    155
    +            self.logger.error(e)
    
    156
    +            context.set_details(str(e))
    
    157
    +            context.set_code(grpc.StatusCode.NOT_FOUND)
    
    158
    +
    
    159
    +        return bytestream_pb2.WriteResponse()
    
    160
    +
    
    161
    +    def _get_instance(self, instance_name):
    
    162
    +        try:
    
    163
    +            return self._instances[instance_name]
    
    164
    +
    
    165
    +        except KeyError:
    
    166
    +            raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))

  • buildgrid/server/instance.pybuildgrid/server/controller.py
    ... ... @@ -14,31 +14,36 @@
    14 14
     
    
    15 15
     
    
    16 16
     """
    
    17
    -BuildGrid Instance
    
    17
    +Execution Controller
    
    18 18
     ==================
    
    19 19
     
    
    20
    -An instance of the BuildGrid server.
    
    20
    +An instance of the Execution controller.
    
    21 21
     
    
    22
    -Contains scheduler, execution instance and an interface to the bots.
    
    22
    +All this stuff you need to make the execution service work.
    
    23
    +
    
    24
    +Contains scheduler, execution instance, an interface to the bots
    
    25
    +and an operations instance.
    
    23 26
     """
    
    24 27
     
    
    25 28
     
    
    26 29
     import logging
    
    27 30
     
    
    28
    -from .execution.instance import ExecutionInstance
    
    29 31
     from .scheduler import Scheduler
    
    30 32
     from .bots.instance import BotsInterface
    
    33
    +from .execution.instance import ExecutionInstance
    
    34
    +from .operations.instance import OperationsInstance
    
    31 35
     
    
    32 36
     
    
    33
    -class BuildGridInstance(ExecutionInstance, BotsInterface):
    
    37
    +class ExecutionController(ExecutionInstance, BotsInterface, OperationsInstance):
    
    34 38
     
    
    35
    -    def __init__(self, action_cache=None, cas_storage=None):
    
    39
    +    def __init__(self, action_cache=None, storage=None):
    
    36 40
             scheduler = Scheduler(action_cache)
    
    37 41
     
    
    38 42
             self.logger = logging.getLogger(__name__)
    
    39 43
     
    
    40
    -        ExecutionInstance.__init__(self, scheduler, cas_storage)
    
    44
    +        ExecutionInstance.__init__(self, scheduler, storage)
    
    41 45
             BotsInterface.__init__(self, scheduler)
    
    46
    +        OperationsInstance.__init__(self, scheduler)
    
    42 47
     
    
    43 48
         def stream_operation_updates(self, message_queue, operation_name):
    
    44 49
             operation = message_queue.get()
    

  • buildgrid/server/execution/service.py
    ... ... @@ -35,10 +35,12 @@ from .._exceptions import InvalidArgumentError
    35 35
     
    
    36 36
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    37 37
     
    
    38
    -    def __init__(self, instances):
    
    38
    +    def __init__(self, server, instances):
    
    39 39
             self.logger = logging.getLogger(__name__)
    
    40 40
             self._instances = instances
    
    41 41
     
    
    42
    +        remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
    
    43
    +
    
    42 44
         def Execute(self, request, context):
    
    43 45
             try:
    
    44 46
                 message_queue = queue.Queue()
    

  • buildgrid/server/operations/service.py
    ... ... @@ -32,10 +32,12 @@ from .._exceptions import InvalidArgumentError
    32 32
     
    
    33 33
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    34 34
     
    
    35
    -    def __init__(self, instances):
    
    35
    +    def __init__(self, server, instances):
    
    36 36
             self._instances = instances
    
    37 37
             self.logger = logging.getLogger(__name__)
    
    38 38
     
    
    39
    +        operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
    
    40
    +
    
    39 41
         def GetOperation(self, request, context):
    
    40 42
             try:
    
    41 43
                 name = request.name
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -20,34 +20,70 @@ import grpc
    20 20
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    21 21
     from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    22 22
     
    
    23
    -from .._exceptions import NotFoundError
    
    23
    +from .._exceptions import InvalidArgumentError, NotFoundError
    
    24 24
     
    
    25 25
     
    
    26 26
     class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    
    27 27
     
    
    28
    -    def __init__(self, reference_cache):
    
    29
    -        self._reference_cache = reference_cache
    
    28
    +    def __init__(self, server, instances):
    
    30 29
             self.logger = logging.getLogger(__name__)
    
    31 30
     
    
    31
    +        self._instances = instances
    
    32
    +
    
    33
    +        buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(self, server)
    
    34
    +
    
    32 35
         def GetReference(self, request, context):
    
    33 36
             try:
    
    37
    +            instance = self._get_instance(request.instance_name)
    
    38
    +            digest = instance.get_digest_reference(request.key)
    
    34 39
                 response = buildstream_pb2.GetReferenceResponse()
    
    35
    -            response.digest.CopyFrom(self._reference_cache.get_digest_reference(request.key))
    
    40
    +            response.digest.CopyFrom(digest)
    
    36 41
                 return response
    
    37 42
     
    
    43
    +        except InvalidArgumentError as e:
    
    44
    +            self.logger.error(e)
    
    45
    +            context.set_details(str(e))
    
    46
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    47
    +
    
    38 48
             except NotFoundError:
    
    39 49
                 context.set_code(grpc.StatusCode.NOT_FOUND)
    
    40 50
     
    
    51
    +        return buildstream_pb2.GetReferenceResponse()
    
    52
    +
    
    41 53
         def UpdateReference(self, request, context):
    
    42 54
             try:
    
    55
    +            instance = self._get_instance(request.instance_name)
    
    56
    +            digest = request.digest
    
    57
    +
    
    43 58
                 for key in request.keys:
    
    44
    -                self._reference_cache.update_reference(key, request.digest)
    
    59
    +                instance.update_reference(key, digest)
    
    45 60
     
    
    46
    -            return buildstream_pb2.UpdateReferenceResponse()
    
    61
    +        except InvalidArgumentError as e:
    
    62
    +            self.logger.error(e)
    
    63
    +            context.set_details(str(e))
    
    64
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    47 65
     
    
    48 66
             except NotImplementedError:
    
    49 67
                 context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    50 68
     
    
    69
    +        return buildstream_pb2.UpdateReferenceResponse()
    
    70
    +
    
    51 71
         def Status(self, request, context):
    
    52
    -        allow_updates = self._reference_cache.allow_updates
    
    53
    -        return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
    72
    +        try:
    
    73
    +            instance = self._get_instance(request.instance_name)
    
    74
    +            allow_updates = instance.allow_updates
    
    75
    +            return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
    
    76
    +
    
    77
    +        except InvalidArgumentError as e:
    
    78
    +            self.logger.error(e)
    
    79
    +            context.set_details(str(e))
    
    80
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    81
    +
    
    82
    +        return buildstream_pb2.StatusResponse()
    
    83
    +
    
    84
    +    def _get_instance(self, instance_name):
    
    85
    +        try:
    
    86
    +            return self._instances[instance_name]
    
    87
    +
    
    88
    +        except KeyError:
    
    89
    +            raise InvalidArgumentError("Invalid instance name: {}".format(instance_name))

  • docs/source/using_dummy_build.rst
    1
    -
    
    2 1
     .. _dummy-build:
    
    3 2
     
    
    4 3
     Dummy build
    
    ... ... @@ -8,7 +7,7 @@ In one terminal, start a server:
    8 7
     
    
    9 8
     .. code-block:: sh
    
    10 9
     
    
    11
    -   bgd server start --allow-insecure
    
    10
    +   bgd server start buildgrid/_app/settings/default.yml
    
    12 11
     
    
    13 12
     In another terminal, send a request for work:
    
    14 13
     
    

  • docs/source/using_simple_build.rst
    1
    -
    
    2 1
     .. _simple-build:
    
    3 2
     
    
    4 3
     Simple build
    
    ... ... @@ -27,7 +26,7 @@ Now start a BuildGrid server, passing it a directory it can write a CAS to:
    27 26
     
    
    28 27
     .. code-block:: sh
    
    29 28
     
    
    30
    -   bgd server start --allow-insecure --cas disk --cas-cache disk --cas-disk-directory /path/to/empty/directory
    
    29
    +   bgd server start buildgrid/_app/settings/default.yml
    
    31 30
     
    
    32 31
     Start the following bot session:
    
    33 32
     
    

  • setup.py
    ... ... @@ -114,6 +114,7 @@ setup(
    114 114
             'protobuf',
    
    115 115
             'grpcio',
    
    116 116
             'Click',
    
    117
    +        'pyaml',
    
    117 118
             'boto3 < 1.8.0',
    
    118 119
             'botocore < 1.11.0',
    
    119 120
             'xdg',
    

  • tests/cas/test_services.py
    ... ... @@ -18,17 +18,25 @@
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20 20
     import io
    
    21
    +from unittest import mock
    
    21 22
     
    
    23
    +import grpc
    
    24
    +from grpc._server import _Context
    
    22 25
     import pytest
    
    23 26
     
    
    24 27
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    25 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    26 29
     from buildgrid.server.cas.storage.storage_abc import StorageABC
    
    27
    -from buildgrid.server.cas.service import ByteStreamService
    
    28
    -from buildgrid.server.cas.service import ContentAddressableStorageService
    
    30
    +from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    31
    +from buildgrid.server.cas import service
    
    32
    +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    29 33
     from buildgrid.settings import HASH
    
    30 34
     
    
    31 35
     
    
    36
    +context = mock.create_autospec(_Context)
    
    37
    +server = mock.create_autospec(grpc.server)
    
    38
    +
    
    39
    +
    
    32 40
     class SimpleStorage(StorageABC):
    
    33 41
         """Storage provider wrapper around a dictionary.
    
    34 42
     
    
    ... ... @@ -61,28 +69,18 @@ class SimpleStorage(StorageABC):
    61 69
             self.data[(digest.hash, digest.size_bytes)] = data
    
    62 70
     
    
    63 71
     
    
    64
    -class MockObject:
    
    65
    -    def __init__(self):
    
    66
    -        self.abort = None
    
    67
    -
    
    68
    -
    
    69
    -class MockException(Exception):
    
    70
    -    pass
    
    71
    -
    
    72
    -
    
    73
    -def raise_mock_exception(*args, **kwargs):
    
    74
    -    raise MockException()
    
    75
    -
    
    76
    -
    
    77 72
     test_strings = [b"", b"hij"]
    
    78 73
     instances = ["", "test_inst"]
    
    79 74
     
    
    80 75
     
    
    81 76
     @pytest.mark.parametrize("data_to_read", test_strings)
    
    82 77
     @pytest.mark.parametrize("instance", instances)
    
    83
    -def test_bytestream_read(data_to_read, instance):
    
    78
    +@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
    
    79
    +def test_bytestream_read(mocked, data_to_read, instance):
    
    84 80
         storage = SimpleStorage([b"abc", b"defg", data_to_read])
    
    85
    -    servicer = ByteStreamService(storage)
    
    81
    +
    
    82
    +    bs_instance = ByteStreamInstance(storage)
    
    83
    +    servicer = ByteStreamService(server, {instance: bs_instance})
    
    86 84
     
    
    87 85
         request = bytestream_pb2.ReadRequest()
    
    88 86
         if instance != "":
    
    ... ... @@ -96,11 +94,13 @@ def test_bytestream_read(data_to_read, instance):
    96 94
     
    
    97 95
     
    
    98 96
     @pytest.mark.parametrize("instance", instances)
    
    99
    -def test_bytestream_read_many(instance):
    
    97
    +@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
    
    98
    +def test_bytestream_read_many(mocked, instance):
    
    100 99
         data_to_read = b"testing" * 10000
    
    101 100
     
    
    102 101
         storage = SimpleStorage([b"abc", b"defg", data_to_read])
    
    103
    -    servicer = ByteStreamService(storage)
    
    102
    +    bs_instance = ByteStreamInstance(storage)
    
    103
    +    servicer = ByteStreamService(server, {instance: bs_instance})
    
    104 104
     
    
    105 105
         request = bytestream_pb2.ReadRequest()
    
    106 106
         if instance != "":
    
    ... ... @@ -115,9 +115,11 @@ def test_bytestream_read_many(instance):
    115 115
     
    
    116 116
     @pytest.mark.parametrize("instance", instances)
    
    117 117
     @pytest.mark.parametrize("extra_data", ["", "/", "/extra/data"])
    
    118
    -def test_bytestream_write(instance, extra_data):
    
    118
    +@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
    
    119
    +def test_bytestream_write(mocked, instance, extra_data):
    
    119 120
         storage = SimpleStorage()
    
    120
    -    servicer = ByteStreamService(storage)
    
    121
    +    bs_instance = ByteStreamInstance(storage)
    
    122
    +    servicer = ByteStreamService(server, {instance: bs_instance})
    
    121 123
     
    
    122 124
         resource_name = ""
    
    123 125
         if instance != "":
    
    ... ... @@ -137,9 +139,11 @@ def test_bytestream_write(instance, extra_data):
    137 139
         assert storage.data[(hash_, 6)] == b'abcdef'
    
    138 140
     
    
    139 141
     
    
    140
    -def test_bytestream_write_rejects_wrong_hash():
    
    142
    +@mock.patch.object(service, 'bytestream_pb2_grpc', autospec=True)
    
    143
    +def test_bytestream_write_rejects_wrong_hash(mocked):
    
    141 144
         storage = SimpleStorage()
    
    142
    -    servicer = ByteStreamService(storage)
    
    145
    +    bs_instance = ByteStreamInstance(storage)
    
    146
    +    servicer = ByteStreamService(server, {"": bs_instance})
    
    143 147
     
    
    144 148
         data = b'some data'
    
    145 149
         wrong_hash = HASH(b'incorrect').hexdigest()
    
    ... ... @@ -148,18 +152,18 @@ def test_bytestream_write_rejects_wrong_hash():
    148 152
             bytestream_pb2.WriteRequest(resource_name=resource_name, data=data, finish_write=True)
    
    149 153
         ]
    
    150 154
     
    
    151
    -    context = MockObject()
    
    152
    -    context.abort = raise_mock_exception
    
    153
    -    with pytest.raises(MockException):
    
    154
    -        servicer.Write(requests, context)
    
    155
    +    servicer.Write(requests, context)
    
    156
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    155 157
     
    
    156 158
         assert len(storage.data) is 0
    
    157 159
     
    
    158 160
     
    
    159 161
     @pytest.mark.parametrize("instance", instances)
    
    160
    -def test_cas_find_missing_blobs(instance):
    
    162
    +@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
    
    163
    +def test_cas_find_missing_blobs(mocked, instance):
    
    161 164
         storage = SimpleStorage([b'abc', b'def'])
    
    162
    -    servicer = ContentAddressableStorageService(storage)
    
    165
    +    cas_instance = ContentAddressableStorageInstance(storage)
    
    166
    +    servicer = ContentAddressableStorageService(server, {instance: cas_instance})
    
    163 167
         digests = [
    
    164 168
             re_pb2.Digest(hash=HASH(b'def').hexdigest(), size_bytes=3),
    
    165 169
             re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
    
    ... ... @@ -171,9 +175,12 @@ def test_cas_find_missing_blobs(instance):
    171 175
     
    
    172 176
     
    
    173 177
     @pytest.mark.parametrize("instance", instances)
    
    174
    -def test_cas_batch_update_blobs(instance):
    
    178
    +@mock.patch.object(service, 'remote_execution_pb2_grpc', autospec=True)
    
    179
    +def test_cas_batch_update_blobs(mocked, instance):
    
    175 180
         storage = SimpleStorage()
    
    176
    -    servicer = ContentAddressableStorageService(storage)
    
    181
    +    cas_instance = ContentAddressableStorageInstance(storage)
    
    182
    +    servicer = ContentAddressableStorageService(server, {instance: cas_instance})
    
    183
    +
    
    177 184
         update_requests = [
    
    178 185
             re_pb2.BatchUpdateBlobsRequest.Request(
    
    179 186
                 digest=re_pb2.Digest(hash=HASH(b'abc').hexdigest(), size_bytes=3), data=b'abc'),
    
    ... ... @@ -181,16 +188,21 @@ def test_cas_batch_update_blobs(instance):
    181 188
                 digest=re_pb2.Digest(hash="invalid digest!", size_bytes=1000),
    
    182 189
                 data=b'wrong data')
    
    183 190
         ]
    
    191
    +
    
    184 192
         request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
    
    185 193
         response = servicer.BatchUpdateBlobs(request, None)
    
    186 194
         assert len(response.responses) == 2
    
    195
    +
    
    187 196
         for blob_response in response.responses:
    
    188 197
             if blob_response.digest == update_requests[0].digest:
    
    189 198
                 assert blob_response.status.code == 0
    
    199
    +
    
    190 200
             elif blob_response.digest == update_requests[1].digest:
    
    191 201
                 assert blob_response.status.code != 0
    
    202
    +
    
    192 203
             else:
    
    193 204
                 raise Exception("Unexpected blob response")
    
    205
    +
    
    194 206
         assert len(storage.data) == 1
    
    195 207
         assert (update_requests[0].digest.hash, 3) in storage.data
    
    196 208
         assert storage.data[(update_requests[0].digest.hash, 3)] == b'abc'

  • tests/cas/test_storage.py
    ... ... @@ -19,18 +19,28 @@
    19 19
     
    
    20 20
     import tempfile
    
    21 21
     
    
    22
    +from unittest import mock
    
    23
    +
    
    22 24
     import boto3
    
    25
    +import grpc
    
    26
    +from grpc._server import _Context
    
    23 27
     import pytest
    
    24
    -
    
    25 28
     from moto import mock_s3
    
    26 29
     
    
    27 30
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
    
    31
    +from buildgrid.server.cas import service
    
    32
    +from buildgrid.server.cas.instance import ByteStreamInstance, ContentAddressableStorageInstance
    
    33
    +from buildgrid.server.cas.storage import remote
    
    28 34
     from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    29 35
     from buildgrid.server.cas.storage.disk import DiskStorage
    
    30 36
     from buildgrid.server.cas.storage.s3 import S3Storage
    
    31 37
     from buildgrid.server.cas.storage.with_cache import WithCacheStorage
    
    32 38
     from buildgrid.settings import HASH
    
    33 39
     
    
    40
    +
    
    41
    +context = mock.create_autospec(_Context)
    
    42
    +server = mock.create_autospec(grpc.server)
    
    43
    +
    
    34 44
     abc = b"abc"
    
    35 45
     abc_digest = Digest(hash=HASH(abc).hexdigest(), size_bytes=3)
    
    36 46
     defg = b"defg"
    
    ... ... @@ -45,10 +55,64 @@ def write(storage, digest, blob):
    45 55
         storage.commit_write(digest, session)
    
    46 56
     
    
    47 57
     
    
    58
    +class MockCASStorage(ByteStreamInstance, ContentAddressableStorageInstance):
    
    59
    +
    
    60
    +    def __init__(self):
    
    61
    +        storage = LRUMemoryCache(256)
    
    62
    +        super().__init__(storage)
    
    63
    +
    
    64
    +
    
    65
    +# Mock a CAS server with LRUStorage to return "calls" made to it
    
    66
    +class MockStubServer:
    
    67
    +
    
    68
    +    def __init__(self):
    
    69
    +        instances = {"": MockCASStorage(), "dna": MockCASStorage()}
    
    70
    +        self._requests = []
    
    71
    +        with mock.patch.object(service, 'bytestream_pb2_grpc'):
    
    72
    +            self._bs_service = service.ByteStreamService(server, instances)
    
    73
    +        with mock.patch.object(service, 'remote_execution_pb2_grpc'):
    
    74
    +            self._cas_service = service.ContentAddressableStorageService(server, instances)
    
    75
    +
    
    76
    +    def Read(self, request):
    
    77
    +        yield from self._bs_service.Read(request, context)
    
    78
    +
    
    79
    +    def Write(self, request):
    
    80
    +        self._requests.append(request)
    
    81
    +        if request.finish_write:
    
    82
    +            response = self._bs_service.Write(self._requests, context)
    
    83
    +            self._requests = []
    
    84
    +            return response
    
    85
    +
    
    86
    +        return None
    
    87
    +
    
    88
    +    def FindMissingBlobs(self, request):
    
    89
    +        return self._cas_service.FindMissingBlobs(request, context)
    
    90
    +
    
    91
    +    def BatchUpdateBlobs(self, request):
    
    92
    +        return self._cas_service.BatchUpdateBlobs(request, context)
    
    93
    +
    
    94
    +
    
    95
    +# Instances of MockCASStorage
    
    96
    +@pytest.fixture(params=["", "dna"])
    
    97
    +def instance(params):
    
    98
    +    return {params, MockCASStorage()}
    
    99
    +
    
    100
    +
    
    101
    +@pytest.fixture()
    
    102
    +@mock.patch.object(remote, 'bytestream_pb2_grpc')
    
    103
    +@mock.patch.object(remote, 'remote_execution_pb2_grpc')
    
    104
    +def remote_storage(mock_bs_grpc, mock_re_pb2_grpc):
    
    105
    +    mock_server = MockStubServer()
    
    106
    +    storage = remote.RemoteStorage(instance)
    
    107
    +    storage._stub_bs = mock_server
    
    108
    +    storage._stub_cas = mock_server
    
    109
    +    yield storage
    
    110
    +
    
    111
    +
    
    48 112
     # General tests for all storage providers
    
    49 113
     
    
    50 114
     
    
    51
    -@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3"])
    
    115
    +@pytest.fixture(params=["lru", "disk", "s3", "lru_disk", "disk_s3", "remote"])
    
    52 116
     def any_storage(request):
    
    53 117
         if request.param == "lru":
    
    54 118
             yield LRUMemoryCache(256)
    
    ... ... @@ -70,6 +134,14 @@ def any_storage(request):
    70 134
                 with mock_s3():
    
    71 135
                     boto3.resource('s3').create_bucket(Bucket="testing")
    
    72 136
                     yield WithCacheStorage(DiskStorage(path), S3Storage("testing"))
    
    137
    +    elif request.param == "remote":
    
    138
    +        with mock.patch.object(remote, 'bytestream_pb2_grpc'):
    
    139
    +            with mock.patch.object(remote, 'remote_execution_pb2_grpc'):
    
    140
    +                mock_server = MockStubServer()
    
    141
    +                storage = remote.RemoteStorage(instance)
    
    142
    +                storage._stub_bs = mock_server
    
    143
    +                storage._stub_cas = mock_server
    
    144
    +                yield storage
    
    73 145
     
    
    74 146
     
    
    75 147
     def test_initially_empty(any_storage):
    

  • tests/integration/action_cache_service.py
    ... ... @@ -26,10 +26,14 @@ import pytest
    26 26
     
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid.server.cas.storage import lru_memory_cache
    
    29
    +from buildgrid.server.actioncache import service
    
    29 30
     from buildgrid.server.actioncache.storage import ActionCache
    
    30 31
     from buildgrid.server.actioncache.service import ActionCacheService
    
    31 32
     
    
    32 33
     
    
    34
    +server = mock.create_autospec(grpc.server)
    
    35
    +
    
    36
    +
    
    33 37
     # Can mock this
    
    34 38
     @pytest.fixture
    
    35 39
     def context():
    
    ... ... @@ -42,36 +46,41 @@ def cas():
    42 46
     
    
    43 47
     
    
    44 48
     @pytest.fixture
    
    45
    -def cache(cas):
    
    46
    -    yield ActionCache(cas, 50)
    
    49
    +def cache_instances(cas):
    
    50
    +    yield {"": ActionCache(cas, 50)}
    
    47 51
     
    
    48 52
     
    
    49
    -def test_simple_action_result(cache, context):
    
    50
    -    service = ActionCacheService(cache)
    
    53
    +def test_simple_action_result(cache_instances, context):
    
    54
    +    with mock.patch.object(service, 'remote_execution_pb2_grpc'):
    
    55
    +        ac_service = ActionCacheService(server, cache_instances)
    
    56
    +
    
    57
    +    print(cache_instances)
    
    51 58
         action_digest = remote_execution_pb2.Digest(hash='sample', size_bytes=4)
    
    52 59
     
    
    53 60
         # Check that before adding the ActionResult, attempting to fetch it fails
    
    54
    -    request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
    
    55
    -    service.GetActionResult(request, context)
    
    61
    +    request = remote_execution_pb2.GetActionResultRequest(instance_name="",
    
    62
    +                                                          action_digest=action_digest)
    
    63
    +    ac_service.GetActionResult(request, context)
    
    56 64
         context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
    
    57 65
     
    
    58 66
         # Add an ActionResult to the cache
    
    59 67
         action_result = remote_execution_pb2.ActionResult(stdout_raw=b'example output')
    
    60 68
         request = remote_execution_pb2.UpdateActionResultRequest(action_digest=action_digest,
    
    61 69
                                                                  action_result=action_result)
    
    62
    -    service.UpdateActionResult(request, context)
    
    70
    +    ac_service.UpdateActionResult(request, context)
    
    63 71
     
    
    64 72
         # Check that fetching it now works
    
    65 73
         request = remote_execution_pb2.GetActionResultRequest(action_digest=action_digest)
    
    66
    -    fetched_result = service.GetActionResult(request, context)
    
    74
    +    fetched_result = ac_service.GetActionResult(request, context)
    
    67 75
         assert fetched_result.stdout_raw == action_result.stdout_raw
    
    68 76
     
    
    69 77
     
    
    70
    -def test_disabled_update_action_result(cache, context):
    
    78
    +def test_disabled_update_action_result(context):
    
    71 79
         disabled_push = ActionCache(cas, 50, False)
    
    72
    -    service = ActionCacheService(disabled_push)
    
    80
    +    with mock.patch.object(service, 'remote_execution_pb2_grpc'):
    
    81
    +        ac_service = ActionCacheService(server, {"": disabled_push})
    
    73 82
     
    
    74
    -    request = remote_execution_pb2.UpdateActionResultRequest()
    
    75
    -    service.UpdateActionResult(request, context)
    
    83
    +    request = remote_execution_pb2.UpdateActionResultRequest(instance_name='')
    
    84
    +    ac_service.UpdateActionResult(request, context)
    
    76 85
     
    
    77 86
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)

  • tests/integration/bots_service.py
    ... ... @@ -27,12 +27,15 @@ import pytest
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    29 29
     from buildgrid.server import job
    
    30
    -from buildgrid.server.instance import BuildGridInstance
    
    30
    +from buildgrid.server.controller import ExecutionController
    
    31 31
     from buildgrid.server.job import LeaseState
    
    32
    -from buildgrid.server.bots.instance import BotsInterface
    
    32
    +from buildgrid.server.bots import service
    
    33 33
     from buildgrid.server.bots.service import BotsService
    
    34 34
     
    
    35 35
     
    
    36
    +server = mock.create_autospec(grpc.server)
    
    37
    +
    
    38
    +
    
    36 39
     # GRPC context
    
    37 40
     @pytest.fixture
    
    38 41
     def context():
    
    ... ... @@ -55,19 +58,15 @@ def bot_session():
    55 58
     
    
    56 59
     @pytest.fixture
    
    57 60
     def buildgrid():
    
    58
    -    yield BuildGridInstance()
    
    59
    -
    
    60
    -
    
    61
    -@pytest.fixture
    
    62
    -def bots(schedule):
    
    63
    -    yield BotsInterface(schedule)
    
    61
    +    yield ExecutionController()
    
    64 62
     
    
    65 63
     
    
    66 64
     # Instance to test
    
    67 65
     @pytest.fixture
    
    68 66
     def instance(buildgrid):
    
    69 67
         instances = {"": buildgrid}
    
    70
    -    yield BotsService(instances)
    
    68
    +    with mock.patch.object(service, 'bots_pb2_grpc'):
    
    69
    +        yield BotsService(server, instances)
    
    71 70
     
    
    72 71
     
    
    73 72
     def test_create_bot_session(bot_session, context, instance):
    

  • tests/integration/execution_service.py
    ... ... @@ -29,12 +29,16 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 30
     
    
    31 31
     from buildgrid.server import job
    
    32
    -from buildgrid.server.instance import BuildGridInstance
    
    32
    +from buildgrid.server.controller import ExecutionController
    
    33 33
     from buildgrid.server.cas.storage import lru_memory_cache
    
    34 34
     from buildgrid.server.actioncache.storage import ActionCache
    
    35
    +from buildgrid.server.execution import service
    
    35 36
     from buildgrid.server.execution.service import ExecutionService
    
    36 37
     
    
    37 38
     
    
    39
    +server = mock.create_autospec(grpc.server)
    
    40
    +
    
    41
    +
    
    38 42
     @pytest.fixture
    
    39 43
     def context():
    
    40 44
         cxt = mock.MagicMock(spec=_Context)
    
    ... ... @@ -46,17 +50,17 @@ def buildgrid(request):
    46 50
         if request.param == "action-cache":
    
    47 51
             storage = lru_memory_cache.LRUMemoryCache(1024 * 1024)
    
    48 52
             cache = ActionCache(storage, 50)
    
    49
    -
    
    50
    -        return BuildGridInstance(action_cache=cache,
    
    51
    -                                 cas_storage=storage)
    
    52
    -    return BuildGridInstance()
    
    53
    +        yield ExecutionController(cache, storage)
    
    54
    +    else:
    
    55
    +        yield ExecutionController()
    
    53 56
     
    
    54 57
     
    
    55 58
     # Instance to test
    
    56 59
     @pytest.fixture
    
    57 60
     def instance(buildgrid):
    
    58 61
         instances = {"": buildgrid}
    
    59
    -    yield ExecutionService(instances)
    
    62
    +    with mock.patch.object(service, 'remote_execution_pb2_grpc'):
    
    63
    +        yield ExecutionService(server, instances)
    
    60 64
     
    
    61 65
     
    
    62 66
     @pytest.mark.parametrize("skip_cache_lookup", [True, False])
    

  • tests/integration/operations_service.py
    ... ... @@ -28,12 +28,14 @@ from google.protobuf import any_pb2
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 30
     
    
    31
    -from buildgrid.server.instance import BuildGridInstance
    
    31
    +from buildgrid.server.controller import ExecutionController
    
    32 32
     from buildgrid.server._exceptions import InvalidArgumentError
    
    33 33
     
    
    34
    +from buildgrid.server.operations import service
    
    34 35
     from buildgrid.server.operations.service import OperationsService
    
    35 36
     
    
    36 37
     
    
    38
    +server = mock.create_autospec(grpc.server)
    
    37 39
     instance_name = "blade"
    
    38 40
     
    
    39 41
     
    
    ... ... @@ -56,14 +58,15 @@ def execute_request():
    56 58
     
    
    57 59
     @pytest.fixture
    
    58 60
     def buildgrid():
    
    59
    -    yield BuildGridInstance()
    
    61
    +    yield ExecutionController()
    
    60 62
     
    
    61 63
     
    
    62 64
     # Instance to test
    
    63 65
     @pytest.fixture
    
    64 66
     def instance(buildgrid):
    
    65 67
         instances = {instance_name: buildgrid}
    
    66
    -    yield OperationsService(instances)
    
    68
    +    with mock.patch.object(service, 'operations_pb2_grpc'):
    
    69
    +        yield OperationsService(server, instances)
    
    67 70
     
    
    68 71
     
    
    69 72
     # Queue an execution, get operation corresponding to that request
    

  • tests/integration/reference_storage_service.py
    ... ... @@ -25,10 +25,15 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    25 25
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    26 26
     
    
    27 27
     from buildgrid.server.cas.storage import lru_memory_cache
    
    28
    +from buildgrid.server.referencestorage import service
    
    28 29
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    29 30
     from buildgrid.server.referencestorage.storage import ReferenceCache
    
    30 31
     
    
    31 32
     
    
    33
    +server = mock.create_autospec(grpc.server)
    
    34
    +instance_name = ''
    
    35
    +
    
    36
    +
    
    32 37
     # Can mock this
    
    33 38
     @pytest.fixture
    
    34 39
     def context():
    
    ... ... @@ -45,41 +50,49 @@ def cache(cas):
    45 50
         yield ReferenceCache(cas, 50)
    
    46 51
     
    
    47 52
     
    
    48
    -def test_simple_result(cache, context):
    
    53
    +@pytest.fixture
    
    54
    +def instance(cache):
    
    55
    +    instances = {instance_name: cache}
    
    56
    +    with mock.patch.object(service, 'buildstream_pb2_grpc'):
    
    57
    +        yield ReferenceStorageService(server, instances)
    
    58
    +
    
    59
    +
    
    60
    +def test_simple_result(instance, context):
    
    49 61
         keys = ["rick", "roy", "rach"]
    
    50
    -    service = ReferenceStorageService(cache)
    
    51 62
     
    
    52 63
         # Check that before adding the ReferenceResult, attempting to fetch it fails
    
    53 64
         request = buildstream_pb2.GetReferenceRequest(key=keys[0])
    
    54
    -    service.GetReference(request, context)
    
    65
    +    instance.GetReference(request, context)
    
    55 66
         context.set_code.assert_called_once_with(grpc.StatusCode.NOT_FOUND)
    
    56 67
     
    
    57 68
         # Add an ReferenceResult to the cache
    
    58 69
         reference_result = remote_execution_pb2.Digest(hash='deckard')
    
    59 70
         request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
    
    60 71
                                                          digest=reference_result)
    
    61
    -    service.UpdateReference(request, context)
    
    72
    +    instance.UpdateReference(request, context)
    
    62 73
     
    
    63 74
         # Check that fetching it now works
    
    64 75
         for key in keys:
    
    65 76
             request = buildstream_pb2.GetReferenceRequest(key=key)
    
    66
    -        fetched_result = service.GetReference(request, context)
    
    77
    +        fetched_result = instance.GetReference(request, context)
    
    67 78
             assert fetched_result.digest == reference_result
    
    68 79
     
    
    69 80
     
    
    70
    -def test_disabled_update_result(cache, context):
    
    81
    +def test_disabled_update_result(context):
    
    71 82
         disabled_push = ReferenceCache(cas, 50, False)
    
    72 83
         keys = ["rick", "roy", "rach"]
    
    73
    -    service = ReferenceStorageService(disabled_push)
    
    84
    +
    
    85
    +    with mock.patch.object(service, 'buildstream_pb2_grpc'):
    
    86
    +        instance = ReferenceStorageService(server, {'': disabled_push})
    
    74 87
     
    
    75 88
         # Add an ReferenceResult to the cache
    
    76 89
         reference_result = remote_execution_pb2.Digest(hash='deckard')
    
    77 90
         request = buildstream_pb2.UpdateReferenceRequest(keys=keys,
    
    78 91
                                                          digest=reference_result)
    
    79
    -    service.UpdateReference(request, context)
    
    92
    +    instance.UpdateReference(request, context)
    
    80 93
     
    
    81 94
         request = buildstream_pb2.UpdateReferenceRequest()
    
    82
    -    service.UpdateReference(request, context)
    
    95
    +    instance.UpdateReference(request, context)
    
    83 96
     
    
    84 97
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    85 98
     
    
    ... ... @@ -87,9 +100,10 @@ def test_disabled_update_result(cache, context):
    87 100
     @pytest.mark.parametrize("allow_updates", [True, False])
    
    88 101
     def test_status(allow_updates, context):
    
    89 102
         cache = ReferenceCache(cas, 5, allow_updates)
    
    90
    -    service = ReferenceStorageService(cache)
    
    103
    +    with mock.patch.object(service, 'buildstream_pb2_grpc'):
    
    104
    +        instance = ReferenceStorageService(server, {'': cache})
    
    91 105
     
    
    92 106
         request = buildstream_pb2.StatusRequest()
    
    93
    -    response = service.Status(request, context)
    
    107
    +    response = instance.Status(request, context)
    
    94 108
     
    
    95 109
         assert response.allow_updates == allow_updates



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]