Martin Blanchard pushed to branch mablanch/23-new-logging at BuildGrid / buildgrid
Commits:
- 
b2df08ec
by Martin Blanchard at 2018-11-15T13:06:36Z
 - 
24f70ff3
by Martin Blanchard at 2018-11-15T13:06:57Z
 - 
49782b64
by Martin Blanchard at 2018-11-15T13:06:57Z
 - 
a43c5625
by Martin Blanchard at 2018-11-15T13:33:02Z
 - 
527d993b
by Martin Blanchard at 2018-11-15T13:33:04Z
 - 
a7852bbf
by Martin Blanchard at 2018-11-15T13:33:04Z
 - 
89168bc7
by Martin Blanchard at 2018-11-15T13:33:04Z
 - 
4a509095
by Martin Blanchard at 2018-11-15T13:33:04Z
 - 
a99dcb19
by Martin Blanchard at 2018-11-15T15:03:03Z
 - 
8c7db057
by Martin Blanchard at 2018-11-15T15:03:20Z
 - 
a09a386d
by Martin Blanchard at 2018-11-15T15:03:20Z
 - 
97023a78
by Martin Blanchard at 2018-11-15T15:03:20Z
 - 
b9a9a841
by Martin Blanchard at 2018-11-15T15:03:20Z
 - 
8da1bf57
by Martin Blanchard at 2018-11-15T15:03:20Z
 
13 changed files:
- .pylintrc
 - buildgrid/_app/cli.py
 - buildgrid/server/bots/instance.py
 - buildgrid/server/bots/service.py
 - buildgrid/server/execution/instance.py
 - buildgrid/server/execution/service.py
 - buildgrid/server/instance.py
 - buildgrid/server/job.py
 - buildgrid/server/operations/instance.py
 - buildgrid/server/operations/service.py
 - buildgrid/server/scheduler.py
 - buildgrid/settings.py
 - setup.py
 
Changes:
| ... | ... | @@ -460,6 +460,7 @@ known-third-party=boto3, | 
| 460 | 460 | 
                   enchant,
 | 
| 461 | 461 | 
                   google,
 | 
| 462 | 462 | 
                   grpc,
 | 
| 463 | 
+                  janus,
 | 
|
| 463 | 464 | 
                   moto,
 | 
| 464 | 465 | 
                   yaml
 | 
| 465 | 466 | 
 | 
| ... | ... | @@ -523,4 +524,4 @@ valid-metaclass-classmethod-first-arg=mcs | 
| 523 | 524 | 
 | 
| 524 | 525 | 
 # Exceptions that will emit a warning when being caught. Defaults to
 | 
| 525 | 526 | 
 # "Exception"
 | 
| 526 | 
-overgeneral-exceptions=Exception
 | 
|
| 527 | 
+overgeneral-exceptions=Exception
 | 
|
| \ No newline at end of file | 
| ... | ... | @@ -27,6 +27,7 @@ import os | 
| 27 | 27 | 
 import click
 | 
| 28 | 28 | 
 import grpc
 | 
| 29 | 29 | 
 | 
| 30 | 
+from buildgrid.settings import LOG_RECORD_FORMAT
 | 
|
| 30 | 31 | 
 from buildgrid.utils import read_file
 | 
| 31 | 32 | 
 | 
| 32 | 33 | 
 CONTEXT_SETTINGS = dict(auto_envvar_prefix='BUILDGRID')
 | 
| ... | ... | @@ -138,11 +139,38 @@ class BuildGridCLI(click.MultiCommand): | 
| 138 | 139 | 
         return mod.cli
 | 
| 139 | 140 | 
 | 
| 140 | 141 | 
 | 
| 142 | 
+class DebugFilter(logging.Filter):
 | 
|
| 143 | 
+  | 
|
| 144 | 
+    def __init__(self, debug_domains, name=''):
 | 
|
| 145 | 
+        super().__init__(name=name)
 | 
|
| 146 | 
+        self.__domains_tree = {}
 | 
|
| 147 | 
+  | 
|
| 148 | 
+        for domain in debug_domains.split(','):
 | 
|
| 149 | 
+            domains_tree = self.__domains_tree
 | 
|
| 150 | 
+            for label in domain.split('.'):
 | 
|
| 151 | 
+                if label not in domains_tree:
 | 
|
| 152 | 
+                    domains_tree[label] = {}
 | 
|
| 153 | 
+                domains_tree = domains_tree[label]
 | 
|
| 154 | 
+  | 
|
| 155 | 
+    def filter(self, record):
 | 
|
| 156 | 
+        domains_tree = self.__domains_tree
 | 
|
| 157 | 
+        for label in record.name.split('.'):
 | 
|
| 158 | 
+            if '*' in domains_tree:
 | 
|
| 159 | 
+                return True
 | 
|
| 160 | 
+            if label not in domains_tree:
 | 
|
| 161 | 
+                return False
 | 
|
| 162 | 
+            domains_tree = domains_tree[label]
 | 
|
| 163 | 
+  | 
|
| 164 | 
+        return True
 | 
|
| 165 | 
+  | 
|
| 166 | 
+  | 
|
| 141 | 167 | 
 @click.command(cls=BuildGridCLI, context_settings=CONTEXT_SETTINGS)
 | 
| 168 | 
+@click.option('--no-print', is_flag=True, show_default=True,
 | 
|
| 169 | 
+              help="Do not print log records to stdout/stderr.")
 | 
|
| 142 | 170 | 
 @click.option('-v', '--verbose', count=True,
 | 
| 143 | 171 | 
               help='Increase log verbosity level.')
 | 
| 144 | 172 | 
 @pass_context
 | 
| 145 | 
-def cli(context, verbose):
 | 
|
| 173 | 
+def cli(context, no_print, verbose):
 | 
|
| 146 | 174 | 
     """BuildGrid App"""
 | 
| 147 | 175 | 
     logger = logging.getLogger()
 | 
| 148 | 176 | 
 | 
| ... | ... | @@ -152,8 +180,20 @@ def cli(context, verbose): | 
| 152 | 180 | 
     for log_filter in logger.filters[:]:
 | 
| 153 | 181 | 
         logger.removeFilter(log_filter)
 | 
| 154 | 182 | 
 | 
| 155 | 
-    logging.basicConfig(
 | 
|
| 156 | 
-        format='%(asctime)s:%(name)32.32s][%(levelname)5.5s]: %(message)s')
 | 
|
| 183 | 
+    # Do not register a stream handler if no-print is requested:
 | 
|
| 184 | 
+    if not no_print:
 | 
|
| 185 | 
+        log_handler = logging.StreamHandler()
 | 
|
| 186 | 
+  | 
|
| 187 | 
+        # Filter debug messages using BGD_MESSAGE_DEBUG value:
 | 
|
| 188 | 
+        debug_domains = os.environ.get('BGD_MESSAGE_DEBUG', None)
 | 
|
| 189 | 
+        if debug_domains:
 | 
|
| 190 | 
+            log_handler.addFilter(DebugFilter(debug_domains))
 | 
|
| 191 | 
+  | 
|
| 192 | 
+    else:
 | 
|
| 193 | 
+        log_handler = logging.NullHandler()
 | 
|
| 194 | 
+  | 
|
| 195 | 
+    logging.basicConfig(format=LOG_RECORD_FORMAT,
 | 
|
| 196 | 
+                        handlers=[log_handler])
 | 
|
| 157 | 197 | 
 | 
| 158 | 198 | 
     if verbose == 1:
 | 
| 159 | 199 | 
         logger.setLevel(logging.WARNING)
 | 
| ... | ... | @@ -161,5 +201,3 @@ def cli(context, verbose): | 
| 161 | 201 | 
         logger.setLevel(logging.INFO)
 | 
| 162 | 202 | 
     elif verbose >= 3:
 | 
| 163 | 203 | 
         logger.setLevel(logging.DEBUG)
 | 
| 164 | 
-    else:
 | 
|
| 165 | 
-        logger.setLevel(logging.ERROR)
 | 
| ... | ... | @@ -37,6 +37,10 @@ class BotsInterface: | 
| 37 | 37 | 
         self._bot_sessions = {}
 | 
| 38 | 38 | 
         self._scheduler = scheduler
 | 
| 39 | 39 | 
 | 
| 40 | 
+    @property
 | 
|
| 41 | 
+    def scheduler(self):
 | 
|
| 42 | 
+        return self._scheduler
 | 
|
| 43 | 
+  | 
|
| 40 | 44 | 
     def register_instance_with_server(self, instance_name, server):
 | 
| 41 | 45 | 
         server.add_bots_interface(self, instance_name)
 | 
| 42 | 46 | 
 | 
| ... | ... | @@ -23,8 +23,9 @@ import logging | 
| 23 | 23 | 
 | 
| 24 | 24 | 
 import grpc
 | 
| 25 | 25 | 
 | 
| 26 | 
-from google.protobuf.empty_pb2 import Empty
 | 
|
| 26 | 
+from google.protobuf import empty_pb2, timestamp_pb2
 | 
|
| 27 | 27 | 
 | 
| 28 | 
+from buildgrid._enums import BotStatus
 | 
|
| 28 | 29 | 
 from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
 | 
| 29 | 30 | 
 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 30 | 31 | 
 from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
 | 
| ... | ... | @@ -32,24 +33,87 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp | 
| 32 | 33 | 
 | 
| 33 | 34 | 
 class BotsService(bots_pb2_grpc.BotsServicer):
 | 
| 34 | 35 | 
 | 
| 35 | 
-    def __init__(self, server):
 | 
|
| 36 | 
+    def __init__(self, server, monitor=True):
 | 
|
| 36 | 37 | 
         self.__logger = logging.getLogger(__name__)
 | 
| 37 | 38 | 
 | 
| 39 | 
+        self.__bots_by_status = None
 | 
|
| 40 | 
+        self.__bots_by_instance = None
 | 
|
| 41 | 
+        self.__bots = None
 | 
|
| 42 | 
+  | 
|
| 38 | 43 | 
         self._instances = {}
 | 
| 39 | 44 | 
 | 
| 40 | 45 | 
         bots_pb2_grpc.add_BotsServicer_to_server(self, server)
 | 
| 41 | 46 | 
 | 
| 42 | 
-    def add_instance(self, name, instance):
 | 
|
| 43 | 
-        self._instances[name] = instance
 | 
|
| 47 | 
+        self._is_instrumented = monitor
 | 
|
| 48 | 
+  | 
|
| 49 | 
+        if self._is_instrumented:
 | 
|
| 50 | 
+            self.__bots_by_status = {}
 | 
|
| 51 | 
+            self.__bots_by_instance = {}
 | 
|
| 52 | 
+            self.__bots = {}
 | 
|
| 53 | 
+  | 
|
| 54 | 
+            self.__bots_by_status[BotStatus.OK] = set()
 | 
|
| 55 | 
+            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
 | 
|
| 56 | 
+            self.__bots_by_status[BotStatus.HOST_REBOOTING] = set()
 | 
|
| 57 | 
+            self.__bots_by_status[BotStatus.BOT_TERMINATING] = set()
 | 
|
| 58 | 
+  | 
|
| 59 | 
+    # --- Public API ---
 | 
|
| 60 | 
+  | 
|
| 61 | 
+    def add_instance(self, instance_name, instance):
 | 
|
| 62 | 
+        """Registers a new servicer instance.
 | 
|
| 63 | 
+  | 
|
| 64 | 
+        Args:
 | 
|
| 65 | 
+            instance_name (str): The new instance's name.
 | 
|
| 66 | 
+            instance (BotsInterface): The new instance itself.
 | 
|
| 67 | 
+        """
 | 
|
| 68 | 
+        self._instances[instance_name] = instance
 | 
|
| 69 | 
+  | 
|
| 70 | 
+        if self._is_instrumented:
 | 
|
| 71 | 
+            self.__bots_by_instance[instance_name] = 0
 | 
|
| 72 | 
+  | 
|
| 73 | 
+    def get_scheduler(self, instance_name):
 | 
|
| 74 | 
+        """Retrieves a reference to the scheduler for an instance.
 | 
|
| 75 | 
+  | 
|
| 76 | 
+        Args:
 | 
|
| 77 | 
+            instance_name (str): The name of the instance to query.
 | 
|
| 78 | 
+  | 
|
| 79 | 
+        Returns:
 | 
|
| 80 | 
+            Scheduler: A reference to the scheduler for `instance_name`.
 | 
|
| 81 | 
+  | 
|
| 82 | 
+        Raises:
 | 
|
| 83 | 
+            InvalidArgumentError: If no instance named `instance_name` exists.
 | 
|
| 84 | 
+        """
 | 
|
| 85 | 
+        instance = self._get_instance(instance_name)
 | 
|
| 86 | 
+  | 
|
| 87 | 
+        return instance.scheduler
 | 
|
| 88 | 
+  | 
|
| 89 | 
+    # --- Public API: Servicer ---
 | 
|
| 44 | 90 | 
 | 
| 45 | 91 | 
     def CreateBotSession(self, request, context):
 | 
| 92 | 
+        """Handles CreateBotSessionRequest messages.
 | 
|
| 93 | 
+  | 
|
| 94 | 
+        Args:
 | 
|
| 95 | 
+            request (CreateBotSessionRequest): The incoming RPC request.
 | 
|
| 96 | 
+            context (grpc.ServicerContext): Context for the RPC call.
 | 
|
| 97 | 
+        """
 | 
|
| 46 | 98 | 
         self.__logger.debug("CreateBotSession request from [%s]", context.peer())
 | 
| 47 | 99 | 
 | 
| 100 | 
+        instance_name = request.parent
 | 
|
| 101 | 
+        bot_status = BotStatus(request.bot_session.status)
 | 
|
| 102 | 
+        bot_id = request.bot_session.bot_id
 | 
|
| 103 | 
+  | 
|
| 48 | 104 | 
         try:
 | 
| 49 | 
-            parent = request.parent
 | 
|
| 50 | 
-            instance = self._get_instance(request.parent)
 | 
|
| 51 | 
-            return instance.create_bot_session(parent,
 | 
|
| 52 | 
-                                               request.bot_session)
 | 
|
| 105 | 
+            instance = self._get_instance(instance_name)
 | 
|
| 106 | 
+            bot_session = instance.create_bot_session(instance_name,
 | 
|
| 107 | 
+                                                      request.bot_session)
 | 
|
| 108 | 
+            now = timestamp_pb2.Timestamp()
 | 
|
| 109 | 
+            now.GetCurrentTime()
 | 
|
| 110 | 
+  | 
|
| 111 | 
+            if self._is_instrumented:
 | 
|
| 112 | 
+                self.__bots[bot_id] = now
 | 
|
| 113 | 
+                self.__bots_by_instance[instance_name] += 1
 | 
|
| 114 | 
+                self.__bots_by_status[bot_status].add(bot_id)
 | 
|
| 115 | 
+  | 
|
| 116 | 
+            return bot_session
 | 
|
| 53 | 117 | 
 | 
| 54 | 118 | 
         except InvalidArgumentError as e:
 | 
| 55 | 119 | 
             self.__logger.error(e)
 | 
| ... | ... | @@ -59,17 +123,36 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 59 | 123 | 
         return bots_pb2.BotSession()
 | 
| 60 | 124 | 
 | 
| 61 | 125 | 
     def UpdateBotSession(self, request, context):
 | 
| 126 | 
+        """Handles UpdateBotSessionRequest messages.
 | 
|
| 127 | 
+  | 
|
| 128 | 
+        Args:
 | 
|
| 129 | 
+            request (UpdateBotSessionRequest): The incoming RPC request.
 | 
|
| 130 | 
+            context (grpc.ServicerContext): Context for the RPC call.
 | 
|
| 131 | 
+        """
 | 
|
| 62 | 132 | 
         self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
 | 
| 63 | 133 | 
 | 
| 134 | 
+        names = request.name.split("/")
 | 
|
| 135 | 
+        bot_status = BotStatus(request.bot_session.status)
 | 
|
| 136 | 
+        bot_id = request.bot_session.bot_id
 | 
|
| 137 | 
+  | 
|
| 64 | 138 | 
         try:
 | 
| 65 | 
-            names = request.name.split("/")
 | 
|
| 66 | 
-            # Operation name should be in format:
 | 
|
| 67 | 
-            # {instance/name}/{uuid}
 | 
|
| 68 | 
-            instance_name = ''.join(names[0:-1])
 | 
|
| 139 | 
+            instance_name = '/'.join(names[:-1])
 | 
|
| 69 | 140 | 
 | 
| 70 | 141 | 
             instance = self._get_instance(instance_name)
 | 
| 71 | 
-            return instance.update_bot_session(request.name,
 | 
|
| 72 | 
-                                               request.bot_session)
 | 
|
| 142 | 
+            bot_session = instance.update_bot_session(request.name,
 | 
|
| 143 | 
+                                                      request.bot_session)
 | 
|
| 144 | 
+  | 
|
| 145 | 
+            if self._is_instrumented:
 | 
|
| 146 | 
+                self.__bots[bot_id].GetCurrentTime()
 | 
|
| 147 | 
+                if bot_id not in self.__bots_by_status[bot_status]:
 | 
|
| 148 | 
+                    self.__bots_by_status[BotStatus.OK].discard(bot_id)
 | 
|
| 149 | 
+                    self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
 | 
|
| 150 | 
+                    self.__bots_by_status[BotStatus.HOST_REBOOTING].discard(bot_id)
 | 
|
| 151 | 
+                    self.__bots_by_status[BotStatus.BOT_TERMINATING].discard(bot_id)
 | 
|
| 152 | 
+  | 
|
| 153 | 
+                    self.__bots_by_status[bot_status].add(bot_id)
 | 
|
| 154 | 
+  | 
|
| 155 | 
+            return bot_session
 | 
|
| 73 | 156 | 
 | 
| 74 | 157 | 
         except InvalidArgumentError as e:
 | 
| 75 | 158 | 
             self.__logger.error(e)
 | 
| ... | ... | @@ -89,10 +172,46 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 89 | 172 | 
         return bots_pb2.BotSession()
 | 
| 90 | 173 | 
 | 
| 91 | 174 | 
     def PostBotEventTemp(self, request, context):
 | 
| 175 | 
+        """Handles PostBotEventTempRequest messages.
 | 
|
| 176 | 
+  | 
|
| 177 | 
+        Args:
 | 
|
| 178 | 
+            request (PostBotEventTempRequest): The incoming RPC request.
 | 
|
| 179 | 
+            context (grpc.ServicerContext): Context for the RPC call.
 | 
|
| 180 | 
+        """
 | 
|
| 92 | 181 | 
         self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
 | 
| 93 | 182 | 
 | 
| 94 | 183 | 
         context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 95 | 
-        return Empty()
 | 
|
| 184 | 
+  | 
|
| 185 | 
+        return empty_pb2.Empty()
 | 
|
| 186 | 
+  | 
|
| 187 | 
+    # --- Public API: Monitoring ---
 | 
|
| 188 | 
+  | 
|
| 189 | 
+    @property
 | 
|
| 190 | 
+    def is_instrumented(self):
 | 
|
| 191 | 
+        return self._is_instrumented
 | 
|
| 192 | 
+  | 
|
| 193 | 
+    def query_n_bots(self):
 | 
|
| 194 | 
+        if self.__bots is not None:
 | 
|
| 195 | 
+            return len(self.__bots)
 | 
|
| 196 | 
+        return 0
 | 
|
| 197 | 
+  | 
|
| 198 | 
+    def query_n_bots_for_instance(self, instance_name):
 | 
|
| 199 | 
+        try:
 | 
|
| 200 | 
+            if self.__bots_by_instance is not None:
 | 
|
| 201 | 
+                return self.__bots_by_instance[instance_name]
 | 
|
| 202 | 
+        except KeyError:
 | 
|
| 203 | 
+            pass
 | 
|
| 204 | 
+        return 0
 | 
|
| 205 | 
+  | 
|
| 206 | 
+    def query_n_bots_for_status(self, bot_status):
 | 
|
| 207 | 
+        try:
 | 
|
| 208 | 
+            if self.__bots_by_status is not None:
 | 
|
| 209 | 
+                return len(self.__bots_by_status[bot_status])
 | 
|
| 210 | 
+        except KeyError:
 | 
|
| 211 | 
+            pass
 | 
|
| 212 | 
+        return 0
 | 
|
| 213 | 
+  | 
|
| 214 | 
+    # --- Private API ---
 | 
|
| 96 | 215 | 
 | 
| 97 | 216 | 
     def _get_instance(self, name):
 | 
| 98 | 217 | 
         try:
 | 
| ... | ... | @@ -35,6 +35,10 @@ class ExecutionInstance: | 
| 35 | 35 | 
         self._storage = storage
 | 
| 36 | 36 | 
         self._scheduler = scheduler
 | 
| 37 | 37 | 
 | 
| 38 | 
+    @property
 | 
|
| 39 | 
+    def scheduler(self):
 | 
|
| 40 | 
+        return self._scheduler
 | 
|
| 41 | 
+  | 
|
| 38 | 42 | 
     def register_instance_with_server(self, instance_name, server):
 | 
| 39 | 43 | 
         server.add_execution_instance(self, instance_name)
 | 
| 40 | 44 | 
 | 
| ... | ... | @@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 33 | 33 | 
 | 
| 34 | 34 | 
 class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 | 
| 35 | 35 | 
 | 
| 36 | 
-    def __init__(self, server):
 | 
|
| 36 | 
+    def __init__(self, server, monitor=True):
 | 
|
| 37 | 37 | 
         self.__logger = logging.getLogger(__name__)
 | 
| 38 | 38 | 
 | 
| 39 | 
+        self.__peers_by_instance = None
 | 
|
| 40 | 
+        self.__peers = None
 | 
|
| 41 | 
+  | 
|
| 39 | 42 | 
         self._instances = {}
 | 
| 43 | 
+  | 
|
| 40 | 44 | 
         remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
 | 
| 41 | 45 | 
 | 
| 42 | 
-    def add_instance(self, name, instance):
 | 
|
| 43 | 
-        self._instances[name] = instance
 | 
|
| 46 | 
+        self._is_instrumented = monitor
 | 
|
| 47 | 
+  | 
|
| 48 | 
+        if self._is_instrumented:
 | 
|
| 49 | 
+            self.__peers_by_instance = {}
 | 
|
| 50 | 
+            self.__peers = {}
 | 
|
| 51 | 
+  | 
|
| 52 | 
+    # --- Public API ---
 | 
|
| 53 | 
+  | 
|
| 54 | 
+    def add_instance(self, instance_name, instance):
 | 
|
| 55 | 
+        """Registers a new servicer instance.
 | 
|
| 56 | 
+  | 
|
| 57 | 
+        Args:
 | 
|
| 58 | 
+            instance_name (str): The new instance's name.
 | 
|
| 59 | 
+            instance (ExecutionInstance): The new instance itself.
 | 
|
| 60 | 
+        """
 | 
|
| 61 | 
+        self._instances[instance_name] = instance
 | 
|
| 62 | 
+  | 
|
| 63 | 
+        if self._is_instrumented:
 | 
|
| 64 | 
+            self.__peers_by_instance[instance_name] = set()
 | 
|
| 65 | 
+  | 
|
| 66 | 
+    def get_scheduler(self, instance_name):
 | 
|
| 67 | 
+        """Retrieves a reference to the scheduler for an instance.
 | 
|
| 68 | 
+  | 
|
| 69 | 
+        Args:
 | 
|
| 70 | 
+            instance_name (str): The name of the instance to query.
 | 
|
| 71 | 
+  | 
|
| 72 | 
+        Returns:
 | 
|
| 73 | 
+            Scheduler: A reference to the scheduler for `instance_name`.
 | 
|
| 74 | 
+  | 
|
| 75 | 
+        Raises:
 | 
|
| 76 | 
+            InvalidArgumentError: If no instance named `instance_name` exists.
 | 
|
| 77 | 
+        """
 | 
|
| 78 | 
+        instance = self._get_instance(instance_name)
 | 
|
| 79 | 
+  | 
|
| 80 | 
+        return instance.scheduler
 | 
|
| 81 | 
+  | 
|
| 82 | 
+    # --- Public API: Servicer ---
 | 
|
| 44 | 83 | 
 | 
| 45 | 84 | 
     def Execute(self, request, context):
 | 
| 85 | 
+        """Handles ExecuteRequest messages.
 | 
|
| 86 | 
+  | 
|
| 87 | 
+        Args:
 | 
|
| 88 | 
+            request (ExecuteRequest): The incoming RPC request.
 | 
|
| 89 | 
+            context (grpc.ServicerContext): Context for the RPC call.
 | 
|
| 90 | 
+        """
 | 
|
| 46 | 91 | 
         self.__logger.debug("Execute request from [%s]", context.peer())
 | 
| 47 | 92 | 
 | 
| 93 | 
+        instance_name = request.instance_name
 | 
|
| 94 | 
+        message_queue = queue.Queue()
 | 
|
| 95 | 
+        peer = context.peer()
 | 
|
| 96 | 
+  | 
|
| 48 | 97 | 
         try:
 | 
| 49 | 
-            message_queue = queue.Queue()
 | 
|
| 50 | 
-            instance = self._get_instance(request.instance_name)
 | 
|
| 98 | 
+            instance = self._get_instance(instance_name)
 | 
|
| 51 | 99 | 
             operation = instance.execute(request.action_digest,
 | 
| 52 | 100 | 
                                          request.skip_cache_lookup,
 | 
| 53 | 101 | 
                                          message_queue)
 | 
| 54 | 102 | 
 | 
| 55 | 
-            context.add_callback(partial(instance.unregister_message_client,
 | 
|
| 56 | 
-                                         operation.name, message_queue))
 | 
|
| 103 | 
+            context.add_callback(partial(self._rpc_termination_callback,
 | 
|
| 104 | 
+                                         peer, instance_name, operation.name, message_queue))
 | 
|
| 105 | 
+  | 
|
| 106 | 
+            if self._is_instrumented:
 | 
|
| 107 | 
+                if peer not in self.__peers:
 | 
|
| 108 | 
+                    self.__peers_by_instance[instance_name].add(peer)
 | 
|
| 109 | 
+                    self.__peers[peer] = 1
 | 
|
| 110 | 
+                else:
 | 
|
| 111 | 
+                    self.__peers[peer] += 1
 | 
|
| 57 | 112 | 
 | 
| 58 | 
-            instanced_op_name = "{}/{}".format(request.instance_name,
 | 
|
| 59 | 
-                                               operation.name)
 | 
|
| 113 | 
+            instanced_op_name = "{}/{}".format(instance_name, operation.name)
 | 
|
| 60 | 114 | 
 | 
| 61 | 115 | 
             self.__logger.info("Operation name: [%s]", instanced_op_name)
 | 
| 62 | 116 | 
 | 
| ... | ... | @@ -80,23 +134,37 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 80 | 134 | 
             yield operations_pb2.Operation()
 | 
| 81 | 135 | 
 | 
| 82 | 136 | 
     def WaitExecution(self, request, context):
 | 
| 137 | 
+        """Handles WaitExecutionRequest messages.
 | 
|
| 138 | 
+  | 
|
| 139 | 
+        Args:
 | 
|
| 140 | 
+            request (WaitExecutionRequest): The incoming RPC request.
 | 
|
| 141 | 
+            context (grpc.ServicerContext): Context for the RPC call.
 | 
|
| 142 | 
+        """
 | 
|
| 83 | 143 | 
         self.__logger.debug("WaitExecution request from [%s]", context.peer())
 | 
| 84 | 144 | 
 | 
| 85 | 
-        try:
 | 
|
| 86 | 
-            names = request.name.split("/")
 | 
|
| 145 | 
+        names = request.name.split('/')
 | 
|
| 146 | 
+        instance_name = '/'.join(names[:-1])
 | 
|
| 147 | 
+        operation_name = names[-1]
 | 
|
| 148 | 
+        message_queue = queue.Queue()
 | 
|
| 149 | 
+        peer = context.peer()
 | 
|
| 87 | 150 | 
 | 
| 88 | 
-            # Operation name should be in format:
 | 
|
| 89 | 
-            # {instance/name}/{operation_id}
 | 
|
| 90 | 
-            instance_name = ''.join(names[0:-1])
 | 
|
| 151 | 
+        try:
 | 
|
| 152 | 
+            if instance_name != request.instance_name:
 | 
|
| 153 | 
+                raise InvalidArgumentError("Invalid operation [{}] for instance [{}]"
 | 
|
| 154 | 
+                                            .format(request.name, instance_name))
 | 
|
| 91 | 155 | 
 | 
| 92 | 
-            message_queue = queue.Queue()
 | 
|
| 93 | 
-            operation_name = names[-1]
 | 
|
| 94 | 156 | 
             instance = self._get_instance(instance_name)
 | 
| 95 | 157 | 
 | 
| 96 | 158 | 
             instance.register_message_client(operation_name, message_queue)
 | 
| 159 | 
+            context.add_callback(partial(self._rpc_termination_callback,
 | 
|
| 160 | 
+                                         peer, instance_name, operation_name, message_queue))
 | 
|
| 97 | 161 | 
 | 
| 98 | 
-            context.add_callback(partial(instance.unregister_message_client,
 | 
|
| 99 | 
-                                         operation_name, message_queue))
 | 
|
| 162 | 
+            if self._is_instrumented:
 | 
|
| 163 | 
+                if peer not in self.__peers:
 | 
|
| 164 | 
+                    self.__peers_by_instance[instance_name].add(peer)
 | 
|
| 165 | 
+                    self.__peers[peer] = 1
 | 
|
| 166 | 
+                else:
 | 
|
| 167 | 
+                    self.__peers[peer] += 1
 | 
|
| 100 | 168 | 
 | 
| 101 | 169 | 
             for operation in instance.stream_operation_updates(message_queue,
 | 
| 102 | 170 | 
                                                                operation_name):
 | 
| ... | ... | @@ -111,9 +179,42 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 111 | 179 | 
             context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
| 112 | 180 | 
             yield operations_pb2.Operation()
 | 
| 113 | 181 | 
 | 
| 182 | 
+    # --- Private API ---
 | 
|
| 183 | 
+  | 
|
| 184 | 
+    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
 | 
|
| 185 | 
+        instance = self._get_instance(instance_name)
 | 
|
| 186 | 
+  | 
|
| 187 | 
+        instance.unregister_message_client(job_name, message_queue)
 | 
|
| 188 | 
+  | 
|
| 189 | 
+        if self._is_instrumented:
 | 
|
| 190 | 
+            if self.__peers[peer] > 1:
 | 
|
| 191 | 
+                self.__peers[peer] -= 1
 | 
|
| 192 | 
+            else:
 | 
|
| 193 | 
+                self.__peers_by_instance[instance_name].remove(peer)
 | 
|
| 194 | 
+                del self.__peers[peer]
 | 
|
| 195 | 
+  | 
|
| 114 | 196 | 
     def _get_instance(self, name):
 | 
| 115 | 197 | 
         try:
 | 
| 116 | 198 | 
             return self._instances[name]
 | 
| 117 | 199 | 
 | 
| 118 | 200 | 
         except KeyError:
 | 
| 119 | 201 | 
             raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
 | 
| 202 | 
+  | 
|
| 203 | 
+    # --- Public API: Monitoring ---
 | 
|
| 204 | 
+  | 
|
| 205 | 
+    @property
 | 
|
| 206 | 
+    def is_instrumented(self):
 | 
|
| 207 | 
+        return self._is_instrumented
 | 
|
| 208 | 
+  | 
|
| 209 | 
+    def query_n_clients(self):
 | 
|
| 210 | 
+        if self.__peers is not None:
 | 
|
| 211 | 
+            return len(self.__peers)
 | 
|
| 212 | 
+        return 0
 | 
|
| 213 | 
+  | 
|
| 214 | 
+    def query_n_clients_for_instance(self, instance_name):
 | 
|
| 215 | 
+        try:
 | 
|
| 216 | 
+            if self.__peers_by_instance is not None:
 | 
|
| 217 | 
+                return len(self.__peers_by_instance[instance_name])
 | 
|
| 218 | 
+        except KeyError:
 | 
|
| 219 | 
+            pass
 | 
|
| 220 | 
+        return 0
 | 
| ... | ... | @@ -15,18 +15,25 @@ | 
| 15 | 15 | 
 | 
| 16 | 16 | 
 import asyncio
 | 
| 17 | 17 | 
 from concurrent import futures
 | 
| 18 | 
+from datetime import datetime, timedelta
 | 
|
| 18 | 19 | 
 import logging
 | 
| 20 | 
+from logging.handlers import QueueHandler
 | 
|
| 19 | 21 | 
 import os
 | 
| 20 | 22 | 
 import time
 | 
| 21 | 23 | 
 | 
| 22 | 24 | 
 import grpc
 | 
| 25 | 
+import janus
 | 
|
| 23 | 26 | 
 | 
| 27 | 
+from buildgrid._enums import LogRecordLevel, MetricRecordDomain, MetricRecordType
 | 
|
| 28 | 
+from buildgrid._protos.buildgrid.v2 import monitoring_pb2
 | 
|
| 24 | 29 | 
 from buildgrid.server.actioncache.service import ActionCacheService
 | 
| 25 | 30 | 
 from buildgrid.server.bots.service import BotsService
 | 
| 26 | 31 | 
 from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 | 
| 27 | 32 | 
 from buildgrid.server.execution.service import ExecutionService
 | 
| 33 | 
+from buildgrid.server._monitoring import MonitoringBus
 | 
|
| 28 | 34 | 
 from buildgrid.server.operations.service import OperationsService
 | 
| 29 | 35 | 
 from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 36 | 
+from buildgrid.settings import MONITORING_PERIOD
 | 
|
| 30 | 37 | 
 | 
| 31 | 38 | 
 | 
| 32 | 39 | 
 class BuildGridServer:
 | 
| ... | ... | @@ -36,7 +43,7 @@ class BuildGridServer: | 
| 36 | 43 | 
     requisite services.
 | 
| 37 | 44 | 
     """
 | 
| 38 | 45 | 
 | 
| 39 | 
-    def __init__(self, max_workers=None):
 | 
|
| 46 | 
+    def __init__(self, max_workers=None, monitor=True):
 | 
|
| 40 | 47 | 
         """Initializes a new :class:`BuildGridServer` instance.
 | 
| 41 | 48 | 
 | 
| 42 | 49 | 
         Args:
 | 
| ... | ... | @@ -53,6 +60,13 @@ class BuildGridServer: | 
| 53 | 60 | 
 | 
| 54 | 61 | 
         self.__main_loop = asyncio.get_event_loop()
 | 
| 55 | 62 | 
 | 
| 63 | 
+        self.__monitoring_bus = None
 | 
|
| 64 | 
+        self.__logging_queue = None
 | 
|
| 65 | 
+        self.__logging_handler = None
 | 
|
| 66 | 
+  | 
|
| 67 | 
+        self.__state_monitoring_task = None
 | 
|
| 68 | 
+        self.__logging_task = None
 | 
|
| 69 | 
+  | 
|
| 56 | 70 | 
         self._execution_service = None
 | 
| 57 | 71 | 
         self._bots_service = None
 | 
| 58 | 72 | 
         self._operations_service = None
 | 
| ... | ... | @@ -61,10 +75,35 @@ class BuildGridServer: | 
| 61 | 75 | 
         self._cas_service = None
 | 
| 62 | 76 | 
         self._bytestream_service = None
 | 
| 63 | 77 | 
 | 
| 78 | 
+        self._schedulers = {}
 | 
|
| 79 | 
+        self._instances = set()
 | 
|
| 80 | 
+  | 
|
| 81 | 
+        self._is_instrumented = monitor
 | 
|
| 82 | 
+  | 
|
| 83 | 
+        if self._is_instrumented:
 | 
|
| 84 | 
+            self.__monitoring_bus = MonitoringBus(self.__main_loop)
 | 
|
| 85 | 
+            self.__logging_queue = janus.Queue(loop=self.__main_loop)
 | 
|
| 86 | 
+            self.__logging_handler = QueueHandler(self.__logging_queue.sync_q)
 | 
|
| 87 | 
+  | 
|
| 88 | 
+            logging.getLogger().addHandler(self.__logging_handler)
 | 
|
| 89 | 
+  | 
|
| 90 | 
+    # --- Public API ---
 | 
|
| 91 | 
+  | 
|
| 64 | 92 | 
     def start(self):
 | 
| 65 | 93 | 
         """Starts the BuildGrid server.
 | 
| 66 | 94 | 
         """
 | 
| 67 | 95 | 
         self.__grpc_server.start()
 | 
| 96 | 
+  | 
|
| 97 | 
+        if self._is_instrumented:
 | 
|
| 98 | 
+            self.__monitoring_bus.start()
 | 
|
| 99 | 
+  | 
|
| 100 | 
+            self.__state_monitoring_task = asyncio.ensure_future(
 | 
|
| 101 | 
+                self._state_monitoring_worker(period=MONITORING_PERIOD),
 | 
|
| 102 | 
+                loop=self.__main_loop)
 | 
|
| 103 | 
+  | 
|
| 104 | 
+            self.__logging_task = asyncio.ensure_future(
 | 
|
| 105 | 
+                self._logging_worker(), loop=self.__main_loop)
 | 
|
| 106 | 
+  | 
|
| 68 | 107 | 
         self.__main_loop.run_forever()
 | 
| 69 | 108 | 
 | 
| 70 | 109 | 
     def stop(self, grace=0):
 | 
| ... | ... | @@ -73,7 +112,14 @@ class BuildGridServer: | 
| 73 | 112 | 
         Args:
 | 
| 74 | 113 | 
             grace (int, optional): A duration of time in seconds. Defaults to 0.
 | 
| 75 | 114 | 
         """
 | 
| 76 | 
-        self.__main_loop.close()
 | 
|
| 115 | 
+        if self._is_instrumented:
 | 
|
| 116 | 
+            if self.__state_monitoring_task is not None:
 | 
|
| 117 | 
+                self.__state_monitoring_task.cancel()
 | 
|
| 118 | 
+  | 
|
| 119 | 
+            if self.__logging_task is not None:
 | 
|
| 120 | 
+                self.__logging_task.cancel()
 | 
|
| 121 | 
+  | 
|
| 122 | 
+            self.__monitoring_bus.stop()
 | 
|
| 77 | 123 | 
 | 
| 78 | 124 | 
         self.__grpc_server.stop(grace)
 | 
| 79 | 125 | 
 | 
| ... | ... | @@ -109,9 +155,11 @@ class BuildGridServer: | 
| 109 | 155 | 
         """
 | 
| 110 | 156 | 
         if self._execution_service is None:
 | 
| 111 | 157 | 
             self._execution_service = ExecutionService(self.__grpc_server)
 | 
| 112 | 
-  | 
|
| 113 | 158 | 
         self._execution_service.add_instance(instance_name, instance)
 | 
| 114 | 159 | 
 | 
| 160 | 
+        self._schedulers[instance_name] = instance.scheduler
 | 
|
| 161 | 
+        self._instances.add(instance_name)
 | 
|
| 162 | 
+  | 
|
| 115 | 163 | 
     def add_bots_interface(self, instance, instance_name):
 | 
| 116 | 164 | 
         """Adds a :obj:`BotsInterface` to the service.
 | 
| 117 | 165 | 
 | 
| ... | ... | @@ -123,9 +171,10 @@ class BuildGridServer: | 
| 123 | 171 | 
         """
 | 
| 124 | 172 | 
         if self._bots_service is None:
 | 
| 125 | 173 | 
             self._bots_service = BotsService(self.__grpc_server)
 | 
| 126 | 
-  | 
|
| 127 | 174 | 
         self._bots_service.add_instance(instance_name, instance)
 | 
| 128 | 175 | 
 | 
| 176 | 
+        self._instances.add(instance_name)
 | 
|
| 177 | 
+  | 
|
| 129 | 178 | 
     def add_operations_instance(self, instance, instance_name):
 | 
| 130 | 179 | 
         """Adds an :obj:`OperationsInstance` to the service.
 | 
| 131 | 180 | 
 | 
| ... | ... | @@ -137,7 +186,6 @@ class BuildGridServer: | 
| 137 | 186 | 
         """
 | 
| 138 | 187 | 
         if self._operations_service is None:
 | 
| 139 | 188 | 
             self._operations_service = OperationsService(self.__grpc_server)
 | 
| 140 | 
-  | 
|
| 141 | 189 | 
         self._operations_service.add_instance(instance_name, instance)
 | 
| 142 | 190 | 
 | 
| 143 | 191 | 
     def add_reference_storage_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -151,7 +199,6 @@ class BuildGridServer: | 
| 151 | 199 | 
         """
 | 
| 152 | 200 | 
         if self._reference_storage_service is None:
 | 
| 153 | 201 | 
             self._reference_storage_service = ReferenceStorageService(self.__grpc_server)
 | 
| 154 | 
-  | 
|
| 155 | 202 | 
         self._reference_storage_service.add_instance(instance_name, instance)
 | 
| 156 | 203 | 
 | 
| 157 | 204 | 
     def add_action_cache_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -165,7 +212,6 @@ class BuildGridServer: | 
| 165 | 212 | 
         """
 | 
| 166 | 213 | 
         if self._action_cache_service is None:
 | 
| 167 | 214 | 
             self._action_cache_service = ActionCacheService(self.__grpc_server)
 | 
| 168 | 
-  | 
|
| 169 | 215 | 
         self._action_cache_service.add_instance(instance_name, instance)
 | 
| 170 | 216 | 
 | 
| 171 | 217 | 
     def add_cas_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -179,7 +225,6 @@ class BuildGridServer: | 
| 179 | 225 | 
         """
 | 
| 180 | 226 | 
         if self._cas_service is None:
 | 
| 181 | 227 | 
             self._cas_service = ContentAddressableStorageService(self.__grpc_server)
 | 
| 182 | 
-  | 
|
| 183 | 228 | 
         self._cas_service.add_instance(instance_name, instance)
 | 
| 184 | 229 | 
 | 
| 185 | 230 | 
     def add_bytestream_instance(self, instance, instance_name):
 | 
| ... | ... | @@ -193,5 +238,191 @@ class BuildGridServer: | 
| 193 | 238 | 
         """
 | 
| 194 | 239 | 
         if self._bytestream_service is None:
 | 
| 195 | 240 | 
             self._bytestream_service = ByteStreamService(self.__grpc_server)
 | 
| 196 | 
-  | 
|
| 197 | 241 | 
         self._bytestream_service.add_instance(instance_name, instance)
 | 
| 242 | 
+  | 
|
| 243 | 
+    # --- Public API: Monitoring ---
 | 
|
| 244 | 
+  | 
|
| 245 | 
+    @property
 | 
|
| 246 | 
+    def is_instrumented(self):
 | 
|
| 247 | 
+        return self._is_instrumented
 | 
|
| 248 | 
+  | 
|
| 249 | 
+    # --- Private API ---
 | 
|
| 250 | 
+  | 
|
| 251 | 
+    async def _logging_worker(self):
 | 
|
| 252 | 
+        """Publishes log records to the monitoring bus."""
 | 
|
| 253 | 
+        async def __logging_worker():
 | 
|
| 254 | 
+            log_record = await self.__logging_queue.async_q.get()
 | 
|
| 255 | 
+  | 
|
| 256 | 
+            # Emit a log record:
 | 
|
| 257 | 
+            record = monitoring_pb2.LogRecord()
 | 
|
| 258 | 
+            creation_time = datetime.fromtimestamp(log_record.created)
 | 
|
| 259 | 
+  | 
|
| 260 | 
+            record.creation_timestamp.FromDatetime(creation_time),
 | 
|
| 261 | 
+            record.domain = log_record.name
 | 
|
| 262 | 
+            record.level = int(log_record.levelno / 10)
 | 
|
| 263 | 
+            record.message = log_record.message
 | 
|
| 264 | 
+            # logging.LogRecord.extra must be a str to str dict:
 | 
|
| 265 | 
+            if 'extra' in log_record.__dict__ and log_record.extra:
 | 
|
| 266 | 
+                record.extra.update(log_record.extra)
 | 
|
| 267 | 
+  | 
|
| 268 | 
+            await self.__monitoring_bus.send_record(record)
 | 
|
| 269 | 
+  | 
|
| 270 | 
+        try:
 | 
|
| 271 | 
+            while True:
 | 
|
| 272 | 
+                await __logging_worker()
 | 
|
| 273 | 
+  | 
|
| 274 | 
+        except asyncio.CancelledError:
 | 
|
| 275 | 
+            pass
 | 
|
| 276 | 
+        except BaseException as e:
 | 
|
| 277 | 
+             print(f'__logging_worker: {e}')
 | 
|
| 278 | 
+  | 
|
| 279 | 
+    async def _state_monitoring_worker(self, period=1.0):
 | 
|
| 280 | 
+        """Periodically publishes state metrics to the monitoring bus."""
 | 
|
| 281 | 
+        async def __state_monitoring_worker():
 | 
|
| 282 | 
+            # Emit total clients count record:
 | 
|
| 283 | 
+            _, record = self._query_n_clients()
 | 
|
| 284 | 
+            await self.__monitoring_bus.send_record(record)
 | 
|
| 285 | 
+  | 
|
| 286 | 
+            # Emit total bots count record:
 | 
|
| 287 | 
+            _, record = self._query_n_bots()
 | 
|
| 288 | 
+            await self.__monitoring_bus.send_record(record)
 | 
|
| 289 | 
+  | 
|
| 290 | 
+            queue_times = []
 | 
|
| 291 | 
+            # Emits records by instance:
 | 
|
| 292 | 
+            for instance_name in self._instances:
 | 
|
| 293 | 
+                # Emit instance clients count record:
 | 
|
| 294 | 
+                _, record = self._query_n_clients_for_instance(instance_name)
 | 
|
| 295 | 
+                await self.__monitoring_bus.send_record(record)
 | 
|
| 296 | 
+  | 
|
| 297 | 
+                # Emit instance bots count record:
 | 
|
| 298 | 
+                _, record = self._query_n_bots_for_instance(instance_name)
 | 
|
| 299 | 
+                await self.__monitoring_bus.send_record(record)
 | 
|
| 300 | 
+  | 
|
| 301 | 
+                # Emit instance average queue time record:
 | 
|
| 302 | 
+                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
 | 
|
| 303 | 
+                await self.__monitoring_bus.send_record(record)
 | 
|
| 304 | 
+                if queue_time:
 | 
|
| 305 | 
+                    queue_times.append(queue_time)
 | 
|
| 306 | 
+  | 
|
| 307 | 
+            # Emit overall average queue time record:
 | 
|
| 308 | 
+            if len(queue_times) > 0:
 | 
|
| 309 | 
+                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
 | 
|
| 310 | 
+            else:
 | 
|
| 311 | 
+                am_queue_time = timedelta()
 | 
|
| 312 | 
+            record = self._forge_timer_metric_record(
 | 
|
| 313 | 
+                MetricRecordDomain.STATE,
 | 
|
| 314 | 
+                'average-queue-time',
 | 
|
| 315 | 
+                am_queue_time)
 | 
|
| 316 | 
+  | 
|
| 317 | 
+            await self.__monitoring_bus.send_record(record)
 | 
|
| 318 | 
+  | 
|
| 319 | 
+            print('---')
 | 
|
| 320 | 
+            n_clients = self._execution_service.query_n_clients()
 | 
|
| 321 | 
+            n_bots = self._bots_service.query_n_bots()
 | 
|
| 322 | 
+            print('Totals: n_clients={}, n_bots={}, am_queue_time={}'
 | 
|
| 323 | 
+                  .format(n_clients, n_bots, am_queue_time))
 | 
|
| 324 | 
+            print('Per instances:')
 | 
|
| 325 | 
+            for instance_name in self._instances:
 | 
|
| 326 | 
+                n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
 | 
|
| 327 | 
+                n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
 | 
|
| 328 | 
+                am_queue_time = self._execution_service.get_scheduler(instance_name).query_am_queue_time()
 | 
|
| 329 | 
+                instance_name = instance_name or 'void'
 | 
|
| 330 | 
+                print(' - {}: n_clients={}, n_bots={}, am_queue_time={}'
 | 
|
| 331 | 
+                      .format(instance_name, n_clients, n_bots, am_queue_time))
 | 
|
| 332 | 
+            print('---')
 | 
|
| 333 | 
+  | 
|
| 334 | 
+        try:
 | 
|
| 335 | 
+            while True:
 | 
|
| 336 | 
+                start = time.time()
 | 
|
| 337 | 
+                await __state_monitoring_worker()
 | 
|
| 338 | 
+  | 
|
| 339 | 
+                end = time.time()
 | 
|
| 340 | 
+                await asyncio.sleep(period - (end - start))
 | 
|
| 341 | 
+  | 
|
| 342 | 
+        except asyncio.CancelledError:
 | 
|
| 343 | 
+            pass
 | 
|
| 344 | 
+        except BaseException as e:
 | 
|
| 345 | 
+             print(f'__state_monitoring_worker: {e}')
 | 
|
| 346 | 
+  | 
|
| 347 | 
+    def _forge_counter_metric_record(self, domain, name, count, extra=None):
 | 
|
| 348 | 
+        counter_record = monitoring_pb2.MetricRecord()
 | 
|
| 349 | 
+  | 
|
| 350 | 
+        counter_record.creation_timestamp.GetCurrentTime()
 | 
|
| 351 | 
+        counter_record.domain = domain.value
 | 
|
| 352 | 
+        counter_record.type = MetricRecordType.COUNTER.value
 | 
|
| 353 | 
+        counter_record.name = name
 | 
|
| 354 | 
+        counter_record.count = count
 | 
|
| 355 | 
+        if extra is not None:
 | 
|
| 356 | 
+            counter_record.extra.update(extra)
 | 
|
| 357 | 
+  | 
|
| 358 | 
+        return record
 | 
|
| 359 | 
+  | 
|
| 360 | 
+    def _forge_timer_metric_record(self, domain, name, duration, extra=None):
 | 
|
| 361 | 
+        timer_record = monitoring_pb2.MetricRecord()
 | 
|
| 362 | 
+  | 
|
| 363 | 
+        timer_record.creation_timestamp.GetCurrentTime()
 | 
|
| 364 | 
+        timer_record.domain = domain.value
 | 
|
| 365 | 
+        timer_record.type = MetricRecordType.TIMER.value
 | 
|
| 366 | 
+        timer_record.name = name
 | 
|
| 367 | 
+        timer_record.duration.FromTimedelta(duration)
 | 
|
| 368 | 
+        if extra is not None:
 | 
|
| 369 | 
+            timer_record.extra.update(extra)
 | 
|
| 370 | 
+  | 
|
| 371 | 
+        return timer_record
 | 
|
| 372 | 
+  | 
|
| 373 | 
+    def _forge_gauge_metric_record(self, domain, name, value, extra=None):
 | 
|
| 374 | 
+        gauge_record = monitoring_pb2.MetricRecord()
 | 
|
| 375 | 
+  | 
|
| 376 | 
+        gauge_record.creation_timestamp.GetCurrentTime()
 | 
|
| 377 | 
+        gauge_record.domain = domain.value
 | 
|
| 378 | 
+        gauge_record.type = MetricRecordType.GAUGE.value
 | 
|
| 379 | 
+        gauge_record.name = name
 | 
|
| 380 | 
+        gauge_record.value = value
 | 
|
| 381 | 
+        if extra is not None:
 | 
|
| 382 | 
+            gauge_record.extra.update(extra)
 | 
|
| 383 | 
+  | 
|
| 384 | 
+        return gauge_record
 | 
|
| 385 | 
+  | 
|
| 386 | 
+    # --- Private API: Monitoring ---
 | 
|
| 387 | 
+  | 
|
| 388 | 
+    def _query_n_clients(self):
 | 
|
| 389 | 
+        """Queries the number of clients connected."""
 | 
|
| 390 | 
+        n_clients = self._execution_service.query_n_clients()
 | 
|
| 391 | 
+        gauge_record = self._forge_gauge_metric_record(
 | 
|
| 392 | 
+            MetricRecordDomain.STATE, 'clients-count', n_clients)
 | 
|
| 393 | 
+  | 
|
| 394 | 
+        return n_clients, gauge_record
 | 
|
| 395 | 
+    def _query_n_clients_for_instance(self, instance_name):
 | 
|
| 396 | 
+        """Queries the number of clients connected for a given instance"""
 | 
|
| 397 | 
+        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
 | 
|
| 398 | 
+        gauge_record = self._forge_gauge_metric_record(
 | 
|
| 399 | 
+            MetricRecordDomain.STATE, 'clients-count', n_clients,
 | 
|
| 400 | 
+            extra={ 'instance-name': instance_name or 'void' })
 | 
|
| 401 | 
+  | 
|
| 402 | 
+        return n_clients, gauge_record
 | 
|
| 403 | 
+  | 
|
| 404 | 
+    def _query_n_bots(self):
 | 
|
| 405 | 
+        """Queries the number of bots connected."""
 | 
|
| 406 | 
+        n_bots = self._bots_service.query_n_bots()
 | 
|
| 407 | 
+        gauge_record = self._forge_gauge_metric_record(
 | 
|
| 408 | 
+            MetricRecordDomain.STATE, 'bots-count', n_bots)
 | 
|
| 409 | 
+  | 
|
| 410 | 
+        return n_bots, gauge_record
 | 
|
| 411 | 
+  | 
|
| 412 | 
+    def _query_n_bots_for_instance(self, instance_name):
 | 
|
| 413 | 
+        """Queries the number of bots connected for a given instance."""
 | 
|
| 414 | 
+        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
 | 
|
| 415 | 
+        gauge_record = self._forge_gauge_metric_record(
 | 
|
| 416 | 
+            MetricRecordDomain.STATE, 'bots-count', n_bots,
 | 
|
| 417 | 
+            extra={ 'instance-name': instance_name or 'void' })
 | 
|
| 418 | 
+  | 
|
| 419 | 
+        return n_bots, gauge_record
 | 
|
| 420 | 
+  | 
|
| 421 | 
+    def _query_am_queue_time_for_instance(self, instance_name):
 | 
|
| 422 | 
+        """Queries the average job's queue time for a given instance."""
 | 
|
| 423 | 
+        am_queue_time = self._schedulers[instance_name].query_am_queue_time()
 | 
|
| 424 | 
+        timer_record = self._forge_timer_metric_record(
 | 
|
| 425 | 
+            MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
 | 
|
| 426 | 
+            extra={ 'instance-name': instance_name or 'void' })
 | 
|
| 427 | 
+  | 
|
| 428 | 
+        return am_queue_time, timer_record
 | 
| ... | ... | @@ -13,10 +13,11 @@ | 
| 13 | 13 | 
 # limitations under the License.
 | 
| 14 | 14 | 
 | 
| 15 | 15 | 
 | 
| 16 | 
+from datetime import datetime
 | 
|
| 16 | 17 | 
 import logging
 | 
| 17 | 18 | 
 import uuid
 | 
| 18 | 19 | 
 | 
| 19 | 
-from google.protobuf import timestamp_pb2
 | 
|
| 20 | 
+from google.protobuf import duration_pb2, timestamp_pb2
 | 
|
| 20 | 21 | 
 | 
| 21 | 22 | 
 from buildgrid._enums import LeaseState, OperationStage
 | 
| 22 | 23 | 
 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| ... | ... | @@ -37,6 +38,7 @@ class Job: | 
| 37 | 38 | 
         self.__execute_response = None
 | 
| 38 | 39 | 
         self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 39 | 40 | 
         self.__queued_timestamp = timestamp_pb2.Timestamp()
 | 
| 41 | 
+        self.__queued_time_duration = duration_pb2.Duration()
 | 
|
| 40 | 42 | 
         self.__worker_start_timestamp = timestamp_pb2.Timestamp()
 | 
| 41 | 43 | 
         self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
 | 
| 42 | 44 | 
 | 
| ... | ... | @@ -50,6 +52,8 @@ class Job: | 
| 50 | 52 | 
         self._operation.done = False
 | 
| 51 | 53 | 
         self._n_tries = 0
 | 
| 52 | 54 | 
 | 
| 55 | 
+    # --- Public API ---
 | 
|
| 56 | 
+  | 
|
| 53 | 57 | 
     @property
 | 
| 54 | 58 | 
     def name(self):
 | 
| 55 | 59 | 
         return self._name
 | 
| ... | ... | @@ -179,7 +183,7 @@ class Job: | 
| 179 | 183 | 
                 result.Unpack(action_result)
 | 
| 180 | 184 | 
 | 
| 181 | 185 | 
             action_metadata = action_result.execution_metadata
 | 
| 182 | 
-            action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
 | 
|
| 186 | 
+            action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
 | 
|
| 183 | 187 | 
             action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
 | 
| 184 | 188 | 
             action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
 | 
| 185 | 189 | 
 | 
| ... | ... | @@ -204,6 +208,10 @@ class Job: | 
| 204 | 208 | 
                 self.__queued_timestamp.GetCurrentTime()
 | 
| 205 | 209 | 
             self._n_tries += 1
 | 
| 206 | 210 | 
 | 
| 211 | 
+        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
 | 
|
| 212 | 
+            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
 | 
|
| 213 | 
+            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
 | 
|
| 214 | 
+  | 
|
| 207 | 215 | 
         elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
 | 
| 208 | 216 | 
             if self.__execute_response is not None:
 | 
| 209 | 217 | 
                 self._operation.response.Pack(self.__execute_response)
 | 
| ... | ... | @@ -213,3 +221,11 @@ class Job: | 
| 213 | 221 | 
 | 
| 214 | 222 | 
         for queue in self._operation_update_queues:
 | 
| 215 | 223 | 
             queue.put(self._operation)
 | 
| 224 | 
+  | 
|
| 225 | 
+    # --- Public API: Monitoring ---
 | 
|
| 226 | 
+  | 
|
| 227 | 
+    def query_queue_time(self):
 | 
|
| 228 | 
+        return self.__queued_time_duration.ToTimedelta()
 | 
|
| 229 | 
+  | 
|
| 230 | 
+    def query_n_retries(self):
 | 
|
| 231 | 
+        return self._n_tries - 1 if self._n_tries > 0 else 0
 | 
| ... | ... | @@ -32,6 +32,10 @@ class OperationsInstance: | 
| 32 | 32 | 
 | 
| 33 | 33 | 
         self._scheduler = scheduler
 | 
| 34 | 34 | 
 | 
| 35 | 
+    @property
 | 
|
| 36 | 
+    def scheduler(self):
 | 
|
| 37 | 
+        return self._scheduler
 | 
|
| 38 | 
+  | 
|
| 35 | 39 | 
     def register_instance_with_server(self, instance_name, server):
 | 
| 36 | 40 | 
         server.add_operations_instance(self, instance_name)
 | 
| 37 | 41 | 
 | 
| ... | ... | @@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 38 | 38 | 
 | 
| 39 | 39 | 
         operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
 | 
| 40 | 40 | 
 | 
| 41 | 
-    def add_instance(self, name, instance):
 | 
|
| 42 | 
-        self._instances[name] = instance
 | 
|
| 41 | 
+    # --- Public API ---
 | 
|
| 42 | 
+  | 
|
| 43 | 
+    def add_instance(self, instance_name, instance):
 | 
|
| 44 | 
+        """Registers a new servicer instance.
 | 
|
| 45 | 
+  | 
|
| 46 | 
+        Args:
 | 
|
| 47 | 
+            instance_name (str): The new instance's name.
 | 
|
| 48 | 
+            instance (OperationsInstance): The new instance itself.
 | 
|
| 49 | 
+        """
 | 
|
| 50 | 
+        self._instances[instance_name] = instance
 | 
|
| 51 | 
+  | 
|
| 52 | 
+    # --- Public API: Servicer ---
 | 
|
| 43 | 53 | 
 | 
| 44 | 54 | 
     def GetOperation(self, request, context):
 | 
| 45 | 55 | 
         self.__logger.debug("GetOperation request from [%s]", context.peer())
 | 
| ... | ... | @@ -132,6 +142,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 132 | 142 | 
 | 
| 133 | 143 | 
         return Empty()
 | 
| 134 | 144 | 
 | 
| 145 | 
+    # --- Private API ---
 | 
|
| 146 | 
+  | 
|
| 135 | 147 | 
     def _parse_instance_name(self, name):
 | 
| 136 | 148 | 
         """ If the instance name is not blank, 'name' will have the form
 | 
| 137 | 149 | 
         {instance_name}/{operation_uuid}. Otherwise, it will just be
 | 
| ... | ... | @@ -20,24 +20,38 @@ Schedules jobs. | 
| 20 | 20 | 
 """
 | 
| 21 | 21 | 
 | 
| 22 | 22 | 
 from collections import deque
 | 
| 23 | 
+from datetime import timedelta
 | 
|
| 23 | 24 | 
 import logging
 | 
| 24 | 25 | 
 | 
| 26 | 
+from buildgrid._enums import LeaseState, OperationStage
 | 
|
| 25 | 27 | 
 from buildgrid._exceptions import NotFoundError
 | 
| 26 | 28 | 
 | 
| 27 | 
-from .job import OperationStage, LeaseState
 | 
|
| 28 | 
-  | 
|
| 29 | 29 | 
 | 
| 30 | 30 | 
 class Scheduler:
 | 
| 31 | 31 | 
 | 
| 32 | 32 | 
     MAX_N_TRIES = 5
 | 
| 33 | 33 | 
 | 
| 34 | 
-    def __init__(self, action_cache=None):
 | 
|
| 34 | 
+    def __init__(self, action_cache=None, monitor=True):
 | 
|
| 35 | 35 | 
         self.__logger = logging.getLogger(__name__)
 | 
| 36 | 36 | 
 | 
| 37 | 
+        self.__queue_times_by_priority = None
 | 
|
| 38 | 
+        self.__queue_time_average = None
 | 
|
| 39 | 
+        self.__retries_by_error = None
 | 
|
| 40 | 
+        self.__retries_count = 0
 | 
|
| 41 | 
+  | 
|
| 37 | 42 | 
         self._action_cache = action_cache
 | 
| 38 | 43 | 
         self.jobs = {}
 | 
| 39 | 44 | 
         self.queue = deque()
 | 
| 40 | 45 | 
 | 
| 46 | 
+        self._is_instrumented = monitor
 | 
|
| 47 | 
+  | 
|
| 48 | 
+        if self._is_instrumented:
 | 
|
| 49 | 
+            self.__queue_time_average = 0, timedelta()
 | 
|
| 50 | 
+            self.__queue_times_by_priority = {}
 | 
|
| 51 | 
+            self.__retries_by_error = {}
 | 
|
| 52 | 
+  | 
|
| 53 | 
+    # --- Public API ---
 | 
|
| 54 | 
+  | 
|
| 41 | 55 | 
     def register_client(self, job_name, queue):
 | 
| 42 | 56 | 
         self.jobs[job_name].register_client(queue)
 | 
| 43 | 57 | 
 | 
| ... | ... | @@ -66,18 +80,22 @@ class Scheduler: | 
| 66 | 80 | 
             operation_stage = OperationStage.QUEUED
 | 
| 67 | 81 | 
             self.queue.append(job)
 | 
| 68 | 82 | 
 | 
| 69 | 
-        job.update_operation_stage(operation_stage)
 | 
|
| 83 | 
+        self._update_job_operation_stage(job.name, operation_stage)
 | 
|
| 70 | 84 | 
 | 
| 71 | 85 | 
     def retry_job(self, job_name):
 | 
| 72 | 
-        if job_name in self.jobs:
 | 
|
| 73 | 
-            job = self.jobs[job_name]
 | 
|
| 74 | 
-            if job.n_tries >= self.MAX_N_TRIES:
 | 
|
| 75 | 
-                # TODO: Decide what to do with these jobs
 | 
|
| 76 | 
-                job.update_operation_stage(OperationStage.COMPLETED)
 | 
|
| 77 | 
-                # TODO: Mark these jobs as done
 | 
|
| 78 | 
-            else:
 | 
|
| 79 | 
-                job.update_operation_stage(OperationStage.QUEUED)
 | 
|
| 80 | 
-                self.queue.appendleft(job)
 | 
|
| 86 | 
+        job = self.jobs[job_name]
 | 
|
| 87 | 
+  | 
|
| 88 | 
+        operation_stage = None
 | 
|
| 89 | 
+        if job.n_tries >= self.MAX_N_TRIES:
 | 
|
| 90 | 
+            # TODO: Decide what to do with these jobs
 | 
|
| 91 | 
+            operation_stage = OperationStage.COMPLETED
 | 
|
| 92 | 
+            # TODO: Mark these jobs as done
 | 
|
| 93 | 
+  | 
|
| 94 | 
+        else:
 | 
|
| 95 | 
+            operation_stage = OperationStage.QUEUED
 | 
|
| 96 | 
+            self.queue.appendleft(job)
 | 
|
| 97 | 
+  | 
|
| 98 | 
+        self._update_job_operation_stage(job_name, operation_stage)
 | 
|
| 81 | 99 | 
 | 
| 82 | 100 | 
     def list_jobs(self):
 | 
| 83 | 101 | 
         return self.jobs.values()
 | 
| ... | ... | @@ -112,13 +130,14 @@ class Scheduler: | 
| 112 | 130 | 
         """
 | 
| 113 | 131 | 
         job = self.jobs[job_name]
 | 
| 114 | 132 | 
 | 
| 133 | 
+        operation_stage = None
 | 
|
| 115 | 134 | 
         if lease_state == LeaseState.PENDING:
 | 
| 116 | 135 | 
             job.update_lease_state(LeaseState.PENDING)
 | 
| 117 | 
-            job.update_operation_stage(OperationStage.QUEUED)
 | 
|
| 136 | 
+            operation_stage = OperationStage.QUEUED
 | 
|
| 118 | 137 | 
 | 
| 119 | 138 | 
         elif lease_state == LeaseState.ACTIVE:
 | 
| 120 | 139 | 
             job.update_lease_state(LeaseState.ACTIVE)
 | 
| 121 | 
-            job.update_operation_stage(OperationStage.EXECUTING)
 | 
|
| 140 | 
+            operation_stage = OperationStage.EXECUTING
 | 
|
| 122 | 141 | 
 | 
| 123 | 142 | 
         elif lease_state == LeaseState.COMPLETED:
 | 
| 124 | 143 | 
             job.update_lease_state(LeaseState.COMPLETED,
 | 
| ... | ... | @@ -127,7 +146,9 @@ class Scheduler: | 
| 127 | 146 | 
             if self._action_cache is not None and not job.do_not_cache:
 | 
| 128 | 147 | 
                 self._action_cache.update_action_result(job.action_digest, job.action_result)
 | 
| 129 | 148 | 
 | 
| 130 | 
-            job.update_operation_stage(OperationStage.COMPLETED)
 | 
|
| 149 | 
+            operation_stage = OperationStage.COMPLETED
 | 
|
| 150 | 
+  | 
|
| 151 | 
+        self._update_job_operation_stage(job_name, operation_stage)
 | 
|
| 131 | 152 | 
 | 
| 132 | 153 | 
     def get_job_lease(self, job_name):
 | 
| 133 | 154 | 
         """Returns the lease associated to job, if any have been emitted yet."""
 | 
| ... | ... | @@ -136,3 +157,83 @@ class Scheduler: | 
| 136 | 157 | 
     def get_job_operation(self, job_name):
 | 
| 137 | 158 | 
         """Returns the operation associated to job."""
 | 
| 138 | 159 | 
         return self.jobs[job_name].operation
 | 
| 160 | 
+  | 
|
| 161 | 
+    # --- Public API: Monitoring ---
 | 
|
| 162 | 
+  | 
|
| 163 | 
+    @property
 | 
|
| 164 | 
+    def is_instrumented(self):
 | 
|
| 165 | 
+        return self._is_instrumented
 | 
|
| 166 | 
+  | 
|
| 167 | 
+    def query_n_jobs(self):
 | 
|
| 168 | 
+        return len(self.jobs)
 | 
|
| 169 | 
+  | 
|
| 170 | 
+    def query_n_operations(self):
 | 
|
| 171 | 
+        return len(self.jobs)
 | 
|
| 172 | 
+  | 
|
| 173 | 
+    def query_n_operations_by_stage(self):
 | 
|
| 174 | 
+        return len(self.jobs)
 | 
|
| 175 | 
+  | 
|
| 176 | 
+    def query_n_leases(self):
 | 
|
| 177 | 
+        return len(self.jobs)
 | 
|
| 178 | 
+  | 
|
| 179 | 
+    def query_n_leases_by_state(self):
 | 
|
| 180 | 
+        return len(self.jobs)
 | 
|
| 181 | 
+  | 
|
| 182 | 
+    def query_n_retries(self):
 | 
|
| 183 | 
+        return self.__retries_count
 | 
|
| 184 | 
+  | 
|
| 185 | 
+    def query_n_retries_for_error(self, error_type):
 | 
|
| 186 | 
+        try:
 | 
|
| 187 | 
+            if self.__retries_by_error is not None:
 | 
|
| 188 | 
+                return self.__retries_by_error[error_type]
 | 
|
| 189 | 
+        except KeyError:
 | 
|
| 190 | 
+            pass
 | 
|
| 191 | 
+        return 0
 | 
|
| 192 | 
+  | 
|
| 193 | 
+    def query_am_queue_time(self):
 | 
|
| 194 | 
+        if self.__queue_time_average is not None:
 | 
|
| 195 | 
+            return self.__queue_time_average[1]
 | 
|
| 196 | 
+        return 0
 | 
|
| 197 | 
+  | 
|
| 198 | 
+    def query_am_queue_time_for_priority(self, priority_level):
 | 
|
| 199 | 
+        try:
 | 
|
| 200 | 
+            if self.__queue_times_by_priority is not None:
 | 
|
| 201 | 
+                return self.__queue_times_by_priority[priority_level]
 | 
|
| 202 | 
+        except KeyError:
 | 
|
| 203 | 
+            pass
 | 
|
| 204 | 
+        return 0
 | 
|
| 205 | 
+  | 
|
| 206 | 
+    # --- Private API ---
 | 
|
| 207 | 
+  | 
|
| 208 | 
+    def _update_job_operation_stage(self, job_name, operation_stage):
 | 
|
| 209 | 
+        """Requests a stage transition for the job's :class:Operations.
 | 
|
| 210 | 
+  | 
|
| 211 | 
+        Args:
 | 
|
| 212 | 
+            job_name (str): name of the job to query.
 | 
|
| 213 | 
+            operation_stage (OperationStage): the stage to transition to.
 | 
|
| 214 | 
+        """
 | 
|
| 215 | 
+        job = self.jobs[job_name]
 | 
|
| 216 | 
+  | 
|
| 217 | 
+        if operation_stage == OperationStage.CACHE_CHECK:
 | 
|
| 218 | 
+            job.update_operation_stage(OperationStage.CACHE_CHECK)
 | 
|
| 219 | 
+  | 
|
| 220 | 
+        elif operation_stage == OperationStage.QUEUED:
 | 
|
| 221 | 
+            job.update_operation_stage(OperationStage.QUEUED)
 | 
|
| 222 | 
+  | 
|
| 223 | 
+        elif operation_stage == OperationStage.EXECUTING:
 | 
|
| 224 | 
+            job.update_operation_stage(OperationStage.EXECUTING)
 | 
|
| 225 | 
+  | 
|
| 226 | 
+        elif operation_stage == OperationStage.COMPLETED:
 | 
|
| 227 | 
+            job.update_operation_stage(OperationStage.COMPLETED)
 | 
|
| 228 | 
+  | 
|
| 229 | 
+            if self._is_instrumented:
 | 
|
| 230 | 
+                average_order, average_time = self.__queue_time_average
 | 
|
| 231 | 
+  | 
|
| 232 | 
+                average_order += 1
 | 
|
| 233 | 
+                if average_order <= 1:
 | 
|
| 234 | 
+                    average_time = job.query_queue_time()
 | 
|
| 235 | 
+                else:
 | 
|
| 236 | 
+                    queue_time = job.query_queue_time()
 | 
|
| 237 | 
+                    average_time = average_time + ((queue_time - average_time) / average_order)
 | 
|
| 238 | 
+  | 
|
| 239 | 
+                self.__queue_time_average = average_order, average_time
 | 
| 1 | 
+# Copyright (C) 2018 Bloomberg LP
 | 
|
| 2 | 
+#
 | 
|
| 3 | 
+# Licensed under the Apache License, Version 2.0 (the "License");
 | 
|
| 4 | 
+# you may not use this file except in compliance with the License.
 | 
|
| 5 | 
+# You may obtain a copy of the License at
 | 
|
| 6 | 
+#
 | 
|
| 7 | 
+#  <http://www.apache.org/licenses/LICENSE-2.0>
 | 
|
| 8 | 
+#
 | 
|
| 9 | 
+# Unless required by applicable law or agreed to in writing, software
 | 
|
| 10 | 
+# distributed under the License is distributed on an "AS IS" BASIS,
 | 
|
| 11 | 
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
|
| 12 | 
+# See the License for the specific language governing permissions and
 | 
|
| 13 | 
+# limitations under the License.
 | 
|
| 14 | 
+  | 
|
| 15 | 
+  | 
|
| 1 | 16 | 
 import hashlib
 | 
| 2 | 17 | 
 | 
| 3 | 18 | 
 | 
| 4 | 
-# The hash function that CAS uses
 | 
|
| 19 | 
+# Hash function used for computing digests:
 | 
|
| 5 | 20 | 
 HASH = hashlib.sha256
 | 
| 21 | 
+  | 
|
| 22 | 
+# Lenght in bytes of a hash string returned by HASH:
 | 
|
| 6 | 23 | 
 HASH_LENGTH = HASH().digest_size * 2
 | 
| 24 | 
+  | 
|
| 25 | 
+# Period, in seconds, for the monitoring cycle:
 | 
|
| 26 | 
+MONITORING_PERIOD = 5.0
 | 
|
| 27 | 
+  | 
|
| 28 | 
+# String format for log records:
 | 
|
| 29 | 
+LOG_RECORD_FORMAT = '%(asctime)s:%(name)36.36s][%(levelname)5.5s]: %(message)s'
 | 
|
| 30 | 
+# The different log record attributes are documented here:
 | 
|
| 31 | 
+# https://docs.python.org/3/library/logging.html#logrecord-attributes
 | 
| ... | ... | @@ -112,13 +112,15 @@ setup( | 
| 112 | 112 | 
     license="Apache License, Version 2.0",
 | 
| 113 | 113 | 
     description="A remote execution service",
 | 
| 114 | 114 | 
     packages=find_packages(),
 | 
| 115 | 
+    python_requires='>= 3.5.3',  # janus requirement
 | 
|
| 115 | 116 | 
     install_requires=[
 | 
| 116 | 
-        'protobuf',
 | 
|
| 117 | 
-        'grpcio',
 | 
|
| 118 | 
-        'Click',
 | 
|
| 119 | 
-        'PyYAML',
 | 
|
| 120 | 117 | 
         'boto3 < 1.8.0',
 | 
| 121 | 118 | 
         'botocore < 1.11.0',
 | 
| 119 | 
+        'click',
 | 
|
| 120 | 
+        'grpcio',
 | 
|
| 121 | 
+        'janus',
 | 
|
| 122 | 
+        'protobuf',
 | 
|
| 123 | 
+        'pyyaml',
 | 
|
| 122 | 124 | 
     ],
 | 
| 123 | 125 | 
     entry_points={
 | 
| 124 | 126 | 
         'console_scripts': [
 | 
