Martin Blanchard pushed to branch mablanch/132-gather-state-metrics at BuildGrid / buildgrid
Commits:
- 
07963525
by Raoul Hidalgo Charman at 2018-11-08T13:57:26Z
- 
aac2f938
by Raoul Hidalgo Charman at 2018-11-08T13:57:26Z
- 
76f2e31f
by Martin Blanchard at 2018-11-08T13:57:26Z
- 
dcd82219
by Martin Blanchard at 2018-11-08T14:50:25Z
- 
dd572340
by Martin Blanchard at 2018-11-08T16:54:57Z
- 
1446a9cd
by Martin Blanchard at 2018-11-09T11:26:17Z
- 
f0b591cc
by Martin Blanchard at 2018-11-09T11:26:18Z
- 
3ef64559
by Martin Blanchard at 2018-11-09T11:26:18Z
- 
fedcd0ff
by Martin Blanchard at 2018-11-09T11:26:18Z
- 
aa207582
by Martin Blanchard at 2018-11-09T11:26:18Z
- 
26eb97ac
by Martin Blanchard at 2018-11-09T11:26:18Z
- 
740d4b70
by Martin Blanchard at 2018-11-09T11:26:18Z
29 changed files:
- − buildgrid/_app/_logging.py
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/bot/bot_session.py
- buildgrid/server/actioncache/service.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/cas/storage/disk.py
- buildgrid/server/cas/storage/lru_memory_cache.py
- buildgrid/server/cas/storage/remote.py
- buildgrid/server/cas/storage/s3.py
- buildgrid/server/cas/storage/with_cache.py
- buildgrid/server/controller.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/referencestorage/service.py
- buildgrid/server/referencestorage/storage.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- setup.py
- tests/cas/test_services.py
Changes:
| 1 | -# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | -#
 | |
| 3 | -# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | -# you may not use this file except in compliance with the License.
 | |
| 5 | -# You may obtain a copy of the License at
 | |
| 6 | -#
 | |
| 7 | -#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | -#
 | |
| 9 | -# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | -# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | -# See the License for the specific language governing permissions and
 | |
| 13 | -# limitations under the License.
 | |
| 14 | - | |
| 15 | - | |
| 16 | -import logging
 | |
| 17 | - | |
| 18 | - | |
| 19 | -def bgd_logger():
 | |
| 20 | -    formatter = logging.Formatter(
 | |
| 21 | -        fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
 | |
| 22 | -    )
 | |
| 23 | - | |
| 24 | -    logger = logging.getLogger()
 | |
| 25 | -    logger.setLevel(logging.INFO)
 | |
| 26 | - | |
| 27 | -    handler = logging.StreamHandler()
 | |
| 28 | -    handler.setFormatter(formatter)
 | |
| 29 | - | |
| 30 | -    logger.addHandler(handler)
 | |
| 31 | - | |
| 32 | -    return logger | 
| ... | ... | @@ -21,16 +21,14 @@ Any files in the commands/ folder with the name cmd_*.py | 
| 21 | 21 |  will be attempted to be imported.
 | 
| 22 | 22 |  """
 | 
| 23 | 23 |  | 
| 24 | -import os
 | |
| 25 | 24 |  import logging
 | 
| 25 | +import os
 | |
| 26 | 26 |  | 
| 27 | 27 |  import click
 | 
| 28 | 28 |  import grpc
 | 
| 29 | 29 |  | 
| 30 | 30 |  from buildgrid.utils import read_file
 | 
| 31 | 31 |  | 
| 32 | -from . import _logging
 | |
| 33 | - | |
| 34 | 32 |  CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
 | 
| 35 | 33 |  | 
| 36 | 34 |  | 
| ... | ... | @@ -141,12 +139,27 @@ class BuildGridCLI(click.MultiCommand): | 
| 141 | 139 |  | 
| 142 | 140 |  | 
| 143 | 141 |  @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
 | 
| 144 | -@click.option('-v', '--verbose', is_flag=True,
 | |
| 145 | -              help='Enables verbose mode.')
 | |
| 142 | +@click.option('-v', '--verbose', count=True,
 | |
| 143 | +              help='Increase log verbosity level.')
 | |
| 146 | 144 |  @pass_context
 | 
| 147 | 145 |  def cli(context, verbose):
 | 
| 148 | 146 |      """BuildGrid App"""
 | 
| 149 | -    logger = _logging.bgd_logger()
 | |
| 150 | -    context.verbose = verbose
 | |
| 151 | -    if verbose:
 | |
| 147 | +    logger = logging.getLogger()
 | |
| 148 | + | |
| 149 | +    # Clean-up root logger for any pre-configuration:
 | |
| 150 | +    for log_handler in logger.handlers[:]:
 | |
| 151 | +        logger.removeHandler(log_handler)
 | |
| 152 | +    for log_filter in logger.filters[:]:
 | |
| 153 | +        logger.removeFilter(log_filter)
 | |
| 154 | + | |
| 155 | +    logging.basicConfig(
 | |
| 156 | +        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
 | |
| 157 | + | |
| 158 | +    if verbose == 1:
 | |
| 159 | +        logger.setLevel(logging.WARNING)
 | |
| 160 | +    elif verbose == 2:
 | |
| 161 | +        logger.setLevel(logging.INFO)
 | |
| 162 | +    elif verbose >= 3:
 | |
| 152 | 163 |          logger.setLevel(logging.DEBUG)
 | 
| 164 | +    else:
 | |
| 165 | +        logger.setLevel(logging.ERROR) | 
| ... | ... | @@ -20,7 +20,6 @@ Server command | 
| 20 | 20 |  Create a BuildGrid server.
 | 
| 21 | 21 |  """
 | 
| 22 | 22 |  | 
| 23 | -import asyncio
 | |
| 24 | 23 |  import logging
 | 
| 25 | 24 |  import sys
 | 
| 26 | 25 |  | 
| ... | ... | @@ -52,18 +51,14 @@ def start(context, config): | 
| 52 | 51 |          click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
 | 
| 53 | 52 |          sys.exit(-1)
 | 
| 54 | 53 |  | 
| 55 | -    loop = asyncio.get_event_loop()
 | |
| 56 | 54 |      try:
 | 
| 57 | 55 |          server.start()
 | 
| 58 | -        loop.run_forever()
 | |
| 59 | 56 |  | 
| 60 | 57 |      except KeyboardInterrupt:
 | 
| 61 | 58 |          pass
 | 
| 62 | 59 |  | 
| 63 | 60 |      finally:
 | 
| 64 | -        context.logger.info("Stopping server")
 | |
| 65 | 61 |          server.stop()
 | 
| 66 | -        loop.close()
 | |
| 67 | 62 |  | 
| 68 | 63 |  | 
| 69 | 64 |  def _create_server_from_config(config):
 | 
| ... | ... | @@ -30,7 +30,7 @@ class Bot: | 
| 30 | 30 |      """
 | 
| 31 | 31 |  | 
| 32 | 32 |      def __init__(self, bot_session, update_period=1):
 | 
| 33 | -        self.logger = logging.getLogger(__name__)
 | |
| 33 | +        self.__logger = logging.getLogger(__name__)
 | |
| 34 | 34 |  | 
| 35 | 35 |          self._bot_session = bot_session
 | 
| 36 | 36 |          self._update_period = update_period
 | 
| ... | ... | @@ -31,8 +31,8 @@ class BotInterface: | 
| 31 | 31 |      """
 | 
| 32 | 32 |  | 
| 33 | 33 |      def __init__(self, channel):
 | 
| 34 | -        self.logger = logging.getLogger(__name__)
 | |
| 35 | -        self.logger.info(channel)
 | |
| 34 | +        self.__logger = logging.getLogger(__name__)
 | |
| 35 | + | |
| 36 | 36 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 37 | 37 |  | 
| 38 | 38 |      def create_bot_session(self, parent, bot_session):
 | 
| ... | ... | @@ -43,8 +43,7 @@ class BotSession: | 
| 43 | 43 |          If a bot attempts to update an invalid session, it must be rejected and
 | 
| 44 | 44 |          may be put in quarantine.
 | 
| 45 | 45 |          """
 | 
| 46 | - | |
| 47 | -        self.logger = logging.getLogger(__name__)
 | |
| 46 | +        self.__logger = logging.getLogger(__name__)
 | |
| 48 | 47 |  | 
| 49 | 48 |          self._bot_id = '{}.{}'.format(parent, platform.node())
 | 
| 50 | 49 |          self._context = None
 | 
| ... | ... | @@ -64,20 +63,20 @@ class BotSession: | 
| 64 | 63 |          self._worker = worker
 | 
| 65 | 64 |  | 
| 66 | 65 |      def create_bot_session(self, work, context=None):
 | 
| 67 | -        self.logger.debug("Creating bot session")
 | |
| 66 | +        self.__logger.debug("Creating bot session")
 | |
| 68 | 67 |          self._work = work
 | 
| 69 | 68 |          self._context = context
 | 
| 70 | 69 |  | 
| 71 | 70 |          session = self._interface.create_bot_session(self._parent, self.get_pb2())
 | 
| 72 | 71 |          self._name = session.name
 | 
| 73 | 72 |  | 
| 74 | -        self.logger.info("Created bot session with name: [{}]".format(self._name))
 | |
| 73 | +        self.__logger.info("Created bot session with name: [%s]", self._name)
 | |
| 75 | 74 |  | 
| 76 | 75 |          for lease in session.leases:
 | 
| 77 | 76 |              self._update_lease_from_server(lease)
 | 
| 78 | 77 |  | 
| 79 | 78 |      def update_bot_session(self):
 | 
| 80 | -        self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
 | |
| 79 | +        self.__logger.debug("Updating bot session: [%s]", self._bot_id)
 | |
| 81 | 80 |          session = self._interface.update_bot_session(self.get_pb2())
 | 
| 82 | 81 |          for k, v in list(self._leases.items()):
 | 
| 83 | 82 |              if v.state == LeaseState.COMPLETED.value:
 | 
| ... | ... | @@ -113,25 +112,25 @@ class BotSession: | 
| 113 | 112 |              asyncio.ensure_future(self.create_work(lease))
 | 
| 114 | 113 |  | 
| 115 | 114 |      async def create_work(self, lease):
 | 
| 116 | -        self.logger.debug("Work created: [{}]".format(lease.id))
 | |
| 115 | +        self.__logger.debug("Work created: [%s]", lease.id)
 | |
| 117 | 116 |          loop = asyncio.get_event_loop()
 | 
| 118 | 117 |  | 
| 119 | 118 |          try:
 | 
| 120 | 119 |              lease = await loop.run_in_executor(None, self._work, self._context, lease)
 | 
| 121 | 120 |  | 
| 122 | 121 |          except grpc.RpcError as e:
 | 
| 123 | -            self.logger.error("RPC error thrown: [{}]".format(e))
 | |
| 122 | +            self.__logger.error(e)
 | |
| 124 | 123 |              lease.status.CopyFrom(e.code())
 | 
| 125 | 124 |  | 
| 126 | 125 |          except BotError as e:
 | 
| 127 | -            self.logger.error("Internal bot error thrown: [{}]".format(e))
 | |
| 126 | +            self.__logger.error(e)
 | |
| 128 | 127 |              lease.status.code = code_pb2.INTERNAL
 | 
| 129 | 128 |  | 
| 130 | 129 |          except Exception as e:
 | 
| 131 | -            self.logger.error("Exception thrown: [{}]".format(e))
 | |
| 130 | +            self.__logger.error(e)
 | |
| 132 | 131 |              lease.status.code = code_pb2.INTERNAL
 | 
| 133 | 132 |  | 
| 134 | -        self.logger.debug("Work complete: [{}]".format(lease.id))
 | |
| 133 | +        self.__logger.debug("Work complete: [%s]", lease.id)
 | |
| 135 | 134 |          self.lease_completed(lease)
 | 
| 136 | 135 |  | 
| 137 | 136 |  | 
| ... | ... | @@ -32,7 +32,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p | 
| 32 | 32 |  class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
 | 
| 33 | 33 |  | 
| 34 | 34 |      def __init__(self, server):
 | 
| 35 | -        self.logger = logging.getLogger(__name__)
 | |
| 35 | +        self.__logger = logging.getLogger(__name__)
 | |
| 36 | 36 |  | 
| 37 | 37 |          self._instances = {}
 | 
| 38 | 38 |  | 
| ... | ... | @@ -42,34 +42,38 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): | 
| 42 | 42 |          self._instances[name] = instance
 | 
| 43 | 43 |  | 
| 44 | 44 |      def GetActionResult(self, request, context):
 | 
| 45 | +        self.__logger.debug("GetActionResult request from [%s]", context.peer())
 | |
| 46 | + | |
| 45 | 47 |          try:
 | 
| 46 | 48 |              instance = self._get_instance(request.instance_name)
 | 
| 47 | 49 |              return instance.get_action_result(request.action_digest)
 | 
| 48 | 50 |  | 
| 49 | 51 |          except InvalidArgumentError as e:
 | 
| 50 | -            self.logger.error(e)
 | |
| 52 | +            self.__logger.error(e)
 | |
| 51 | 53 |              context.set_details(str(e))
 | 
| 52 | 54 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 53 | 55 |  | 
| 54 | 56 |          except NotFoundError as e:
 | 
| 55 | -            self.logger.debug(e)
 | |
| 57 | +            self.__logger.debug(e)
 | |
| 56 | 58 |              context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 57 | 59 |  | 
| 58 | 60 |          return remote_execution_pb2.ActionResult()
 | 
| 59 | 61 |  | 
| 60 | 62 |      def UpdateActionResult(self, request, context):
 | 
| 63 | +        self.__logger.debug("UpdateActionResult request from [%s]", context.peer())
 | |
| 64 | + | |
| 61 | 65 |          try:
 | 
| 62 | 66 |              instance = self._get_instance(request.instance_name)
 | 
| 63 | 67 |              instance.update_action_result(request.action_digest, request.action_result)
 | 
| 64 | 68 |              return request.action_result
 | 
| 65 | 69 |  | 
| 66 | 70 |          except InvalidArgumentError as e:
 | 
| 67 | -            self.logger.error(e)
 | |
| 71 | +            self.__logger.error(e)
 | |
| 68 | 72 |              context.set_details(str(e))
 | 
| 69 | 73 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 70 | 74 |  | 
| 71 | 75 |          except NotImplementedError as e:
 | 
| 72 | -            self.logger.error(e)
 | |
| 76 | +            self.__logger.error(e)
 | |
| 73 | 77 |              context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 74 | 78 |  | 
| 75 | 79 |          return remote_execution_pb2.ActionResult()
 | 
| ... | ... | @@ -31,7 +31,7 @@ from ..job import LeaseState | 
| 31 | 31 |  class BotsInterface:
 | 
| 32 | 32 |  | 
| 33 | 33 |      def __init__(self, scheduler):
 | 
| 34 | -        self.logger = logging.getLogger(__name__)
 | |
| 34 | +        self.__logger = logging.getLogger(__name__)
 | |
| 35 | 35 |  | 
| 36 | 36 |          self._bot_ids = {}
 | 
| 37 | 37 |          self._bot_sessions = {}
 | 
| ... | ... | @@ -64,7 +64,7 @@ class BotsInterface: | 
| 64 | 64 |  | 
| 65 | 65 |          self._bot_ids[name] = bot_id
 | 
| 66 | 66 |          self._bot_sessions[name] = bot_session
 | 
| 67 | -        self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
 | |
| 67 | +        self.__logger.info("Created bot session name=[%s] with bot_id=[%s]", name, bot_id)
 | |
| 68 | 68 |  | 
| 69 | 69 |          # TODO: Send worker capabilities to the scheduler!
 | 
| 70 | 70 |          leases = self._scheduler.request_job_leases({})
 | 
| ... | ... | @@ -77,7 +77,7 @@ class BotsInterface: | 
| 77 | 77 |          """ Client updates the server. Any changes in state to the Lease should be
 | 
| 78 | 78 |          registered server side. Assigns available leases with work.
 | 
| 79 | 79 |          """
 | 
| 80 | -        self.logger.debug("Updating bot session name={}".format(name))
 | |
| 80 | +        self.__logger.debug("Updating bot session name=[%s]", name)
 | |
| 81 | 81 |          self._check_bot_ids(bot_session.bot_id, name)
 | 
| 82 | 82 |  | 
| 83 | 83 |          leases = filter(None, [self.check_states(lease) for lease in bot_session.leases])
 | 
| ... | ... | @@ -173,12 +173,12 @@ class BotsInterface: | 
| 173 | 173 |          if bot_id is None:
 | 
| 174 | 174 |              raise InvalidArgumentError("Bot id does not exist: [{}]".format(name))
 | 
| 175 | 175 |  | 
| 176 | -        self.logger.debug("Attempting to close [{}] with name: [{}]".format(bot_id, name))
 | |
| 176 | +        self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
 | |
| 177 | 177 |          for lease in self._bot_sessions[name].leases:
 | 
| 178 | 178 |              if lease.state != LeaseState.COMPLETED.value:
 | 
| 179 | 179 |                  # TODO: Be wary here, may need to handle rejected leases in future
 | 
| 180 | 180 |                  self._scheduler.retry_job(lease.id)
 | 
| 181 | 181 |  | 
| 182 | -        self.logger.debug("Closing bot session: [{}]".format(name))
 | |
| 182 | +        self.__logger.debug("Closing bot session: [%s]", name)
 | |
| 183 | 183 |          self._bot_ids.pop(name)
 | 
| 184 | -        self.logger.info("Closed bot [{}] with name: [{}]".format(bot_id, name)) | |
| 184 | +        self.__logger.info("Closed bot [%s] with name: [%s]", bot_id, name) | 
| ... | ... | @@ -23,8 +23,9 @@ import logging | 
| 23 | 23 |  | 
| 24 | 24 |  import grpc
 | 
| 25 | 25 |  | 
| 26 | -from google.protobuf.empty_pb2 import Empty
 | |
| 26 | +from google.protobuf import empty_pb2, timestamp_pb2
 | |
| 27 | 27 |  | 
| 28 | +from buildgrid._enums import BotStatus
 | |
| 28 | 29 |  from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
 | 
| 29 | 30 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 30 | 31 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
 | 
| ... | ... | @@ -32,61 +33,152 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp | 
| 32 | 33 |  | 
| 33 | 34 |  class BotsService(bots_pb2_grpc.BotsServicer):
 | 
| 34 | 35 |  | 
| 35 | -    def __init__(self, server):
 | |
| 36 | -        self.logger = logging.getLogger(__name__)
 | |
| 36 | +    def __init__(self, server, monitor=True):
 | |
| 37 | +        self.__logger = logging.getLogger(__name__)
 | |
| 38 | + | |
| 39 | +        self.__bots_by_status = {}
 | |
| 40 | +        self.__bots_by_instance = {}
 | |
| 41 | +        self.__bots = {}
 | |
| 37 | 42 |  | 
| 38 | 43 |          self._instances = {}
 | 
| 44 | +        self._is_monitored = True
 | |
| 39 | 45 |  | 
| 40 | 46 |          bots_pb2_grpc.add_BotsServicer_to_server(self, server)
 | 
| 41 | 47 |  | 
| 42 | -    def add_instance(self, name, instance):
 | |
| 43 | -        self._instances[name] = instance
 | |
| 48 | +        if self._is_monitored:
 | |
| 49 | +            self.__bots_by_status[BotStatus.OK] = set()
 | |
| 50 | +            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
 | |
| 51 | +            self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
 | |
| 52 | +            self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
 | |
| 53 | + | |
| 54 | +    # --- Public API ---
 | |
| 55 | + | |
| 56 | +    def add_instance(self, instance_name, instance):
 | |
| 57 | +        self._instances[instance_name] = instance
 | |
| 58 | + | |
| 59 | +        if self._is_monitored:
 | |
| 60 | +            self.__bots_by_instance[instance_name] = 0
 | |
| 61 | + | |
| 62 | +    # --- Public API: Servicer ---
 | |
| 44 | 63 |  | 
| 45 | 64 |      def CreateBotSession(self, request, context):
 | 
| 65 | +        """Handles CreateBotSessionRequest messages.
 | |
| 66 | + | |
| 67 | +        Args:
 | |
| 68 | +            request (CreateBotSessionRequest): The incoming RPC request.
 | |
| 69 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 70 | +        """
 | |
| 71 | +        self.__logger.debug("CreateBotSession request from [%s]", context.peer())
 | |
| 72 | + | |
| 73 | +        instance_name = request.parent
 | |
| 74 | +        bot_status = BotStatus(request.bot_session.status)
 | |
| 75 | +        bot_id = request.bot_session.bot_id
 | |
| 76 | + | |
| 46 | 77 |          try:
 | 
| 47 | -            parent = request.parent
 | |
| 48 | -            instance = self._get_instance(request.parent)
 | |
| 49 | -            return instance.create_bot_session(parent,
 | |
| 50 | -                                               request.bot_session)
 | |
| 78 | +            instance = self._get_instance(instance_name)
 | |
| 79 | +            bot_session = instance.create_bot_session(instance_name,
 | |
| 80 | +                                                      request.bot_session)
 | |
| 81 | +            now = timestamp_pb2.Timestamp()
 | |
| 82 | +            now.GetCurrentTime()
 | |
| 83 | + | |
| 84 | +            if self._is_monitored:
 | |
| 85 | +                self.__bots[bot_id] = now
 | |
| 86 | +                self.__bots_by_instance[instance_name] += 1
 | |
| 87 | +                self.__bots_by_status[bot_status].add(bot_id)
 | |
| 88 | + | |
| 89 | +            return bot_session
 | |
| 51 | 90 |  | 
| 52 | 91 |          except InvalidArgumentError as e:
 | 
| 53 | -            self.logger.error(e)
 | |
| 92 | +            self.__logger.error(e)
 | |
| 54 | 93 |              context.set_details(str(e))
 | 
| 55 | 94 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 56 | 95 |  | 
| 57 | 96 |          return bots_pb2.BotSession()
 | 
| 58 | 97 |  | 
| 59 | 98 |      def UpdateBotSession(self, request, context):
 | 
| 99 | +        """Handles UpdateBotSessionRequest messages.
 | |
| 100 | + | |
| 101 | +        Args:
 | |
| 102 | +            request (UpdateBotSessionRequest): The incoming RPC request.
 | |
| 103 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 104 | +        """
 | |
| 105 | +        self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
 | |
| 106 | + | |
| 107 | +        names = request.name.split("/")
 | |
| 108 | +        bot_status = BotStatus(request.bot_session.status)
 | |
| 109 | +        bot_id = request.bot_session.bot_id
 | |
| 110 | + | |
| 60 | 111 |          try:
 | 
| 61 | -            names = request.name.split("/")
 | |
| 62 | -            # Operation name should be in format:
 | |
| 63 | -            # {instance/name}/{uuid}
 | |
| 64 | -            instance_name = ''.join(names[0:-1])
 | |
| 112 | +            instance_name = '/'.join(names[:-1])
 | |
| 65 | 113 |  | 
| 66 | 114 |              instance = self._get_instance(instance_name)
 | 
| 67 | -            return instance.update_bot_session(request.name,
 | |
| 68 | -                                               request.bot_session)
 | |
| 115 | +            bot_session = instance.update_bot_session(request.name,
 | |
| 116 | +                                                      request.bot_session)
 | |
| 117 | + | |
| 118 | +            if self._is_monitored:
 | |
| 119 | +                self.__bots[bot_id].GetCurrentTime()
 | |
| 120 | +                if bot_id not in self.__bots_by_status[bot_status]:
 | |
| 121 | +                    self.__bots_by_status[BotStatus.OK].discard(bot_id)
 | |
| 122 | +                    self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
 | |
| 123 | +                    self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
 | |
| 124 | +                    self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
 | |
| 125 | + | |
| 126 | +                    self.__bots_by_status[bot_status].add(bot_id)
 | |
| 127 | + | |
| 128 | +            return bot_session
 | |
| 69 | 129 |  | 
| 70 | 130 |          except InvalidArgumentError as e:
 | 
| 71 | -            self.logger.error(e)
 | |
| 131 | +            self.__logger.error(e)
 | |
| 72 | 132 |              context.set_details(str(e))
 | 
| 73 | 133 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 74 | 134 |  | 
| 75 | 135 |          except OutOfSyncError as e:
 | 
| 76 | -            self.logger.error(e)
 | |
| 136 | +            self.__logger.error(e)
 | |
| 77 | 137 |              context.set_details(str(e))
 | 
| 78 | 138 |              context.set_code(grpc.StatusCode.DATA_LOSS)
 | 
| 79 | 139 |  | 
| 80 | 140 |          except NotImplementedError as e:
 | 
| 81 | -            self.logger.error(e)
 | |
| 141 | +            self.__logger.error(e)
 | |
| 82 | 142 |              context.set_details(str(e))
 | 
| 83 | 143 |              context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 84 | 144 |  | 
| 85 | 145 |          return bots_pb2.BotSession()
 | 
| 86 | 146 |  | 
| 87 | 147 |      def PostBotEventTemp(self, request, context):
 | 
| 148 | +        """Handles PostBotEventTempRequest messages.
 | |
| 149 | + | |
| 150 | +        Args:
 | |
| 151 | +            request (PostBotEventTempRequest): The incoming RPC request.
 | |
| 152 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 153 | +        """
 | |
| 154 | +        self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
 | |
| 155 | + | |
| 88 | 156 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 89 | -        return Empty()
 | |
| 157 | + | |
| 158 | +        return empty_pb2.Empty()
 | |
| 159 | + | |
| 160 | +    # --- Public API: Monitoring ---
 | |
| 161 | + | |
| 162 | +    @property
 | |
| 163 | +    def is_monitored(self):
 | |
| 164 | +        return self._is_monitored
 | |
| 165 | + | |
| 166 | +    def query_n_bots(self):
 | |
| 167 | +        return len(self.__bots)
 | |
| 168 | + | |
| 169 | +    def query_n_bots_for_instance(self, instance_name):
 | |
| 170 | +        try:
 | |
| 171 | +            return self.__bots_by_instance[instance_name]
 | |
| 172 | +        except KeyError:
 | |
| 173 | +            return 0
 | |
| 174 | + | |
| 175 | +    def query_n_bots_for_status(self, bot_status):
 | |
| 176 | +        try:
 | |
| 177 | +            return len(self.__bots_by_status[bot_status])
 | |
| 178 | +        except KeyError:
 | |
| 179 | +            return 0
 | |
| 180 | + | |
| 181 | +    # --- Private API ---
 | |
| 90 | 182 |  | 
| 91 | 183 |      def _get_instance(self, name):
 | 
| 92 | 184 |          try:
 | 
| ... | ... | @@ -19,6 +19,8 @@ Storage Instances | 
| 19 | 19 |  Instances of CAS and ByteStream
 | 
| 20 | 20 |  """
 | 
| 21 | 21 |  | 
| 22 | +import logging
 | |
| 23 | + | |
| 22 | 24 |  from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
 | 
| 23 | 25 |  from buildgrid._protos.google.bytestream import bytestream_pb2
 | 
| 24 | 26 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
 | 
| ... | ... | @@ -28,6 +30,8 @@ from buildgrid.settings import HASH | 
| 28 | 30 |  class ContentAddressableStorageInstance:
 | 
| 29 | 31 |  | 
| 30 | 32 |      def __init__(self, storage):
 | 
| 33 | +        self.__logger = logging.getLogger(__name__)
 | |
| 34 | + | |
| 31 | 35 |          self._storage = storage
 | 
| 32 | 36 |  | 
| 33 | 37 |      def register_instance_with_server(self, instance_name, server):
 | 
| ... | ... | @@ -60,6 +64,8 @@ class ByteStreamInstance: | 
| 60 | 64 |      BLOCK_SIZE = 1 * 1024 * 1024  # 1 MB block size
 | 
| 61 | 65 |  | 
| 62 | 66 |      def __init__(self, storage):
 | 
| 67 | +        self.__logger = logging.getLogger(__name__)
 | |
| 68 | + | |
| 63 | 69 |          self._storage = storage
 | 
| 64 | 70 |  | 
| 65 | 71 |      def register_instance_with_server(self, instance_name, server):
 | 
| ... | ... | @@ -35,7 +35,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p | 
| 35 | 35 |  class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 36 | 36 |  | 
| 37 | 37 |      def __init__(self, server):
 | 
| 38 | -        self.logger = logging.getLogger(__name__)
 | |
| 38 | +        self.__logger = logging.getLogger(__name__)
 | |
| 39 | 39 |  | 
| 40 | 40 |          self._instances = {}
 | 
| 41 | 41 |  | 
| ... | ... | @@ -45,42 +45,48 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa | 
| 45 | 45 |          self._instances[name] = instance
 | 
| 46 | 46 |  | 
| 47 | 47 |      def FindMissingBlobs(self, request, context):
 | 
| 48 | +        self.__logger.debug("FindMissingBlobs request from [%s]", context.peer())
 | |
| 49 | + | |
| 48 | 50 |          try:
 | 
| 49 | -            self.logger.debug("FindMissingBlobs request: [{}]".format(request))
 | |
| 50 | 51 |              instance = self._get_instance(request.instance_name)
 | 
| 51 | 52 |              response = instance.find_missing_blobs(request.blob_digests)
 | 
| 52 | -            self.logger.debug("FindMissingBlobs response: [{}]".format(response))
 | |
| 53 | + | |
| 53 | 54 |              return response
 | 
| 54 | 55 |  | 
| 55 | 56 |          except InvalidArgumentError as e:
 | 
| 56 | -            self.logger.error(e)
 | |
| 57 | +            self.__logger.error(e)
 | |
| 57 | 58 |              context.set_details(str(e))
 | 
| 58 | 59 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 59 | 60 |  | 
| 60 | 61 |          return remote_execution_pb2.FindMissingBlobsResponse()
 | 
| 61 | 62 |  | 
| 62 | 63 |      def BatchUpdateBlobs(self, request, context):
 | 
| 64 | +        self.__logger.debug("BatchUpdateBlobs request from [%s]", context.peer())
 | |
| 65 | + | |
| 63 | 66 |          try:
 | 
| 64 | -            self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
 | |
| 65 | 67 |              instance = self._get_instance(request.instance_name)
 | 
| 66 | 68 |              response = instance.batch_update_blobs(request.requests)
 | 
| 67 | -            self.logger.debug("FindMissingBlobs response: [{}]".format(response))
 | |
| 69 | + | |
| 68 | 70 |              return response
 | 
| 69 | 71 |  | 
| 70 | 72 |          except InvalidArgumentError as e:
 | 
| 71 | -            self.logger.error(e)
 | |
| 73 | +            self.__logger.error(e)
 | |
| 72 | 74 |              context.set_details(str(e))
 | 
| 73 | 75 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 74 | 76 |  | 
| 75 | 77 |          return remote_execution_pb2.BatchReadBlobsResponse()
 | 
| 76 | 78 |  | 
| 77 | 79 |      def BatchReadBlobs(self, request, context):
 | 
| 80 | +        self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
 | |
| 81 | + | |
| 78 | 82 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 79 | 83 |          context.set_details('Method not implemented!')
 | 
| 80 | 84 |  | 
| 81 | 85 |          return remote_execution_pb2.BatchReadBlobsResponse()
 | 
| 82 | 86 |  | 
| 83 | 87 |      def GetTree(self, request, context):
 | 
| 88 | +        self.__logger.debug("GetTree request from [%s]", context.peer())
 | |
| 89 | + | |
| 84 | 90 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 85 | 91 |          context.set_details('Method not implemented!')
 | 
| 86 | 92 |  | 
| ... | ... | @@ -97,7 +103,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa | 
| 97 | 103 |  class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 98 | 104 |  | 
| 99 | 105 |      def __init__(self, server):
 | 
| 100 | -        self.logger = logging.getLogger(__name__)
 | |
| 106 | +        self.__logger = logging.getLogger(__name__)
 | |
| 101 | 107 |  | 
| 102 | 108 |          self._instances = {}
 | 
| 103 | 109 |  | 
| ... | ... | @@ -107,8 +113,9 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): | 
| 107 | 113 |          self._instances[name] = instance
 | 
| 108 | 114 |  | 
| 109 | 115 |      def Read(self, request, context):
 | 
| 116 | +        self.__logger.debug("Read request from [%s]", context.peer())
 | |
| 117 | + | |
| 110 | 118 |          try:
 | 
| 111 | -            self.logger.debug("Read request: [{}]".format(request))
 | |
| 112 | 119 |              path = request.resource_name.split("/")
 | 
| 113 | 120 |              instance_name = path[0]
 | 
| 114 | 121 |  | 
| ... | ... | @@ -131,30 +138,29 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): | 
| 131 | 138 |                                       request.read_limit)
 | 
| 132 | 139 |  | 
| 133 | 140 |          except InvalidArgumentError as e:
 | 
| 134 | -            self.logger.error(e)
 | |
| 141 | +            self.__logger.error(e)
 | |
| 135 | 142 |              context.set_details(str(e))
 | 
| 136 | 143 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 137 | 144 |              yield bytestream_pb2.ReadResponse()
 | 
| 138 | 145 |  | 
| 139 | 146 |          except NotFoundError as e:
 | 
| 140 | -            self.logger.error(e)
 | |
| 147 | +            self.__logger.error(e)
 | |
| 141 | 148 |              context.set_details(str(e))
 | 
| 142 | 149 |              context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 143 | 150 |              yield bytestream_pb2.ReadResponse()
 | 
| 144 | 151 |  | 
| 145 | 152 |          except OutOfRangeError as e:
 | 
| 146 | -            self.logger.error(e)
 | |
| 153 | +            self.__logger.error(e)
 | |
| 147 | 154 |              context.set_details(str(e))
 | 
| 148 | 155 |              context.set_code(grpc.StatusCode.OUT_OF_RANGE)
 | 
| 149 | 156 |              yield bytestream_pb2.ReadResponse()
 | 
| 150 | 157 |  | 
| 151 | -            self.logger.debug("Read finished.")
 | |
| 152 | - | |
| 153 | 158 |      def Write(self, requests, context):
 | 
| 159 | +        self.__logger.debug("Write request from [%s]", context.peer())
 | |
| 160 | + | |
| 154 | 161 |          try:
 | 
| 155 | 162 |              requests, request_probe = tee(requests, 2)
 | 
| 156 | 163 |              first_request = next(request_probe)
 | 
| 157 | -            self.logger.debug("First write request: [{}]".format(first_request))
 | |
| 158 | 164 |  | 
| 159 | 165 |              path = first_request.resource_name.split("/")
 | 
| 160 | 166 |  | 
| ... | ... | @@ -175,21 +181,21 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): | 
| 175 | 181 |  | 
| 176 | 182 |              instance = self._get_instance(instance_name)
 | 
| 177 | 183 |              response = instance.write(requests)
 | 
| 178 | -            self.logger.debug("Write response: [{}]".format(response))
 | |
| 184 | + | |
| 179 | 185 |              return response
 | 
| 180 | 186 |  | 
| 181 | 187 |          except NotImplementedError as e:
 | 
| 182 | -            self.logger.error(e)
 | |
| 188 | +            self.__logger.error(e)
 | |
| 183 | 189 |              context.set_details(str(e))
 | 
| 184 | 190 |              context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 185 | 191 |  | 
| 186 | 192 |          except InvalidArgumentError as e:
 | 
| 187 | -            self.logger.error(e)
 | |
| 193 | +            self.__logger.error(e)
 | |
| 188 | 194 |              context.set_details(str(e))
 | 
| 189 | 195 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 190 | 196 |  | 
| 191 | 197 |          except NotFoundError as e:
 | 
| 192 | -            self.logger.error(e)
 | |
| 198 | +            self.__logger.error(e)
 | |
| 193 | 199 |              context.set_details(str(e))
 | 
| 194 | 200 |              context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 195 | 201 |  | 
| ... | ... | @@ -20,6 +20,7 @@ DiskStorage | 
| 20 | 20 |  A CAS storage provider that stores files as blobs on disk.
 | 
| 21 | 21 |  """
 | 
| 22 | 22 |  | 
| 23 | +import logging
 | |
| 23 | 24 |  import os
 | 
| 24 | 25 |  import tempfile
 | 
| 25 | 26 |  | 
| ... | ... | @@ -29,6 +30,8 @@ from .storage_abc import StorageABC | 
| 29 | 30 |  class DiskStorage(StorageABC):
 | 
| 30 | 31 |  | 
| 31 | 32 |      def __init__(self, path):
 | 
| 33 | +        self.__logger = logging.getLogger(__name__)
 | |
| 34 | + | |
| 32 | 35 |          if not os.path.isabs(path):
 | 
| 33 | 36 |              self.__root_path = os.path.abspath(path)
 | 
| 34 | 37 |          else:
 | 
| ... | ... | @@ -43,6 +43,8 @@ class _NullBytesIO(io.BufferedIOBase): | 
| 43 | 43 |  class LRUMemoryCache(StorageABC):
 | 
| 44 | 44 |  | 
| 45 | 45 |      def __init__(self, limit):
 | 
| 46 | +        self.__logger = logging.getLogger(__name__)
 | |
| 47 | + | |
| 46 | 48 |          self._limit = limit
 | 
| 47 | 49 |          self._storage = collections.OrderedDict()
 | 
| 48 | 50 |          self._bytes_stored = 0
 | 
| ... | ... | @@ -35,7 +35,7 @@ from .storage_abc import StorageABC | 
| 35 | 35 |  class RemoteStorage(StorageABC):
 | 
| 36 | 36 |  | 
| 37 | 37 |      def __init__(self, channel, instance_name):
 | 
| 38 | -        self.logger = logging.getLogger(__name__)
 | |
| 38 | +        self.__logger = logging.getLogger(__name__)
 | |
| 39 | 39 |  | 
| 40 | 40 |          self.instance_name = instance_name
 | 
| 41 | 41 |          self.channel = channel
 | 
| ... | ... | @@ -21,6 +21,7 @@ A storage provider that stores data in an Amazon S3 bucket. | 
| 21 | 21 |  """
 | 
| 22 | 22 |  | 
| 23 | 23 |  import io
 | 
| 24 | +import logging
 | |
| 24 | 25 |  | 
| 25 | 26 |  import boto3
 | 
| 26 | 27 |  from botocore.exceptions import ClientError
 | 
| ... | ... | @@ -31,6 +32,8 @@ from .storage_abc import StorageABC | 
| 31 | 32 |  class S3Storage(StorageABC):
 | 
| 32 | 33 |  | 
| 33 | 34 |      def __init__(self, bucket, **kwargs):
 | 
| 35 | +        self.__logger = logging.getLogger(__name__)
 | |
| 36 | + | |
| 34 | 37 |          self._bucket = bucket
 | 
| 35 | 38 |          self._s3 = boto3.resource('s3', **kwargs)
 | 
| 36 | 39 |  | 
| ... | ... | @@ -26,6 +26,7 @@ the fallback. | 
| 26 | 26 |  """
 | 
| 27 | 27 |  | 
| 28 | 28 |  import io
 | 
| 29 | +import logging
 | |
| 29 | 30 |  | 
| 30 | 31 |  from .storage_abc import StorageABC
 | 
| 31 | 32 |  | 
| ... | ... | @@ -118,6 +119,8 @@ class _CachingTee(io.RawIOBase): | 
| 118 | 119 |  class WithCacheStorage(StorageABC):
 | 
| 119 | 120 |  | 
| 120 | 121 |      def __init__(self, cache, fallback):
 | 
| 122 | +        self.__logger = logging.getLogger(__name__)
 | |
| 123 | + | |
| 121 | 124 |          self._cache = cache
 | 
| 122 | 125 |          self._fallback = fallback
 | 
| 123 | 126 |  | 
| ... | ... | @@ -37,9 +37,9 @@ from .operations.instance import OperationsInstance | 
| 37 | 37 |  class ExecutionController:
 | 
| 38 | 38 |  | 
| 39 | 39 |      def __init__(self, action_cache=None, storage=None):
 | 
| 40 | -        scheduler = Scheduler(action_cache)
 | |
| 40 | +        self.__logger = logging.getLogger(__name__)
 | |
| 41 | 41 |  | 
| 42 | -        self.logger = logging.getLogger(__name__)
 | |
| 42 | +        scheduler = Scheduler(action_cache)
 | |
| 43 | 43 |  | 
| 44 | 44 |          self._execution_instance = ExecutionInstance(scheduler, storage)
 | 
| 45 | 45 |          self._bots_interface = BotsInterface(scheduler)
 | 
| ... | ... | @@ -30,7 +30,8 @@ from ..job import Job | 
| 30 | 30 |  class ExecutionInstance:
 | 
| 31 | 31 |  | 
| 32 | 32 |      def __init__(self, scheduler, storage):
 | 
| 33 | -        self.logger = logging.getLogger(__name__)
 | |
| 33 | +        self.__logger = logging.getLogger(__name__)
 | |
| 34 | + | |
| 34 | 35 |          self._storage = storage
 | 
| 35 | 36 |          self._scheduler = scheduler
 | 
| 36 | 37 |  | 
| ... | ... | @@ -33,29 +33,60 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 33 | 33 |  | 
| 34 | 34 |  class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 | 
| 35 | 35 |  | 
| 36 | -    def __init__(self, server):
 | |
| 37 | -        self.logger = logging.getLogger(__name__)
 | |
| 36 | +    def __init__(self, server, monitor=True):
 | |
| 37 | +        self.__logger = logging.getLogger(__name__)
 | |
| 38 | + | |
| 39 | +        self.__peers_by_instance = {}
 | |
| 40 | +        self.__peers = {}
 | |
| 41 | + | |
| 38 | 42 |          self._instances = {}
 | 
| 43 | +        self._is_monitored = True
 | |
| 44 | + | |
| 39 | 45 |          remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
 | 
| 40 | 46 |  | 
| 41 | -    def add_instance(self, name, instance):
 | |
| 42 | -        self._instances[name] = instance
 | |
| 47 | +    # --- Public API ---
 | |
| 48 | + | |
| 49 | +    def add_instance(self, instance_name, instance):
 | |
| 50 | +        self._instances[instance_name] = instance
 | |
| 51 | + | |
| 52 | +        if self._is_monitored:
 | |
| 53 | +            self.__peers_by_instance[instance_name] = set()
 | |
| 54 | + | |
| 55 | +    # --- Public API: Servicer ---
 | |
| 43 | 56 |  | 
| 44 | 57 |      def Execute(self, request, context):
 | 
| 58 | +        """Handles ExecuteRequest messages.
 | |
| 59 | + | |
| 60 | +        Args:
 | |
| 61 | +            request (ExecuteRequest): The incoming RPC request.
 | |
| 62 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 63 | +        """
 | |
| 64 | +        self.__logger.debug("Execute request from [%s]", context.peer())
 | |
| 65 | + | |
| 66 | +        instance_name = request.instance_name
 | |
| 67 | +        message_queue = queue.Queue()
 | |
| 68 | +        peer = context.peer()
 | |
| 69 | + | |
| 45 | 70 |          try:
 | 
| 46 | -            message_queue = queue.Queue()
 | |
| 47 | -            instance = self._get_instance(request.instance_name)
 | |
| 71 | +            instance = self._get_instance(instance_name)
 | |
| 48 | 72 |              operation = instance.execute(request.action_digest,
 | 
| 49 | 73 |                                           request.skip_cache_lookup,
 | 
| 50 | 74 |                                           message_queue)
 | 
| 51 | 75 |  | 
| 52 | -            context.add_callback(partial(instance.unregister_message_client,
 | |
| 53 | -                                         operation.name, message_queue))
 | |
| 76 | +            context.add_callback(partial(self._rpc_termination_callback,
 | |
| 77 | +                                         peer, instance_name, operation.name, message_queue))
 | |
| 54 | 78 |  | 
| 55 | -            instanced_op_name = "{}/{}".format(request.instance_name,
 | |
| 56 | -                                               operation.name)
 | |
| 79 | +            if self._is_monitored:
 | |
| 80 | +                if peer in self.__peers:
 | |
| 81 | +                    self.__peers[peer] += 1
 | |
| 82 | +                else:
 | |
| 83 | +                    self.__peers[peer] = 1
 | |
| 57 | 84 |  | 
| 58 | -            self.logger.info("Operation name: [{}]".format(instanced_op_name))
 | |
| 85 | +                self.__peers_by_instance[instance_name].add(peer)
 | |
| 86 | + | |
| 87 | +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
 | |
| 88 | + | |
| 89 | +            self.__logger.info("Operation name: [%s]", instanced_op_name)
 | |
| 59 | 90 |  | 
| 60 | 91 |              for operation in instance.stream_operation_updates(message_queue,
 | 
| 61 | 92 |                                                                 operation.name):
 | 
| ... | ... | @@ -65,33 +96,48 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 65 | 96 |                  yield op
 | 
| 66 | 97 |  | 
| 67 | 98 |          except InvalidArgumentError as e:
 | 
| 68 | -            self.logger.error(e)
 | |
| 99 | +            self.__logger.error(e)
 | |
| 69 | 100 |              context.set_details(str(e))
 | 
| 70 | 101 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 71 | 102 |              yield operations_pb2.Operation()
 | 
| 72 | 103 |  | 
| 73 | 104 |          except FailedPreconditionError as e:
 | 
| 74 | -            self.logger.error(e)
 | |
| 105 | +            self.__logger.error(e)
 | |
| 75 | 106 |              context.set_details(str(e))
 | 
| 76 | 107 |              context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 77 | 108 |              yield operations_pb2.Operation()
 | 
| 78 | 109 |  | 
| 79 | 110 |      def WaitExecution(self, request, context):
 | 
| 80 | -        try:
 | |
| 81 | -            names = request.name.split("/")
 | |
| 111 | +        """Handles WaitExecutionRequest messages.
 | |
| 82 | 112 |  | 
| 83 | -            # Operation name should be in format:
 | |
| 84 | -            # {instance/name}/{operation_id}
 | |
| 85 | -            instance_name = ''.join(names[0:-1])
 | |
| 113 | +        Args:
 | |
| 114 | +            request (WaitExecutionRequest): The incoming RPC request.
 | |
| 115 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 116 | +        """
 | |
| 117 | +        self.__logger.debug("WaitExecution request from [%s]", context.peer())
 | |
| 118 | + | |
| 119 | +        names = request.name.split('/')
 | |
| 120 | +        instance_name = '/'.join(names[:-1])
 | |
| 121 | +        operation_name = names[-1]
 | |
| 122 | +        message_queue = queue.Queue()
 | |
| 123 | +        peer = context.peer()
 | |
| 124 | + | |
| 125 | +        try:
 | |
| 126 | +            if instance_name != request.instance_name:
 | |
| 127 | +                raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
 | |
| 128 | +                                            .format(request.name, instance_name))
 | |
| 86 | 129 |  | 
| 87 | -            message_queue = queue.Queue()
 | |
| 88 | -            operation_name = names[-1]
 | |
| 89 | 130 |              instance = self._get_instance(instance_name)
 | 
| 90 | 131 |  | 
| 91 | 132 |              instance.register_message_client(operation_name, message_queue)
 | 
| 133 | +            context.add_callback(partial(self._rpc_termination_callback,
 | |
| 134 | +                                         peer, instance_name, operation_name, message_queue))
 | |
| 92 | 135 |  | 
| 93 | -            context.add_callback(partial(instance.unregister_message_client,
 | |
| 94 | -                                         operation_name, message_queue))
 | |
| 136 | +            if self._is_monitored:
 | |
| 137 | +                if peer in self.__peers:
 | |
| 138 | +                    self.__peers[peer] += 1
 | |
| 139 | +                else:
 | |
| 140 | +                    self.__peers[peer] = 1
 | |
| 95 | 141 |  | 
| 96 | 142 |              for operation in instance.stream_operation_updates(message_queue,
 | 
| 97 | 143 |                                                                 operation_name):
 | 
| ... | ... | @@ -101,11 +147,39 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 101 | 147 |                  yield op
 | 
| 102 | 148 |  | 
| 103 | 149 |          except InvalidArgumentError as e:
 | 
| 104 | -            self.logger.error(e)
 | |
| 150 | +            self.__logger.error(e)
 | |
| 105 | 151 |              context.set_details(str(e))
 | 
| 106 | 152 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 107 | 153 |              yield operations_pb2.Operation()
 | 
| 108 | 154 |  | 
| 155 | +    # --- Public API: Monitoring ---
 | |
| 156 | + | |
| 157 | +    @property
 | |
| 158 | +    def is_monitored(self):
 | |
| 159 | +        return self._is_monitored
 | |
| 160 | + | |
| 161 | +    def query_n_clients(self):
 | |
| 162 | +        return len(self.__peers)
 | |
| 163 | + | |
| 164 | +    def query_n_clients_for_instance(self, instance_name):
 | |
| 165 | +        try:
 | |
| 166 | +            return len(self.__peers_by_instance[instance_name])
 | |
| 167 | +        except KeyError:
 | |
| 168 | +            return 0
 | |
| 169 | + | |
| 170 | +    # --- Private API ---
 | |
| 171 | + | |
| 172 | +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
 | |
| 173 | +        instance = self._get_instance(instance_name)
 | |
| 174 | + | |
| 175 | +        instance.unregister_message_client(job_name, message_queue)
 | |
| 176 | + | |
| 177 | +        if self._is_monitored:
 | |
| 178 | +            if self.__peers[peer] > 1:
 | |
| 179 | +                self.__peers[peer] -= 1
 | |
| 180 | +            else:
 | |
| 181 | +                del self.__peers[peer]
 | |
| 182 | + | |
| 109 | 183 |      def _get_instance(self, name):
 | 
| 110 | 184 |          try:
 | 
| 111 | 185 |              return self._instances[name]
 | 
| ... | ... | @@ -13,18 +13,21 @@ | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | 14 |  | 
| 15 | 15 |  | 
| 16 | +import asyncio
 | |
| 17 | +from concurrent import futures
 | |
| 16 | 18 |  import logging
 | 
| 17 | 19 |  import os
 | 
| 18 | -from concurrent import futures
 | |
| 20 | +import time
 | |
| 19 | 21 |  | 
| 20 | 22 |  import grpc
 | 
| 21 | 23 |  | 
| 22 | -from .cas.service import ByteStreamService, ContentAddressableStorageService
 | |
| 23 | -from .actioncache.service import ActionCacheService
 | |
| 24 | -from .execution.service import ExecutionService
 | |
| 25 | -from .operations.service import OperationsService
 | |
| 26 | -from .bots.service import BotsService
 | |
| 27 | -from .referencestorage.service import ReferenceStorageService
 | |
| 24 | +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 | |
| 25 | +from buildgrid.server.actioncache.service import ActionCacheService
 | |
| 26 | +from buildgrid.server.execution.service import ExecutionService
 | |
| 27 | +from buildgrid.server.operations.service import OperationsService
 | |
| 28 | +from buildgrid.server.bots.service import BotsService
 | |
| 29 | +from buildgrid.server.referencestorage.service import ReferenceStorageService
 | |
| 30 | +from buildgrid.settings import MONITORING_PERIOD
 | |
| 28 | 31 |  | 
| 29 | 32 |  | 
| 30 | 33 |  class BuildGridServer:
 | 
| ... | ... | @@ -40,16 +43,17 @@ class BuildGridServer: | 
| 40 | 43 |          Args:
 | 
| 41 | 44 |              max_workers (int, optional): A pool of max worker threads.
 | 
| 42 | 45 |          """
 | 
| 43 | - | |
| 44 | -        self.logger = logging.getLogger(__name__)
 | |
| 46 | +        self.__logger = logging.getLogger(__name__)
 | |
| 45 | 47 |  | 
| 46 | 48 |          if max_workers is None:
 | 
| 47 | 49 |              # Use max_workers default from Python 3.5+
 | 
| 48 | 50 |              max_workers = (os.cpu_count() or 1) * 5
 | 
| 49 | 51 |  | 
| 50 | -        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | |
| 52 | +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
 | |
| 53 | +        self.__grpc_server = grpc.server(self.__grpc_executor)
 | |
| 51 | 54 |  | 
| 52 | -        self._server = server
 | |
| 55 | +        self.__main_loop = asyncio.get_event_loop()
 | |
| 56 | +        self.__monitoring_task = None
 | |
| 53 | 57 |  | 
| 54 | 58 |          self._execution_service = None
 | 
| 55 | 59 |          self._bots_service = None
 | 
| ... | ... | @@ -59,15 +63,33 @@ class BuildGridServer: | 
| 59 | 63 |          self._cas_service = None
 | 
| 60 | 64 |          self._bytestream_service = None
 | 
| 61 | 65 |  | 
| 66 | +        self.__execution_instances = []
 | |
| 67 | +        self.__bots_instances = []
 | |
| 68 | + | |
| 69 | +    # --- Public API ---
 | |
| 70 | + | |
| 62 | 71 |      def start(self):
 | 
| 63 | -        """Starts the server.
 | |
| 72 | +        """Starts the BuildGrid server.
 | |
| 64 | 73 |          """
 | 
| 65 | -        self._server.start()
 | |
| 74 | +        self.__grpc_server.start()
 | |
| 75 | + | |
| 76 | +        self.__monitoring_task = asyncio.ensure_future(
 | |
| 77 | +            self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
 | |
| 78 | +        self.__main_loop.run_forever()
 | |
| 66 | 79 |  | 
| 67 | 80 |      def stop(self, grace=0):
 | 
| 68 | -        """Stops the server.
 | |
| 81 | +        """Stops the BuildGrid server.
 | |
| 82 | + | |
| 83 | +        Args:
 | |
| 84 | +            grace (int, optional): A duration of time in seconds. Defaults to 0.
 | |
| 69 | 85 |          """
 | 
| 70 | -        self._server.stop(grace)
 | |
| 86 | +        if self.__monitoring_task is not None:
 | |
| 87 | +            self.__monitoring_task.cancel()
 | |
| 88 | + | |
| 89 | +        self.__grpc_server.stop(grace)
 | |
| 90 | + | |
| 91 | +        if grace > 0:
 | |
| 92 | +            time.sleep(grace)
 | |
| 71 | 93 |  | 
| 72 | 94 |      def add_port(self, address, credentials):
 | 
| 73 | 95 |          """Adds a port to the server.
 | 
| ... | ... | @@ -80,12 +102,12 @@ class BuildGridServer: | 
| 80 | 102 |              credentials (:obj:`grpc.ChannelCredentials`): Credentials object.
 | 
| 81 | 103 |          """
 | 
| 82 | 104 |          if credentials is not None:
 | 
| 83 | -            self.logger.info("Adding secure connection on: [{}]".format(address))
 | |
| 84 | -            self._server.add_secure_port(address, credentials)
 | |
| 105 | +            self.__logger.info("Adding secure connection on: [%s]", address)
 | |
| 106 | +            self.__grpc_server.add_secure_port(address, credentials)
 | |
| 85 | 107 |  | 
| 86 | 108 |          else:
 | 
| 87 | -            self.logger.info("Adding insecure connection on [{}]".format(address))
 | |
| 88 | -            self._server.add_insecure_port(address)
 | |
| 109 | +            self.__logger.info("Adding insecure connection on [%s]", address)
 | |
| 110 | +            self.__grpc_server.add_insecure_port(address)
 | |
| 89 | 111 |  | 
| 90 | 112 |      def add_execution_instance(self, instance, instance_name):
 | 
| 91 | 113 |          """Adds an :obj:`ExecutionInstance` to the service.
 | 
| ... | ... | @@ -97,10 +119,11 @@ class BuildGridServer: | 
| 97 | 119 |              instance_name (str): Instance name.
 | 
| 98 | 120 |          """
 | 
| 99 | 121 |          if self._execution_service is None:
 | 
| 100 | -            self._execution_service = ExecutionService(self._server)
 | |
| 101 | - | |
| 122 | +            self._execution_service = ExecutionService(self.__grpc_server)
 | |
| 102 | 123 |          self._execution_service.add_instance(instance_name, instance)
 | 
| 103 | 124 |  | 
| 125 | +        self.__execution_instances.append(instance_name)
 | |
| 126 | + | |
| 104 | 127 |      def add_bots_interface(self, instance, instance_name):
 | 
| 105 | 128 |          """Adds a :obj:`BotsInterface` to the service.
 | 
| 106 | 129 |  | 
| ... | ... | @@ -111,10 +134,11 @@ class BuildGridServer: | 
| 111 | 134 |              instance_name (str): Instance name.
 | 
| 112 | 135 |          """
 | 
| 113 | 136 |          if self._bots_service is None:
 | 
| 114 | -            self._bots_service = BotsService(self._server)
 | |
| 115 | - | |
| 137 | +            self._bots_service = BotsService(self.__grpc_server)
 | |
| 116 | 138 |          self._bots_service.add_instance(instance_name, instance)
 | 
| 117 | 139 |  | 
| 140 | +        self.__bots_instances.append(instance_name)
 | |
| 141 | + | |
| 118 | 142 |      def add_operations_instance(self, instance, instance_name):
 | 
| 119 | 143 |          """Adds an :obj:`OperationsInstance` to the service.
 | 
| 120 | 144 |  | 
| ... | ... | @@ -125,8 +149,7 @@ class BuildGridServer: | 
| 125 | 149 |              instance_name (str): Instance name.
 | 
| 126 | 150 |          """
 | 
| 127 | 151 |          if self._operations_service is None:
 | 
| 128 | -            self._operations_service = OperationsService(self._server)
 | |
| 129 | - | |
| 152 | +            self._operations_service = OperationsService(self.__grpc_server)
 | |
| 130 | 153 |          self._operations_service.add_instance(instance_name, instance)
 | 
| 131 | 154 |  | 
| 132 | 155 |      def add_reference_storage_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -139,8 +162,7 @@ class BuildGridServer: | 
| 139 | 162 |              instance_name (str): Instance name.
 | 
| 140 | 163 |          """
 | 
| 141 | 164 |          if self._reference_storage_service is None:
 | 
| 142 | -            self._reference_storage_service = ReferenceStorageService(self._server)
 | |
| 143 | - | |
| 165 | +            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
 | |
| 144 | 166 |          self._reference_storage_service.add_instance(instance_name, instance)
 | 
| 145 | 167 |  | 
| 146 | 168 |      def add_action_cache_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -153,8 +175,7 @@ class BuildGridServer: | 
| 153 | 175 |              instance_name (str): Instance name.
 | 
| 154 | 176 |          """
 | 
| 155 | 177 |          if self._action_cache_service is None:
 | 
| 156 | -            self._action_cache_service = ActionCacheService(self._server)
 | |
| 157 | - | |
| 178 | +            self._action_cache_service = ActionCacheService(self.__grpc_server)
 | |
| 158 | 179 |          self._action_cache_service.add_instance(instance_name, instance)
 | 
| 159 | 180 |  | 
| 160 | 181 |      def add_cas_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -167,8 +188,7 @@ class BuildGridServer: | 
| 167 | 188 |              instance_name (str): Instance name.
 | 
| 168 | 189 |          """
 | 
| 169 | 190 |          if self._cas_service is None:
 | 
| 170 | -            self._cas_service = ContentAddressableStorageService(self._server)
 | |
| 171 | - | |
| 191 | +            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
 | |
| 172 | 192 |          self._cas_service.add_instance(instance_name, instance)
 | 
| 173 | 193 |  | 
| 174 | 194 |      def add_bytestream_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -181,6 +201,23 @@ class BuildGridServer: | 
| 181 | 201 |              instance_name (str): Instance name.
 | 
| 182 | 202 |          """
 | 
| 183 | 203 |          if self._bytestream_service is None:
 | 
| 184 | -            self._bytestream_service = ByteStreamService(self._server)
 | |
| 185 | - | |
| 204 | +            self._bytestream_service = ByteStreamService(self.__grpc_server)
 | |
| 186 | 205 |          self._bytestream_service.add_instance(instance_name, instance)
 | 
| 206 | + | |
| 207 | +    # --- Private API ---
 | |
| 208 | + | |
| 209 | +    async def _monitoring_worker(self, period=1):
 | |
| 210 | +        while True:
 | |
| 211 | +            try:
 | |
| 212 | +                n_clients = self._execution_service.query_n_clients()
 | |
| 213 | +                n_bots = self._bots_service.query_n_bots()
 | |
| 214 | + | |
| 215 | +                print('---')
 | |
| 216 | +                print('n_clients={}'.format(n_clients))
 | |
| 217 | +                print('n_bots={}'.format(n_bots))
 | |
| 218 | + | |
| 219 | +                await asyncio.sleep(period)
 | |
| 220 | +            except asyncio.CancelledError:
 | |
| 221 | +                break
 | |
| 222 | + | |
| 223 | +        self.__main_loop.stop() | 
| ... | ... | @@ -27,7 +27,7 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 27 | 27 |  class Job:
 | 
| 28 | 28 |  | 
| 29 | 29 |      def __init__(self, action, action_digest):
 | 
| 30 | -        self.logger = logging.getLogger(__name__)
 | |
| 30 | +        self.__logger = logging.getLogger(__name__)
 | |
| 31 | 31 |  | 
| 32 | 32 |          self._name = str(uuid.uuid4())
 | 
| 33 | 33 |          self._action = remote_execution_pb2.Action()
 | 
| ... | ... | @@ -28,7 +28,8 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 28 | 28 |  class OperationsInstance:
 | 
| 29 | 29 |  | 
| 30 | 30 |      def __init__(self, scheduler):
 | 
| 31 | -        self.logger = logging.getLogger(__name__)
 | |
| 31 | +        self.__logger = logging.getLogger(__name__)
 | |
| 32 | + | |
| 32 | 33 |          self._scheduler = scheduler
 | 
| 33 | 34 |  | 
| 34 | 35 |      def register_instance_with_server(self, instance_name, server):
 | 
| ... | ... | @@ -32,7 +32,7 @@ from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations | 
| 32 | 32 |  class OperationsService(operations_pb2_grpc.OperationsServicer):
 | 
| 33 | 33 |  | 
| 34 | 34 |      def __init__(self, server):
 | 
| 35 | -        self.logger = logging.getLogger(__name__)
 | |
| 35 | +        self.__logger = logging.getLogger(__name__)
 | |
| 36 | 36 |  | 
| 37 | 37 |          self._instances = {}
 | 
| 38 | 38 |  | 
| ... | ... | @@ -42,6 +42,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 42 | 42 |          self._instances[name] = instance
 | 
| 43 | 43 |  | 
| 44 | 44 |      def GetOperation(self, request, context):
 | 
| 45 | +        self.__logger.debug("GetOperation request from [%s]", context.peer())
 | |
| 46 | + | |
| 45 | 47 |          try:
 | 
| 46 | 48 |              name = request.name
 | 
| 47 | 49 |  | 
| ... | ... | @@ -56,13 +58,15 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 56 | 58 |              return op
 | 
| 57 | 59 |  | 
| 58 | 60 |          except InvalidArgumentError as e:
 | 
| 59 | -            self.logger.error(e)
 | |
| 61 | +            self.__logger.error(e)
 | |
| 60 | 62 |              context.set_details(str(e))
 | 
| 61 | 63 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 62 | 64 |  | 
| 63 | 65 |          return operations_pb2.Operation()
 | 
| 64 | 66 |  | 
| 65 | 67 |      def ListOperations(self, request, context):
 | 
| 68 | +        self.__logger.debug("ListOperations request from [%s]", context.peer())
 | |
| 69 | + | |
| 66 | 70 |          try:
 | 
| 67 | 71 |              # The request name should be the collection name
 | 
| 68 | 72 |              # In our case, this is just the instance_name
 | 
| ... | ... | @@ -79,13 +83,15 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 79 | 83 |              return result
 | 
| 80 | 84 |  | 
| 81 | 85 |          except InvalidArgumentError as e:
 | 
| 82 | -            self.logger.error(e)
 | |
| 86 | +            self.__logger.error(e)
 | |
| 83 | 87 |              context.set_details(str(e))
 | 
| 84 | 88 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 85 | 89 |  | 
| 86 | 90 |          return operations_pb2.ListOperationsResponse()
 | 
| 87 | 91 |  | 
| 88 | 92 |      def DeleteOperation(self, request, context):
 | 
| 93 | +        self.__logger.debug("DeleteOperation request from [%s]", context.peer())
 | |
| 94 | + | |
| 89 | 95 |          try:
 | 
| 90 | 96 |              name = request.name
 | 
| 91 | 97 |  | 
| ... | ... | @@ -96,13 +102,15 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 96 | 102 |              instance.delete_operation(operation_name)
 | 
| 97 | 103 |  | 
| 98 | 104 |          except InvalidArgumentError as e:
 | 
| 99 | -            self.logger.error(e)
 | |
| 105 | +            self.__logger.error(e)
 | |
| 100 | 106 |              context.set_details(str(e))
 | 
| 101 | 107 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 102 | 108 |  | 
| 103 | 109 |          return Empty()
 | 
| 104 | 110 |  | 
| 105 | 111 |      def CancelOperation(self, request, context):
 | 
| 112 | +        self.__logger.debug("CancelOperation request from [%s]", context.peer())
 | |
| 113 | + | |
| 106 | 114 |          try:
 | 
| 107 | 115 |              name = request.name
 | 
| 108 | 116 |  | 
| ... | ... | @@ -113,12 +121,12 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 113 | 121 |              instance.cancel_operation(operation_name)
 | 
| 114 | 122 |  | 
| 115 | 123 |          except NotImplementedError as e:
 | 
| 116 | -            self.logger.error(e)
 | |
| 124 | +            self.__logger.error(e)
 | |
| 117 | 125 |              context.set_details(str(e))
 | 
| 118 | 126 |              context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 119 | 127 |  | 
| 120 | 128 |          except InvalidArgumentError as e:
 | 
| 121 | -            self.logger.error(e)
 | |
| 129 | +            self.__logger.error(e)
 | |
| 122 | 130 |              context.set_details(str(e))
 | 
| 123 | 131 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 124 | 132 |  | 
| ... | ... | @@ -25,7 +25,7 @@ from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc | 
| 25 | 25 |  class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
 | 
| 26 | 26 |  | 
| 27 | 27 |      def __init__(self, server):
 | 
| 28 | -        self.logger = logging.getLogger(__name__)
 | |
| 28 | +        self.__logger = logging.getLogger(__name__)
 | |
| 29 | 29 |  | 
| 30 | 30 |          self._instances = {}
 | 
| 31 | 31 |  | 
| ... | ... | @@ -35,6 +35,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 35 | 35 |          self._instances[name] = instance
 | 
| 36 | 36 |  | 
| 37 | 37 |      def GetReference(self, request, context):
 | 
| 38 | +        self.__logger.debug("GetReference request from [%s]", context.peer())
 | |
| 39 | + | |
| 38 | 40 |          try:
 | 
| 39 | 41 |              instance = self._get_instance(request.instance_name)
 | 
| 40 | 42 |              digest = instance.get_digest_reference(request.key)
 | 
| ... | ... | @@ -43,17 +45,19 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 43 | 45 |              return response
 | 
| 44 | 46 |  | 
| 45 | 47 |          except InvalidArgumentError as e:
 | 
| 46 | -            self.logger.error(e)
 | |
| 48 | +            self.__logger.error(e)
 | |
| 47 | 49 |              context.set_details(str(e))
 | 
| 48 | 50 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 49 | 51 |  | 
| 50 | 52 |          except NotFoundError as e:
 | 
| 51 | -            self.logger.debug(e)
 | |
| 53 | +            self.__logger.debug(e)
 | |
| 52 | 54 |              context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 53 | 55 |  | 
| 54 | 56 |          return buildstream_pb2.GetReferenceResponse()
 | 
| 55 | 57 |  | 
| 56 | 58 |      def UpdateReference(self, request, context):
 | 
| 59 | +        self.__logger.debug("UpdateReference request from [%s]", context.peer())
 | |
| 60 | + | |
| 57 | 61 |          try:
 | 
| 58 | 62 |              instance = self._get_instance(request.instance_name)
 | 
| 59 | 63 |              digest = request.digest
 | 
| ... | ... | @@ -62,7 +66,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 62 | 66 |                  instance.update_reference(key, digest)
 | 
| 63 | 67 |  | 
| 64 | 68 |          except InvalidArgumentError as e:
 | 
| 65 | -            self.logger.error(e)
 | |
| 69 | +            self.__logger.error(e)
 | |
| 66 | 70 |              context.set_details(str(e))
 | 
| 67 | 71 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 68 | 72 |  | 
| ... | ... | @@ -72,13 +76,15 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 72 | 76 |          return buildstream_pb2.UpdateReferenceResponse()
 | 
| 73 | 77 |  | 
| 74 | 78 |      def Status(self, request, context):
 | 
| 79 | +        self.__logger.debug("Status request from [%s]", context.peer())
 | |
| 80 | + | |
| 75 | 81 |          try:
 | 
| 76 | 82 |              instance = self._get_instance(request.instance_name)
 | 
| 77 | 83 |              allow_updates = instance.allow_updates
 | 
| 78 | 84 |              return buildstream_pb2.StatusResponse(allow_updates=allow_updates)
 | 
| 79 | 85 |  | 
| 80 | 86 |          except InvalidArgumentError as e:
 | 
| 81 | -            self.logger.error(e)
 | |
| 87 | +            self.__logger.error(e)
 | |
| 82 | 88 |              context.set_details(str(e))
 | 
| 83 | 89 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 84 | 90 |  | 
| ... | ... | @@ -23,6 +23,7 @@ For a given key, it | 
| 23 | 23 |  """
 | 
| 24 | 24 |  | 
| 25 | 25 |  import collections
 | 
| 26 | +import logging
 | |
| 26 | 27 |  | 
| 27 | 28 |  from buildgrid._exceptions import NotFoundError
 | 
| 28 | 29 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| ... | ... | @@ -38,6 +39,8 @@ class ReferenceCache: | 
| 38 | 39 |              max_cached_refs (int): maximum number of entries to be stored.
 | 
| 39 | 40 |              allow_updates (bool): allow the client to write to storage
 | 
| 40 | 41 |          """
 | 
| 42 | +        self.__logger = logging.getLogger(__name__)
 | |
| 43 | + | |
| 41 | 44 |          self._allow_updates = allow_updates
 | 
| 42 | 45 |          self._storage = storage
 | 
| 43 | 46 |          self._max_cached_refs = max_cached_refs
 | 
| ... | ... | @@ -20,21 +20,33 @@ Schedules jobs. | 
| 20 | 20 |  """
 | 
| 21 | 21 |  | 
| 22 | 22 |  from collections import deque
 | 
| 23 | +import logging
 | |
| 23 | 24 |  | 
| 24 | -from buildgrid._exceptions import NotFoundError
 | |
| 25 | +from google.protobuf import duration_pb2
 | |
| 25 | 26 |  | 
| 26 | -from .job import OperationStage, LeaseState
 | |
| 27 | +from buildgrid._enums import LeaseState, OperationStage
 | |
| 28 | +from buildgrid._exceptions import NotFoundError
 | |
| 27 | 29 |  | 
| 28 | 30 |  | 
| 29 | 31 |  class Scheduler:
 | 
| 30 | 32 |  | 
| 31 | 33 |      MAX_N_TRIES = 5
 | 
| 32 | 34 |  | 
| 33 | -    def __init__(self, action_cache=None):
 | |
| 35 | +    def __init__(self, action_cache=None, monitor=True):
 | |
| 36 | +        self.__logger = logging.getLogger(__name__)
 | |
| 37 | + | |
| 38 | +        self.__queue_times_by_priority = {}
 | |
| 39 | +        self.__queue_time = duration_pb2.Duration()
 | |
| 40 | +        self.__retries_by_error = {}
 | |
| 41 | +        self.__retries_count = 0
 | |
| 42 | + | |
| 34 | 43 |          self._action_cache = action_cache
 | 
| 44 | +        self._is_monitored = True
 | |
| 35 | 45 |          self.jobs = {}
 | 
| 36 | 46 |          self.queue = deque()
 | 
| 37 | 47 |  | 
| 48 | +    # --- Public API ---
 | |
| 49 | + | |
| 38 | 50 |      def register_client(self, job_name, queue):
 | 
| 39 | 51 |          self.jobs[job_name].register_client(queue)
 | 
| 40 | 52 |  | 
| ... | ... | @@ -133,3 +145,42 @@ class Scheduler: | 
| 133 | 145 |      def get_job_operation(self, job_name):
 | 
| 134 | 146 |          """Returns the operation associated to job."""
 | 
| 135 | 147 |          return self.jobs[job_name].operation
 | 
| 148 | + | |
| 149 | +    # --- Public API: Monitoring ---
 | |
| 150 | + | |
| 151 | +    @property
 | |
| 152 | +    def is_monitored(self):
 | |
| 153 | +        return self._is_monitored
 | |
| 154 | + | |
| 155 | +    def query_n_jobs(self):
 | |
| 156 | +        return len(self.jobs)
 | |
| 157 | + | |
| 158 | +    def query_n_operations(self):
 | |
| 159 | +        return len(self.jobs)
 | |
| 160 | + | |
| 161 | +    def query_n_operations_by_stage(self):
 | |
| 162 | +        return len(self.jobs)
 | |
| 163 | + | |
| 164 | +    def query_n_leases(self):
 | |
| 165 | +        return len(self.jobs)
 | |
| 166 | + | |
| 167 | +    def query_n_leases_by_state(self):
 | |
| 168 | +        return len(self.jobs)
 | |
| 169 | + | |
| 170 | +    def query_n_retries(self):
 | |
| 171 | +        return self.__retries_count
 | |
| 172 | + | |
| 173 | +    def query_n_retries_for_error(self, error_type):
 | |
| 174 | +        try:
 | |
| 175 | +            return self.__retries_by_error[error_type]
 | |
| 176 | +        except KeyError:
 | |
| 177 | +            return 0
 | |
| 178 | + | |
| 179 | +    def query_am_queue_time(self):
 | |
| 180 | +        return self.__average_queue_time
 | |
| 181 | + | |
| 182 | +    def query_am_queue_time_for_priority(self, priority_level):
 | |
| 183 | +        try:
 | |
| 184 | +            return self.__queue_times_by_priority[priority_level]
 | |
| 185 | +        except KeyError:
 | |
| 186 | +            return 0 | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 1 | 16 |  import hashlib
 | 
| 2 | 17 |  | 
| 3 | 18 |  | 
| 4 | -# The hash function that CAS uses
 | |
| 19 | +# Hash function used for computing digests:
 | |
| 5 | 20 |  HASH = hashlib.sha256
 | 
| 21 | + | |
| 22 | +# Lenght in bytes of a hash string returned by HASH:
 | |
| 6 | 23 |  HASH_LENGTH = HASH().digest_size * 2
 | 
| 24 | + | |
| 25 | +# Period, in seconds, for the monitoring cycle:
 | |
| 26 | +MONITORING_PERIOD = 5.0 | 
| ... | ... | @@ -112,13 +112,15 @@ setup( | 
| 112 | 112 |      license="Apache License, Version 2.0",
 | 
| 113 | 113 |      description="A remote execution service",
 | 
| 114 | 114 |      packages=find_packages(),
 | 
| 115 | +    python_requires='>= 3.5.3',  # janus requirement
 | |
| 115 | 116 |      install_requires=[
 | 
| 116 | -        'protobuf',
 | |
| 117 | -        'grpcio',
 | |
| 118 | -        'Click',
 | |
| 119 | -        'PyYAML',
 | |
| 120 | 117 |          'boto3 < 1.8.0',
 | 
| 121 | 118 |          'botocore < 1.11.0',
 | 
| 119 | +        'click',
 | |
| 120 | +        'grpcio',
 | |
| 121 | +        'janus',
 | |
| 122 | +        'protobuf',
 | |
| 123 | +        'pyyaml',
 | |
| 122 | 124 |      ],
 | 
| 123 | 125 |      entry_points={
 | 
| 124 | 126 |          'console_scripts': [
 | 
| ... | ... | @@ -89,7 +89,7 @@ def test_bytestream_read(mocked, data_to_read, instance): | 
| 89 | 89 |      request.resource_name += "blobs/{}/{}".format(HASH(data_to_read).hexdigest(), len(data_to_read))
 | 
| 90 | 90 |  | 
| 91 | 91 |      data = b""
 | 
| 92 | -    for response in servicer.Read(request, None):
 | |
| 92 | +    for response in servicer.Read(request, context):
 | |
| 93 | 93 |          data += response.data
 | 
| 94 | 94 |      assert data == data_to_read
 | 
| 95 | 95 |  | 
| ... | ... | @@ -111,7 +111,7 @@ def test_bytestream_read_many(mocked, instance): | 
| 111 | 111 |      request.resource_name += "blobs/{}/{}".format(HASH(data_to_read).hexdigest(), len(data_to_read))
 | 
| 112 | 112 |  | 
| 113 | 113 |      data = b""
 | 
| 114 | -    for response in servicer.Read(request, None):
 | |
| 114 | +    for response in servicer.Read(request, context):
 | |
| 115 | 115 |          data += response.data
 | 
| 116 | 116 |      assert data == data_to_read
 | 
| 117 | 117 |  | 
| ... | ... | @@ -137,7 +137,7 @@ def test_bytestream_write(mocked, instance, extra_data): | 
| 137 | 137 |          bytestream_pb2.WriteRequest(data=b'def', write_offset=3, finish_write=True)
 | 
| 138 | 138 |      ]
 | 
| 139 | 139 |  | 
| 140 | -    response = servicer.Write(requests, None)
 | |
| 140 | +    response = servicer.Write(requests, context)
 | |
| 141 | 141 |      assert response.committed_size == 6
 | 
| 142 | 142 |      assert len(storage.data) == 1
 | 
| 143 | 143 |      assert (hash_, 6) in storage.data
 | 
| ... | ... | @@ -178,7 +178,7 @@ def test_cas_find_missing_blobs(mocked, instance): | 
| 178 | 178 |          re_pb2.Digest(hash=HASH(b'ghij').hexdigest(), size_bytes=4)
 | 
| 179 | 179 |      ]
 | 
| 180 | 180 |      request = re_pb2.FindMissingBlobsRequest(instance_name=instance, blob_digests=digests)
 | 
| 181 | -    response = servicer.FindMissingBlobs(request, None)
 | |
| 181 | +    response = servicer.FindMissingBlobs(request, context)
 | |
| 182 | 182 |      assert len(response.missing_blob_digests) == 1
 | 
| 183 | 183 |      assert response.missing_blob_digests[0] == digests[1]
 | 
| 184 | 184 |  | 
| ... | ... | @@ -201,7 +201,7 @@ def test_cas_batch_update_blobs(mocked, instance): | 
| 201 | 201 |      ]
 | 
| 202 | 202 |  | 
| 203 | 203 |      request = re_pb2.BatchUpdateBlobsRequest(instance_name=instance, requests=update_requests)
 | 
| 204 | -    response = servicer.BatchUpdateBlobs(request, None)
 | |
| 204 | +    response = servicer.BatchUpdateBlobs(request, context)
 | |
| 205 | 205 |      assert len(response.responses) == 2
 | 
| 206 | 206 |  | 
| 207 | 207 |      for blob_response in response.responses:
 | 
