[Notes] [Git][BuildGrid/buildgrid][mablanch/139-emit-build-metrics] 30 commits: buildgrid/utils.py: New `get_hash_type` function.



Title: GitLab

Martin Blanchard pushed to branch mablanch/139-emit-build-metrics at BuildGrid / buildgrid

Commits:

29 changed files:

Changes:

  • .gitlab-ci.yml
    ... ... @@ -2,7 +2,7 @@
    2 2
     image: python:3.5-stretch
    
    3 3
     
    
    4 4
     variables:
    
    5
    -  BGD: bgd --verbose
    
    5
    +  BGD: bgd
    
    6 6
     
    
    7 7
     stages:
    
    8 8
       - test
    

  • .pylintrc
    ... ... @@ -185,6 +185,7 @@ ignore-on-opaque-inference=yes
    185 185
     # for classes with dynamically set attributes). This supports the use of
    
    186 186
     # qualified names.
    
    187 187
     ignored-classes=google.protobuf.any_pb2.Any,
    
    188
    +                google.protobuf.duration_pb2.Duration,
    
    188 189
                     google.protobuf.timestamp_pb2.Timestamp
    
    189 190
     
    
    190 191
     # List of module names for which member attributes should not be checked
    
    ... ... @@ -460,6 +461,7 @@ known-third-party=boto3,
    460 461
                       enchant,
    
    461 462
                       google,
    
    462 463
                       grpc,
    
    464
    +                  janus,
    
    463 465
                       moto,
    
    464 466
                       yaml
    
    465 467
     
    

  • buildgrid/_app/cli.py
    ... ... @@ -23,10 +23,12 @@ will be attempted to be imported.
    23 23
     
    
    24 24
     import logging
    
    25 25
     import os
    
    26
    +import sys
    
    26 27
     
    
    27 28
     import click
    
    28 29
     import grpc
    
    29 30
     
    
    31
    +from buildgrid.settings import LOG_RECORD_FORMAT
    
    30 32
     from buildgrid.utils import read_file
    
    31 33
     
    
    32 34
     CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
    
    ... ... @@ -138,28 +140,71 @@ class BuildGridCLI(click.MultiCommand):
    138 140
             return mod.cli
    
    139 141
     
    
    140 142
     
    
    143
    +class DebugFilter(logging.Filter):
    
    144
    +
    
    145
    +    def __init__(self, debug_domains, name=''):
    
    146
    +        super().__init__(name=name)
    
    147
    +        self.__domains_tree = {}
    
    148
    +
    
    149
    +        for domain in debug_domains.split(':'):
    
    150
    +            domains_tree = self.__domains_tree
    
    151
    +            for label in domain.split('.'):
    
    152
    +                if all(key not in domains_tree for key in [label, '*']):
    
    153
    +                    domains_tree[label] = {}
    
    154
    +                domains_tree = domains_tree[label]
    
    155
    +
    
    156
    +    def filter(self, record):
    
    157
    +        domains_tree, last_match = self.__domains_tree, None
    
    158
    +        for label in record.name.split('.'):
    
    159
    +            if all(key not in domains_tree for key in [label, '*']):
    
    160
    +                return False
    
    161
    +            last_match = label if label in domains_tree else '*'
    
    162
    +            domains_tree = domains_tree[last_match]
    
    163
    +        if domains_tree and '*' not in domains_tree:
    
    164
    +            return False
    
    165
    +        return True
    
    166
    +
    
    167
    +
    
    168
    +def setup_logging(verbosity=0, debug_mode=False):
    
    169
    +    """Deals with loggers verbosity"""
    
    170
    +    asyncio_logger = logging.getLogger('asyncio')
    
    171
    +    root_logger = logging.getLogger()
    
    172
    +
    
    173
    +    log_handler = logging.StreamHandler(stream=sys.stdout)
    
    174
    +    for log_filter in root_logger.filters:
    
    175
    +        log_handler.addFilter(log_filter)
    
    176
    +
    
    177
    +    logging.basicConfig(format=LOG_RECORD_FORMAT, handlers=[log_handler])
    
    178
    +
    
    179
    +    if verbosity == 1:
    
    180
    +        root_logger.setLevel(logging.WARNING)
    
    181
    +    elif verbosity == 2:
    
    182
    +        root_logger.setLevel(logging.INFO)
    
    183
    +    elif verbosity >= 3:
    
    184
    +        root_logger.setLevel(logging.DEBUG)
    
    185
    +    else:
    
    186
    +        root_logger.setLevel(logging.ERROR)
    
    187
    +
    
    188
    +    if not debug_mode:
    
    189
    +        asyncio_logger.setLevel(logging.CRITICAL)
    
    190
    +    else:
    
    191
    +        asyncio_logger.setLevel(logging.DEBUG)
    
    192
    +        root_logger.setLevel(logging.DEBUG)
    
    193
    +
    
    194
    +
    
    141 195
     @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
    
    142
    -@click.option('-v', '--verbose', count=True,
    
    143
    -              help='Increase log verbosity level.')
    
    144 196
     @pass_context
    
    145
    -def cli(context, verbose):
    
    197
    +def cli(context):
    
    146 198
         """BuildGrid App"""
    
    147
    -    logger = logging.getLogger()
    
    199
    +    root_logger = logging.getLogger()
    
    148 200
     
    
    149 201
         # Clean-up root logger for any pre-configuration:
    
    150
    -    for log_handler in logger.handlers[:]:
    
    151
    -        logger.removeHandler(log_handler)
    
    152
    -    for log_filter in logger.filters[:]:
    
    153
    -        logger.removeFilter(log_filter)
    
    154
    -
    
    155
    -    logging.basicConfig(
    
    156
    -        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
    
    157
    -
    
    158
    -    if verbose == 1:
    
    159
    -        logger.setLevel(logging.WARNING)
    
    160
    -    elif verbose == 2:
    
    161
    -        logger.setLevel(logging.INFO)
    
    162
    -    elif verbose >= 3:
    
    163
    -        logger.setLevel(logging.DEBUG)
    
    164
    -    else:
    
    165
    -        logger.setLevel(logging.ERROR)
    202
    +    for log_handler in root_logger.handlers[:]:
    
    203
    +        root_logger.removeHandler(log_handler)
    
    204
    +    for log_filter in root_logger.filters[:]:
    
    205
    +        root_logger.removeFilter(log_filter)
    
    206
    +
    
    207
    +    # Filter debug messages using BGD_MESSAGE_DEBUG value:
    
    208
    +    debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
    
    209
    +    if debug_domains:
    
    210
    +        root_logger.addFilter(DebugFilter(debug_domains))

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -34,7 +34,7 @@ from buildgrid.bot.hardware.worker import Worker
    34 34
     
    
    35 35
     
    
    36 36
     from ..bots import buildbox, dummy, host
    
    37
    -from ..cli import pass_context
    
    37
    +from ..cli import pass_context, setup_logging
    
    38 38
     
    
    39 39
     
    
    40 40
     @click.group(name='bot', short_help="Create and register bot clients.")
    
    ... ... @@ -58,9 +58,12 @@ from ..cli import pass_context
    58 58
                   help="Time period for bot updates to the server in seconds.")
    
    59 59
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    60 60
                   help="Targeted farm resource.")
    
    61
    +@click.option('-v', '--verbose', count=True,
    
    62
    +              help='Increase log verbosity level.')
    
    61 63
     @pass_context
    
    62 64
     def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
    
    63
    -        remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
    
    65
    +        remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
    
    66
    +    setup_logging(verbosity=verbose)
    
    64 67
         # Setup the remote execution server channel:
    
    65 68
         url = urlparse(remote)
    
    66 69
     
    
    ... ... @@ -122,9 +125,8 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_
    122 125
             context.cas_client_cert = context.client_cert
    
    123 126
             context.cas_server_cert = context.server_cert
    
    124 127
     
    
    125
    -    click.echo("Starting for remote=[{}]".format(context.remote))
    
    126
    -
    
    127 128
         bot_interface = interface.BotInterface(context.channel)
    
    129
    +
    
    128 130
         worker = Worker()
    
    129 131
         worker.add_device(Device())
    
    130 132
         hardware_interface = HardwareInterface(worker)
    

  • buildgrid/_app/commands/cmd_capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import sys
    
    17
    +from urllib.parse import urlparse
    
    18
    +
    
    19
    +import click
    
    20
    +import grpc
    
    21
    +
    
    22
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    23
    +
    
    24
    +from ..cli import pass_context
    
    25
    +
    
    26
    +
    
    27
    +@click.command(name='capabilities', short_help="Capabilities service.")
    
    28
    +@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
    
    29
    +              help="Remote execution server's URL (port defaults to 50051 if no specified).")
    
    30
    +@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
    
    31
    +              help="Private client key for TLS (PEM-encoded)")
    
    32
    +@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    33
    +              help="Public client certificate for TLS (PEM-encoded)")
    
    34
    +@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    35
    +              help="Public server certificate for TLS (PEM-encoded)")
    
    36
    +@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
    
    37
    +              help="Targeted farm instance name.")
    
    38
    +@pass_context
    
    39
    +def cli(context, remote, instance_name, client_key, client_cert, server_cert):
    
    40
    +    click.echo("Getting capabilities...")
    
    41
    +    url = urlparse(remote)
    
    42
    +
    
    43
    +    remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    44
    +    instance_name = instance_name
    
    45
    +
    
    46
    +    if url.scheme == 'http':
    
    47
    +        channel = grpc.insecure_channel(remote)
    
    48
    +    else:
    
    49
    +        credentials = context.load_client_credentials(client_key, client_cert, server_cert)
    
    50
    +        if not credentials:
    
    51
    +            click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
    
    52
    +            sys.exit(-1)
    
    53
    +
    
    54
    +        channel = grpc.secure_channel(remote, credentials)
    
    55
    +
    
    56
    +    interface = CapabilitiesInterface(channel)
    
    57
    +    response = interface.get_capabilities(instance_name)
    
    58
    +    click.echo(response)

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -26,7 +26,7 @@ import click
    26 26
     
    
    27 27
     from buildgrid.server.instance import BuildGridServer
    
    28 28
     
    
    29
    -from ..cli import pass_context
    
    29
    +from ..cli import pass_context, setup_logging
    
    30 30
     from ..settings import parser
    
    31 31
     
    
    32 32
     
    
    ... ... @@ -37,9 +37,14 @@ def cli(context):
    37 37
     
    
    38 38
     
    
    39 39
     @cli.command('start', short_help="Setup a new server instance.")
    
    40
    -@click.argument('CONFIG', type=click.Path(file_okay=True, dir_okay=False, writable=False))
    
    40
    +@click.argument('CONFIG',
    
    41
    +                type=click.Path(file_okay=True, dir_okay=False, writable=False))
    
    42
    +@click.option('-v', '--verbose', count=True,
    
    43
    +              help='Increase log verbosity level.')
    
    41 44
     @pass_context
    
    42
    -def start(context, config):
    
    45
    +def start(context, config, verbose):
    
    46
    +    setup_logging(verbosity=verbose)
    
    47
    +
    
    43 48
         with open(config) as f:
    
    44 49
             settings = parser.get_parser().safe_load(f)
    
    45 50
     
    

  • buildgrid/client/capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +import grpc
    
    18
    +
    
    19
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    20
    +
    
    21
    +
    
    22
    +class CapabilitiesInterface:
    
    23
    +    """Interface for calls the the Capabilities Service."""
    
    24
    +
    
    25
    +    def __init__(self, channel):
    
    26
    +        """Initialises an instance of the capabilities service.
    
    27
    +
    
    28
    +        Args:
    
    29
    +            channel (grpc.Channel): A gRPC channel to the CAS endpoint.
    
    30
    +        """
    
    31
    +        self.__logger = logging.getLogger(__name__)
    
    32
    +        self.__stub = remote_execution_pb2_grpc.CapabilitiesStub(channel)
    
    33
    +
    
    34
    +    def get_capabilities(self, instance_name):
    
    35
    +        """Returns the capabilities or the server to the user.
    
    36
    +
    
    37
    +        Args:
    
    38
    +            instance_name (str): The name of the instance."""
    
    39
    +
    
    40
    +        request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=instance_name)
    
    41
    +        try:
    
    42
    +            return self.__stub.GetCapabilities(request)
    
    43
    +
    
    44
    +        except grpc.RpcError as e:
    
    45
    +            self.__logger.error(e)
    
    46
    +            raise

  • buildgrid/client/cas.py
    ... ... @@ -23,19 +23,13 @@ from buildgrid._exceptions import NotFoundError
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25 25
     from buildgrid._protos.google.rpc import code_pb2
    
    26
    -from buildgrid.settings import HASH
    
    26
    +from buildgrid.settings import HASH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    27 27
     from buildgrid.utils import merkle_tree_maker
    
    28 28
     
    
    29 29
     
    
    30 30
     # Maximum size for a queueable file:
    
    31 31
     FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    
    32 32
     
    
    33
    -# Maximum size for a single gRPC request:
    
    34
    -MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    35
    -
    
    36
    -# Maximum number of elements per gRPC request:
    
    37
    -MAX_REQUEST_COUNT = 500
    
    38
    -
    
    39 33
     
    
    40 34
     class _CallCache:
    
    41 35
         """Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
    
    ... ... @@ -390,11 +384,10 @@ class Downloader:
    390 384
                     assert digest.hash in directories
    
    391 385
     
    
    392 386
                     directory = directories[digest.hash]
    
    393
    -                self._write_directory(digest.hash, directory_path,
    
    387
    +                self._write_directory(directory, directory_path,
    
    394 388
                                           directories=directories, root_barrier=directory_path)
    
    395 389
     
    
    396 390
                     directory_fetched = True
    
    397
    -
    
    398 391
                 except grpc.RpcError as e:
    
    399 392
                     status_code = e.code()
    
    400 393
                     if status_code == grpc.StatusCode.UNIMPLEMENTED:
    

  • buildgrid/server/_monitoring.py
    ... ... @@ -156,9 +156,11 @@ class MonitoringBus:
    156 156
                     output_writers.append(output_file)
    
    157 157
     
    
    158 158
                     while True:
    
    159
    -                    if await __streaming_worker(iter(output_file)):
    
    159
    +                    if await __streaming_worker([output_file]):
    
    160 160
                             self.__sequence_number += 1
    
    161 161
     
    
    162
    +                        output_file.flush()
    
    163
    +
    
    162 164
                 else:
    
    163 165
                     output_writers.append(sys.stdout.buffer)
    
    164 166
     
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -37,6 +37,10 @@ class BotsInterface:
    37 37
             self._assigned_leases = {}
    
    38 38
             self._scheduler = scheduler
    
    39 39
     
    
    40
    +    @property
    
    41
    +    def scheduler(self):
    
    42
    +        return self._scheduler
    
    43
    +
    
    40 44
         def register_instance_with_server(self, instance_name, server):
    
    41 45
             server.add_bots_interface(self, instance_name)
    
    42 46
     
    

  • buildgrid/server/bots/service.py
    ... ... @@ -23,8 +23,9 @@ import logging
    23 23
     
    
    24 24
     import grpc
    
    25 25
     
    
    26
    -from google.protobuf.empty_pb2 import Empty
    
    26
    +from google.protobuf import empty_pb2, timestamp_pb2
    
    27 27
     
    
    28
    +from buildgrid._enums import BotStatus
    
    28 29
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    29 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    30 31
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    ... ... @@ -32,24 +33,86 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp
    32 33
     
    
    33 34
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    34 35
     
    
    35
    -    def __init__(self, server):
    
    36
    +    def __init__(self, server, monitor=False):
    
    36 37
             self.__logger = logging.getLogger(__name__)
    
    37 38
     
    
    39
    +        self.__bots_by_status = None
    
    40
    +        self.__bots_by_instance = None
    
    41
    +        self.__bots = None
    
    42
    +
    
    38 43
             self._instances = {}
    
    39 44
     
    
    40 45
             bots_pb2_grpc.add_BotsServicer_to_server(self, server)
    
    41 46
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    47
    +        self._is_instrumented = monitor
    
    48
    +
    
    49
    +        if self._is_instrumented:
    
    50
    +            self.__bots_by_status = {}
    
    51
    +            self.__bots_by_instance = {}
    
    52
    +            self.__bots = {}
    
    53
    +
    
    54
    +            self.__bots_by_status[BotStatus.OK] = set()
    
    55
    +            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
    
    56
    +
    
    57
    +    # --- Public API ---
    
    58
    +
    
    59
    +    def add_instance(self, instance_name, instance):
    
    60
    +        """Registers a new servicer instance.
    
    61
    +
    
    62
    +        Args:
    
    63
    +            instance_name (str): The new instance's name.
    
    64
    +            instance (BotsInterface): The new instance itself.
    
    65
    +        """
    
    66
    +        self._instances[instance_name] = instance
    
    67
    +
    
    68
    +        if self._is_instrumented:
    
    69
    +            self.__bots_by_instance[instance_name] = set()
    
    70
    +
    
    71
    +    def get_scheduler(self, instance_name):
    
    72
    +        """Retrieves a reference to the scheduler for an instance.
    
    73
    +
    
    74
    +        Args:
    
    75
    +            instance_name (str): The name of the instance to query.
    
    76
    +
    
    77
    +        Returns:
    
    78
    +            Scheduler: A reference to the scheduler for `instance_name`.
    
    79
    +
    
    80
    +        Raises:
    
    81
    +            InvalidArgumentError: If no instance named `instance_name` exists.
    
    82
    +        """
    
    83
    +        instance = self._get_instance(instance_name)
    
    84
    +
    
    85
    +        return instance.scheduler
    
    86
    +
    
    87
    +    # --- Public API: Servicer ---
    
    44 88
     
    
    45 89
         def CreateBotSession(self, request, context):
    
    90
    +        """Handles CreateBotSessionRequest messages.
    
    91
    +
    
    92
    +        Args:
    
    93
    +            request (CreateBotSessionRequest): The incoming RPC request.
    
    94
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    95
    +        """
    
    46 96
             self.__logger.debug("CreateBotSession request from [%s]", context.peer())
    
    47 97
     
    
    98
    +        instance_name = request.parent
    
    99
    +        bot_status = BotStatus(request.bot_session.status)
    
    100
    +        bot_id = request.bot_session.bot_id
    
    101
    +
    
    48 102
             try:
    
    49
    -            parent = request.parent
    
    50
    -            instance = self._get_instance(request.parent)
    
    51
    -            return instance.create_bot_session(parent,
    
    52
    -                                               request.bot_session)
    
    103
    +            instance = self._get_instance(instance_name)
    
    104
    +            bot_session = instance.create_bot_session(instance_name,
    
    105
    +                                                      request.bot_session)
    
    106
    +            now = timestamp_pb2.Timestamp()
    
    107
    +            now.GetCurrentTime()
    
    108
    +
    
    109
    +            if self._is_instrumented:
    
    110
    +                self.__bots[bot_id] = now
    
    111
    +                self.__bots_by_instance[instance_name].add(bot_id)
    
    112
    +                if bot_status in self.__bots_by_status:
    
    113
    +                    self.__bots_by_status[bot_status].add(bot_id)
    
    114
    +
    
    115
    +            return bot_session
    
    53 116
     
    
    54 117
             except InvalidArgumentError as e:
    
    55 118
                 self.__logger.error(e)
    
    ... ... @@ -59,17 +122,41 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    59 122
             return bots_pb2.BotSession()
    
    60 123
     
    
    61 124
         def UpdateBotSession(self, request, context):
    
    125
    +        """Handles UpdateBotSessionRequest messages.
    
    126
    +
    
    127
    +        Args:
    
    128
    +            request (UpdateBotSessionRequest): The incoming RPC request.
    
    129
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    130
    +        """
    
    62 131
             self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
    
    63 132
     
    
    133
    +        names = request.name.split("/")
    
    134
    +        bot_status = BotStatus(request.bot_session.status)
    
    135
    +        bot_id = request.bot_session.bot_id
    
    136
    +
    
    64 137
             try:
    
    65
    -            names = request.name.split("/")
    
    66
    -            # Operation name should be in format:
    
    67
    -            # {instance/name}/{uuid}
    
    68
    -            instance_name = ''.join(names[0:-1])
    
    138
    +            instance_name = '/'.join(names[:-1])
    
    69 139
     
    
    70 140
                 instance = self._get_instance(instance_name)
    
    71
    -            return instance.update_bot_session(request.name,
    
    72
    -                                               request.bot_session)
    
    141
    +            bot_session = instance.update_bot_session(request.name,
    
    142
    +                                                      request.bot_session)
    
    143
    +
    
    144
    +            if self._is_instrumented:
    
    145
    +                self.__bots[bot_id].GetCurrentTime()
    
    146
    +                if bot_id not in self.__bots_by_status[bot_status]:
    
    147
    +                    if bot_status == BotStatus.OK:
    
    148
    +                        self.__bots_by_status[BotStatus.OK].add(bot_id)
    
    149
    +                        self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
    
    150
    +
    
    151
    +                    elif bot_status == BotStatus.UNHEALTHY:
    
    152
    +                        self.__bots_by_status[BotStatus.OK].discard(bot_id)
    
    153
    +                        self.__bots_by_status[BotStatus.UNHEALTHY].add(bot_id)
    
    154
    +
    
    155
    +                    else:
    
    156
    +                        self.__bots_by_instance[instance_name].remove(bot_id)
    
    157
    +                        del self.__bots[bot_id]
    
    158
    +
    
    159
    +            return bot_session
    
    73 160
     
    
    74 161
             except InvalidArgumentError as e:
    
    75 162
                 self.__logger.error(e)
    
    ... ... @@ -89,10 +176,47 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    89 176
             return bots_pb2.BotSession()
    
    90 177
     
    
    91 178
         def PostBotEventTemp(self, request, context):
    
    179
    +        """Handles PostBotEventTempRequest messages.
    
    180
    +
    
    181
    +        Args:
    
    182
    +            request (PostBotEventTempRequest): The incoming RPC request.
    
    183
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    184
    +        """
    
    92 185
             self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
    
    93 186
     
    
    94 187
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    95
    -        return Empty()
    
    188
    +
    
    189
    +        return empty_pb2.Empty()
    
    190
    +
    
    191
    +    # --- Public API: Monitoring ---
    
    192
    +
    
    193
    +    @property
    
    194
    +    def is_instrumented(self):
    
    195
    +        return self._is_instrumented
    
    196
    +
    
    197
    +    def query_n_bots(self):
    
    198
    +        if self.__bots is not None:
    
    199
    +            return len(self.__bots)
    
    200
    +
    
    201
    +        return 0
    
    202
    +
    
    203
    +    def query_n_bots_for_instance(self, instance_name):
    
    204
    +        try:
    
    205
    +            if self.__bots_by_instance is not None:
    
    206
    +                return len(self.__bots_by_instance[instance_name])
    
    207
    +        except KeyError:
    
    208
    +            pass
    
    209
    +        return 0
    
    210
    +
    
    211
    +    def query_n_bots_for_status(self, bot_status):
    
    212
    +        try:
    
    213
    +            if self.__bots_by_status is not None:
    
    214
    +                return len(self.__bots_by_status[bot_status])
    
    215
    +        except KeyError:
    
    216
    +            pass
    
    217
    +        return 0
    
    218
    +
    
    219
    +    # --- Private API ---
    
    96 220
     
    
    97 221
         def _get_instance(self, name):
    
    98 222
             try:
    

  • buildgrid/server/capabilities/__init__.py

  • buildgrid/server/capabilities/instance.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +
    
    18
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    19
    +
    
    20
    +
    
    21
    +class CapabilitiesInstance:
    
    22
    +
    
    23
    +    def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
    
    24
    +        self.__logger = logging.getLogger(__name__)
    
    25
    +        self.__cas_instance = cas_instance
    
    26
    +        self.__action_cache_instance = action_cache_instance
    
    27
    +        self.__execution_instance = execution_instance
    
    28
    +
    
    29
    +    def register_instance_with_server(self, instance_name, server):
    
    30
    +        server.add_capabilities_instance(self, instance_name)
    
    31
    +
    
    32
    +    def add_cas_instance(self, cas_instance):
    
    33
    +        self.__cas_instance = cas_instance
    
    34
    +
    
    35
    +    def add_action_cache_instance(self, action_cache_instance):
    
    36
    +        self.__action_cache_instance = action_cache_instance
    
    37
    +
    
    38
    +    def add_execution_instance(self, execution_instance):
    
    39
    +        self.__execution_instance = execution_instance
    
    40
    +
    
    41
    +    def get_capabilities(self):
    
    42
    +        server_capabilities = remote_execution_pb2.ServerCapabilities()
    
    43
    +        server_capabilities.cache_capabilities.CopyFrom(self._get_cache_capabilities())
    
    44
    +        server_capabilities.execution_capabilities.CopyFrom(self._get_capabilities_execution())
    
    45
    +        # TODO
    
    46
    +        # When API is stable, fill out SemVer values
    
    47
    +        # server_capabilities.deprecated_api_version =
    
    48
    +        # server_capabilities.low_api_version =
    
    49
    +        # server_capabilities.low_api_version =
    
    50
    +        # server_capabilities.hig_api_version =
    
    51
    +        return server_capabilities
    
    52
    +
    
    53
    +    def _get_cache_capabilities(self):
    
    54
    +        capabilities = remote_execution_pb2.CacheCapabilities()
    
    55
    +        action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
    
    56
    +
    
    57
    +        if self.__cas_instance:
    
    58
    +            capabilities.digest_function.extend([self.__cas_instance.hash_type()])
    
    59
    +            capabilities.max_batch_total_size_bytes = self.__cas_instance.max_batch_total_size_bytes()
    
    60
    +            capabilities.symlink_absolute_path_strategy = self.__cas_instance.symlink_absolute_path_strategy()
    
    61
    +            # TODO: execution priority #102
    
    62
    +            # capabilities.cache_priority_capabilities =
    
    63
    +
    
    64
    +        if self.__action_cache_instance:
    
    65
    +            action_cache_update_capabilities.update_enabled = self.__action_cache_instance.allow_updates
    
    66
    +
    
    67
    +        capabilities.action_cache_update_capabilities.CopyFrom(action_cache_update_capabilities)
    
    68
    +        return capabilities
    
    69
    +
    
    70
    +    def _get_capabilities_execution(self):
    
    71
    +        capabilities = remote_execution_pb2.ExecutionCapabilities()
    
    72
    +        if self.__execution_instance:
    
    73
    +            capabilities.exec_enabled = True
    
    74
    +            capabilities.digest_function = self.__execution_instance.hash_type()
    
    75
    +            # TODO: execution priority #102
    
    76
    +            # capabilities.execution_priority =
    
    77
    +
    
    78
    +        else:
    
    79
    +            capabilities.exec_enabled = False
    
    80
    +
    
    81
    +        return capabilities

  • buildgrid/server/capabilities/service.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +import logging
    
    17
    +
    
    18
    +import grpc
    
    19
    +
    
    20
    +from buildgrid._exceptions import InvalidArgumentError
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22
    +
    
    23
    +
    
    24
    +class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    25
    +
    
    26
    +    def __init__(self, server):
    
    27
    +        self.__logger = logging.getLogger(__name__)
    
    28
    +        self.__instances = {}
    
    29
    +        remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
    
    30
    +
    
    31
    +    def add_instance(self, name, instance):
    
    32
    +        self.__instances[name] = instance
    
    33
    +
    
    34
    +    def add_cas_instance(self, name, instance):
    
    35
    +        self.__instances[name].add_cas_instance(instance)
    
    36
    +
    
    37
    +    def add_action_cache_instance(self, name, instance):
    
    38
    +        self.__instances[name].add_action_cache_instance(instance)
    
    39
    +
    
    40
    +    def add_execution_instance(self, name, instance):
    
    41
    +        self.__instances[name].add_execution_instance(instance)
    
    42
    +
    
    43
    +    def GetCapabilities(self, request, context):
    
    44
    +        try:
    
    45
    +            instance = self._get_instance(request.instance_name)
    
    46
    +            return instance.get_capabilities()
    
    47
    +
    
    48
    +        except InvalidArgumentError as e:
    
    49
    +            self.__logger.error(e)
    
    50
    +            context.set_details(str(e))
    
    51
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    52
    +
    
    53
    +        return remote_execution_pb2.ServerCapabilities()
    
    54
    +
    
    55
    +    def _get_instance(self, name):
    
    56
    +        try:
    
    57
    +            return self.__instances[name]
    
    58
    +
    
    59
    +        except KeyError:
    
    60
    +            raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))

  • buildgrid/server/cas/instance.py
    ... ... @@ -24,7 +24,8 @@ import logging
    24 24
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
    
    25 25
     from buildgrid._protos.google.bytestream import bytestream_pb2
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
    
    27
    -from buildgrid.settings import HASH, HASH_LENGTH
    
    27
    +from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
    
    28
    +from buildgrid.utils import get_hash_type
    
    28 29
     
    
    29 30
     
    
    30 31
     class ContentAddressableStorageInstance:
    
    ... ... @@ -37,6 +38,17 @@ class ContentAddressableStorageInstance:
    37 38
         def register_instance_with_server(self, instance_name, server):
    
    38 39
             server.add_cas_instance(self, instance_name)
    
    39 40
     
    
    41
    +    def hash_type(self):
    
    42
    +        return get_hash_type()
    
    43
    +
    
    44
    +    def max_batch_total_size_bytes(self):
    
    45
    +        return MAX_REQUEST_SIZE
    
    46
    +
    
    47
    +    def symlink_absolute_path_strategy(self):
    
    48
    +        # Currently this strategy is hardcoded into BuildGrid
    
    49
    +        # With no setting to reference
    
    50
    +        return re_pb2.CacheCapabilities().DISALLOWED
    
    51
    +
    
    40 52
         def find_missing_blobs(self, blob_digests):
    
    41 53
             storage = self._storage
    
    42 54
             return re_pb2.FindMissingBlobsResponse(
    
    ... ... @@ -58,6 +70,41 @@ class ContentAddressableStorageInstance:
    58 70
     
    
    59 71
             return response
    
    60 72
     
    
    73
    +    def get_tree(self, request):
    
    74
    +        storage = self._storage
    
    75
    +
    
    76
    +        response = re_pb2.GetTreeResponse()
    
    77
    +        page_size = request.page_size
    
    78
    +
    
    79
    +        if not request.page_size:
    
    80
    +            request.page_size = MAX_REQUEST_COUNT
    
    81
    +
    
    82
    +        root_digest = request.root_digest
    
    83
    +        page_size = request.page_size
    
    84
    +
    
    85
    +        def __get_tree(node_digest):
    
    86
    +            nonlocal response, page_size, request
    
    87
    +
    
    88
    +            if not page_size:
    
    89
    +                page_size = request.page_size
    
    90
    +                yield response
    
    91
    +                response = re_pb2.GetTreeResponse()
    
    92
    +
    
    93
    +            if response.ByteSize() >= (MAX_REQUEST_SIZE):
    
    94
    +                yield response
    
    95
    +                response = re_pb2.GetTreeResponse()
    
    96
    +
    
    97
    +            directory_from_digest = storage.get_message(node_digest, re_pb2.Directory)
    
    98
    +            page_size -= 1
    
    99
    +            response.directories.extend([directory_from_digest])
    
    100
    +
    
    101
    +            for directory in directory_from_digest.directories:
    
    102
    +                yield from __get_tree(directory.digest)
    
    103
    +
    
    104
    +            yield response
    
    105
    +
    
    106
    +        return __get_tree(root_digest)
    
    107
    +
    
    61 108
     
    
    62 109
     class ByteStreamInstance:
    
    63 110
     
    

  • buildgrid/server/cas/service.py
    ... ... @@ -86,10 +86,16 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    86 86
         def GetTree(self, request, context):
    
    87 87
             self.__logger.debug("GetTree request from [%s]", context.peer())
    
    88 88
     
    
    89
    -        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    90
    -        context.set_details('Method not implemented!')
    
    89
    +        try:
    
    90
    +            instance = self._get_instance(request.instance_name)
    
    91
    +            yield from instance.get_tree(request)
    
    92
    +
    
    93
    +        except InvalidArgumentError as e:
    
    94
    +            self.__logger.error(e)
    
    95
    +            context.set_details(str(e))
    
    96
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    91 97
     
    
    92
    -        return iter([remote_execution_pb2.GetTreeResponse()])
    
    98
    +            yield remote_execution_pb2.GetTreeResponse()
    
    93 99
     
    
    94 100
         def _get_instance(self, instance_name):
    
    95 101
             try:
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -25,6 +25,7 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26 26
     
    
    27 27
     from ..job import Job
    
    28
    +from ...utils import get_hash_type
    
    28 29
     
    
    29 30
     
    
    30 31
     class ExecutionInstance:
    
    ... ... @@ -35,9 +36,16 @@ class ExecutionInstance:
    35 36
             self._storage = storage
    
    36 37
             self._scheduler = scheduler
    
    37 38
     
    
    39
    +    @property
    
    40
    +    def scheduler(self):
    
    41
    +        return self._scheduler
    
    42
    +
    
    38 43
         def register_instance_with_server(self, instance_name, server):
    
    39 44
             server.add_execution_instance(self, instance_name)
    
    40 45
     
    
    46
    +    def hash_type(self):
    
    47
    +        return get_hash_type()
    
    48
    +
    
    41 49
         def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    42 50
             """ Sends a job for execution.
    
    43 51
             Queues an action and creates an Operation instance to be associated with
    

  • buildgrid/server/execution/service.py
    ... ... @@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2
    33 33
     
    
    34 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    35 35
     
    
    36
    -    def __init__(self, server):
    
    36
    +    def __init__(self, server, monitor=False):
    
    37 37
             self.__logger = logging.getLogger(__name__)
    
    38 38
     
    
    39
    +        self.__peers_by_instance = None
    
    40
    +        self.__peers = None
    
    41
    +
    
    39 42
             self._instances = {}
    
    43
    +
    
    40 44
             remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
    
    41 45
     
    
    42
    -    def add_instance(self, name, instance):
    
    43
    -        self._instances[name] = instance
    
    46
    +        self._is_instrumented = monitor
    
    47
    +
    
    48
    +        if self._is_instrumented:
    
    49
    +            self.__peers_by_instance = {}
    
    50
    +            self.__peers = {}
    
    51
    +
    
    52
    +    # --- Public API ---
    
    53
    +
    
    54
    +    def add_instance(self, instance_name, instance):
    
    55
    +        """Registers a new servicer instance.
    
    56
    +
    
    57
    +        Args:
    
    58
    +            instance_name (str): The new instance's name.
    
    59
    +            instance (ExecutionInstance): The new instance itself.
    
    60
    +        """
    
    61
    +        self._instances[instance_name] = instance
    
    62
    +
    
    63
    +        if self._is_instrumented:
    
    64
    +            self.__peers_by_instance[instance_name] = set()
    
    65
    +
    
    66
    +    def get_scheduler(self, instance_name):
    
    67
    +        """Retrieves a reference to the scheduler for an instance.
    
    68
    +
    
    69
    +        Args:
    
    70
    +            instance_name (str): The name of the instance to query.
    
    71
    +
    
    72
    +        Returns:
    
    73
    +            Scheduler: A reference to the scheduler for `instance_name`.
    
    74
    +
    
    75
    +        Raises:
    
    76
    +            InvalidArgumentError: If no instance named `instance_name` exists.
    
    77
    +        """
    
    78
    +        instance = self._get_instance(instance_name)
    
    79
    +
    
    80
    +        return instance.scheduler
    
    81
    +
    
    82
    +    # --- Public API: Servicer ---
    
    44 83
     
    
    45 84
         def Execute(self, request, context):
    
    85
    +        """Handles ExecuteRequest messages.
    
    86
    +
    
    87
    +        Args:
    
    88
    +            request (ExecuteRequest): The incoming RPC request.
    
    89
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    90
    +        """
    
    46 91
             self.__logger.debug("Execute request from [%s]", context.peer())
    
    47 92
     
    
    93
    +        instance_name = request.instance_name
    
    94
    +        message_queue = queue.Queue()
    
    95
    +        peer = context.peer()
    
    96
    +
    
    48 97
             try:
    
    49
    -            message_queue = queue.Queue()
    
    50
    -            instance = self._get_instance(request.instance_name)
    
    98
    +            instance = self._get_instance(instance_name)
    
    51 99
                 operation = instance.execute(request.action_digest,
    
    52 100
                                              request.skip_cache_lookup,
    
    53 101
                                              message_queue)
    
    54 102
     
    
    55
    -            context.add_callback(partial(instance.unregister_message_client,
    
    56
    -                                         operation.name, message_queue))
    
    103
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    104
    +                                         peer, instance_name, operation.name, message_queue))
    
    57 105
     
    
    58
    -            instanced_op_name = "{}/{}".format(request.instance_name,
    
    59
    -                                               operation.name)
    
    106
    +            if self._is_instrumented:
    
    107
    +                if peer not in self.__peers:
    
    108
    +                    self.__peers_by_instance[instance_name].add(peer)
    
    109
    +                    self.__peers[peer] = 1
    
    110
    +                else:
    
    111
    +                    self.__peers[peer] += 1
    
    112
    +
    
    113
    +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    60 114
     
    
    61 115
                 self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    62 116
     
    
    ... ... @@ -86,23 +140,33 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    86 140
                 yield operations_pb2.Operation()
    
    87 141
     
    
    88 142
         def WaitExecution(self, request, context):
    
    89
    -        self.__logger.debug("WaitExecution request from [%s]", context.peer())
    
    143
    +        """Handles WaitExecutionRequest messages.
    
    90 144
     
    
    91
    -        try:
    
    92
    -            names = request.name.split("/")
    
    145
    +        Args:
    
    146
    +            request (WaitExecutionRequest): The incoming RPC request.
    
    147
    +            context (grpc.ServicerContext): Context for the RPC call.
    
    148
    +        """
    
    149
    +        self.__logger.debug("WaitExecution request from [%s]", context.peer())
    
    93 150
     
    
    94
    -            # Operation name should be in format:
    
    95
    -            # {instance/name}/{operation_id}
    
    96
    -            instance_name = ''.join(names[0:-1])
    
    151
    +        names = request.name.split('/')
    
    152
    +        instance_name = '/'.join(names[:-1])
    
    153
    +        operation_name = names[-1]
    
    154
    +        message_queue = queue.Queue()
    
    155
    +        peer = context.peer()
    
    97 156
     
    
    98
    -            message_queue = queue.Queue()
    
    99
    -            operation_name = names[-1]
    
    157
    +        try:
    
    100 158
                 instance = self._get_instance(instance_name)
    
    101 159
     
    
    102 160
                 instance.register_message_client(operation_name, message_queue)
    
    161
    +            context.add_callback(partial(self._rpc_termination_callback,
    
    162
    +                                         peer, instance_name, operation_name, message_queue))
    
    103 163
     
    
    104
    -            context.add_callback(partial(instance.unregister_message_client,
    
    105
    -                                         operation_name, message_queue))
    
    164
    +            if self._is_instrumented:
    
    165
    +                if peer not in self.__peers:
    
    166
    +                    self.__peers_by_instance[instance_name].add(peer)
    
    167
    +                    self.__peers[peer] = 1
    
    168
    +                else:
    
    169
    +                    self.__peers[peer] += 1
    
    106 170
     
    
    107 171
                 for operation in instance.stream_operation_updates(message_queue,
    
    108 172
                                                                    operation_name):
    
    ... ... @@ -123,6 +187,39 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    123 187
                 context.set_code(grpc.StatusCode.CANCELLED)
    
    124 188
                 yield operations_pb2.Operation()
    
    125 189
     
    
    190
    +    # --- Public API: Monitoring ---
    
    191
    +
    
    192
    +    @property
    
    193
    +    def is_instrumented(self):
    
    194
    +        return self._is_instrumented
    
    195
    +
    
    196
    +    def query_n_clients(self):
    
    197
    +        if self.__peers is not None:
    
    198
    +            return len(self.__peers)
    
    199
    +        return 0
    
    200
    +
    
    201
    +    def query_n_clients_for_instance(self, instance_name):
    
    202
    +        try:
    
    203
    +            if self.__peers_by_instance is not None:
    
    204
    +                return len(self.__peers_by_instance[instance_name])
    
    205
    +        except KeyError:
    
    206
    +            pass
    
    207
    +        return 0
    
    208
    +
    
    209
    +    # --- Private API ---
    
    210
    +
    
    211
    +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    212
    +        instance = self._get_instance(instance_name)
    
    213
    +
    
    214
    +        instance.unregister_message_client(job_name, message_queue)
    
    215
    +
    
    216
    +        if self._is_instrumented:
    
    217
    +            if self.__peers[peer] > 1:
    
    218
    +                self.__peers[peer] -= 1
    
    219
    +            else:
    
    220
    +                self.__peers_by_instance[instance_name].remove(peer)
    
    221
    +                del self.__peers[peer]
    
    222
    +
    
    126 223
         def _get_instance(self, name):
    
    127 224
             try:
    
    128 225
                 return self._instances[name]
    

  • buildgrid/server/instance.py
    ... ... @@ -15,19 +15,29 @@
    15 15
     
    
    16 16
     import asyncio
    
    17 17
     from concurrent import futures
    
    18
    +from datetime import datetime, timedelta
    
    18 19
     import logging
    
    20
    +import logging.handlers
    
    19 21
     import os
    
    20 22
     import signal
    
    23
    +import sys
    
    24
    +import time
    
    21 25
     
    
    22 26
     import grpc
    
    27
    +import janus
    
    23 28
     
    
    29
    +from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    30
    +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    24 31
     from buildgrid.server.actioncache.service import ActionCacheService
    
    25 32
     from buildgrid.server.bots.service import BotsService
    
    33
    +from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    34
    +from buildgrid.server.capabilities.service import CapabilitiesService
    
    26 35
     from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
    
    27 36
     from buildgrid.server.execution.service import ExecutionService
    
    28 37
     from buildgrid.server._monitoring import MonitoringBus, MonitoringOutputType, MonitoringOutputFormat
    
    29 38
     from buildgrid.server.operations.service import OperationsService
    
    30 39
     from buildgrid.server.referencestorage.service import ReferenceStorageService
    
    40
    +from buildgrid.settings import LOG_RECORD_FORMAT, MONITORING_PERIOD
    
    31 41
     
    
    32 42
     
    
    33 43
     class BuildGridServer:
    
    ... ... @@ -53,8 +63,23 @@ class BuildGridServer:
    53 63
             self.__grpc_server = grpc.server(self.__grpc_executor)
    
    54 64
     
    
    55 65
             self.__main_loop = asyncio.get_event_loop()
    
    66
    +
    
    56 67
             self.__monitoring_bus = None
    
    57 68
     
    
    69
    +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
    
    70
    +        self.__logging_handler = logging.handlers.QueueHandler(self.__logging_queue.sync_q)
    
    71
    +        self.__logging_formatter = logging.Formatter(fmt=LOG_RECORD_FORMAT)
    
    72
    +        self.__print_log_records = True
    
    73
    +
    
    74
    +        self.__build_metadata_queues = None
    
    75
    +
    
    76
    +        self.__state_monitoring_task = None
    
    77
    +        self.__build_monitoring_tasks = None
    
    78
    +        self.__logging_task = None
    
    79
    +
    
    80
    +        # We always want a capabilities service
    
    81
    +        self._capabilities_service = CapabilitiesService(self.__grpc_server)
    
    82
    +
    
    58 83
             self._execution_service = None
    
    59 84
             self._bots_service = None
    
    60 85
             self._operations_service = None
    
    ... ... @@ -63,6 +88,9 @@ class BuildGridServer:
    63 88
             self._cas_service = None
    
    64 89
             self._bytestream_service = None
    
    65 90
     
    
    91
    +        self._schedulers = {}
    
    92
    +        self._instances = set()
    
    93
    +
    
    66 94
             self._is_instrumented = monitor
    
    67 95
     
    
    68 96
             if self._is_instrumented:
    
    ... ... @@ -70,6 +98,19 @@ class BuildGridServer:
    70 98
                     self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
    
    71 99
                     serialisation_format=MonitoringOutputFormat.JSON)
    
    72 100
     
    
    101
    +            self.__build_monitoring_tasks = []
    
    102
    +
    
    103
    +        # Setup the main logging handler:
    
    104
    +        root_logger = logging.getLogger()
    
    105
    +
    
    106
    +        for log_filter in root_logger.filters[:]:
    
    107
    +            self.__logging_handler.addFilter(log_filter)
    
    108
    +            root_logger.removeFilter(log_filter)
    
    109
    +
    
    110
    +        for log_handler in root_logger.handlers[:]:
    
    111
    +            root_logger.removeHandler(log_handler)
    
    112
    +        root_logger.addHandler(self.__logging_handler)
    
    113
    +
    
    73 114
         # --- Public API ---
    
    74 115
     
    
    75 116
         def start(self):
    
    ... ... @@ -79,6 +120,25 @@ class BuildGridServer:
    79 120
             if self._is_instrumented:
    
    80 121
                 self.__monitoring_bus.start()
    
    81 122
     
    
    123
    +            self.__state_monitoring_task = asyncio.ensure_future(
    
    124
    +                self._state_monitoring_worker(period=MONITORING_PERIOD),
    
    125
    +                loop=self.__main_loop)
    
    126
    +
    
    127
    +            self.__build_monitoring_tasks.clear()
    
    128
    +            for instance_name, scheduler in self._schedulers.items():
    
    129
    +                if not scheduler.is_instrumented:
    
    130
    +                    continue
    
    131
    +
    
    132
    +                message_queue = janus.Queue(loop=self.__main_loop)
    
    133
    +                scheduler.register_build_metadata_watcher(message_queue.sync_q)
    
    134
    +
    
    135
    +                self.__build_monitoring_tasks.append(asyncio.ensure_future(
    
    136
    +                    self._build_monitoring_worker(instance_name, message_queue),
    
    137
    +                    loop=self.__main_loop))
    
    138
    +
    
    139
    +        self.__logging_task = asyncio.ensure_future(
    
    140
    +            self._logging_worker(), loop=self.__main_loop)
    
    141
    +
    
    82 142
             self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
    
    83 143
     
    
    84 144
             self.__main_loop.run_forever()
    
    ... ... @@ -86,8 +146,18 @@ class BuildGridServer:
    86 146
         def stop(self):
    
    87 147
             """Stops the BuildGrid server."""
    
    88 148
             if self._is_instrumented:
    
    149
    +            if self.__state_monitoring_task is not None:
    
    150
    +                self.__state_monitoring_task.cancel()
    
    151
    +
    
    152
    +            for build_monitoring_task in self.__build_monitoring_tasks:
    
    153
    +                build_monitoring_task.cancel()
    
    154
    +            self.__build_monitoring_tasks.clear()
    
    155
    +
    
    89 156
                 self.__monitoring_bus.stop()
    
    90 157
     
    
    158
    +        if self.__logging_task is not None:
    
    159
    +            self.__logging_task.cancel()
    
    160
    +
    
    91 161
             self.__main_loop.stop()
    
    92 162
     
    
    93 163
             self.__grpc_server.stop(None)
    
    ... ... @@ -125,9 +195,14 @@ class BuildGridServer:
    125 195
                 instance_name (str): Instance name.
    
    126 196
             """
    
    127 197
             if self._execution_service is None:
    
    128
    -            self._execution_service = ExecutionService(self.__grpc_server)
    
    198
    +            self._execution_service = ExecutionService(
    
    199
    +                self.__grpc_server, monitor=self._is_instrumented)
    
    129 200
     
    
    130 201
             self._execution_service.add_instance(instance_name, instance)
    
    202
    +        self._add_capabilities_instance(instance_name, execution_instance=instance)
    
    203
    +
    
    204
    +        self._schedulers[instance_name] = instance.scheduler
    
    205
    +        self._instances.add(instance_name)
    
    131 206
     
    
    132 207
         def add_bots_interface(self, instance, instance_name):
    
    133 208
             """Adds a :obj:`BotsInterface` to the service.
    
    ... ... @@ -139,10 +214,13 @@ class BuildGridServer:
    139 214
                 instance_name (str): Instance name.
    
    140 215
             """
    
    141 216
             if self._bots_service is None:
    
    142
    -            self._bots_service = BotsService(self.__grpc_server)
    
    217
    +            self._bots_service = BotsService(
    
    218
    +                self.__grpc_server, monitor=self._is_instrumented)
    
    143 219
     
    
    144 220
             self._bots_service.add_instance(instance_name, instance)
    
    145 221
     
    
    222
    +        self._instances.add(instance_name)
    
    223
    +
    
    146 224
         def add_operations_instance(self, instance, instance_name):
    
    147 225
             """Adds an :obj:`OperationsInstance` to the service.
    
    148 226
     
    
    ... ... @@ -184,9 +262,10 @@ class BuildGridServer:
    184 262
                 self._action_cache_service = ActionCacheService(self.__grpc_server)
    
    185 263
     
    
    186 264
             self._action_cache_service.add_instance(instance_name, instance)
    
    265
    +        self._add_capabilities_instance(instance_name, action_cache_instance=instance)
    
    187 266
     
    
    188 267
         def add_cas_instance(self, instance, instance_name):
    
    189
    -        """Stores a :obj:`ContentAddressableStorageInstance` to the service.
    
    268
    +        """Adds a :obj:`ContentAddressableStorageInstance` to the service.
    
    190 269
     
    
    191 270
             If no service exists, it creates one.
    
    192 271
     
    
    ... ... @@ -198,9 +277,10 @@ class BuildGridServer:
    198 277
                 self._cas_service = ContentAddressableStorageService(self.__grpc_server)
    
    199 278
     
    
    200 279
             self._cas_service.add_instance(instance_name, instance)
    
    280
    +        self._add_capabilities_instance(instance_name, cas_instance=instance)
    
    201 281
     
    
    202 282
         def add_bytestream_instance(self, instance, instance_name):
    
    203
    -        """Stores a :obj:`ByteStreamInstance` to the service.
    
    283
    +        """Adds a :obj:`ByteStreamInstance` to the service.
    
    204 284
     
    
    205 285
             If no service exists, it creates one.
    
    206 286
     
    
    ... ... @@ -218,3 +298,279 @@ class BuildGridServer:
    218 298
         @property
    
    219 299
         def is_instrumented(self):
    
    220 300
             return self._is_instrumented
    
    301
    +
    
    302
    +    # --- Private API ---
    
    303
    +
    
    304
    +    def _add_capabilities_instance(self, instance_name,
    
    305
    +                                   cas_instance=None,
    
    306
    +                                   action_cache_instance=None,
    
    307
    +                                   execution_instance=None):
    
    308
    +        """Adds a :obj:`CapabilitiesInstance` to the service.
    
    309
    +
    
    310
    +        Args:
    
    311
    +            instance (:obj:`CapabilitiesInstance`): Instance to add.
    
    312
    +            instance_name (str): Instance name.
    
    313
    +        """
    
    314
    +
    
    315
    +        try:
    
    316
    +            if cas_instance:
    
    317
    +                self._capabilities_service.add_cas_instance(instance_name, cas_instance)
    
    318
    +            if action_cache_instance:
    
    319
    +                self._capabilities_service.add_action_cache_instance(instance_name, action_cache_instance)
    
    320
    +            if execution_instance:
    
    321
    +                self._capabilities_service.add_execution_instance(instance_name, execution_instance)
    
    322
    +
    
    323
    +        except KeyError:
    
    324
    +            capabilities_instance = CapabilitiesInstance(cas_instance,
    
    325
    +                                                         action_cache_instance,
    
    326
    +                                                         execution_instance)
    
    327
    +            self._capabilities_service.add_instance(instance_name, capabilities_instance)
    
    328
    +
    
    329
    +    async def _logging_worker(self):
    
    330
    +        """Publishes log records to the monitoring bus."""
    
    331
    +        async def __logging_worker():
    
    332
    +            log_record = await self.__logging_queue.async_q.get()
    
    333
    +
    
    334
    +            # Print log records to stdout, if required:
    
    335
    +            if self.__print_log_records:
    
    336
    +                record = self.__logging_formatter.format(log_record)
    
    337
    +
    
    338
    +                # TODO: Investigate if async write would be worth here.
    
    339
    +                sys.stdout.write('{}\n'.format(record))
    
    340
    +                sys.stdout.flush()
    
    341
    +
    
    342
    +            # Emit a log record if server is instrumented:
    
    343
    +            if self._is_instrumented:
    
    344
    +                log_record_level = LogRecordLevel(int(log_record.levelno / 10))
    
    345
    +                log_record_creation_time = datetime.fromtimestamp(log_record.created)
    
    346
    +                # logging.LogRecord.extra must be a str to str dict:
    
    347
    +                if 'extra' in log_record.__dict__ and log_record.extra:
    
    348
    +                    log_record_metadata = log_record.extra
    
    349
    +                else:
    
    350
    +                    log_record_metadata = None
    
    351
    +                record = self._forge_log_record(
    
    352
    +                    log_record.name, log_record_level, log_record.message,
    
    353
    +                    log_record_creation_time, metadata=log_record_metadata)
    
    354
    +
    
    355
    +                await self.__monitoring_bus.send_record(record)
    
    356
    +
    
    357
    +        try:
    
    358
    +            while True:
    
    359
    +                await __logging_worker()
    
    360
    +
    
    361
    +        except asyncio.CancelledError:
    
    362
    +            pass
    
    363
    +
    
    364
    +    def _forge_log_record(self, domain, level, message, creation_time, metadata=None):
    
    365
    +        log_record = monitoring_pb2.LogRecord()
    
    366
    +
    
    367
    +        log_record.creation_timestamp.FromDatetime(creation_time)
    
    368
    +        log_record.domain = domain
    
    369
    +        log_record.level = level.value
    
    370
    +        log_record.message = message
    
    371
    +        if metadata is not None:
    
    372
    +            log_record.metadata.update(metadata)
    
    373
    +
    
    374
    +        return log_record
    
    375
    +
    
    376
    +    async def _build_monitoring_worker(self, instance_name, message_queue):
    
    377
    +        """Publishes builds metadata to the monitoring bus."""
    
    378
    +        async def __build_monitoring_worker():
    
    379
    +            metadata = await message_queue.async_q.get()
    
    380
    +
    
    381
    +            # Emit build inputs fetching time record:
    
    382
    +            fetch_start = metadata.input_fetch_start_timestamp.ToDatetime()
    
    383
    +            fetch_completed = metadata.input_fetch_completed_timestamp.ToDatetime()
    
    384
    +            input_fetch_time = fetch_completed - fetch_start
    
    385
    +            timer_record = self._forge_timer_metric_record(
    
    386
    +                MetricRecordDomain.BUILD, 'inputs-fetching-time', input_fetch_time,
    
    387
    +                metadata={'instance-name': instance_name or 'void'})
    
    388
    +
    
    389
    +            await self.__monitoring_bus.send_record(timer_record)
    
    390
    +
    
    391
    +            # Emit build execution time record:
    
    392
    +            execution_start = metadata.execution_start_timestamp.ToDatetime()
    
    393
    +            execution_completed = metadata.execution_completed_timestamp.ToDatetime()
    
    394
    +            execution_time = execution_completed - execution_start
    
    395
    +            timer_record = self._forge_timer_metric_record(
    
    396
    +                MetricRecordDomain.BUILD, 'execution-time', execution_time,
    
    397
    +                metadata={'instance-name': instance_name or 'void'})
    
    398
    +
    
    399
    +            await self.__monitoring_bus.send_record(timer_record)
    
    400
    +
    
    401
    +            # Emit build outputs uploading time record:
    
    402
    +            upload_start = metadata.output_upload_start_timestamp.ToDatetime()
    
    403
    +            upload_completed = metadata.output_upload_completed_timestamp.ToDatetime()
    
    404
    +            output_upload_time = upload_completed - upload_start
    
    405
    +            timer_record = self._forge_timer_metric_record(
    
    406
    +                MetricRecordDomain.BUILD, 'outputs-uploading-time', output_upload_time,
    
    407
    +                metadata={'instance-name': instance_name or 'void'})
    
    408
    +
    
    409
    +            await self.__monitoring_bus.send_record(timer_record)
    
    410
    +
    
    411
    +            # Emit total build handling time record:
    
    412
    +            queued = metadata.queued_timestamp.ToDatetime()
    
    413
    +            worker_completed = metadata.worker_completed_timestamp.ToDatetime()
    
    414
    +            total_handling_time = worker_completed - queued
    
    415
    +            timer_record = self._forge_timer_metric_record(
    
    416
    +                MetricRecordDomain.BUILD, 'total-handling-time', total_handling_time,
    
    417
    +                metadata={'instance-name': instance_name or 'void'})
    
    418
    +
    
    419
    +            await self.__monitoring_bus.send_record(timer_record)
    
    420
    +
    
    421
    +        try:
    
    422
    +            while True:
    
    423
    +                await __build_monitoring_worker()
    
    424
    +
    
    425
    +        except asyncio.CancelledError:
    
    426
    +            pass
    
    427
    +
    
    428
    +    async def _state_monitoring_worker(self, period=1.0):
    
    429
    +        """Periodically publishes state metrics to the monitoring bus."""
    
    430
    +        async def __state_monitoring_worker():
    
    431
    +            # Emit total clients count record:
    
    432
    +            _, record = self._query_n_clients()
    
    433
    +            await self.__monitoring_bus.send_record(record)
    
    434
    +
    
    435
    +            # Emit total bots count record:
    
    436
    +            _, record = self._query_n_bots()
    
    437
    +            await self.__monitoring_bus.send_record(record)
    
    438
    +
    
    439
    +            queue_times = []
    
    440
    +            # Emits records by instance:
    
    441
    +            for instance_name in self._instances:
    
    442
    +                # Emit instance clients count record:
    
    443
    +                _, record = self._query_n_clients_for_instance(instance_name)
    
    444
    +                await self.__monitoring_bus.send_record(record)
    
    445
    +
    
    446
    +                # Emit instance bots count record:
    
    447
    +                _, record = self._query_n_bots_for_instance(instance_name)
    
    448
    +                await self.__monitoring_bus.send_record(record)
    
    449
    +
    
    450
    +                # Emit instance average queue time record:
    
    451
    +                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
    
    452
    +                await self.__monitoring_bus.send_record(record)
    
    453
    +                if queue_time:
    
    454
    +                    queue_times.append(queue_time)
    
    455
    +
    
    456
    +            # Emits records by bot status:
    
    457
    +            for bot_status in [BotStatus.OK, BotStatus.UNHEALTHY]:
    
    458
    +                # Emit status bots count record:
    
    459
    +                _, record = self._query_n_bots_for_status(bot_status)
    
    460
    +                await self.__monitoring_bus.send_record(record)
    
    461
    +
    
    462
    +            # Emit overall average queue time record:
    
    463
    +            if queue_times:
    
    464
    +                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
    
    465
    +            else:
    
    466
    +                am_queue_time = timedelta()
    
    467
    +            record = self._forge_timer_metric_record(
    
    468
    +                MetricRecordDomain.STATE,
    
    469
    +                'average-queue-time',
    
    470
    +                am_queue_time)
    
    471
    +
    
    472
    +            await self.__monitoring_bus.send_record(record)
    
    473
    +
    
    474
    +        try:
    
    475
    +            while True:
    
    476
    +                start = time.time()
    
    477
    +                await __state_monitoring_worker()
    
    478
    +
    
    479
    +                end = time.time()
    
    480
    +                await asyncio.sleep(period - (end - start))
    
    481
    +
    
    482
    +        except asyncio.CancelledError:
    
    483
    +            pass
    
    484
    +
    
    485
    +    def _forge_counter_metric_record(self, domain, name, count, metadata=None):
    
    486
    +        counter_record = monitoring_pb2.MetricRecord()
    
    487
    +
    
    488
    +        counter_record.creation_timestamp.GetCurrentTime()
    
    489
    +        counter_record.domain = domain.value
    
    490
    +        counter_record.type = MetricRecordType.COUNTER.value
    
    491
    +        counter_record.name = name
    
    492
    +        counter_record.count = count
    
    493
    +        if metadata is not None:
    
    494
    +            counter_record.metadata.update(metadata)
    
    495
    +
    
    496
    +        return counter_record
    
    497
    +
    
    498
    +    def _forge_timer_metric_record(self, domain, name, duration, metadata=None):
    
    499
    +        timer_record = monitoring_pb2.MetricRecord()
    
    500
    +
    
    501
    +        timer_record.creation_timestamp.GetCurrentTime()
    
    502
    +        timer_record.domain = domain.value
    
    503
    +        timer_record.type = MetricRecordType.TIMER.value
    
    504
    +        timer_record.name = name
    
    505
    +        timer_record.duration.FromTimedelta(duration)
    
    506
    +        if metadata is not None:
    
    507
    +            timer_record.metadata.update(metadata)
    
    508
    +
    
    509
    +        return timer_record
    
    510
    +
    
    511
    +    def _forge_gauge_metric_record(self, domain, name, value, metadata=None):
    
    512
    +        gauge_record = monitoring_pb2.MetricRecord()
    
    513
    +
    
    514
    +        gauge_record.creation_timestamp.GetCurrentTime()
    
    515
    +        gauge_record.domain = domain.value
    
    516
    +        gauge_record.type = MetricRecordType.GAUGE.value
    
    517
    +        gauge_record.name = name
    
    518
    +        gauge_record.value = value
    
    519
    +        if metadata is not None:
    
    520
    +            gauge_record.metadata.update(metadata)
    
    521
    +
    
    522
    +        return gauge_record
    
    523
    +
    
    524
    +    # --- Private API: Monitoring ---
    
    525
    +
    
    526
    +    def _query_n_clients(self):
    
    527
    +        """Queries the number of clients connected."""
    
    528
    +        n_clients = self._execution_service.query_n_clients()
    
    529
    +        gauge_record = self._forge_gauge_metric_record(
    
    530
    +            MetricRecordDomain.STATE, 'clients-count', n_clients)
    
    531
    +
    
    532
    +        return n_clients, gauge_record
    
    533
    +
    
    534
    +    def _query_n_clients_for_instance(self, instance_name):
    
    535
    +        """Queries the number of clients connected for a given instance"""
    
    536
    +        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
    
    537
    +        gauge_record = self._forge_gauge_metric_record(
    
    538
    +            MetricRecordDomain.STATE, 'clients-count', n_clients,
    
    539
    +            metadata={'instance-name': instance_name or 'void'})
    
    540
    +
    
    541
    +        return n_clients, gauge_record
    
    542
    +
    
    543
    +    def _query_n_bots(self):
    
    544
    +        """Queries the number of bots connected."""
    
    545
    +        n_bots = self._bots_service.query_n_bots()
    
    546
    +        gauge_record = self._forge_gauge_metric_record(
    
    547
    +            MetricRecordDomain.STATE, 'bots-count', n_bots)
    
    548
    +
    
    549
    +        return n_bots, gauge_record
    
    550
    +
    
    551
    +    def _query_n_bots_for_instance(self, instance_name):
    
    552
    +        """Queries the number of bots connected for a given instance."""
    
    553
    +        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
    
    554
    +        gauge_record = self._forge_gauge_metric_record(
    
    555
    +            MetricRecordDomain.STATE, 'bots-count', n_bots,
    
    556
    +            metadata={'instance-name': instance_name or 'void'})
    
    557
    +
    
    558
    +        return n_bots, gauge_record
    
    559
    +
    
    560
    +    def _query_n_bots_for_status(self, bot_status):
    
    561
    +        """Queries the number of bots connected for a given health status."""
    
    562
    +        n_bots = self._bots_service.query_n_bots_for_status(bot_status)
    
    563
    +        gauge_record = self._forge_gauge_metric_record(
    
    564
    +            MetricRecordDomain.STATE, 'bots-count', n_bots,
    
    565
    +            metadata={'bot-status': bot_status.name})
    
    566
    +
    
    567
    +        return n_bots, gauge_record
    
    568
    +
    
    569
    +    def _query_am_queue_time_for_instance(self, instance_name):
    
    570
    +        """Queries the average job's queue time for a given instance."""
    
    571
    +        am_queue_time = self._schedulers[instance_name].query_am_queue_time()
    
    572
    +        timer_record = self._forge_timer_metric_record(
    
    573
    +            MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
    
    574
    +            metadata={'instance-name': instance_name or 'void'})
    
    575
    +
    
    576
    +        return am_queue_time, timer_record

  • buildgrid/server/job.py
    ... ... @@ -13,10 +13,11 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from datetime import datetime
    
    16 17
     import logging
    
    17 18
     import uuid
    
    18 19
     
    
    19
    -from google.protobuf import timestamp_pb2
    
    20
    +from google.protobuf import duration_pb2, timestamp_pb2
    
    20 21
     
    
    21 22
     from buildgrid._enums import LeaseState, OperationStage
    
    22 23
     from buildgrid._exceptions import CancelledError
    
    ... ... @@ -40,6 +41,7 @@ class Job:
    40 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    41 42
     
    
    42 43
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    44
    +        self.__queued_time_duration = duration_pb2.Duration()
    
    43 45
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    44 46
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    45 47
     
    
    ... ... @@ -56,6 +58,8 @@ class Job:
    56 58
             self._operation.done = False
    
    57 59
             self._n_tries = 0
    
    58 60
     
    
    61
    +    # --- Public API ---
    
    62
    +
    
    59 63
         @property
    
    60 64
         def name(self):
    
    61 65
             return self._name
    
    ... ... @@ -79,6 +83,13 @@ class Job:
    79 83
             else:
    
    80 84
                 return None
    
    81 85
     
    
    86
    +    @property
    
    87
    +    def holds_cached_action_result(self):
    
    88
    +        if self.__execute_response is not None:
    
    89
    +            return self.__execute_response.cached_result
    
    90
    +        else:
    
    91
    +            return False
    
    92
    +
    
    82 93
         @property
    
    83 94
         def operation(self):
    
    84 95
             return self._operation
    
    ... ... @@ -193,7 +204,7 @@ class Job:
    193 204
                     result.Unpack(action_result)
    
    194 205
     
    
    195 206
                 action_metadata = action_result.execution_metadata
    
    196
    -            action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    207
    +            action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
    
    197 208
                 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    198 209
                 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
    
    199 210
     
    
    ... ... @@ -227,6 +238,10 @@ class Job:
    227 238
                     self.__queued_timestamp.GetCurrentTime()
    
    228 239
                 self._n_tries += 1
    
    229 240
     
    
    241
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    242
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    243
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    244
    +
    
    230 245
             elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    231 246
                 if self.__execute_response is not None:
    
    232 247
                     self._operation.response.Pack(self.__execute_response)
    
    ... ... @@ -260,3 +275,11 @@ class Job:
    260 275
             self.__execute_response.status.message = "Operation cancelled by client."
    
    261 276
     
    
    262 277
             self.update_operation_stage(OperationStage.COMPLETED)
    
    278
    +
    
    279
    +    # --- Public API: Monitoring ---
    
    280
    +
    
    281
    +    def query_queue_time(self):
    
    282
    +        return self.__queued_time_duration.ToTimedelta()
    
    283
    +
    
    284
    +    def query_n_retries(self):
    
    285
    +        return self._n_tries - 1 if self._n_tries > 0 else 0

  • buildgrid/server/operations/instance.py
    ... ... @@ -32,6 +32,10 @@ class OperationsInstance:
    32 32
     
    
    33 33
             self._scheduler = scheduler
    
    34 34
     
    
    35
    +    @property
    
    36
    +    def scheduler(self):
    
    37
    +        return self._scheduler
    
    38
    +
    
    35 39
         def register_instance_with_server(self, instance_name, server):
    
    36 40
             server.add_operations_instance(self, instance_name)
    
    37 41
     
    

  • buildgrid/server/operations/service.py
    ... ... @@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    38 38
     
    
    39 39
             operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
    
    40 40
     
    
    41
    -    def add_instance(self, name, instance):
    
    42
    -        self._instances[name] = instance
    
    41
    +    # --- Public API ---
    
    42
    +
    
    43
    +    def add_instance(self, instance_name, instance):
    
    44
    +        """Registers a new servicer instance.
    
    45
    +
    
    46
    +        Args:
    
    47
    +            instance_name (str): The new instance's name.
    
    48
    +            instance (OperationsInstance): The new instance itself.
    
    49
    +        """
    
    50
    +        self._instances[instance_name] = instance
    
    51
    +
    
    52
    +    # --- Public API: Servicer ---
    
    43 53
     
    
    44 54
         def GetOperation(self, request, context):
    
    45 55
             self.__logger.debug("GetOperation request from [%s]", context.peer())
    
    ... ... @@ -127,6 +137,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    127 137
     
    
    128 138
             return Empty()
    
    129 139
     
    
    140
    +    # --- Private API ---
    
    141
    +
    
    130 142
         def _parse_instance_name(self, name):
    
    131 143
             """ If the instance name is not blank, 'name' will have the form
    
    132 144
             {instance_name}/{operation_uuid}. Otherwise, it will just be
    

  • buildgrid/server/scheduler.py
    ... ... @@ -20,33 +20,74 @@ Schedules jobs.
    20 20
     """
    
    21 21
     
    
    22 22
     from collections import deque
    
    23
    +from datetime import timedelta
    
    23 24
     import logging
    
    24 25
     
    
    26
    +from buildgrid._enums import LeaseState, OperationStage
    
    25 27
     from buildgrid._exceptions import NotFoundError
    
    26 28
     
    
    27
    -from .job import OperationStage, LeaseState
    
    28
    -
    
    29 29
     
    
    30 30
     class Scheduler:
    
    31 31
     
    
    32 32
         MAX_N_TRIES = 5
    
    33 33
     
    
    34
    -    def __init__(self, action_cache=None):
    
    34
    +    def __init__(self, action_cache=None, monitor=False):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    +        self.__build_metadata_queues = None
    
    38
    +
    
    39
    +        self.__operations_by_stage = None
    
    40
    +        self.__leases_by_state = None
    
    41
    +        self.__queue_time_average = None
    
    42
    +        self.__retries_count = 0
    
    43
    +
    
    37 44
             self._action_cache = action_cache
    
    38 45
             self.jobs = {}
    
    39 46
             self.queue = deque()
    
    40 47
     
    
    48
    +        self._is_instrumented = monitor
    
    49
    +
    
    50
    +        if self._is_instrumented:
    
    51
    +            self.__build_metadata_queues = []
    
    52
    +
    
    53
    +            self.__operations_by_stage = {}
    
    54
    +            self.__leases_by_state = {}
    
    55
    +            self.__queue_time_average = 0, timedelta()
    
    56
    +
    
    57
    +            self.__operations_by_stage[OperationStage.CACHE_CHECK] = set()
    
    58
    +            self.__operations_by_stage[OperationStage.QUEUED] = set()
    
    59
    +            self.__operations_by_stage[OperationStage.EXECUTING] = set()
    
    60
    +            self.__operations_by_stage[OperationStage.COMPLETED] = set()
    
    61
    +
    
    62
    +            self.__leases_by_state[LeaseState.PENDING] = set()
    
    63
    +            self.__leases_by_state[LeaseState.ACTIVE] = set()
    
    64
    +            self.__leases_by_state[LeaseState.COMPLETED] = set()
    
    65
    +
    
    66
    +    # --- Public API ---
    
    67
    +
    
    41 68
         def register_client(self, job_name, queue):
    
    42
    -        self.jobs[job_name].register_client(queue)
    
    69
    +        job = self.jobs[job_name]
    
    70
    +
    
    71
    +        job.register_client(queue)
    
    43 72
     
    
    44 73
         def unregister_client(self, job_name, queue):
    
    45
    -        self.jobs[job_name].unregister_client(queue)
    
    74
    +        job = self.jobs[job_name]
    
    75
    +
    
    76
    +        job.unregister_client(queue)
    
    46 77
     
    
    47
    -        if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
    
    78
    +        if not job.n_clients and job.operation.done:
    
    48 79
                 del self.jobs[job_name]
    
    49 80
     
    
    81
    +            if self._is_instrumented:
    
    82
    +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    83
    +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    84
    +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    85
    +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    86
    +
    
    87
    +                self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    88
    +                self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    89
    +                self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    90
    +
    
    50 91
         def queue_job(self, job, skip_cache_lookup=False):
    
    51 92
             self.jobs[job.name] = job
    
    52 93
     
    
    ... ... @@ -62,23 +103,30 @@ class Scheduler:
    62 103
                     job.set_cached_result(action_result)
    
    63 104
                     operation_stage = OperationStage.COMPLETED
    
    64 105
     
    
    106
    +                if self._is_instrumented:
    
    107
    +                    self.__retries_count += 1
    
    108
    +
    
    65 109
             else:
    
    66 110
                 operation_stage = OperationStage.QUEUED
    
    67 111
                 self.queue.append(job)
    
    68 112
     
    
    69
    -        job.update_operation_stage(operation_stage)
    
    113
    +        self._update_job_operation_stage(job.name, operation_stage)
    
    70 114
     
    
    71 115
         def retry_job(self, job_name):
    
    72
    -        if job_name in self.jobs:
    
    73
    -            job = self.jobs[job_name]
    
    74
    -            if job.n_tries >= self.MAX_N_TRIES:
    
    75
    -                # TODO: Decide what to do with these jobs
    
    76
    -                job.update_operation_stage(OperationStage.COMPLETED)
    
    77
    -                # TODO: Mark these jobs as done
    
    78
    -            else:
    
    79
    -                job.update_operation_stage(OperationStage.QUEUED)
    
    80
    -                job.update_lease_state(LeaseState.PENDING)
    
    81
    -                self.queue.append(job)
    
    116
    +        job = self.jobs[job_name]
    
    117
    +
    
    118
    +        operation_stage = None
    
    119
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    120
    +            # TODO: Decide what to do with these jobs
    
    121
    +            operation_stage = OperationStage.COMPLETED
    
    122
    +            # TODO: Mark these jobs as done
    
    123
    +
    
    124
    +        else:
    
    125
    +            operation_stage = OperationStage.QUEUED
    
    126
    +            job.update_lease_state(LeaseState.PENDING)
    
    127
    +            self.queue.append(job)
    
    128
    +
    
    129
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    82 130
     
    
    83 131
         def list_jobs(self):
    
    84 132
             return self.jobs.values()
    
    ... ... @@ -118,17 +166,27 @@ class Scheduler:
    118 166
                 lease_result (google.protobuf.Any): the lease execution result, only
    
    119 167
                     required if `lease_state` is `COMPLETED`.
    
    120 168
             """
    
    121
    -
    
    122 169
             job = self.jobs[lease.id]
    
    123 170
             lease_state = LeaseState(lease.state)
    
    124 171
     
    
    172
    +        operation_stage = None
    
    125 173
             if lease_state == LeaseState.PENDING:
    
    126 174
                 job.update_lease_state(LeaseState.PENDING)
    
    127
    -            job.update_operation_stage(OperationStage.QUEUED)
    
    175
    +            operation_stage = OperationStage.QUEUED
    
    176
    +
    
    177
    +            if self._is_instrumented:
    
    178
    +                self.__leases_by_state[LeaseState.PENDING].add(lease.id)
    
    179
    +                self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    180
    +                self.__leases_by_state[LeaseState.COMPLETED].discard(lease.id)
    
    128 181
     
    
    129 182
             elif lease_state == LeaseState.ACTIVE:
    
    130 183
                 job.update_lease_state(LeaseState.ACTIVE)
    
    131
    -            job.update_operation_stage(OperationStage.EXECUTING)
    
    184
    +            operation_stage = OperationStage.EXECUTING
    
    185
    +
    
    186
    +            if self._is_instrumented:
    
    187
    +                self.__leases_by_state[LeaseState.PENDING].discard(lease.id)
    
    188
    +                self.__leases_by_state[LeaseState.ACTIVE].add(lease.id)
    
    189
    +                self.__leases_by_state[LeaseState.COMPLETED].discard(lease.id)
    
    132 190
     
    
    133 191
             elif lease_state == LeaseState.COMPLETED:
    
    134 192
                 job.update_lease_state(LeaseState.COMPLETED,
    
    ... ... @@ -137,7 +195,14 @@ class Scheduler:
    137 195
                 if self._action_cache is not None and not job.do_not_cache:
    
    138 196
                     self._action_cache.update_action_result(job.action_digest, job.action_result)
    
    139 197
     
    
    140
    -            job.update_operation_stage(OperationStage.COMPLETED)
    
    198
    +            operation_stage = OperationStage.COMPLETED
    
    199
    +
    
    200
    +            if self._is_instrumented:
    
    201
    +                self.__leases_by_state[LeaseState.PENDING].discard(lease.id)
    
    202
    +                self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    203
    +                self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
    
    204
    +
    
    205
    +        self._update_job_operation_stage(lease.id, operation_stage)
    
    141 206
     
    
    142 207
         def get_job_lease(self, job_name):
    
    143 208
             """Returns the lease associated to job, if any have been emitted yet."""
    
    ... ... @@ -160,3 +225,109 @@ class Scheduler:
    160 225
                 job_name (str): name of the job holding the operation to cancel.
    
    161 226
             """
    
    162 227
             self.jobs[job_name].cancel_operation()
    
    228
    +
    
    229
    +    # --- Public API: Monitoring ---
    
    230
    +
    
    231
    +    @property
    
    232
    +    def is_instrumented(self):
    
    233
    +        return self._is_instrumented
    
    234
    +
    
    235
    +    def register_build_metadata_watcher(self, message_queue):
    
    236
    +        if self.__build_metadata_queues is not None:
    
    237
    +            self.__build_metadata_queues.append(message_queue)
    
    238
    +
    
    239
    +    def query_n_jobs(self):
    
    240
    +        return len(self.jobs)
    
    241
    +
    
    242
    +    def query_n_operations(self):
    
    243
    +        # For now n_operations == n_jobs:
    
    244
    +        return len(self.jobs)
    
    245
    +
    
    246
    +    def query_n_operations_by_stage(self, operation_stage):
    
    247
    +        try:
    
    248
    +            if self.__operations_by_stage is not None:
    
    249
    +                return len(self.__operations_by_stage[operation_stage])
    
    250
    +        except KeyError:
    
    251
    +            pass
    
    252
    +        return 0
    
    253
    +
    
    254
    +    def query_n_leases(self):
    
    255
    +        return len(self.jobs)
    
    256
    +
    
    257
    +    def query_n_leases_by_state(self, lease_state):
    
    258
    +        try:
    
    259
    +            if self.__leases_by_state is not None:
    
    260
    +                return len(self.__leases_by_state[lease_state])
    
    261
    +        except KeyError:
    
    262
    +            pass
    
    263
    +        return 0
    
    264
    +
    
    265
    +    def query_n_retries(self):
    
    266
    +        return self.__retries_count
    
    267
    +
    
    268
    +    def query_am_queue_time(self):
    
    269
    +        if self.__queue_time_average is not None:
    
    270
    +            return self.__queue_time_average[1]
    
    271
    +        return timedelta()
    
    272
    +
    
    273
    +    # --- Private API ---
    
    274
    +
    
    275
    +    def _update_job_operation_stage(self, job_name, operation_stage):
    
    276
    +        """Requests a stage transition for the job's :class:Operations.
    
    277
    +
    
    278
    +        Args:
    
    279
    +            job_name (str): name of the job to query.
    
    280
    +            operation_stage (OperationStage): the stage to transition to.
    
    281
    +        """
    
    282
    +        job = self.jobs[job_name]
    
    283
    +
    
    284
    +        if operation_stage == OperationStage.CACHE_CHECK:
    
    285
    +            job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    286
    +
    
    287
    +            if self._is_instrumented:
    
    288
    +                self.__operations_by_stage[OperationStage.CACHE_CHECK].add(job_name)
    
    289
    +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    290
    +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    291
    +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    292
    +
    
    293
    +        elif operation_stage == OperationStage.QUEUED:
    
    294
    +            job.update_operation_stage(OperationStage.QUEUED)
    
    295
    +
    
    296
    +            if self._is_instrumented:
    
    297
    +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    298
    +                self.__operations_by_stage[OperationStage.QUEUED].add(job_name)
    
    299
    +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    300
    +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    301
    +
    
    302
    +        elif operation_stage == OperationStage.EXECUTING:
    
    303
    +            job.update_operation_stage(OperationStage.EXECUTING)
    
    304
    +
    
    305
    +            if self._is_instrumented:
    
    306
    +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    307
    +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    308
    +                self.__operations_by_stage[OperationStage.EXECUTING].add(job_name)
    
    309
    +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    310
    +
    
    311
    +        elif operation_stage == OperationStage.COMPLETED:
    
    312
    +            job.update_operation_stage(OperationStage.COMPLETED)
    
    313
    +
    
    314
    +            if self._is_instrumented:
    
    315
    +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    316
    +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    317
    +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    318
    +                self.__operations_by_stage[OperationStage.COMPLETED].add(job_name)
    
    319
    +
    
    320
    +                average_order, average_time = self.__queue_time_average
    
    321
    +
    
    322
    +                average_order += 1
    
    323
    +                if average_order <= 1:
    
    324
    +                    average_time = job.query_queue_time()
    
    325
    +                else:
    
    326
    +                    queue_time = job.query_queue_time()
    
    327
    +                    average_time = average_time + ((queue_time - average_time) / average_order)
    
    328
    +
    
    329
    +                self.__queue_time_average = average_order, average_time
    
    330
    +
    
    331
    +                if not job.holds_cached_action_result:
    
    332
    +                    for message_queue in self.__build_metadata_queues:
    
    333
    +                        message_queue.put(job.action_result.execution_metadata)

  • buildgrid/settings.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    1 16
     import hashlib
    
    2 17
     
    
    3 18
     
    
    4
    -# The hash function that CAS uses
    
    19
    +# Hash function used for computing digests:
    
    5 20
     HASH = hashlib.sha256
    
    21
    +
    
    22
    +# Lenght in bytes of a hash string returned by HASH:
    
    6 23
     HASH_LENGTH = HASH().digest_size * 2
    
    24
    +
    
    25
    +# Period, in seconds, for the monitoring cycle:
    
    26
    +MONITORING_PERIOD = 5.0
    
    27
    +
    
    28
    +# Maximum size for a single gRPC request:
    
    29
    +MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    30
    +
    
    31
    +# Maximum number of elements per gRPC request:
    
    32
    +MAX_REQUEST_COUNT = 500
    
    33
    +
    
    34
    +# String format for log records:
    
    35
    +LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    36
    +# The different log record attributes are documented here:
    
    37
    +# https://docs.python.org/3/library/logging.html#logrecord-attributes

  • buildgrid/utils.py
    ... ... @@ -30,6 +30,14 @@ def get_hostname():
    30 30
         return socket.gethostname()
    
    31 31
     
    
    32 32
     
    
    33
    +def get_hash_type():
    
    34
    +    """Returns the hash type."""
    
    35
    +    hash_name = HASH().name
    
    36
    +    if hash_name == "sha256":
    
    37
    +        return remote_execution_pb2.SHA256
    
    38
    +    return remote_execution_pb2.UNKNOWN
    
    39
    +
    
    40
    +
    
    33 41
     def create_digest(bytes_to_digest):
    
    34 42
         """Computes the :obj:`Digest` of a piece of data.
    
    35 43
     
    

  • setup.py
    ... ... @@ -112,13 +112,15 @@ setup(
    112 112
         license="Apache License, Version 2.0",
    
    113 113
         description="A remote execution service",
    
    114 114
         packages=find_packages(),
    
    115
    +    python_requires='>= 3.5.3',  # janus requirement
    
    115 116
         install_requires=[
    
    116
    -        'protobuf',
    
    117
    -        'grpcio',
    
    118
    -        'Click',
    
    119
    -        'PyYAML',
    
    120 117
             'boto3 < 1.8.0',
    
    121 118
             'botocore < 1.11.0',
    
    119
    +        'click',
    
    120
    +        'grpcio',
    
    121
    +        'janus',
    
    122
    +        'protobuf',
    
    123
    +        'pyyaml',
    
    122 124
         ],
    
    123 125
         entry_points={
    
    124 126
             'console_scripts': [
    

  • tests/cas/test_storage.py
    ... ... @@ -21,8 +21,8 @@ import tempfile
    21 21
     
    
    22 22
     import boto3
    
    23 23
     import grpc
    
    24
    -import pytest
    
    25 24
     from moto import mock_s3
    
    25
    +import pytest
    
    26 26
     
    
    27 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    28 28
     from buildgrid.server.cas.storage.remote import RemoteStorage
    

  • tests/integration/capabilities_service.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +# pylint: disable=redefined-outer-name
    
    16
    +
    
    17
    +
    
    18
    +import grpc
    
    19
    +import pytest
    
    20
    +
    
    21
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22
    +from buildgrid.client.capabilities import CapabilitiesInterface
    
    23
    +from buildgrid.server.controller import ExecutionController
    
    24
    +from buildgrid.server.actioncache.storage import ActionCache
    
    25
    +from buildgrid.server.cas.instance import ContentAddressableStorageInstance
    
    26
    +from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
    
    27
    +
    
    28
    +from ..utils.utils import run_in_subprocess
    
    29
    +from ..utils.capabilities import serve_capabilities_service
    
    30
    +
    
    31
    +
    
    32
    +INSTANCES = ['', 'instance']
    
    33
    +
    
    34
    +
    
    35
    +# Use subprocess to avoid creation of gRPC threads in main process
    
    36
    +# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
    
    37
    +# Multiprocessing uses pickle which protobufs don't work with
    
    38
    +# Workaround wrapper to send messages as strings
    
    39
    +class ServerInterface:
    
    40
    +
    
    41
    +    def __init__(self, remote):
    
    42
    +        self.__remote = remote
    
    43
    +
    
    44
    +    def get_capabilities(self, instance_name):
    
    45
    +
    
    46
    +        def __get_capabilities(queue, remote, instance_name):
    
    47
    +            interface = CapabilitiesInterface(grpc.insecure_channel(remote))
    
    48
    +
    
    49
    +            result = interface.get_capabilities(instance_name)
    
    50
    +            queue.put(result.SerializeToString())
    
    51
    +
    
    52
    +        result = run_in_subprocess(__get_capabilities,
    
    53
    +                                   self.__remote, instance_name)
    
    54
    +
    
    55
    +        capabilities = remote_execution_pb2.ServerCapabilities()
    
    56
    +        capabilities.ParseFromString(result)
    
    57
    +        return capabilities
    
    58
    +
    
    59
    +
    
    60
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    61
    +def test_execution_not_available_capabilities(instance):
    
    62
    +    with serve_capabilities_service([instance]) as server:
    
    63
    +        server_interface = ServerInterface(server.remote)
    
    64
    +        response = server_interface.get_capabilities(instance)
    
    65
    +
    
    66
    +        assert not response.execution_capabilities.exec_enabled
    
    67
    +
    
    68
    +
    
    69
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    70
    +def test_execution_available_capabilities(instance):
    
    71
    +    controller = ExecutionController()
    
    72
    +
    
    73
    +    with serve_capabilities_service([instance],
    
    74
    +                                    execution_instance=controller.execution_instance) as server:
    
    75
    +        server_interface = ServerInterface(server.remote)
    
    76
    +        response = server_interface.get_capabilities(instance)
    
    77
    +
    
    78
    +        assert response.execution_capabilities.exec_enabled
    
    79
    +        assert response.execution_capabilities.digest_function
    
    80
    +
    
    81
    +
    
    82
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    83
    +def test_action_cache_allow_updates_capabilities(instance):
    
    84
    +    storage = LRUMemoryCache(limit=256)
    
    85
    +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
    
    86
    +
    
    87
    +    with serve_capabilities_service([instance],
    
    88
    +                                    action_cache_instance=action_cache) as server:
    
    89
    +        server_interface = ServerInterface(server.remote)
    
    90
    +        response = server_interface.get_capabilities(instance)
    
    91
    +
    
    92
    +        assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
    
    93
    +
    
    94
    +
    
    95
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    96
    +def test_action_cache_not_allow_updates_capabilities(instance):
    
    97
    +    storage = LRUMemoryCache(limit=256)
    
    98
    +    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
    
    99
    +
    
    100
    +    with serve_capabilities_service([instance],
    
    101
    +                                    action_cache_instance=action_cache) as server:
    
    102
    +        server_interface = ServerInterface(server.remote)
    
    103
    +        response = server_interface.get_capabilities(instance)
    
    104
    +
    
    105
    +        assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
    
    106
    +
    
    107
    +
    
    108
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    109
    +def test_cas_capabilities(instance):
    
    110
    +    cas = ContentAddressableStorageInstance(None)
    
    111
    +
    
    112
    +    with serve_capabilities_service([instance],
    
    113
    +                                    cas_instance=cas) as server:
    
    114
    +        server_interface = ServerInterface(server.remote)
    
    115
    +        response = server_interface.get_capabilities(instance)
    
    116
    +
    
    117
    +        assert len(response.cache_capabilities.digest_function) == 1
    
    118
    +        assert response.cache_capabilities.digest_function[0]
    
    119
    +        assert response.cache_capabilities.symlink_absolute_path_strategy
    
    120
    +        assert response.cache_capabilities.max_batch_total_size_bytes

  • tests/utils/capabilities.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from concurrent import futures
    
    17
    +from contextlib import contextmanager
    
    18
    +import multiprocessing
    
    19
    +import os
    
    20
    +import signal
    
    21
    +
    
    22
    +import grpc
    
    23
    +import pytest_cov
    
    24
    +
    
    25
    +from buildgrid.server.capabilities.service import CapabilitiesService
    
    26
    +from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    27
    +
    
    28
    +
    
    29
    +@contextmanager
    
    30
    +def serve_capabilities_service(instances,
    
    31
    +                               cas_instance=None,
    
    32
    +                               action_cache_instance=None,
    
    33
    +                               execution_instance=None):
    
    34
    +    server = Server(instances,
    
    35
    +                    cas_instance,
    
    36
    +                    action_cache_instance,
    
    37
    +                    execution_instance)
    
    38
    +    try:
    
    39
    +        yield server
    
    40
    +    finally:
    
    41
    +        server.quit()
    
    42
    +
    
    43
    +
    
    44
    +class Server:
    
    45
    +
    
    46
    +    def __init__(self, instances,
    
    47
    +                 cas_instance=None,
    
    48
    +                 action_cache_instance=None,
    
    49
    +                 execution_instance=None):
    
    50
    +        self.instances = instances
    
    51
    +
    
    52
    +        self.__queue = multiprocessing.Queue()
    
    53
    +        self.__process = multiprocessing.Process(
    
    54
    +            target=Server.serve,
    
    55
    +            args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
    
    56
    +        self.__process.start()
    
    57
    +
    
    58
    +        self.port = self.__queue.get(timeout=1)
    
    59
    +        self.remote = 'localhost:{}'.format(self.port)
    
    60
    +
    
    61
    +    @staticmethod
    
    62
    +    def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
    
    63
    +        pytest_cov.embed.cleanup_on_sigterm()
    
    64
    +
    
    65
    +        # Use max_workers default from Python 3.5+
    
    66
    +        max_workers = (os.cpu_count() or 1) * 5
    
    67
    +        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
    
    68
    +        port = server.add_insecure_port('localhost:0')
    
    69
    +
    
    70
    +        capabilities_service = CapabilitiesService(server)
    
    71
    +        for name in instances:
    
    72
    +            capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
    
    73
    +            capabilities_service.add_instance(name, capabilities_instance)
    
    74
    +
    
    75
    +        server.start()
    
    76
    +        queue.put(port)
    
    77
    +        signal.pause()
    
    78
    +
    
    79
    +    def quit(self):
    
    80
    +        if self.__process:
    
    81
    +            self.__process.terminate()
    
    82
    +            self.__process.join()



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]