Martin Blanchard pushed to branch mablanch/23-new-logging at BuildGrid / buildgrid
Commits:
- 
6ca668f8
by Martin Blanchard at 2018-11-12T09:12:58Z
- 
3512e0bb
by Martin Blanchard at 2018-11-12T09:13:08Z
- 
4dcbbf62
by Martin Blanchard at 2018-11-12T10:52:34Z
- 
dc1c40ac
by Martin Blanchard at 2018-11-12T10:52:34Z
- 
b17ba95b
by Martin Blanchard at 2018-11-12T12:40:55Z
- 
f54fd18a
by Martin Blanchard at 2018-11-12T12:40:57Z
- 
3abc02d8
by Martin Blanchard at 2018-11-12T12:44:27Z
- 
4c5279e3
by Martin Blanchard at 2018-11-12T12:44:28Z
- 
671f7de5
by Martin Blanchard at 2018-11-12T12:44:28Z
- 
0bb9e0be
by Martin Blanchard at 2018-11-12T12:44:28Z
- 
2953f72a
by Martin Blanchard at 2018-11-12T12:44:28Z
- 
620df68c
by Martin Blanchard at 2018-11-12T13:13:21Z
- 
d157708c
by Martin Blanchard at 2018-11-12T13:13:21Z
16 changed files:
- .pylintrc
- buildgrid/_app/cli.py
- buildgrid/_app/commands/cmd_server.py
- buildgrid/client/cas.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- setup.py
- + tests/cas/data/hello/hello.sh
Changes:
| ... | ... | @@ -460,6 +460,7 @@ known-third-party=boto3, | 
| 460 | 460 |                    enchant,
 | 
| 461 | 461 |                    google,
 | 
| 462 | 462 |                    grpc,
 | 
| 463 | +                  janus,
 | |
| 463 | 464 |                    moto,
 | 
| 464 | 465 |                    yaml
 | 
| 465 | 466 |  | 
| ... | ... | @@ -523,4 +524,4 @@ valid-metaclass-classmethod-first-arg=mcs | 
| 523 | 524 |  | 
| 524 | 525 |  # Exceptions that will emit a warning when being caught. Defaults to
 | 
| 525 | 526 |  # "Exception"
 | 
| 526 | -overgeneral-exceptions=Exception | |
| 527 | +overgeneral-exceptions=Exception | |
| \ No newline at end of file | 
| ... | ... | @@ -152,14 +152,13 @@ def cli(context, verbose): | 
| 152 | 152 |      for log_filter in logger.filters[:]:
 | 
| 153 | 153 |          logger.removeFilter(log_filter)
 | 
| 154 | 154 |  | 
| 155 | -    logging.basicConfig(
 | |
| 156 | -        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
 | |
| 157 | - | |
| 158 | -    if verbose == 1:
 | |
| 159 | -        logger.setLevel(logging.WARNING)
 | |
| 160 | -    elif verbose == 2:
 | |
| 161 | -        logger.setLevel(logging.INFO)
 | |
| 162 | -    elif verbose >= 3:
 | |
| 163 | -        logger.setLevel(logging.DEBUG)
 | |
| 164 | -    else:
 | |
| 165 | -        logger.setLevel(logging.ERROR) | |
| 155 | +    if verbose > 0:
 | |
| 156 | +        logging.basicConfig(
 | |
| 157 | +            format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
 | |
| 158 | + | |
| 159 | +        if verbose == 1:
 | |
| 160 | +            logger.setLevel(logging.WARNING)
 | |
| 161 | +        elif verbose == 2:
 | |
| 162 | +            logger.setLevel(logging.INFO)
 | |
| 163 | +        elif verbose >= 3:
 | |
| 164 | +            logger.setLevel(logging.DEBUG) | 
| ... | ... | @@ -20,7 +20,6 @@ Server command | 
| 20 | 20 |  Create a BuildGrid server.
 | 
| 21 | 21 |  """
 | 
| 22 | 22 |  | 
| 23 | -import asyncio
 | |
| 24 | 23 |  import logging
 | 
| 25 | 24 |  import sys
 | 
| 26 | 25 |  | 
| ... | ... | @@ -52,18 +51,14 @@ def start(context, config): | 
| 52 | 51 |          click.echo("ERROR: Could not parse config: {}.\n".format(str(e)), err=True)
 | 
| 53 | 52 |          sys.exit(-1)
 | 
| 54 | 53 |  | 
| 55 | -    loop = asyncio.get_event_loop()
 | |
| 56 | 54 |      try:
 | 
| 57 | 55 |          server.start()
 | 
| 58 | -        loop.run_forever()
 | |
| 59 | 56 |  | 
| 60 | 57 |      except KeyboardInterrupt:
 | 
| 61 | 58 |          pass
 | 
| 62 | 59 |  | 
| 63 | 60 |      finally:
 | 
| 64 | -        context.logger.info("Stopping server")
 | |
| 65 | 61 |          server.stop()
 | 
| 66 | -        loop.close()
 | |
| 67 | 62 |  | 
| 68 | 63 |  | 
| 69 | 64 |  def _create_server_from_config(config):
 | 
| ... | ... | @@ -171,7 +171,7 @@ class Downloader: | 
| 171 | 171 |  | 
| 172 | 172 |          return messages
 | 
| 173 | 173 |  | 
| 174 | -    def download_file(self, digest, file_path, queue=True):
 | |
| 174 | +    def download_file(self, digest, file_path, is_executable=False, queue=True):
 | |
| 175 | 175 |          """Retrieves a file from the remote CAS server.
 | 
| 176 | 176 |  | 
| 177 | 177 |          If queuing is allowed (`queue=True`), the download request **may** be
 | 
| ... | ... | @@ -181,6 +181,7 @@ class Downloader: | 
| 181 | 181 |          Args:
 | 
| 182 | 182 |              digest (:obj:`Digest`): the file's digest to fetch.
 | 
| 183 | 183 |              file_path (str): absolute or relative path to the local file to write.
 | 
| 184 | +            is_executable (bool): whether the file is executable or not.
 | |
| 184 | 185 |              queue (bool, optional): whether or not the download request may be
 | 
| 185 | 186 |                  queued and submitted as part of a batch upload request. Defaults
 | 
| 186 | 187 |                  to True.
 | 
| ... | ... | @@ -193,9 +194,9 @@ class Downloader: | 
| 193 | 194 |              file_path = os.path.abspath(file_path)
 | 
| 194 | 195 |  | 
| 195 | 196 |          if not queue or digest.size_bytes > FILE_SIZE_THRESHOLD:
 | 
| 196 | -            self._fetch_file(digest, file_path)
 | |
| 197 | +            self._fetch_file(digest, file_path, is_executable=is_executable)
 | |
| 197 | 198 |          else:
 | 
| 198 | -            self._queue_file(digest, file_path)
 | |
| 199 | +            self._queue_file(digest, file_path, is_executable=is_executable)
 | |
| 199 | 200 |  | 
| 200 | 201 |      def download_directory(self, digest, directory_path):
 | 
| 201 | 202 |          """Retrieves a :obj:`Directory` from the remote CAS server.
 | 
| ... | ... | @@ -311,7 +312,7 @@ class Downloader: | 
| 311 | 312 |  | 
| 312 | 313 |          return read_blobs
 | 
| 313 | 314 |  | 
| 314 | -    def _fetch_file(self, digest, file_path):
 | |
| 315 | +    def _fetch_file(self, digest, file_path, is_executable=False):
 | |
| 315 | 316 |          """Fetches a file using ByteStream.Read()"""
 | 
| 316 | 317 |          if self.instance_name:
 | 
| 317 | 318 |              resource_name = '/'.join([self.instance_name, 'blobs',
 | 
| ... | ... | @@ -332,7 +333,10 @@ class Downloader: | 
| 332 | 333 |  | 
| 333 | 334 |              assert byte_file.tell() == digest.size_bytes
 | 
| 334 | 335 |  | 
| 335 | -    def _queue_file(self, digest, file_path):
 | |
| 336 | +        if is_executable:
 | |
| 337 | +            os.chmod(file_path, 0o755)  # rwxr-xr-x / 755
 | |
| 338 | + | |
| 339 | +    def _queue_file(self, digest, file_path, is_executable=False):
 | |
| 336 | 340 |          """Queues a file for later batch download"""
 | 
| 337 | 341 |          if self.__file_request_size + digest.ByteSize() > MAX_REQUEST_SIZE:
 | 
| 338 | 342 |              self.flush()
 | 
| ... | ... | @@ -341,22 +345,25 @@ class Downloader: | 
| 341 | 345 |          elif self.__file_request_count >= MAX_REQUEST_COUNT:
 | 
| 342 | 346 |              self.flush()
 | 
| 343 | 347 |  | 
| 344 | -        self.__file_requests[digest.hash] = (digest, file_path)
 | |
| 348 | +        self.__file_requests[digest.hash] = (digest, file_path, is_executable)
 | |
| 345 | 349 |          self.__file_request_count += 1
 | 
| 346 | 350 |          self.__file_request_size += digest.ByteSize()
 | 
| 347 | 351 |          self.__file_response_size += digest.size_bytes
 | 
| 348 | 352 |  | 
| 349 | 353 |      def _fetch_file_batch(self, batch):
 | 
| 350 | 354 |          """Sends queued data using ContentAddressableStorage.BatchReadBlobs()"""
 | 
| 351 | -        batch_digests = [digest for digest, _ in batch.values()]
 | |
| 355 | +        batch_digests = [digest for digest, _, _ in batch.values()]
 | |
| 352 | 356 |          batch_blobs = self._fetch_blob_batch(batch_digests)
 | 
| 353 | 357 |  | 
| 354 | -        for (_, file_path), file_blob in zip(batch.values(), batch_blobs):
 | |
| 358 | +        for (_, file_path, is_executable), file_blob in zip(batch.values(), batch_blobs):
 | |
| 355 | 359 |              os.makedirs(os.path.dirname(file_path), exist_ok=True)
 | 
| 356 | 360 |  | 
| 357 | 361 |              with open(file_path, 'wb') as byte_file:
 | 
| 358 | 362 |                  byte_file.write(file_blob)
 | 
| 359 | 363 |  | 
| 364 | +            if is_executable:
 | |
| 365 | +                os.chmod(file_path, 0o755)  # rwxr-xr-x / 755
 | |
| 366 | + | |
| 360 | 367 |      def _fetch_directory(self, digest, directory_path):
 | 
| 361 | 368 |          """Fetches a file using ByteStream.GetTree()"""
 | 
| 362 | 369 |          # Better fail early if the local root path cannot be created:
 | 
| ... | ... | @@ -414,7 +421,7 @@ class Downloader: | 
| 414 | 421 |          for file_node in root_directory.files:
 | 
| 415 | 422 |              file_path = os.path.join(root_path, file_node.name)
 | 
| 416 | 423 |  | 
| 417 | -            self._queue_file(file_node.digest, file_path)
 | |
| 424 | +            self._queue_file(file_node.digest, file_path, is_executable=file_node.is_executable)
 | |
| 418 | 425 |  | 
| 419 | 426 |          for directory_node in root_directory.directories:
 | 
| 420 | 427 |              directory_path = os.path.join(root_path, directory_node.name)
 | 
| ... | ... | @@ -37,6 +37,10 @@ class BotsInterface: | 
| 37 | 37 |          self._bot_sessions = {}
 | 
| 38 | 38 |          self._scheduler = scheduler
 | 
| 39 | 39 |  | 
| 40 | +    @property
 | |
| 41 | +    def scheduler(self):
 | |
| 42 | +        return self._scheduler
 | |
| 43 | + | |
| 40 | 44 |      def register_instance_with_server(self, instance_name, server):
 | 
| 41 | 45 |          server.add_bots_interface(self, instance_name)
 | 
| 42 | 46 |  | 
| ... | ... | @@ -23,8 +23,9 @@ import logging | 
| 23 | 23 |  | 
| 24 | 24 |  import grpc
 | 
| 25 | 25 |  | 
| 26 | -from google.protobuf.empty_pb2 import Empty
 | |
| 26 | +from google.protobuf import empty_pb2, timestamp_pb2
 | |
| 27 | 27 |  | 
| 28 | +from buildgrid._enums import BotStatus
 | |
| 28 | 29 |  from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
 | 
| 29 | 30 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 30 | 31 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
 | 
| ... | ... | @@ -32,24 +33,82 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp | 
| 32 | 33 |  | 
| 33 | 34 |  class BotsService(bots_pb2_grpc.BotsServicer):
 | 
| 34 | 35 |  | 
| 35 | -    def __init__(self, server):
 | |
| 36 | +    def __init__(self, server, monitor=True):
 | |
| 36 | 37 |          self.__logger = logging.getLogger(__name__)
 | 
| 37 | 38 |  | 
| 39 | +        self.__bots_by_status = {}
 | |
| 40 | +        self.__bots_by_instance = {}
 | |
| 41 | +        self.__bots = {}
 | |
| 42 | + | |
| 38 | 43 |          self._instances = {}
 | 
| 44 | +        self._is_monitored = True
 | |
| 39 | 45 |  | 
| 40 | 46 |          bots_pb2_grpc.add_BotsServicer_to_server(self, server)
 | 
| 41 | 47 |  | 
| 42 | -    def add_instance(self, name, instance):
 | |
| 43 | -        self._instances[name] = instance
 | |
| 48 | +        if self._is_monitored:
 | |
| 49 | +            self.__bots_by_status[BotStatus.OK] = set()
 | |
| 50 | +            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
 | |
| 51 | +            self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
 | |
| 52 | +            self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
 | |
| 53 | + | |
| 54 | +    # --- Public API ---
 | |
| 55 | + | |
| 56 | +    def add_instance(self, instance_name, instance):
 | |
| 57 | +        """Registers a new servicer instance.
 | |
| 58 | + | |
| 59 | +        Args:
 | |
| 60 | +            instance_name (str): The new instance's name.
 | |
| 61 | +            instance (BotsInterface): The new instance itself.
 | |
| 62 | +        """
 | |
| 63 | +        self._instances[instance_name] = instance
 | |
| 64 | + | |
| 65 | +        if self._is_monitored:
 | |
| 66 | +            self.__bots_by_instance[instance_name] = 0
 | |
| 67 | + | |
| 68 | +    def get_scheduler(self, instance_name):
 | |
| 69 | +        """Retrieves a reference to the scheduler for an instance.
 | |
| 70 | + | |
| 71 | +        Args:
 | |
| 72 | +            instance_name (str): The name of the instance to query.
 | |
| 73 | + | |
| 74 | +        Returns:
 | |
| 75 | +            Scheduler: A reference to the scheduler for `instance_name`.
 | |
| 76 | + | |
| 77 | +        Raises:
 | |
| 78 | +            InvalidArgumentError: If no instance named `instance_name` exists.
 | |
| 79 | +        """
 | |
| 80 | +        instance = self._get_instance(instance_name)
 | |
| 81 | + | |
| 82 | +        return instance.scheduler
 | |
| 83 | + | |
| 84 | +    # --- Public API: Servicer ---
 | |
| 44 | 85 |  | 
| 45 | 86 |      def CreateBotSession(self, request, context):
 | 
| 87 | +        """Handles CreateBotSessionRequest messages.
 | |
| 88 | + | |
| 89 | +        Args:
 | |
| 90 | +            request (CreateBotSessionRequest): The incoming RPC request.
 | |
| 91 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 92 | +        """
 | |
| 46 | 93 |          self.__logger.debug("CreateBotSession request from [%s]", context.peer())
 | 
| 47 | 94 |  | 
| 95 | +        instance_name = request.parent
 | |
| 96 | +        bot_status = BotStatus(request.bot_session.status)
 | |
| 97 | +        bot_id = request.bot_session.bot_id
 | |
| 98 | + | |
| 48 | 99 |          try:
 | 
| 49 | -            parent = request.parent
 | |
| 50 | -            instance = self._get_instance(request.parent)
 | |
| 51 | -            return instance.create_bot_session(parent,
 | |
| 52 | -                                               request.bot_session)
 | |
| 100 | +            instance = self._get_instance(instance_name)
 | |
| 101 | +            bot_session = instance.create_bot_session(instance_name,
 | |
| 102 | +                                                      request.bot_session)
 | |
| 103 | +            now = timestamp_pb2.Timestamp()
 | |
| 104 | +            now.GetCurrentTime()
 | |
| 105 | + | |
| 106 | +            if self._is_monitored:
 | |
| 107 | +                self.__bots[bot_id] = now
 | |
| 108 | +                self.__bots_by_instance[instance_name] += 1
 | |
| 109 | +                self.__bots_by_status[bot_status].add(bot_id)
 | |
| 110 | + | |
| 111 | +            return bot_session
 | |
| 53 | 112 |  | 
| 54 | 113 |          except InvalidArgumentError as e:
 | 
| 55 | 114 |              self.__logger.error(e)
 | 
| ... | ... | @@ -59,17 +118,36 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 59 | 118 |          return bots_pb2.BotSession()
 | 
| 60 | 119 |  | 
| 61 | 120 |      def UpdateBotSession(self, request, context):
 | 
| 121 | +        """Handles UpdateBotSessionRequest messages.
 | |
| 122 | + | |
| 123 | +        Args:
 | |
| 124 | +            request (UpdateBotSessionRequest): The incoming RPC request.
 | |
| 125 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 126 | +        """
 | |
| 62 | 127 |          self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
 | 
| 63 | 128 |  | 
| 129 | +        names = request.name.split("/")
 | |
| 130 | +        bot_status = BotStatus(request.bot_session.status)
 | |
| 131 | +        bot_id = request.bot_session.bot_id
 | |
| 132 | + | |
| 64 | 133 |          try:
 | 
| 65 | -            names = request.name.split("/")
 | |
| 66 | -            # Operation name should be in format:
 | |
| 67 | -            # {instance/name}/{uuid}
 | |
| 68 | -            instance_name = ''.join(names[0:-1])
 | |
| 134 | +            instance_name = '/'.join(names[:-1])
 | |
| 69 | 135 |  | 
| 70 | 136 |              instance = self._get_instance(instance_name)
 | 
| 71 | -            return instance.update_bot_session(request.name,
 | |
| 72 | -                                               request.bot_session)
 | |
| 137 | +            bot_session = instance.update_bot_session(request.name,
 | |
| 138 | +                                                      request.bot_session)
 | |
| 139 | + | |
| 140 | +            if self._is_monitored:
 | |
| 141 | +                self.__bots[bot_id].GetCurrentTime()
 | |
| 142 | +                if bot_id not in self.__bots_by_status[bot_status]:
 | |
| 143 | +                    self.__bots_by_status[BotStatus.OK].discard(bot_id)
 | |
| 144 | +                    self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
 | |
| 145 | +                    self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
 | |
| 146 | +                    self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
 | |
| 147 | + | |
| 148 | +                    self.__bots_by_status[bot_status].add(bot_id)
 | |
| 149 | + | |
| 150 | +            return bot_session
 | |
| 73 | 151 |  | 
| 74 | 152 |          except InvalidArgumentError as e:
 | 
| 75 | 153 |              self.__logger.error(e)
 | 
| ... | ... | @@ -89,10 +167,40 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 89 | 167 |          return bots_pb2.BotSession()
 | 
| 90 | 168 |  | 
| 91 | 169 |      def PostBotEventTemp(self, request, context):
 | 
| 170 | +        """Handles PostBotEventTempRequest messages.
 | |
| 171 | + | |
| 172 | +        Args:
 | |
| 173 | +            request (PostBotEventTempRequest): The incoming RPC request.
 | |
| 174 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 175 | +        """
 | |
| 92 | 176 |          self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
 | 
| 93 | 177 |  | 
| 94 | 178 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 95 | -        return Empty()
 | |
| 179 | + | |
| 180 | +        return empty_pb2.Empty()
 | |
| 181 | + | |
| 182 | +    # --- Public API: Monitoring ---
 | |
| 183 | + | |
| 184 | +    @property
 | |
| 185 | +    def is_monitored(self):
 | |
| 186 | +        return self._is_monitored
 | |
| 187 | + | |
| 188 | +    def query_n_bots(self):
 | |
| 189 | +        return len(self.__bots)
 | |
| 190 | + | |
| 191 | +    def query_n_bots_for_instance(self, instance_name):
 | |
| 192 | +        try:
 | |
| 193 | +            return self.__bots_by_instance[instance_name]
 | |
| 194 | +        except KeyError:
 | |
| 195 | +            return 0
 | |
| 196 | + | |
| 197 | +    def query_n_bots_for_status(self, bot_status):
 | |
| 198 | +        try:
 | |
| 199 | +            return len(self.__bots_by_status[bot_status])
 | |
| 200 | +        except KeyError:
 | |
| 201 | +            return 0
 | |
| 202 | + | |
| 203 | +    # --- Private API ---
 | |
| 96 | 204 |  | 
| 97 | 205 |      def _get_instance(self, name):
 | 
| 98 | 206 |          try:
 | 
| ... | ... | @@ -35,6 +35,10 @@ class ExecutionInstance: | 
| 35 | 35 |          self._storage = storage
 | 
| 36 | 36 |          self._scheduler = scheduler
 | 
| 37 | 37 |  | 
| 38 | +    @property
 | |
| 39 | +    def scheduler(self):
 | |
| 40 | +        return self._scheduler
 | |
| 41 | + | |
| 38 | 42 |      def register_instance_with_server(self, instance_name, server):
 | 
| 39 | 43 |          server.add_execution_instance(self, instance_name)
 | 
| 40 | 44 |  | 
| ... | ... | @@ -33,30 +33,79 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 33 | 33 |  | 
| 34 | 34 |  class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 | 
| 35 | 35 |  | 
| 36 | -    def __init__(self, server):
 | |
| 36 | +    def __init__(self, server, monitor=True):
 | |
| 37 | 37 |          self.__logger = logging.getLogger(__name__)
 | 
| 38 | 38 |  | 
| 39 | +        self.__peers_by_instance = {}
 | |
| 40 | +        self.__peers = {}
 | |
| 41 | + | |
| 39 | 42 |          self._instances = {}
 | 
| 43 | +        self._is_monitored = True
 | |
| 44 | + | |
| 40 | 45 |          remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
 | 
| 41 | 46 |  | 
| 42 | -    def add_instance(self, name, instance):
 | |
| 43 | -        self._instances[name] = instance
 | |
| 47 | +    # --- Public API ---
 | |
| 48 | + | |
| 49 | +    def add_instance(self, instance_name, instance):
 | |
| 50 | +        """Registers a new servicer instance.
 | |
| 51 | + | |
| 52 | +        Args:
 | |
| 53 | +            instance_name (str): The new instance's name.
 | |
| 54 | +            instance (ExecutionInstance): The new instance itself.
 | |
| 55 | +        """
 | |
| 56 | +        self._instances[instance_name] = instance
 | |
| 57 | + | |
| 58 | +        if self._is_monitored:
 | |
| 59 | +            self.__peers_by_instance[instance_name] = set()
 | |
| 60 | + | |
| 61 | +    def get_scheduler(self, instance_name):
 | |
| 62 | +        """Retrieves a reference to the scheduler for an instance.
 | |
| 63 | + | |
| 64 | +        Args:
 | |
| 65 | +            instance_name (str): The name of the instance to query.
 | |
| 66 | + | |
| 67 | +        Returns:
 | |
| 68 | +            Scheduler: A reference to the scheduler for `instance_name`.
 | |
| 69 | + | |
| 70 | +        Raises:
 | |
| 71 | +            InvalidArgumentError: If no instance named `instance_name` exists.
 | |
| 72 | +        """
 | |
| 73 | +        instance = self._get_instance(instance_name)
 | |
| 74 | + | |
| 75 | +        return instance.scheduler
 | |
| 76 | + | |
| 77 | +    # --- Public API: Servicer ---
 | |
| 44 | 78 |  | 
| 45 | 79 |      def Execute(self, request, context):
 | 
| 80 | +        """Handles ExecuteRequest messages.
 | |
| 81 | + | |
| 82 | +        Args:
 | |
| 83 | +            request (ExecuteRequest): The incoming RPC request.
 | |
| 84 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 85 | +        """
 | |
| 46 | 86 |          self.__logger.debug("Execute request from [%s]", context.peer())
 | 
| 47 | 87 |  | 
| 88 | +        instance_name = request.instance_name
 | |
| 89 | +        message_queue = queue.Queue()
 | |
| 90 | +        peer = context.peer()
 | |
| 91 | + | |
| 48 | 92 |          try:
 | 
| 49 | -            message_queue = queue.Queue()
 | |
| 50 | -            instance = self._get_instance(request.instance_name)
 | |
| 93 | +            instance = self._get_instance(instance_name)
 | |
| 51 | 94 |              operation = instance.execute(request.action_digest,
 | 
| 52 | 95 |                                           request.skip_cache_lookup,
 | 
| 53 | 96 |                                           message_queue)
 | 
| 54 | 97 |  | 
| 55 | -            context.add_callback(partial(instance.unregister_message_client,
 | |
| 56 | -                                         operation.name, message_queue))
 | |
| 98 | +            context.add_callback(partial(self._rpc_termination_callback,
 | |
| 99 | +                                         peer, instance_name, operation.name, message_queue))
 | |
| 57 | 100 |  | 
| 58 | -            instanced_op_name = "{}/{}".format(request.instance_name,
 | |
| 59 | -                                               operation.name)
 | |
| 101 | +            if self._is_monitored:
 | |
| 102 | +                if peer not in self.__peers:
 | |
| 103 | +                    self.__peers_by_instance[instance_name].add(peer)
 | |
| 104 | +                    self.__peers[peer] = 1
 | |
| 105 | +                else:
 | |
| 106 | +                    self.__peers[peer] += 1
 | |
| 107 | + | |
| 108 | +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
 | |
| 60 | 109 |  | 
| 61 | 110 |              self.__logger.info("Operation name: [%s]", instanced_op_name)
 | 
| 62 | 111 |  | 
| ... | ... | @@ -80,23 +129,37 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 80 | 129 |              yield operations_pb2.Operation()
 | 
| 81 | 130 |  | 
| 82 | 131 |      def WaitExecution(self, request, context):
 | 
| 132 | +        """Handles WaitExecutionRequest messages.
 | |
| 133 | + | |
| 134 | +        Args:
 | |
| 135 | +            request (WaitExecutionRequest): The incoming RPC request.
 | |
| 136 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 137 | +        """
 | |
| 83 | 138 |          self.__logger.debug("WaitExecution request from [%s]", context.peer())
 | 
| 84 | 139 |  | 
| 85 | -        try:
 | |
| 86 | -            names = request.name.split("/")
 | |
| 140 | +        names = request.name.split('/')
 | |
| 141 | +        instance_name = '/'.join(names[:-1])
 | |
| 142 | +        operation_name = names[-1]
 | |
| 143 | +        message_queue = queue.Queue()
 | |
| 144 | +        peer = context.peer()
 | |
| 87 | 145 |  | 
| 88 | -            # Operation name should be in format:
 | |
| 89 | -            # {instance/name}/{operation_id}
 | |
| 90 | -            instance_name = ''.join(names[0:-1])
 | |
| 146 | +        try:
 | |
| 147 | +            if instance_name != request.instance_name:
 | |
| 148 | +                raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
 | |
| 149 | +                                            .format(request.name, instance_name))
 | |
| 91 | 150 |  | 
| 92 | -            message_queue = queue.Queue()
 | |
| 93 | -            operation_name = names[-1]
 | |
| 94 | 151 |              instance = self._get_instance(instance_name)
 | 
| 95 | 152 |  | 
| 96 | 153 |              instance.register_message_client(operation_name, message_queue)
 | 
| 154 | +            context.add_callback(partial(self._rpc_termination_callback,
 | |
| 155 | +                                         peer, instance_name, operation_name, message_queue))
 | |
| 97 | 156 |  | 
| 98 | -            context.add_callback(partial(instance.unregister_message_client,
 | |
| 99 | -                                         operation_name, message_queue))
 | |
| 157 | +            if self._is_monitored:
 | |
| 158 | +                if peer not in self.__peers:
 | |
| 159 | +                    self.__peers_by_instance[instance_name].add(peer)
 | |
| 160 | +                    self.__peers[peer] = 1
 | |
| 161 | +                else:
 | |
| 162 | +                    self.__peers[peer] += 1
 | |
| 100 | 163 |  | 
| 101 | 164 |              for operation in instance.stream_operation_updates(message_queue,
 | 
| 102 | 165 |                                                                 operation_name):
 | 
| ... | ... | @@ -111,6 +174,35 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 111 | 174 |              context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 112 | 175 |              yield operations_pb2.Operation()
 | 
| 113 | 176 |  | 
| 177 | +    # --- Public API: Monitoring ---
 | |
| 178 | + | |
| 179 | +    @property
 | |
| 180 | +    def is_monitored(self):
 | |
| 181 | +        return self._is_monitored
 | |
| 182 | + | |
| 183 | +    def query_n_clients(self):
 | |
| 184 | +        return len(self.__peers)
 | |
| 185 | + | |
| 186 | +    def query_n_clients_for_instance(self, instance_name):
 | |
| 187 | +        try:
 | |
| 188 | +            return len(self.__peers_by_instance[instance_name])
 | |
| 189 | +        except KeyError:
 | |
| 190 | +            return 0
 | |
| 191 | + | |
| 192 | +    # --- Private API ---
 | |
| 193 | + | |
| 194 | +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
 | |
| 195 | +        instance = self._get_instance(instance_name)
 | |
| 196 | + | |
| 197 | +        instance.unregister_message_client(job_name, message_queue)
 | |
| 198 | + | |
| 199 | +        if self._is_monitored:
 | |
| 200 | +            if self.__peers[peer] > 1:
 | |
| 201 | +                self.__peers[peer] -= 1
 | |
| 202 | +            else:
 | |
| 203 | +                self.__peers_by_instance[instance_name].remove(peer)
 | |
| 204 | +                del self.__peers[peer]
 | |
| 205 | + | |
| 114 | 206 |      def _get_instance(self, name):
 | 
| 115 | 207 |          try:
 | 
| 116 | 208 |              return self._instances[name]
 | 
| ... | ... | @@ -13,18 +13,23 @@ | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | 14 |  | 
| 15 | 15 |  | 
| 16 | +import asyncio
 | |
| 16 | 17 |  from concurrent import futures
 | 
| 17 | 18 |  import logging
 | 
| 19 | +from logging.handlers import QueueHandler
 | |
| 18 | 20 |  import os
 | 
| 21 | +import time
 | |
| 19 | 22 |  | 
| 20 | 23 |  import grpc
 | 
| 24 | +import janus
 | |
| 21 | 25 |  | 
| 22 | -from .cas.service import ByteStreamService, ContentAddressableStorageService
 | |
| 23 | -from .actioncache.service import ActionCacheService
 | |
| 24 | -from .execution.service import ExecutionService
 | |
| 25 | -from .operations.service import OperationsService
 | |
| 26 | -from .bots.service import BotsService
 | |
| 27 | -from .referencestorage.service import ReferenceStorageService
 | |
| 26 | +from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 | |
| 27 | +from buildgrid.server.actioncache.service import ActionCacheService
 | |
| 28 | +from buildgrid.server.execution.service import ExecutionService
 | |
| 29 | +from buildgrid.server.operations.service import OperationsService
 | |
| 30 | +from buildgrid.server.bots.service import BotsService
 | |
| 31 | +from buildgrid.server.referencestorage.service import ReferenceStorageService
 | |
| 32 | +from buildgrid.settings import MONITORING_PERIOD
 | |
| 28 | 33 |  | 
| 29 | 34 |  | 
| 30 | 35 |  class BuildGridServer:
 | 
| ... | ... | @@ -46,9 +51,15 @@ class BuildGridServer: | 
| 46 | 51 |              # Use max_workers default from Python 3.5+
 | 
| 47 | 52 |              max_workers = (os.cpu_count() or 1) * 5
 | 
| 48 | 53 |  | 
| 49 | -        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | |
| 54 | +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
 | |
| 55 | +        self.__grpc_server = grpc.server(self.__grpc_executor)
 | |
| 50 | 56 |  | 
| 51 | -        self._server = server
 | |
| 57 | +        self.__main_loop = asyncio.get_event_loop()
 | |
| 58 | +        self.__monitoring_task = None
 | |
| 59 | +        self.__logging_task = None
 | |
| 60 | + | |
| 61 | +        self.__logging_queue = janus.Queue(loop=self.__main_loop)
 | |
| 62 | +        self.__logging_handler = None
 | |
| 52 | 63 |  | 
| 53 | 64 |          self._execution_service = None
 | 
| 54 | 65 |          self._bots_service = None
 | 
| ... | ... | @@ -58,15 +69,39 @@ class BuildGridServer: | 
| 58 | 69 |          self._cas_service = None
 | 
| 59 | 70 |          self._bytestream_service = None
 | 
| 60 | 71 |  | 
| 72 | +        self._instances = set()
 | |
| 73 | + | |
| 74 | +    # --- Public API ---
 | |
| 75 | + | |
| 61 | 76 |      def start(self):
 | 
| 62 | -        """Starts the server.
 | |
| 77 | +        """Starts the BuildGrid server.
 | |
| 63 | 78 |          """
 | 
| 64 | -        self._server.start()
 | |
| 79 | +        self.__grpc_server.start()
 | |
| 80 | + | |
| 81 | +        self._setup_logging_handler()
 | |
| 82 | +        self.__logging_task = asyncio.ensure_future(
 | |
| 83 | +            self._logging_worker(), loop=self.__main_loop)
 | |
| 84 | + | |
| 85 | +        self.__monitoring_task = asyncio.ensure_future(
 | |
| 86 | +            self._monitoring_worker(period=MONITORING_PERIOD), loop=self.__main_loop)
 | |
| 87 | + | |
| 88 | +        self.__main_loop.run_forever()
 | |
| 65 | 89 |  | 
| 66 | 90 |      def stop(self, grace=0):
 | 
| 67 | -        """Stops the server.
 | |
| 91 | +        """Stops the BuildGrid server.
 | |
| 92 | + | |
| 93 | +        Args:
 | |
| 94 | +            grace (int, optional): A duration of time in seconds. Defaults to 0.
 | |
| 68 | 95 |          """
 | 
| 69 | -        self._server.stop(grace)
 | |
| 96 | +        if self.__monitoring_task is not None:
 | |
| 97 | +            self.__monitoring_task.cancel()
 | |
| 98 | +        if self.__logging_task is not None:
 | |
| 99 | +            self.__logging_task.cancel()
 | |
| 100 | + | |
| 101 | +        self.__grpc_server.stop(grace)
 | |
| 102 | + | |
| 103 | +        if grace > 0:
 | |
| 104 | +            time.sleep(grace)
 | |
| 70 | 105 |  | 
| 71 | 106 |      def add_port(self, address, credentials):
 | 
| 72 | 107 |          """Adds a port to the server.
 | 
| ... | ... | @@ -80,11 +115,11 @@ class BuildGridServer: | 
| 80 | 115 |          """
 | 
| 81 | 116 |          if credentials is not None:
 | 
| 82 | 117 |              self.__logger.info("Adding secure connection on: [%s]", address)
 | 
| 83 | -            self._server.add_secure_port(address, credentials)
 | |
| 118 | +            self.__grpc_server.add_secure_port(address, credentials)
 | |
| 84 | 119 |  | 
| 85 | 120 |          else:
 | 
| 86 | 121 |              self.__logger.info("Adding insecure connection on [%s]", address)
 | 
| 87 | -            self._server.add_insecure_port(address)
 | |
| 122 | +            self.__grpc_server.add_insecure_port(address)
 | |
| 88 | 123 |  | 
| 89 | 124 |      def add_execution_instance(self, instance, instance_name):
 | 
| 90 | 125 |          """Adds an :obj:`ExecutionInstance` to the service.
 | 
| ... | ... | @@ -96,10 +131,11 @@ class BuildGridServer: | 
| 96 | 131 |              instance_name (str): Instance name.
 | 
| 97 | 132 |          """
 | 
| 98 | 133 |          if self._execution_service is None:
 | 
| 99 | -            self._execution_service = ExecutionService(self._server)
 | |
| 100 | - | |
| 134 | +            self._execution_service = ExecutionService(self.__grpc_server)
 | |
| 101 | 135 |          self._execution_service.add_instance(instance_name, instance)
 | 
| 102 | 136 |  | 
| 137 | +        self._instances.add(instance_name)
 | |
| 138 | + | |
| 103 | 139 |      def add_bots_interface(self, instance, instance_name):
 | 
| 104 | 140 |          """Adds a :obj:`BotsInterface` to the service.
 | 
| 105 | 141 |  | 
| ... | ... | @@ -110,10 +146,11 @@ class BuildGridServer: | 
| 110 | 146 |              instance_name (str): Instance name.
 | 
| 111 | 147 |          """
 | 
| 112 | 148 |          if self._bots_service is None:
 | 
| 113 | -            self._bots_service = BotsService(self._server)
 | |
| 114 | - | |
| 149 | +            self._bots_service = BotsService(self.__grpc_server)
 | |
| 115 | 150 |          self._bots_service.add_instance(instance_name, instance)
 | 
| 116 | 151 |  | 
| 152 | +        self._instances.add(instance_name)
 | |
| 153 | + | |
| 117 | 154 |      def add_operations_instance(self, instance, instance_name):
 | 
| 118 | 155 |          """Adds an :obj:`OperationsInstance` to the service.
 | 
| 119 | 156 |  | 
| ... | ... | @@ -124,8 +161,7 @@ class BuildGridServer: | 
| 124 | 161 |              instance_name (str): Instance name.
 | 
| 125 | 162 |          """
 | 
| 126 | 163 |          if self._operations_service is None:
 | 
| 127 | -            self._operations_service = OperationsService(self._server)
 | |
| 128 | - | |
| 164 | +            self._operations_service = OperationsService(self.__grpc_server)
 | |
| 129 | 165 |          self._operations_service.add_instance(instance_name, instance)
 | 
| 130 | 166 |  | 
| 131 | 167 |      def add_reference_storage_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -138,8 +174,7 @@ class BuildGridServer: | 
| 138 | 174 |              instance_name (str): Instance name.
 | 
| 139 | 175 |          """
 | 
| 140 | 176 |          if self._reference_storage_service is None:
 | 
| 141 | -            self._reference_storage_service = ReferenceStorageService(self._server)
 | |
| 142 | - | |
| 177 | +            self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
 | |
| 143 | 178 |          self._reference_storage_service.add_instance(instance_name, instance)
 | 
| 144 | 179 |  | 
| 145 | 180 |      def add_action_cache_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -152,8 +187,7 @@ class BuildGridServer: | 
| 152 | 187 |              instance_name (str): Instance name.
 | 
| 153 | 188 |          """
 | 
| 154 | 189 |          if self._action_cache_service is None:
 | 
| 155 | -            self._action_cache_service = ActionCacheService(self._server)
 | |
| 156 | - | |
| 190 | +            self._action_cache_service = ActionCacheService(self.__grpc_server)
 | |
| 157 | 191 |          self._action_cache_service.add_instance(instance_name, instance)
 | 
| 158 | 192 |  | 
| 159 | 193 |      def add_cas_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -166,8 +200,7 @@ class BuildGridServer: | 
| 166 | 200 |              instance_name (str): Instance name.
 | 
| 167 | 201 |          """
 | 
| 168 | 202 |          if self._cas_service is None:
 | 
| 169 | -            self._cas_service = ContentAddressableStorageService(self._server)
 | |
| 170 | - | |
| 203 | +            self._cas_service = ContentAddressableStorageService(self.__grpc_server)
 | |
| 171 | 204 |          self._cas_service.add_instance(instance_name, instance)
 | 
| 172 | 205 |  | 
| 173 | 206 |      def add_bytestream_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -180,6 +213,53 @@ class BuildGridServer: | 
| 180 | 213 |              instance_name (str): Instance name.
 | 
| 181 | 214 |          """
 | 
| 182 | 215 |          if self._bytestream_service is None:
 | 
| 183 | -            self._bytestream_service = ByteStreamService(self._server)
 | |
| 184 | - | |
| 216 | +            self._bytestream_service = ByteStreamService(self.__grpc_server)
 | |
| 185 | 217 |          self._bytestream_service.add_instance(instance_name, instance)
 | 
| 218 | + | |
| 219 | +    # --- Private API ---
 | |
| 220 | + | |
| 221 | +    def _setup_logging_handler(self):
 | |
| 222 | +        self.__logging_handler = QueueHandler(self.__logging_queue.sync_q)
 | |
| 223 | + | |
| 224 | +        root_logger = logging.getLogger()
 | |
| 225 | +        root_logger.addHandler(self.__logging_handler)
 | |
| 226 | + | |
| 227 | +    async def _logging_worker(self):
 | |
| 228 | +        while True:
 | |
| 229 | +            try:
 | |
| 230 | +                log_record = await self.__logging_queue.async_q.get()
 | |
| 231 | + | |
| 232 | +                print(log_record)
 | |
| 233 | + | |
| 234 | +            except asyncio.CancelledError:
 | |
| 235 | +                break
 | |
| 236 | + | |
| 237 | +        if len(asyncio.all_tasks(loop=self.__main_loop)) <= 1:
 | |
| 238 | +            self.__main_loop.stop()
 | |
| 239 | + | |
| 240 | +    async def _monitoring_worker(self, period=1):
 | |
| 241 | +        while True:
 | |
| 242 | +            try:
 | |
| 243 | +                n_clients = self._execution_service.query_n_clients()
 | |
| 244 | +                n_bots = self._bots_service.query_n_bots()
 | |
| 245 | + | |
| 246 | +                print('---')
 | |
| 247 | +                print('Totals: n_clients={}, n_bots={}'.format(n_clients, n_bots))
 | |
| 248 | +                print('Per instances:')
 | |
| 249 | +                for instance_name in self._instances:
 | |
| 250 | +                    n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
 | |
| 251 | +                    n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
 | |
| 252 | +                    am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
 | |
| 253 | + | |
| 254 | +                    instance_name = instance_name or 'empty'
 | |
| 255 | + | |
| 256 | +                    print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
 | |
| 257 | +                          .format(instance_name, n_clients, n_bots, am_queue_time))
 | |
| 258 | + | |
| 259 | +                await asyncio.sleep(period)
 | |
| 260 | + | |
| 261 | +            except asyncio.CancelledError:
 | |
| 262 | +                break
 | |
| 263 | + | |
| 264 | +        if len(asyncio.all_tasks(loop=self.__main_loop)) <= 1:
 | |
| 265 | +            self.__main_loop.stop() | 
| ... | ... | @@ -13,10 +13,11 @@ | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | 14 |  | 
| 15 | 15 |  | 
| 16 | +from datetime import datetime
 | |
| 16 | 17 |  import logging
 | 
| 17 | 18 |  import uuid
 | 
| 18 | 19 |  | 
| 19 | -from google.protobuf import timestamp_pb2
 | |
| 20 | +from google.protobuf import duration_pb2, timestamp_pb2
 | |
| 20 | 21 |  | 
| 21 | 22 |  from buildgrid._enums import LeaseState, OperationStage
 | 
| 22 | 23 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| ... | ... | @@ -37,6 +38,7 @@ class Job: | 
| 37 | 38 |          self.__execute_response = None
 | 
| 38 | 39 |          self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 39 | 40 |          self.__queued_timestamp = timestamp_pb2.Timestamp()
 | 
| 41 | +        self.__queued_time_duration = duration_pb2.Duration()
 | |
| 40 | 42 |          self.__worker_start_timestamp = timestamp_pb2.Timestamp()
 | 
| 41 | 43 |          self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
 | 
| 42 | 44 |  | 
| ... | ... | @@ -50,6 +52,8 @@ class Job: | 
| 50 | 52 |          self._operation.done = False
 | 
| 51 | 53 |          self._n_tries = 0
 | 
| 52 | 54 |  | 
| 55 | +    # --- Public API ---
 | |
| 56 | + | |
| 53 | 57 |      @property
 | 
| 54 | 58 |      def name(self):
 | 
| 55 | 59 |          return self._name
 | 
| ... | ... | @@ -179,7 +183,7 @@ class Job: | 
| 179 | 183 |                  result.Unpack(action_result)
 | 
| 180 | 184 |  | 
| 181 | 185 |              action_metadata = action_result.execution_metadata
 | 
| 182 | -            action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
 | |
| 186 | +            action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
 | |
| 183 | 187 |              action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
 | 
| 184 | 188 |              action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
 | 
| 185 | 189 |  | 
| ... | ... | @@ -204,6 +208,10 @@ class Job: | 
| 204 | 208 |                  self.__queued_timestamp.GetCurrentTime()
 | 
| 205 | 209 |              self._n_tries += 1
 | 
| 206 | 210 |  | 
| 211 | +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
 | |
| 212 | +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
 | |
| 213 | +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
 | |
| 214 | + | |
| 207 | 215 |          elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
 | 
| 208 | 216 |              if self.__execute_response is not None:
 | 
| 209 | 217 |                  self._operation.response.Pack(self.__execute_response)
 | 
| ... | ... | @@ -213,3 +221,11 @@ class Job: | 
| 213 | 221 |  | 
| 214 | 222 |          for queue in self._operation_update_queues:
 | 
| 215 | 223 |              queue.put(self._operation)
 | 
| 224 | + | |
| 225 | +    # --- Public API: Monitoring ---
 | |
| 226 | + | |
| 227 | +    def query_queue_time(self):
 | |
| 228 | +        return self.__queued_time_duration.ToTimedelta()
 | |
| 229 | + | |
| 230 | +    def query_n_retries(self):
 | |
| 231 | +        return self._n_tries - 1 if self._n_tries > 0 else 0 | 
| ... | ... | @@ -32,6 +32,10 @@ class OperationsInstance: | 
| 32 | 32 |  | 
| 33 | 33 |          self._scheduler = scheduler
 | 
| 34 | 34 |  | 
| 35 | +    @property
 | |
| 36 | +    def scheduler(self):
 | |
| 37 | +        return self._scheduler
 | |
| 38 | + | |
| 35 | 39 |      def register_instance_with_server(self, instance_name, server):
 | 
| 36 | 40 |          server.add_operations_instance(self, instance_name)
 | 
| 37 | 41 |  | 
| ... | ... | @@ -38,8 +38,34 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 38 | 38 |  | 
| 39 | 39 |          operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
 | 
| 40 | 40 |  | 
| 41 | -    def add_instance(self, name, instance):
 | |
| 42 | -        self._instances[name] = instance
 | |
| 41 | +    # --- Public API ---
 | |
| 42 | + | |
| 43 | +    def add_instance(self, instance_name, instance):
 | |
| 44 | +        """Registers a new servicer instance.
 | |
| 45 | + | |
| 46 | +        Args:
 | |
| 47 | +            instance_name (str): The new instance's name.
 | |
| 48 | +            instance (OperationsInstance): The new instance itself.
 | |
| 49 | +        """
 | |
| 50 | +        self._instances[instance_name] = instance
 | |
| 51 | + | |
| 52 | +    def get_scheduler(self, instance_name):
 | |
| 53 | +        """Retrieves a reference to the scheduler for an instance.
 | |
| 54 | + | |
| 55 | +        Args:
 | |
| 56 | +            instance_name (str): The name of the instance to query.
 | |
| 57 | + | |
| 58 | +        Returns:
 | |
| 59 | +            Scheduler: A reference to the scheduler for `instance_name`.
 | |
| 60 | + | |
| 61 | +        Raises:
 | |
| 62 | +            InvalidArgumentError: If no instance named `instance_name` exists.
 | |
| 63 | +        """
 | |
| 64 | +        instance = self._get_instance(instance_name)
 | |
| 65 | + | |
| 66 | +        return instance.scheduler
 | |
| 67 | + | |
| 68 | +    # --- Public API: Servicer ---
 | |
| 43 | 69 |  | 
| 44 | 70 |      def GetOperation(self, request, context):
 | 
| 45 | 71 |          self.__logger.debug("GetOperation request from [%s]", context.peer())
 | 
| ... | ... | @@ -132,6 +158,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 132 | 158 |  | 
| 133 | 159 |          return Empty()
 | 
| 134 | 160 |  | 
| 161 | +    # --- Private API ---
 | |
| 162 | + | |
| 135 | 163 |      def _parse_instance_name(self, name):
 | 
| 136 | 164 |          """ If the instance name is not blank, 'name' will have the form
 | 
| 137 | 165 |          {instance_name}/{operation_uuid}. Otherwise, it will just be
 | 
| ... | ... | @@ -22,22 +22,29 @@ Schedules jobs. | 
| 22 | 22 |  from collections import deque
 | 
| 23 | 23 |  import logging
 | 
| 24 | 24 |  | 
| 25 | +from buildgrid._enums import LeaseState, OperationStage
 | |
| 25 | 26 |  from buildgrid._exceptions import NotFoundError
 | 
| 26 | 27 |  | 
| 27 | -from .job import OperationStage, LeaseState
 | |
| 28 | - | |
| 29 | 28 |  | 
| 30 | 29 |  class Scheduler:
 | 
| 31 | 30 |  | 
| 32 | 31 |      MAX_N_TRIES = 5
 | 
| 33 | 32 |  | 
| 34 | -    def __init__(self, action_cache=None):
 | |
| 33 | +    def __init__(self, action_cache=None, monitor=True):
 | |
| 35 | 34 |          self.__logger = logging.getLogger(__name__)
 | 
| 36 | 35 |  | 
| 36 | +        self.__queue_times_by_priority = {}
 | |
| 37 | +        self.__queue_time_average = 0, 0.0
 | |
| 38 | +        self.__retries_by_error = {}
 | |
| 39 | +        self.__retries_count = 0
 | |
| 40 | + | |
| 37 | 41 |          self._action_cache = action_cache
 | 
| 42 | +        self._is_monitored = True
 | |
| 38 | 43 |          self.jobs = {}
 | 
| 39 | 44 |          self.queue = deque()
 | 
| 40 | 45 |  | 
| 46 | +    # --- Public API ---
 | |
| 47 | + | |
| 41 | 48 |      def register_client(self, job_name, queue):
 | 
| 42 | 49 |          self.jobs[job_name].register_client(queue)
 | 
| 43 | 50 |  | 
| ... | ... | @@ -66,18 +73,22 @@ class Scheduler: | 
| 66 | 73 |              operation_stage = OperationStage.QUEUED
 | 
| 67 | 74 |              self.queue.append(job)
 | 
| 68 | 75 |  | 
| 69 | -        job.update_operation_stage(operation_stage)
 | |
| 76 | +        self._update_job_operation_stage(job, operation_stage)
 | |
| 70 | 77 |  | 
| 71 | 78 |      def retry_job(self, job_name):
 | 
| 72 | -        if job_name in self.jobs:
 | |
| 73 | -            job = self.jobs[job_name]
 | |
| 74 | -            if job.n_tries >= self.MAX_N_TRIES:
 | |
| 75 | -                # TODO: Decide what to do with these jobs
 | |
| 76 | -                job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 77 | -                # TODO: Mark these jobs as done
 | |
| 78 | -            else:
 | |
| 79 | -                job.update_operation_stage(OperationStage.QUEUED)
 | |
| 80 | -                self.queue.appendleft(job)
 | |
| 79 | +        job = self.jobs[job_name]
 | |
| 80 | + | |
| 81 | +        operation_stage = None
 | |
| 82 | +        if job.n_tries >= self.MAX_N_TRIES:
 | |
| 83 | +            # TODO: Decide what to do with these jobs
 | |
| 84 | +            operation_stage = OperationStage.COMPLETED
 | |
| 85 | +            # TODO: Mark these jobs as done
 | |
| 86 | + | |
| 87 | +        else:
 | |
| 88 | +            operation_stage = OperationStage.QUEUED
 | |
| 89 | +            self.queue.appendleft(job)
 | |
| 90 | + | |
| 91 | +        self._update_job_operation_stage(job, operation_stage)
 | |
| 81 | 92 |  | 
| 82 | 93 |      def list_jobs(self):
 | 
| 83 | 94 |          return self.jobs.values()
 | 
| ... | ... | @@ -112,13 +123,14 @@ class Scheduler: | 
| 112 | 123 |          """
 | 
| 113 | 124 |          job = self.jobs[job_name]
 | 
| 114 | 125 |  | 
| 126 | +        operation_stage = None
 | |
| 115 | 127 |          if lease_state == LeaseState.PENDING:
 | 
| 116 | 128 |              job.update_lease_state(LeaseState.PENDING)
 | 
| 117 | -            job.update_operation_stage(OperationStage.QUEUED)
 | |
| 129 | +            operation_stage = OperationStage.QUEUED
 | |
| 118 | 130 |  | 
| 119 | 131 |          elif lease_state == LeaseState.ACTIVE:
 | 
| 120 | 132 |              job.update_lease_state(LeaseState.ACTIVE)
 | 
| 121 | -            job.update_operation_stage(OperationStage.EXECUTING)
 | |
| 133 | +            operation_stage = OperationStage.EXECUTING
 | |
| 122 | 134 |  | 
| 123 | 135 |          elif lease_state == LeaseState.COMPLETED:
 | 
| 124 | 136 |              job.update_lease_state(LeaseState.COMPLETED,
 | 
| ... | ... | @@ -127,7 +139,9 @@ class Scheduler: | 
| 127 | 139 |              if self._action_cache is not None and not job.do_not_cache:
 | 
| 128 | 140 |                  self._action_cache.update_action_result(job.action_digest, job.action_result)
 | 
| 129 | 141 |  | 
| 130 | -            job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 142 | +            operation_stage = OperationStage.COMPLETED
 | |
| 143 | + | |
| 144 | +        self._update_job_operation_stage(job, operation_stage)
 | |
| 131 | 145 |  | 
| 132 | 146 |      def get_job_lease(self, job_name):
 | 
| 133 | 147 |          """Returns the lease associated to job, if any have been emitted yet."""
 | 
| ... | ... | @@ -136,3 +150,59 @@ class Scheduler: | 
| 136 | 150 |      def get_job_operation(self, job_name):
 | 
| 137 | 151 |          """Returns the operation associated to job."""
 | 
| 138 | 152 |          return self.jobs[job_name].operation
 | 
| 153 | + | |
| 154 | +    # --- Public API: Monitoring ---
 | |
| 155 | + | |
| 156 | +    @property
 | |
| 157 | +    def is_monitored(self):
 | |
| 158 | +        return self._is_monitored
 | |
| 159 | + | |
| 160 | +    def query_n_jobs(self):
 | |
| 161 | +        return len(self.jobs)
 | |
| 162 | + | |
| 163 | +    def query_n_operations(self):
 | |
| 164 | +        return len(self.jobs)
 | |
| 165 | + | |
| 166 | +    def query_n_operations_by_stage(self):
 | |
| 167 | +        return len(self.jobs)
 | |
| 168 | + | |
| 169 | +    def query_n_leases(self):
 | |
| 170 | +        return len(self.jobs)
 | |
| 171 | + | |
| 172 | +    def query_n_leases_by_state(self):
 | |
| 173 | +        return len(self.jobs)
 | |
| 174 | + | |
| 175 | +    def query_n_retries(self):
 | |
| 176 | +        return self.__retries_count
 | |
| 177 | + | |
| 178 | +    def query_n_retries_for_error(self, error_type):
 | |
| 179 | +        try:
 | |
| 180 | +            return self.__retries_by_error[error_type]
 | |
| 181 | +        except KeyError:
 | |
| 182 | +            return 0
 | |
| 183 | + | |
| 184 | +    def query_am_queue_time(self):
 | |
| 185 | +        return self.__queue_time_average[1]
 | |
| 186 | + | |
| 187 | +    def query_am_queue_time_for_priority(self, priority_level):
 | |
| 188 | +        try:
 | |
| 189 | +            return self.__queue_times_by_priority[priority_level]
 | |
| 190 | +        except KeyError:
 | |
| 191 | +            return 0
 | |
| 192 | + | |
| 193 | +    # --- Private API ---
 | |
| 194 | + | |
| 195 | +    def _update_job_operation_stage(self, job, stage):
 | |
| 196 | +        job.update_operation_stage(stage)
 | |
| 197 | + | |
| 198 | +        if self._is_monitored and stage == OperationStage.COMPLETED:
 | |
| 199 | +            average_order, average_time = self.__queue_time_average
 | |
| 200 | +            queue_time = job.query_queue_time().total_seconds()
 | |
| 201 | + | |
| 202 | +            average_order += 1
 | |
| 203 | +            if average_order > 1:
 | |
| 204 | +                average_time = average_time + (queue_time - average_time / average_order)
 | |
| 205 | +            else:
 | |
| 206 | +                average_time = queue_time
 | |
| 207 | + | |
| 208 | +            self.__queue_time_average = average_order, queue_time | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 1 | 16 |  import hashlib
 | 
| 2 | 17 |  | 
| 3 | 18 |  | 
| 4 | -# The hash function that CAS uses
 | |
| 19 | +# Hash function used for computing digests:
 | |
| 5 | 20 |  HASH = hashlib.sha256
 | 
| 21 | + | |
| 22 | +# Lenght in bytes of a hash string returned by HASH:
 | |
| 6 | 23 |  HASH_LENGTH = HASH().digest_size * 2
 | 
| 24 | + | |
| 25 | +# Period, in seconds, for the monitoring cycle:
 | |
| 26 | +MONITORING_PERIOD = 5.0 | 
| ... | ... | @@ -112,13 +112,15 @@ setup( | 
| 112 | 112 |      license="Apache License, Version 2.0",
 | 
| 113 | 113 |      description="A remote execution service",
 | 
| 114 | 114 |      packages=find_packages(),
 | 
| 115 | +    python_requires='>= 3.5.3',  # janus requirement
 | |
| 115 | 116 |      install_requires=[
 | 
| 116 | -        'protobuf',
 | |
| 117 | -        'grpcio',
 | |
| 118 | -        'Click',
 | |
| 119 | -        'PyYAML',
 | |
| 120 | 117 |          'boto3 < 1.8.0',
 | 
| 121 | 118 |          'botocore < 1.11.0',
 | 
| 119 | +        'click',
 | |
| 120 | +        'grpcio',
 | |
| 121 | +        'janus',
 | |
| 122 | +        'protobuf',
 | |
| 123 | +        'pyyaml',
 | |
| 122 | 124 |      ],
 | 
| 123 | 125 |      entry_points={
 | 
| 124 | 126 |          'console_scripts': [
 | 
| 1 | +#!/bin/bash
 | |
| 2 | + | |
| 3 | +echo "Hello, World!" | 
