Martin Blanchard pushed to branch master at BuildGrid / buildgrid
Commits:
- 
db53ffbc
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
5ecfb7f8
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
df5b6a80
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
5e608d6b
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
c167a1d0
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
8fc6d17d
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
397f385b
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
dbbcdb50
by Martin Blanchard at 2018-11-29T08:59:48Z
- 
50f3f63b
by Martin Blanchard at 2018-11-29T08:59:48Z
12 changed files:
- .pylintrc
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/instance.py
- buildgrid/server/job.py
- buildgrid/server/operations/instance.py
- buildgrid/server/operations/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- setup.py
Changes:
| ... | ... | @@ -185,6 +185,7 @@ ignore-on-opaque-inference=yes | 
| 185 | 185 |  # for classes with dynamically set attributes). This supports the use of
 | 
| 186 | 186 |  # qualified names.
 | 
| 187 | 187 |  ignored-classes=google.protobuf.any_pb2.Any,
 | 
| 188 | +                google.protobuf.duration_pb2.Duration,
 | |
| 188 | 189 |                  google.protobuf.timestamp_pb2.Timestamp
 | 
| 189 | 190 |  | 
| 190 | 191 |  # List of module names for which member attributes should not be checked
 | 
| ... | ... | @@ -460,6 +461,7 @@ known-third-party=boto3, | 
| 460 | 461 |                    enchant,
 | 
| 461 | 462 |                    google,
 | 
| 462 | 463 |                    grpc,
 | 
| 464 | +                  janus,
 | |
| 463 | 465 |                    moto,
 | 
| 464 | 466 |                    yaml
 | 
| 465 | 467 |  | 
| ... | ... | @@ -37,6 +37,10 @@ class BotsInterface: | 
| 37 | 37 |          self._assigned_leases = {}
 | 
| 38 | 38 |          self._scheduler = scheduler
 | 
| 39 | 39 |  | 
| 40 | +    @property
 | |
| 41 | +    def scheduler(self):
 | |
| 42 | +        return self._scheduler
 | |
| 43 | + | |
| 40 | 44 |      def register_instance_with_server(self, instance_name, server):
 | 
| 41 | 45 |          server.add_bots_interface(self, instance_name)
 | 
| 42 | 46 |  | 
| ... | ... | @@ -23,8 +23,9 @@ import logging | 
| 23 | 23 |  | 
| 24 | 24 |  import grpc
 | 
| 25 | 25 |  | 
| 26 | -from google.protobuf.empty_pb2 import Empty
 | |
| 26 | +from google.protobuf import empty_pb2, timestamp_pb2
 | |
| 27 | 27 |  | 
| 28 | +from buildgrid._enums import BotStatus
 | |
| 28 | 29 |  from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
 | 
| 29 | 30 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 30 | 31 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
 | 
| ... | ... | @@ -32,24 +33,86 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grp | 
| 32 | 33 |  | 
| 33 | 34 |  class BotsService(bots_pb2_grpc.BotsServicer):
 | 
| 34 | 35 |  | 
| 35 | -    def __init__(self, server):
 | |
| 36 | +    def __init__(self, server, monitor=False):
 | |
| 36 | 37 |          self.__logger = logging.getLogger(__name__)
 | 
| 37 | 38 |  | 
| 39 | +        self.__bots_by_status = None
 | |
| 40 | +        self.__bots_by_instance = None
 | |
| 41 | +        self.__bots = None
 | |
| 42 | + | |
| 38 | 43 |          self._instances = {}
 | 
| 39 | 44 |  | 
| 40 | 45 |          bots_pb2_grpc.add_BotsServicer_to_server(self, server)
 | 
| 41 | 46 |  | 
| 42 | -    def add_instance(self, name, instance):
 | |
| 43 | -        self._instances[name] = instance
 | |
| 47 | +        self._is_instrumented = monitor
 | |
| 48 | + | |
| 49 | +        if self._is_instrumented:
 | |
| 50 | +            self.__bots_by_status = {}
 | |
| 51 | +            self.__bots_by_instance = {}
 | |
| 52 | +            self.__bots = {}
 | |
| 53 | + | |
| 54 | +            self.__bots_by_status[BotStatus.OK] = set()
 | |
| 55 | +            self.__bots_by_status[BotStatus.UNHEALTHY] = set()
 | |
| 56 | + | |
| 57 | +    # --- Public API ---
 | |
| 58 | + | |
| 59 | +    def add_instance(self, instance_name, instance):
 | |
| 60 | +        """Registers a new servicer instance.
 | |
| 61 | + | |
| 62 | +        Args:
 | |
| 63 | +            instance_name (str): The new instance's name.
 | |
| 64 | +            instance (BotsInterface): The new instance itself.
 | |
| 65 | +        """
 | |
| 66 | +        self._instances[instance_name] = instance
 | |
| 67 | + | |
| 68 | +        if self._is_instrumented:
 | |
| 69 | +            self.__bots_by_instance[instance_name] = set()
 | |
| 70 | + | |
| 71 | +    def get_scheduler(self, instance_name):
 | |
| 72 | +        """Retrieves a reference to the scheduler for an instance.
 | |
| 73 | + | |
| 74 | +        Args:
 | |
| 75 | +            instance_name (str): The name of the instance to query.
 | |
| 76 | + | |
| 77 | +        Returns:
 | |
| 78 | +            Scheduler: A reference to the scheduler for `instance_name`.
 | |
| 79 | + | |
| 80 | +        Raises:
 | |
| 81 | +            InvalidArgumentError: If no instance named `instance_name` exists.
 | |
| 82 | +        """
 | |
| 83 | +        instance = self._get_instance(instance_name)
 | |
| 84 | + | |
| 85 | +        return instance.scheduler
 | |
| 86 | + | |
| 87 | +    # --- Public API: Servicer ---
 | |
| 44 | 88 |  | 
| 45 | 89 |      def CreateBotSession(self, request, context):
 | 
| 90 | +        """Handles CreateBotSessionRequest messages.
 | |
| 91 | + | |
| 92 | +        Args:
 | |
| 93 | +            request (CreateBotSessionRequest): The incoming RPC request.
 | |
| 94 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 95 | +        """
 | |
| 46 | 96 |          self.__logger.debug("CreateBotSession request from [%s]", context.peer())
 | 
| 47 | 97 |  | 
| 98 | +        instance_name = request.parent
 | |
| 99 | +        bot_status = BotStatus(request.bot_session.status)
 | |
| 100 | +        bot_id = request.bot_session.bot_id
 | |
| 101 | + | |
| 48 | 102 |          try:
 | 
| 49 | -            parent = request.parent
 | |
| 50 | -            instance = self._get_instance(request.parent)
 | |
| 51 | -            return instance.create_bot_session(parent,
 | |
| 52 | -                                               request.bot_session)
 | |
| 103 | +            instance = self._get_instance(instance_name)
 | |
| 104 | +            bot_session = instance.create_bot_session(instance_name,
 | |
| 105 | +                                                      request.bot_session)
 | |
| 106 | +            now = timestamp_pb2.Timestamp()
 | |
| 107 | +            now.GetCurrentTime()
 | |
| 108 | + | |
| 109 | +            if self._is_instrumented:
 | |
| 110 | +                self.__bots[bot_id] = now
 | |
| 111 | +                self.__bots_by_instance[instance_name].add(bot_id)
 | |
| 112 | +                if bot_status in self.__bots_by_status:
 | |
| 113 | +                    self.__bots_by_status[bot_status].add(bot_id)
 | |
| 114 | + | |
| 115 | +            return bot_session
 | |
| 53 | 116 |  | 
| 54 | 117 |          except InvalidArgumentError as e:
 | 
| 55 | 118 |              self.__logger.error(e)
 | 
| ... | ... | @@ -59,17 +122,41 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 59 | 122 |          return bots_pb2.BotSession()
 | 
| 60 | 123 |  | 
| 61 | 124 |      def UpdateBotSession(self, request, context):
 | 
| 125 | +        """Handles UpdateBotSessionRequest messages.
 | |
| 126 | + | |
| 127 | +        Args:
 | |
| 128 | +            request (UpdateBotSessionRequest): The incoming RPC request.
 | |
| 129 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 130 | +        """
 | |
| 62 | 131 |          self.__logger.debug("UpdateBotSession request from [%s]", context.peer())
 | 
| 63 | 132 |  | 
| 133 | +        names = request.name.split("/")
 | |
| 134 | +        bot_status = BotStatus(request.bot_session.status)
 | |
| 135 | +        bot_id = request.bot_session.bot_id
 | |
| 136 | + | |
| 64 | 137 |          try:
 | 
| 65 | -            names = request.name.split("/")
 | |
| 66 | -            # Operation name should be in format:
 | |
| 67 | -            # {instance/name}/{uuid}
 | |
| 68 | -            instance_name = ''.join(names[0:-1])
 | |
| 138 | +            instance_name = '/'.join(names[:-1])
 | |
| 69 | 139 |  | 
| 70 | 140 |              instance = self._get_instance(instance_name)
 | 
| 71 | -            return instance.update_bot_session(request.name,
 | |
| 72 | -                                               request.bot_session)
 | |
| 141 | +            bot_session = instance.update_bot_session(request.name,
 | |
| 142 | +                                                      request.bot_session)
 | |
| 143 | + | |
| 144 | +            if self._is_instrumented:
 | |
| 145 | +                self.__bots[bot_id].GetCurrentTime()
 | |
| 146 | +                if bot_id not in self.__bots_by_status[bot_status]:
 | |
| 147 | +                    if bot_status == BotStatus.OK:
 | |
| 148 | +                        self.__bots_by_status[BotStatus.OK].add(bot_id)
 | |
| 149 | +                        self.__bots_by_status[BotStatus.UNHEALTHY].discard(bot_id)
 | |
| 150 | + | |
| 151 | +                    elif bot_status == BotStatus.UNHEALTHY:
 | |
| 152 | +                        self.__bots_by_status[BotStatus.OK].discard(bot_id)
 | |
| 153 | +                        self.__bots_by_status[BotStatus.UNHEALTHY].add(bot_id)
 | |
| 154 | + | |
| 155 | +                    else:
 | |
| 156 | +                        self.__bots_by_instance[instance_name].remove(bot_id)
 | |
| 157 | +                        del self.__bots[bot_id]
 | |
| 158 | + | |
| 159 | +            return bot_session
 | |
| 73 | 160 |  | 
| 74 | 161 |          except InvalidArgumentError as e:
 | 
| 75 | 162 |              self.__logger.error(e)
 | 
| ... | ... | @@ -89,10 +176,47 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 89 | 176 |          return bots_pb2.BotSession()
 | 
| 90 | 177 |  | 
| 91 | 178 |      def PostBotEventTemp(self, request, context):
 | 
| 179 | +        """Handles PostBotEventTempRequest messages.
 | |
| 180 | + | |
| 181 | +        Args:
 | |
| 182 | +            request (PostBotEventTempRequest): The incoming RPC request.
 | |
| 183 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 184 | +        """
 | |
| 92 | 185 |          self.__logger.debug("PostBotEventTemp request from [%s]", context.peer())
 | 
| 93 | 186 |  | 
| 94 | 187 |          context.set_code(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 95 | -        return Empty()
 | |
| 188 | + | |
| 189 | +        return empty_pb2.Empty()
 | |
| 190 | + | |
| 191 | +    # --- Public API: Monitoring ---
 | |
| 192 | + | |
| 193 | +    @property
 | |
| 194 | +    def is_instrumented(self):
 | |
| 195 | +        return self._is_instrumented
 | |
| 196 | + | |
| 197 | +    def query_n_bots(self):
 | |
| 198 | +        if self.__bots is not None:
 | |
| 199 | +            return len(self.__bots)
 | |
| 200 | + | |
| 201 | +        return 0
 | |
| 202 | + | |
| 203 | +    def query_n_bots_for_instance(self, instance_name):
 | |
| 204 | +        try:
 | |
| 205 | +            if self.__bots_by_instance is not None:
 | |
| 206 | +                return len(self.__bots_by_instance[instance_name])
 | |
| 207 | +        except KeyError:
 | |
| 208 | +            pass
 | |
| 209 | +        return 0
 | |
| 210 | + | |
| 211 | +    def query_n_bots_for_status(self, bot_status):
 | |
| 212 | +        try:
 | |
| 213 | +            if self.__bots_by_status is not None:
 | |
| 214 | +                return len(self.__bots_by_status[bot_status])
 | |
| 215 | +        except KeyError:
 | |
| 216 | +            pass
 | |
| 217 | +        return 0
 | |
| 218 | + | |
| 219 | +    # --- Private API ---
 | |
| 96 | 220 |  | 
| 97 | 221 |      def _get_instance(self, name):
 | 
| 98 | 222 |          try:
 | 
| ... | ... | @@ -36,6 +36,10 @@ class ExecutionInstance: | 
| 36 | 36 |          self._storage = storage
 | 
| 37 | 37 |          self._scheduler = scheduler
 | 
| 38 | 38 |  | 
| 39 | +    @property
 | |
| 40 | +    def scheduler(self):
 | |
| 41 | +        return self._scheduler
 | |
| 42 | + | |
| 39 | 43 |      def register_instance_with_server(self, instance_name, server):
 | 
| 40 | 44 |          server.add_execution_instance(self, instance_name)
 | 
| 41 | 45 |  | 
| ... | ... | @@ -33,30 +33,84 @@ from buildgrid._protos.google.longrunning import operations_pb2 | 
| 33 | 33 |  | 
| 34 | 34 |  class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 | 
| 35 | 35 |  | 
| 36 | -    def __init__(self, server):
 | |
| 36 | +    def __init__(self, server, monitor=False):
 | |
| 37 | 37 |          self.__logger = logging.getLogger(__name__)
 | 
| 38 | 38 |  | 
| 39 | +        self.__peers_by_instance = None
 | |
| 40 | +        self.__peers = None
 | |
| 41 | + | |
| 39 | 42 |          self._instances = {}
 | 
| 43 | + | |
| 40 | 44 |          remote_execution_pb2_grpc.add_ExecutionServicer_to_server(self, server)
 | 
| 41 | 45 |  | 
| 42 | -    def add_instance(self, name, instance):
 | |
| 43 | -        self._instances[name] = instance
 | |
| 46 | +        self._is_instrumented = monitor
 | |
| 47 | + | |
| 48 | +        if self._is_instrumented:
 | |
| 49 | +            self.__peers_by_instance = {}
 | |
| 50 | +            self.__peers = {}
 | |
| 51 | + | |
| 52 | +    # --- Public API ---
 | |
| 53 | + | |
| 54 | +    def add_instance(self, instance_name, instance):
 | |
| 55 | +        """Registers a new servicer instance.
 | |
| 56 | + | |
| 57 | +        Args:
 | |
| 58 | +            instance_name (str): The new instance's name.
 | |
| 59 | +            instance (ExecutionInstance): The new instance itself.
 | |
| 60 | +        """
 | |
| 61 | +        self._instances[instance_name] = instance
 | |
| 62 | + | |
| 63 | +        if self._is_instrumented:
 | |
| 64 | +            self.__peers_by_instance[instance_name] = set()
 | |
| 65 | + | |
| 66 | +    def get_scheduler(self, instance_name):
 | |
| 67 | +        """Retrieves a reference to the scheduler for an instance.
 | |
| 68 | + | |
| 69 | +        Args:
 | |
| 70 | +            instance_name (str): The name of the instance to query.
 | |
| 71 | + | |
| 72 | +        Returns:
 | |
| 73 | +            Scheduler: A reference to the scheduler for `instance_name`.
 | |
| 74 | + | |
| 75 | +        Raises:
 | |
| 76 | +            InvalidArgumentError: If no instance named `instance_name` exists.
 | |
| 77 | +        """
 | |
| 78 | +        instance = self._get_instance(instance_name)
 | |
| 79 | + | |
| 80 | +        return instance.scheduler
 | |
| 81 | + | |
| 82 | +    # --- Public API: Servicer ---
 | |
| 44 | 83 |  | 
| 45 | 84 |      def Execute(self, request, context):
 | 
| 85 | +        """Handles ExecuteRequest messages.
 | |
| 86 | + | |
| 87 | +        Args:
 | |
| 88 | +            request (ExecuteRequest): The incoming RPC request.
 | |
| 89 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 90 | +        """
 | |
| 46 | 91 |          self.__logger.debug("Execute request from [%s]", context.peer())
 | 
| 47 | 92 |  | 
| 93 | +        instance_name = request.instance_name
 | |
| 94 | +        message_queue = queue.Queue()
 | |
| 95 | +        peer = context.peer()
 | |
| 96 | + | |
| 48 | 97 |          try:
 | 
| 49 | -            message_queue = queue.Queue()
 | |
| 50 | -            instance = self._get_instance(request.instance_name)
 | |
| 98 | +            instance = self._get_instance(instance_name)
 | |
| 51 | 99 |              operation = instance.execute(request.action_digest,
 | 
| 52 | 100 |                                           request.skip_cache_lookup,
 | 
| 53 | 101 |                                           message_queue)
 | 
| 54 | 102 |  | 
| 55 | -            context.add_callback(partial(instance.unregister_message_client,
 | |
| 56 | -                                         operation.name, message_queue))
 | |
| 103 | +            context.add_callback(partial(self._rpc_termination_callback,
 | |
| 104 | +                                         peer, instance_name, operation.name, message_queue))
 | |
| 57 | 105 |  | 
| 58 | -            instanced_op_name = "{}/{}".format(request.instance_name,
 | |
| 59 | -                                               operation.name)
 | |
| 106 | +            if self._is_instrumented:
 | |
| 107 | +                if peer not in self.__peers:
 | |
| 108 | +                    self.__peers_by_instance[instance_name].add(peer)
 | |
| 109 | +                    self.__peers[peer] = 1
 | |
| 110 | +                else:
 | |
| 111 | +                    self.__peers[peer] += 1
 | |
| 112 | + | |
| 113 | +            instanced_op_name = "{}/{}".format(instance_name, operation.name)
 | |
| 60 | 114 |  | 
| 61 | 115 |              self.__logger.info("Operation name: [%s]", instanced_op_name)
 | 
| 62 | 116 |  | 
| ... | ... | @@ -86,23 +140,33 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 86 | 140 |              yield operations_pb2.Operation()
 | 
| 87 | 141 |  | 
| 88 | 142 |      def WaitExecution(self, request, context):
 | 
| 89 | -        self.__logger.debug("WaitExecution request from [%s]", context.peer())
 | |
| 143 | +        """Handles WaitExecutionRequest messages.
 | |
| 90 | 144 |  | 
| 91 | -        try:
 | |
| 92 | -            names = request.name.split("/")
 | |
| 145 | +        Args:
 | |
| 146 | +            request (WaitExecutionRequest): The incoming RPC request.
 | |
| 147 | +            context (grpc.ServicerContext): Context for the RPC call.
 | |
| 148 | +        """
 | |
| 149 | +        self.__logger.debug("WaitExecution request from [%s]", context.peer())
 | |
| 93 | 150 |  | 
| 94 | -            # Operation name should be in format:
 | |
| 95 | -            # {instance/name}/{operation_id}
 | |
| 96 | -            instance_name = ''.join(names[0:-1])
 | |
| 151 | +        names = request.name.split('/')
 | |
| 152 | +        instance_name = '/'.join(names[:-1])
 | |
| 153 | +        operation_name = names[-1]
 | |
| 154 | +        message_queue = queue.Queue()
 | |
| 155 | +        peer = context.peer()
 | |
| 97 | 156 |  | 
| 98 | -            message_queue = queue.Queue()
 | |
| 99 | -            operation_name = names[-1]
 | |
| 157 | +        try:
 | |
| 100 | 158 |              instance = self._get_instance(instance_name)
 | 
| 101 | 159 |  | 
| 102 | 160 |              instance.register_message_client(operation_name, message_queue)
 | 
| 161 | +            context.add_callback(partial(self._rpc_termination_callback,
 | |
| 162 | +                                         peer, instance_name, operation_name, message_queue))
 | |
| 103 | 163 |  | 
| 104 | -            context.add_callback(partial(instance.unregister_message_client,
 | |
| 105 | -                                         operation_name, message_queue))
 | |
| 164 | +            if self._is_instrumented:
 | |
| 165 | +                if peer not in self.__peers:
 | |
| 166 | +                    self.__peers_by_instance[instance_name].add(peer)
 | |
| 167 | +                    self.__peers[peer] = 1
 | |
| 168 | +                else:
 | |
| 169 | +                    self.__peers[peer] += 1
 | |
| 106 | 170 |  | 
| 107 | 171 |              for operation in instance.stream_operation_updates(message_queue,
 | 
| 108 | 172 |                                                                 operation_name):
 | 
| ... | ... | @@ -123,6 +187,39 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 123 | 187 |              context.set_code(grpc.StatusCode.CANCELLED)
 | 
| 124 | 188 |              yield operations_pb2.Operation()
 | 
| 125 | 189 |  | 
| 190 | +    # --- Public API: Monitoring ---
 | |
| 191 | + | |
| 192 | +    @property
 | |
| 193 | +    def is_instrumented(self):
 | |
| 194 | +        return self._is_instrumented
 | |
| 195 | + | |
| 196 | +    def query_n_clients(self):
 | |
| 197 | +        if self.__peers is not None:
 | |
| 198 | +            return len(self.__peers)
 | |
| 199 | +        return 0
 | |
| 200 | + | |
| 201 | +    def query_n_clients_for_instance(self, instance_name):
 | |
| 202 | +        try:
 | |
| 203 | +            if self.__peers_by_instance is not None:
 | |
| 204 | +                return len(self.__peers_by_instance[instance_name])
 | |
| 205 | +        except KeyError:
 | |
| 206 | +            pass
 | |
| 207 | +        return 0
 | |
| 208 | + | |
| 209 | +    # --- Private API ---
 | |
| 210 | + | |
| 211 | +    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
 | |
| 212 | +        instance = self._get_instance(instance_name)
 | |
| 213 | + | |
| 214 | +        instance.unregister_message_client(job_name, message_queue)
 | |
| 215 | + | |
| 216 | +        if self._is_instrumented:
 | |
| 217 | +            if self.__peers[peer] > 1:
 | |
| 218 | +                self.__peers[peer] -= 1
 | |
| 219 | +            else:
 | |
| 220 | +                self.__peers_by_instance[instance_name].remove(peer)
 | |
| 221 | +                del self.__peers[peer]
 | |
| 222 | + | |
| 126 | 223 |      def _get_instance(self, name):
 | 
| 127 | 224 |          try:
 | 
| 128 | 225 |              return self._instances[name]
 | 
| ... | ... | @@ -15,12 +15,16 @@ | 
| 15 | 15 |  | 
| 16 | 16 |  import asyncio
 | 
| 17 | 17 |  from concurrent import futures
 | 
| 18 | +from datetime import timedelta
 | |
| 18 | 19 |  import logging
 | 
| 19 | 20 |  import os
 | 
| 20 | 21 |  import signal
 | 
| 22 | +import time
 | |
| 21 | 23 |  | 
| 22 | 24 |  import grpc
 | 
| 23 | 25 |  | 
| 26 | +from buildgrid._enums import BotStatus, MetricRecordDomain, MetricRecordType
 | |
| 27 | +from buildgrid._protos.buildgrid.v2 import monitoring_pb2
 | |
| 24 | 28 |  from buildgrid.server.actioncache.service import ActionCacheService
 | 
| 25 | 29 |  from buildgrid.server.bots.service import BotsService
 | 
| 26 | 30 |  from buildgrid.server.cas.service import ByteStreamService, ContentAddressableStorageService
 | 
| ... | ... | @@ -30,6 +34,7 @@ from buildgrid.server.operations.service import OperationsService | 
| 30 | 34 |  from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 31 | 35 |  from buildgrid.server.capabilities.instance import CapabilitiesInstance
 | 
| 32 | 36 |  from buildgrid.server.capabilities.service import CapabilitiesService
 | 
| 37 | +from buildgrid.settings import MONITORING_PERIOD
 | |
| 33 | 38 |  | 
| 34 | 39 |  | 
| 35 | 40 |  class BuildGridServer:
 | 
| ... | ... | @@ -57,6 +62,8 @@ class BuildGridServer: | 
| 57 | 62 |          self.__main_loop = asyncio.get_event_loop()
 | 
| 58 | 63 |          self.__monitoring_bus = None
 | 
| 59 | 64 |  | 
| 65 | +        self.__state_monitoring_task = None
 | |
| 66 | + | |
| 60 | 67 |          # We always want a capabilities service
 | 
| 61 | 68 |          self._capabilities_service = CapabilitiesService(self.__grpc_server)
 | 
| 62 | 69 |  | 
| ... | ... | @@ -68,6 +75,9 @@ class BuildGridServer: | 
| 68 | 75 |          self._cas_service = None
 | 
| 69 | 76 |          self._bytestream_service = None
 | 
| 70 | 77 |  | 
| 78 | +        self._schedulers = {}
 | |
| 79 | +        self._instances = set()
 | |
| 80 | + | |
| 71 | 81 |          self._is_instrumented = monitor
 | 
| 72 | 82 |  | 
| 73 | 83 |          if self._is_instrumented:
 | 
| ... | ... | @@ -84,6 +94,10 @@ class BuildGridServer: | 
| 84 | 94 |          if self._is_instrumented:
 | 
| 85 | 95 |              self.__monitoring_bus.start()
 | 
| 86 | 96 |  | 
| 97 | +            self.__state_monitoring_task = asyncio.ensure_future(
 | |
| 98 | +                self._state_monitoring_worker(period=MONITORING_PERIOD),
 | |
| 99 | +                loop=self.__main_loop)
 | |
| 100 | + | |
| 87 | 101 |          self.__main_loop.add_signal_handler(signal.SIGTERM, self.stop)
 | 
| 88 | 102 |  | 
| 89 | 103 |          self.__main_loop.run_forever()
 | 
| ... | ... | @@ -91,6 +105,9 @@ class BuildGridServer: | 
| 91 | 105 |      def stop(self):
 | 
| 92 | 106 |          """Stops the BuildGrid server."""
 | 
| 93 | 107 |          if self._is_instrumented:
 | 
| 108 | +            if self.__state_monitoring_task is not None:
 | |
| 109 | +                self.__state_monitoring_task.cancel()
 | |
| 110 | + | |
| 94 | 111 |              self.__monitoring_bus.stop()
 | 
| 95 | 112 |  | 
| 96 | 113 |          self.__main_loop.stop()
 | 
| ... | ... | @@ -130,11 +147,15 @@ class BuildGridServer: | 
| 130 | 147 |              instance_name (str): Instance name.
 | 
| 131 | 148 |          """
 | 
| 132 | 149 |          if self._execution_service is None:
 | 
| 133 | -            self._execution_service = ExecutionService(self.__grpc_server)
 | |
| 150 | +            self._execution_service = ExecutionService(
 | |
| 151 | +                self.__grpc_server, monitor=self._is_instrumented)
 | |
| 134 | 152 |  | 
| 135 | 153 |          self._execution_service.add_instance(instance_name, instance)
 | 
| 136 | 154 |          self._add_capabilities_instance(instance_name, execution_instance=instance)
 | 
| 137 | 155 |  | 
| 156 | +        self._schedulers[instance_name] = instance.scheduler
 | |
| 157 | +        self._instances.add(instance_name)
 | |
| 158 | + | |
| 138 | 159 |      def add_bots_interface(self, instance, instance_name):
 | 
| 139 | 160 |          """Adds a :obj:`BotsInterface` to the service.
 | 
| 140 | 161 |  | 
| ... | ... | @@ -145,10 +166,13 @@ class BuildGridServer: | 
| 145 | 166 |              instance_name (str): Instance name.
 | 
| 146 | 167 |          """
 | 
| 147 | 168 |          if self._bots_service is None:
 | 
| 148 | -            self._bots_service = BotsService(self.__grpc_server)
 | |
| 169 | +            self._bots_service = BotsService(
 | |
| 170 | +                self.__grpc_server, monitor=self._is_instrumented)
 | |
| 149 | 171 |  | 
| 150 | 172 |          self._bots_service.add_instance(instance_name, instance)
 | 
| 151 | 173 |  | 
| 174 | +        self._instances.add(instance_name)
 | |
| 175 | + | |
| 152 | 176 |      def add_operations_instance(self, instance, instance_name):
 | 
| 153 | 177 |          """Adds an :obj:`OperationsInstance` to the service.
 | 
| 154 | 178 |  | 
| ... | ... | @@ -221,6 +245,14 @@ class BuildGridServer: | 
| 221 | 245 |  | 
| 222 | 246 |          self._bytestream_service.add_instance(instance_name, instance)
 | 
| 223 | 247 |  | 
| 248 | +    # --- Public API: Monitoring ---
 | |
| 249 | + | |
| 250 | +    @property
 | |
| 251 | +    def is_instrumented(self):
 | |
| 252 | +        return self._is_instrumented
 | |
| 253 | + | |
| 254 | +    # --- Private API ---
 | |
| 255 | + | |
| 224 | 256 |      def _add_capabilities_instance(self, instance_name,
 | 
| 225 | 257 |                                     cas_instance=None,
 | 
| 226 | 258 |                                     action_cache_instance=None,
 | 
| ... | ... | @@ -246,8 +278,152 @@ class BuildGridServer: | 
| 246 | 278 |                                                           execution_instance)
 | 
| 247 | 279 |              self._capabilities_service.add_instance(instance_name, capabilities_instance)
 | 
| 248 | 280 |  | 
| 249 | -    # --- Public API: Monitoring ---
 | |
| 281 | +    async def _state_monitoring_worker(self, period=1.0):
 | |
| 282 | +        """Periodically publishes state metrics to the monitoring bus."""
 | |
| 283 | +        async def __state_monitoring_worker():
 | |
| 284 | +            # Emit total clients count record:
 | |
| 285 | +            _, record = self._query_n_clients()
 | |
| 286 | +            await self.__monitoring_bus.send_record(record)
 | |
| 287 | + | |
| 288 | +            # Emit total bots count record:
 | |
| 289 | +            _, record = self._query_n_bots()
 | |
| 290 | +            await self.__monitoring_bus.send_record(record)
 | |
| 291 | + | |
| 292 | +            queue_times = []
 | |
| 293 | +            # Emits records by instance:
 | |
| 294 | +            for instance_name in self._instances:
 | |
| 295 | +                # Emit instance clients count record:
 | |
| 296 | +                _, record = self._query_n_clients_for_instance(instance_name)
 | |
| 297 | +                await self.__monitoring_bus.send_record(record)
 | |
| 298 | + | |
| 299 | +                # Emit instance bots count record:
 | |
| 300 | +                _, record = self._query_n_bots_for_instance(instance_name)
 | |
| 301 | +                await self.__monitoring_bus.send_record(record)
 | |
| 302 | + | |
| 303 | +                # Emit instance average queue time record:
 | |
| 304 | +                queue_time, record = self._query_am_queue_time_for_instance(instance_name)
 | |
| 305 | +                await self.__monitoring_bus.send_record(record)
 | |
| 306 | +                if queue_time:
 | |
| 307 | +                    queue_times.append(queue_time)
 | |
| 308 | + | |
| 309 | +            # Emits records by bot status:
 | |
| 310 | +            for bot_status in [BotStatus.OK, BotStatus.UNHEALTHY]:
 | |
| 311 | +                # Emit status bots count record:
 | |
| 312 | +                _, record = self._query_n_bots_for_status(bot_status)
 | |
| 313 | +                await self.__monitoring_bus.send_record(record)
 | |
| 314 | + | |
| 315 | +            # Emit overall average queue time record:
 | |
| 316 | +            if queue_times:
 | |
| 317 | +                am_queue_time = sum(queue_times, timedelta()) / len(queue_times)
 | |
| 318 | +            else:
 | |
| 319 | +                am_queue_time = timedelta()
 | |
| 320 | +            record = self._forge_timer_metric_record(
 | |
| 321 | +                MetricRecordDomain.STATE,
 | |
| 322 | +                'average-queue-time',
 | |
| 323 | +                am_queue_time)
 | |
| 324 | + | |
| 325 | +            await self.__monitoring_bus.send_record(record)
 | |
| 250 | 326 |  | 
| 251 | -    @property
 | |
| 252 | -    def is_instrumented(self):
 | |
| 253 | -        return self._is_instrumented | |
| 327 | +        try:
 | |
| 328 | +            while True:
 | |
| 329 | +                start = time.time()
 | |
| 330 | +                await __state_monitoring_worker()
 | |
| 331 | + | |
| 332 | +                end = time.time()
 | |
| 333 | +                await asyncio.sleep(period - (end - start))
 | |
| 334 | + | |
| 335 | +        except asyncio.CancelledError:
 | |
| 336 | +            pass
 | |
| 337 | + | |
| 338 | +    def _forge_counter_metric_record(self, domain, name, count, metadata=None):
 | |
| 339 | +        counter_record = monitoring_pb2.MetricRecord()
 | |
| 340 | + | |
| 341 | +        counter_record.creation_timestamp.GetCurrentTime()
 | |
| 342 | +        counter_record.domain = domain.value
 | |
| 343 | +        counter_record.type = MetricRecordType.COUNTER.value
 | |
| 344 | +        counter_record.name = name
 | |
| 345 | +        counter_record.count = count
 | |
| 346 | +        if metadata is not None:
 | |
| 347 | +            counter_record.metadata.update(metadata)
 | |
| 348 | + | |
| 349 | +        return counter_record
 | |
| 350 | + | |
| 351 | +    def _forge_timer_metric_record(self, domain, name, duration, metadata=None):
 | |
| 352 | +        timer_record = monitoring_pb2.MetricRecord()
 | |
| 353 | + | |
| 354 | +        timer_record.creation_timestamp.GetCurrentTime()
 | |
| 355 | +        timer_record.domain = domain.value
 | |
| 356 | +        timer_record.type = MetricRecordType.TIMER.value
 | |
| 357 | +        timer_record.name = name
 | |
| 358 | +        timer_record.duration.FromTimedelta(duration)
 | |
| 359 | +        if metadata is not None:
 | |
| 360 | +            timer_record.metadata.update(metadata)
 | |
| 361 | + | |
| 362 | +        return timer_record
 | |
| 363 | + | |
| 364 | +    def _forge_gauge_metric_record(self, domain, name, value, metadata=None):
 | |
| 365 | +        gauge_record = monitoring_pb2.MetricRecord()
 | |
| 366 | + | |
| 367 | +        gauge_record.creation_timestamp.GetCurrentTime()
 | |
| 368 | +        gauge_record.domain = domain.value
 | |
| 369 | +        gauge_record.type = MetricRecordType.GAUGE.value
 | |
| 370 | +        gauge_record.name = name
 | |
| 371 | +        gauge_record.value = value
 | |
| 372 | +        if metadata is not None:
 | |
| 373 | +            gauge_record.metadata.update(metadata)
 | |
| 374 | + | |
| 375 | +        return gauge_record
 | |
| 376 | + | |
| 377 | +    # --- Private API: Monitoring ---
 | |
| 378 | + | |
| 379 | +    def _query_n_clients(self):
 | |
| 380 | +        """Queries the number of clients connected."""
 | |
| 381 | +        n_clients = self._execution_service.query_n_clients()
 | |
| 382 | +        gauge_record = self._forge_gauge_metric_record(
 | |
| 383 | +            MetricRecordDomain.STATE, 'clients-count', n_clients)
 | |
| 384 | + | |
| 385 | +        return n_clients, gauge_record
 | |
| 386 | + | |
| 387 | +    def _query_n_clients_for_instance(self, instance_name):
 | |
| 388 | +        """Queries the number of clients connected for a given instance"""
 | |
| 389 | +        n_clients = self._execution_service.query_n_clients_for_instance(instance_name)
 | |
| 390 | +        gauge_record = self._forge_gauge_metric_record(
 | |
| 391 | +            MetricRecordDomain.STATE, 'clients-count', n_clients,
 | |
| 392 | +            metadata={'instance-name': instance_name or 'void'})
 | |
| 393 | + | |
| 394 | +        return n_clients, gauge_record
 | |
| 395 | + | |
| 396 | +    def _query_n_bots(self):
 | |
| 397 | +        """Queries the number of bots connected."""
 | |
| 398 | +        n_bots = self._bots_service.query_n_bots()
 | |
| 399 | +        gauge_record = self._forge_gauge_metric_record(
 | |
| 400 | +            MetricRecordDomain.STATE, 'bots-count', n_bots)
 | |
| 401 | + | |
| 402 | +        return n_bots, gauge_record
 | |
| 403 | + | |
| 404 | +    def _query_n_bots_for_instance(self, instance_name):
 | |
| 405 | +        """Queries the number of bots connected for a given instance."""
 | |
| 406 | +        n_bots = self._bots_service.query_n_bots_for_instance(instance_name)
 | |
| 407 | +        gauge_record = self._forge_gauge_metric_record(
 | |
| 408 | +            MetricRecordDomain.STATE, 'bots-count', n_bots,
 | |
| 409 | +            metadata={'instance-name': instance_name or 'void'})
 | |
| 410 | + | |
| 411 | +        return n_bots, gauge_record
 | |
| 412 | + | |
| 413 | +    def _query_n_bots_for_status(self, bot_status):
 | |
| 414 | +        """Queries the number of bots connected for a given health status."""
 | |
| 415 | +        n_bots = self._bots_service.query_n_bots_for_status(bot_status)
 | |
| 416 | +        gauge_record = self._forge_gauge_metric_record(
 | |
| 417 | +            MetricRecordDomain.STATE, 'bots-count', n_bots,
 | |
| 418 | +            metadata={'bot-status': bot_status.name})
 | |
| 419 | + | |
| 420 | +        return n_bots, gauge_record
 | |
| 421 | + | |
| 422 | +    def _query_am_queue_time_for_instance(self, instance_name):
 | |
| 423 | +        """Queries the average job's queue time for a given instance."""
 | |
| 424 | +        am_queue_time = self._schedulers[instance_name].query_am_queue_time()
 | |
| 425 | +        timer_record = self._forge_timer_metric_record(
 | |
| 426 | +            MetricRecordDomain.STATE, 'average-queue-time', am_queue_time,
 | |
| 427 | +            metadata={'instance-name': instance_name or 'void'})
 | |
| 428 | + | |
| 429 | +        return am_queue_time, timer_record | 
| ... | ... | @@ -13,10 +13,11 @@ | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | 14 |  | 
| 15 | 15 |  | 
| 16 | +from datetime import datetime
 | |
| 16 | 17 |  import logging
 | 
| 17 | 18 |  import uuid
 | 
| 18 | 19 |  | 
| 19 | -from google.protobuf import timestamp_pb2
 | |
| 20 | +from google.protobuf import duration_pb2, timestamp_pb2
 | |
| 20 | 21 |  | 
| 21 | 22 |  from buildgrid._enums import LeaseState, OperationStage
 | 
| 22 | 23 |  from buildgrid._exceptions import CancelledError
 | 
| ... | ... | @@ -40,6 +41,7 @@ class Job: | 
| 40 | 41 |          self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
 | 
| 41 | 42 |  | 
| 42 | 43 |          self.__queued_timestamp = timestamp_pb2.Timestamp()
 | 
| 44 | +        self.__queued_time_duration = duration_pb2.Duration()
 | |
| 43 | 45 |          self.__worker_start_timestamp = timestamp_pb2.Timestamp()
 | 
| 44 | 46 |          self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
 | 
| 45 | 47 |  | 
| ... | ... | @@ -56,6 +58,8 @@ class Job: | 
| 56 | 58 |          self._operation.done = False
 | 
| 57 | 59 |          self._n_tries = 0
 | 
| 58 | 60 |  | 
| 61 | +    # --- Public API ---
 | |
| 62 | + | |
| 59 | 63 |      @property
 | 
| 60 | 64 |      def name(self):
 | 
| 61 | 65 |          return self._name
 | 
| ... | ... | @@ -193,7 +197,7 @@ class Job: | 
| 193 | 197 |                  result.Unpack(action_result)
 | 
| 194 | 198 |  | 
| 195 | 199 |              action_metadata = action_result.execution_metadata
 | 
| 196 | -            action_metadata.queued_timestamp.CopyFrom(self.__worker_start_timestamp)
 | |
| 200 | +            action_metadata.queued_timestamp.CopyFrom(self.__queued_timestamp)
 | |
| 197 | 201 |              action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
 | 
| 198 | 202 |              action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
 | 
| 199 | 203 |  | 
| ... | ... | @@ -227,6 +231,10 @@ class Job: | 
| 227 | 231 |                  self.__queued_timestamp.GetCurrentTime()
 | 
| 228 | 232 |              self._n_tries += 1
 | 
| 229 | 233 |  | 
| 234 | +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
 | |
| 235 | +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
 | |
| 236 | +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
 | |
| 237 | + | |
| 230 | 238 |          elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
 | 
| 231 | 239 |              if self.__execute_response is not None:
 | 
| 232 | 240 |                  self._operation.response.Pack(self.__execute_response)
 | 
| ... | ... | @@ -260,3 +268,11 @@ class Job: | 
| 260 | 268 |          self.__execute_response.status.message = "Operation cancelled by client."
 | 
| 261 | 269 |  | 
| 262 | 270 |          self.update_operation_stage(OperationStage.COMPLETED)
 | 
| 271 | + | |
| 272 | +    # --- Public API: Monitoring ---
 | |
| 273 | + | |
| 274 | +    def query_queue_time(self):
 | |
| 275 | +        return self.__queued_time_duration.ToTimedelta()
 | |
| 276 | + | |
| 277 | +    def query_n_retries(self):
 | |
| 278 | +        return self._n_tries - 1 if self._n_tries > 0 else 0 | 
| ... | ... | @@ -32,6 +32,10 @@ class OperationsInstance: | 
| 32 | 32 |  | 
| 33 | 33 |          self._scheduler = scheduler
 | 
| 34 | 34 |  | 
| 35 | +    @property
 | |
| 36 | +    def scheduler(self):
 | |
| 37 | +        return self._scheduler
 | |
| 38 | + | |
| 35 | 39 |      def register_instance_with_server(self, instance_name, server):
 | 
| 36 | 40 |          server.add_operations_instance(self, instance_name)
 | 
| 37 | 41 |  | 
| ... | ... | @@ -38,8 +38,18 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 38 | 38 |  | 
| 39 | 39 |          operations_pb2_grpc.add_OperationsServicer_to_server(self, server)
 | 
| 40 | 40 |  | 
| 41 | -    def add_instance(self, name, instance):
 | |
| 42 | -        self._instances[name] = instance
 | |
| 41 | +    # --- Public API ---
 | |
| 42 | + | |
| 43 | +    def add_instance(self, instance_name, instance):
 | |
| 44 | +        """Registers a new servicer instance.
 | |
| 45 | + | |
| 46 | +        Args:
 | |
| 47 | +            instance_name (str): The new instance's name.
 | |
| 48 | +            instance (OperationsInstance): The new instance itself.
 | |
| 49 | +        """
 | |
| 50 | +        self._instances[instance_name] = instance
 | |
| 51 | + | |
| 52 | +    # --- Public API: Servicer ---
 | |
| 43 | 53 |  | 
| 44 | 54 |      def GetOperation(self, request, context):
 | 
| 45 | 55 |          self.__logger.debug("GetOperation request from [%s]", context.peer())
 | 
| ... | ... | @@ -127,6 +137,8 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): | 
| 127 | 137 |  | 
| 128 | 138 |          return Empty()
 | 
| 129 | 139 |  | 
| 140 | +    # --- Private API ---
 | |
| 141 | + | |
| 130 | 142 |      def _parse_instance_name(self, name):
 | 
| 131 | 143 |          """ If the instance name is not blank, 'name' will have the form
 | 
| 132 | 144 |          {instance_name}/{operation_uuid}. Otherwise, it will just be
 | 
| ... | ... | @@ -20,33 +20,70 @@ Schedules jobs. | 
| 20 | 20 |  """
 | 
| 21 | 21 |  | 
| 22 | 22 |  from collections import deque
 | 
| 23 | +from datetime import timedelta
 | |
| 23 | 24 |  import logging
 | 
| 24 | 25 |  | 
| 26 | +from buildgrid._enums import LeaseState, OperationStage
 | |
| 25 | 27 |  from buildgrid._exceptions import NotFoundError
 | 
| 26 | 28 |  | 
| 27 | -from .job import OperationStage, LeaseState
 | |
| 28 | - | |
| 29 | 29 |  | 
| 30 | 30 |  class Scheduler:
 | 
| 31 | 31 |  | 
| 32 | 32 |      MAX_N_TRIES = 5
 | 
| 33 | 33 |  | 
| 34 | -    def __init__(self, action_cache=None):
 | |
| 34 | +    def __init__(self, action_cache=None, monitor=False):
 | |
| 35 | 35 |          self.__logger = logging.getLogger(__name__)
 | 
| 36 | 36 |  | 
| 37 | +        self.__operations_by_stage = None
 | |
| 38 | +        self.__leases_by_state = None
 | |
| 39 | +        self.__queue_time_average = None
 | |
| 40 | +        self.__retries_count = 0
 | |
| 41 | + | |
| 37 | 42 |          self._action_cache = action_cache
 | 
| 38 | 43 |          self.jobs = {}
 | 
| 39 | 44 |          self.queue = deque()
 | 
| 40 | 45 |  | 
| 46 | +        self._is_instrumented = monitor
 | |
| 47 | + | |
| 48 | +        if self._is_instrumented:
 | |
| 49 | +            self.__operations_by_stage = {}
 | |
| 50 | +            self.__leases_by_state = {}
 | |
| 51 | +            self.__queue_time_average = 0, timedelta()
 | |
| 52 | + | |
| 53 | +            self.__operations_by_stage[OperationStage.CACHE_CHECK] = set()
 | |
| 54 | +            self.__operations_by_stage[OperationStage.QUEUED] = set()
 | |
| 55 | +            self.__operations_by_stage[OperationStage.EXECUTING] = set()
 | |
| 56 | +            self.__operations_by_stage[OperationStage.COMPLETED] = set()
 | |
| 57 | + | |
| 58 | +            self.__leases_by_state[LeaseState.PENDING] = set()
 | |
| 59 | +            self.__leases_by_state[LeaseState.ACTIVE] = set()
 | |
| 60 | +            self.__leases_by_state[LeaseState.COMPLETED] = set()
 | |
| 61 | + | |
| 62 | +    # --- Public API ---
 | |
| 63 | + | |
| 41 | 64 |      def register_client(self, job_name, queue):
 | 
| 42 | -        self.jobs[job_name].register_client(queue)
 | |
| 65 | +        job = self.jobs[job_name]
 | |
| 66 | + | |
| 67 | +        job.register_client(queue)
 | |
| 43 | 68 |  | 
| 44 | 69 |      def unregister_client(self, job_name, queue):
 | 
| 45 | -        self.jobs[job_name].unregister_client(queue)
 | |
| 70 | +        job = self.jobs[job_name]
 | |
| 46 | 71 |  | 
| 47 | -        if not self.jobs[job_name].n_clients and self.jobs[job_name].operation.done:
 | |
| 72 | +        job.unregister_client(queue)
 | |
| 73 | + | |
| 74 | +        if not job.n_clients and job.operation.done:
 | |
| 48 | 75 |              del self.jobs[job_name]
 | 
| 49 | 76 |  | 
| 77 | +            if self._is_instrumented:
 | |
| 78 | +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
 | |
| 79 | +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
 | |
| 80 | +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
 | |
| 81 | +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
 | |
| 82 | + | |
| 83 | +                self.__leases_by_state[LeaseState.PENDING].discard(job_name)
 | |
| 84 | +                self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
 | |
| 85 | +                self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
 | |
| 86 | + | |
| 50 | 87 |      def queue_job(self, job, skip_cache_lookup=False):
 | 
| 51 | 88 |          self.jobs[job.name] = job
 | 
| 52 | 89 |  | 
| ... | ... | @@ -62,23 +99,30 @@ class Scheduler: | 
| 62 | 99 |                  job.set_cached_result(action_result)
 | 
| 63 | 100 |                  operation_stage = OperationStage.COMPLETED
 | 
| 64 | 101 |  | 
| 102 | +                if self._is_instrumented:
 | |
| 103 | +                    self.__retries_count += 1
 | |
| 104 | + | |
| 65 | 105 |          else:
 | 
| 66 | 106 |              operation_stage = OperationStage.QUEUED
 | 
| 67 | 107 |              self.queue.append(job)
 | 
| 68 | 108 |  | 
| 69 | -        job.update_operation_stage(operation_stage)
 | |
| 109 | +        self._update_job_operation_stage(job.name, operation_stage)
 | |
| 70 | 110 |  | 
| 71 | 111 |      def retry_job(self, job_name):
 | 
| 72 | -        if job_name in self.jobs:
 | |
| 73 | -            job = self.jobs[job_name]
 | |
| 74 | -            if job.n_tries >= self.MAX_N_TRIES:
 | |
| 75 | -                # TODO: Decide what to do with these jobs
 | |
| 76 | -                job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 77 | -                # TODO: Mark these jobs as done
 | |
| 78 | -            else:
 | |
| 79 | -                job.update_operation_stage(OperationStage.QUEUED)
 | |
| 80 | -                job.update_lease_state(LeaseState.PENDING)
 | |
| 81 | -                self.queue.append(job)
 | |
| 112 | +        job = self.jobs[job_name]
 | |
| 113 | + | |
| 114 | +        operation_stage = None
 | |
| 115 | +        if job.n_tries >= self.MAX_N_TRIES:
 | |
| 116 | +            # TODO: Decide what to do with these jobs
 | |
| 117 | +            operation_stage = OperationStage.COMPLETED
 | |
| 118 | +            # TODO: Mark these jobs as done
 | |
| 119 | + | |
| 120 | +        else:
 | |
| 121 | +            operation_stage = OperationStage.QUEUED
 | |
| 122 | +            job.update_lease_state(LeaseState.PENDING)
 | |
| 123 | +            self.queue.append(job)
 | |
| 124 | + | |
| 125 | +        self._update_job_operation_stage(job_name, operation_stage)
 | |
| 82 | 126 |  | 
| 83 | 127 |      def list_jobs(self):
 | 
| 84 | 128 |          return self.jobs.values()
 | 
| ... | ... | @@ -118,17 +162,27 @@ class Scheduler: | 
| 118 | 162 |              lease_result (google.protobuf.Any): the lease execution result, only
 | 
| 119 | 163 |                  required if `lease_state` is `COMPLETED`.
 | 
| 120 | 164 |          """
 | 
| 121 | - | |
| 122 | 165 |          job = self.jobs[lease.id]
 | 
| 123 | 166 |          lease_state = LeaseState(lease.state)
 | 
| 124 | 167 |  | 
| 168 | +        operation_stage = None
 | |
| 125 | 169 |          if lease_state == LeaseState.PENDING:
 | 
| 126 | 170 |              job.update_lease_state(LeaseState.PENDING)
 | 
| 127 | -            job.update_operation_stage(OperationStage.QUEUED)
 | |
| 171 | +            operation_stage = OperationStage.QUEUED
 | |
| 172 | + | |
| 173 | +            if self._is_instrumented:
 | |
| 174 | +                self.__leases_by_state[LeaseState.PENDING].add(lease.id)
 | |
| 175 | +                self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
 | |
| 176 | +                self.__leases_by_state[LeaseState.COMPLETED].discard(lease.id)
 | |
| 128 | 177 |  | 
| 129 | 178 |          elif lease_state == LeaseState.ACTIVE:
 | 
| 130 | 179 |              job.update_lease_state(LeaseState.ACTIVE)
 | 
| 131 | -            job.update_operation_stage(OperationStage.EXECUTING)
 | |
| 180 | +            operation_stage = OperationStage.EXECUTING
 | |
| 181 | + | |
| 182 | +            if self._is_instrumented:
 | |
| 183 | +                self.__leases_by_state[LeaseState.PENDING].discard(lease.id)
 | |
| 184 | +                self.__leases_by_state[LeaseState.ACTIVE].add(lease.id)
 | |
| 185 | +                self.__leases_by_state[LeaseState.COMPLETED].discard(lease.id)
 | |
| 132 | 186 |  | 
| 133 | 187 |          elif lease_state == LeaseState.COMPLETED:
 | 
| 134 | 188 |              job.update_lease_state(LeaseState.COMPLETED,
 | 
| ... | ... | @@ -137,7 +191,14 @@ class Scheduler: | 
| 137 | 191 |              if self._action_cache is not None and not job.do_not_cache:
 | 
| 138 | 192 |                  self._action_cache.update_action_result(job.action_digest, job.action_result)
 | 
| 139 | 193 |  | 
| 140 | -            job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 194 | +            operation_stage = OperationStage.COMPLETED
 | |
| 195 | + | |
| 196 | +            if self._is_instrumented:
 | |
| 197 | +                self.__leases_by_state[LeaseState.PENDING].discard(lease.id)
 | |
| 198 | +                self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
 | |
| 199 | +                self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
 | |
| 200 | + | |
| 201 | +        self._update_job_operation_stage(lease.id, operation_stage)
 | |
| 141 | 202 |  | 
| 142 | 203 |      def get_job_lease(self, job_name):
 | 
| 143 | 204 |          """Returns the lease associated to job, if any have been emitted yet."""
 | 
| ... | ... | @@ -160,3 +221,101 @@ class Scheduler: | 
| 160 | 221 |              job_name (str): name of the job holding the operation to cancel.
 | 
| 161 | 222 |          """
 | 
| 162 | 223 |          self.jobs[job_name].cancel_operation()
 | 
| 224 | + | |
| 225 | +    # --- Public API: Monitoring ---
 | |
| 226 | + | |
| 227 | +    @property
 | |
| 228 | +    def is_instrumented(self):
 | |
| 229 | +        return self._is_instrumented
 | |
| 230 | + | |
| 231 | +    def query_n_jobs(self):
 | |
| 232 | +        return len(self.jobs)
 | |
| 233 | + | |
| 234 | +    def query_n_operations(self):
 | |
| 235 | +        # For now n_operations == n_jobs:
 | |
| 236 | +        return len(self.jobs)
 | |
| 237 | + | |
| 238 | +    def query_n_operations_by_stage(self, operation_stage):
 | |
| 239 | +        try:
 | |
| 240 | +            if self.__operations_by_stage is not None:
 | |
| 241 | +                return len(self.__operations_by_stage[operation_stage])
 | |
| 242 | +        except KeyError:
 | |
| 243 | +            pass
 | |
| 244 | +        return 0
 | |
| 245 | + | |
| 246 | +    def query_n_leases(self):
 | |
| 247 | +        return len(self.jobs)
 | |
| 248 | + | |
| 249 | +    def query_n_leases_by_state(self, lease_state):
 | |
| 250 | +        try:
 | |
| 251 | +            if self.__leases_by_state is not None:
 | |
| 252 | +                return len(self.__leases_by_state[lease_state])
 | |
| 253 | +        except KeyError:
 | |
| 254 | +            pass
 | |
| 255 | +        return 0
 | |
| 256 | + | |
| 257 | +    def query_n_retries(self):
 | |
| 258 | +        return self.__retries_count
 | |
| 259 | + | |
| 260 | +    def query_am_queue_time(self):
 | |
| 261 | +        if self.__queue_time_average is not None:
 | |
| 262 | +            return self.__queue_time_average[1]
 | |
| 263 | +        return timedelta()
 | |
| 264 | + | |
| 265 | +    # --- Private API ---
 | |
| 266 | + | |
| 267 | +    def _update_job_operation_stage(self, job_name, operation_stage):
 | |
| 268 | +        """Requests a stage transition for the job's :class:Operations.
 | |
| 269 | + | |
| 270 | +        Args:
 | |
| 271 | +            job_name (str): name of the job to query.
 | |
| 272 | +            operation_stage (OperationStage): the stage to transition to.
 | |
| 273 | +        """
 | |
| 274 | +        job = self.jobs[job_name]
 | |
| 275 | + | |
| 276 | +        if operation_stage == OperationStage.CACHE_CHECK:
 | |
| 277 | +            job.update_operation_stage(OperationStage.CACHE_CHECK)
 | |
| 278 | + | |
| 279 | +            if self._is_instrumented:
 | |
| 280 | +                self.__operations_by_stage[OperationStage.CACHE_CHECK].add(job_name)
 | |
| 281 | +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
 | |
| 282 | +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
 | |
| 283 | +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
 | |
| 284 | + | |
| 285 | +        elif operation_stage == OperationStage.QUEUED:
 | |
| 286 | +            job.update_operation_stage(OperationStage.QUEUED)
 | |
| 287 | + | |
| 288 | +            if self._is_instrumented:
 | |
| 289 | +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
 | |
| 290 | +                self.__operations_by_stage[OperationStage.QUEUED].add(job_name)
 | |
| 291 | +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
 | |
| 292 | +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
 | |
| 293 | + | |
| 294 | +        elif operation_stage == OperationStage.EXECUTING:
 | |
| 295 | +            job.update_operation_stage(OperationStage.EXECUTING)
 | |
| 296 | + | |
| 297 | +            if self._is_instrumented:
 | |
| 298 | +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
 | |
| 299 | +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
 | |
| 300 | +                self.__operations_by_stage[OperationStage.EXECUTING].add(job_name)
 | |
| 301 | +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
 | |
| 302 | + | |
| 303 | +        elif operation_stage == OperationStage.COMPLETED:
 | |
| 304 | +            job.update_operation_stage(OperationStage.COMPLETED)
 | |
| 305 | + | |
| 306 | +            if self._is_instrumented:
 | |
| 307 | +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
 | |
| 308 | +                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
 | |
| 309 | +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
 | |
| 310 | +                self.__operations_by_stage[OperationStage.COMPLETED].add(job_name)
 | |
| 311 | + | |
| 312 | +                average_order, average_time = self.__queue_time_average
 | |
| 313 | + | |
| 314 | +                average_order += 1
 | |
| 315 | +                if average_order <= 1:
 | |
| 316 | +                    average_time = job.query_queue_time()
 | |
| 317 | +                else:
 | |
| 318 | +                    queue_time = job.query_queue_time()
 | |
| 319 | +                    average_time = average_time + ((queue_time - average_time) / average_order)
 | |
| 320 | + | |
| 321 | +                self.__queue_time_average = average_order, average_time | 
| 1 | +# Copyright (C) 2018 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 1 | 16 |  import hashlib
 | 
| 2 | 17 |  | 
| 3 | 18 |  | 
| 4 | -# The hash function that CAS uses
 | |
| 19 | +# Hash function used for computing digests:
 | |
| 5 | 20 |  HASH = hashlib.sha256
 | 
| 21 | + | |
| 22 | +# Lenght in bytes of a hash string returned by HASH:
 | |
| 6 | 23 |  HASH_LENGTH = HASH().digest_size * 2
 | 
| 24 | + | |
| 25 | +# Period, in seconds, for the monitoring cycle:
 | |
| 26 | +MONITORING_PERIOD = 5.0 | 
| ... | ... | @@ -112,13 +112,15 @@ setup( | 
| 112 | 112 |      license="Apache License, Version 2.0",
 | 
| 113 | 113 |      description="A remote execution service",
 | 
| 114 | 114 |      packages=find_packages(),
 | 
| 115 | +    python_requires='>= 3.5.3',  # janus requirement
 | |
| 115 | 116 |      install_requires=[
 | 
| 116 | -        'protobuf',
 | |
| 117 | -        'grpcio',
 | |
| 118 | -        'Click',
 | |
| 119 | -        'PyYAML',
 | |
| 120 | 117 |          'boto3 < 1.8.0',
 | 
| 121 | 118 |          'botocore < 1.11.0',
 | 
| 119 | +        'click',
 | |
| 120 | +        'grpcio',
 | |
| 121 | +        'janus',
 | |
| 122 | +        'protobuf',
 | |
| 123 | +        'pyyaml',
 | |
| 122 | 124 |      ],
 | 
| 123 | 125 |      entry_points={
 | 
| 124 | 126 |          'console_scripts': [
 | 
