[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 6 commits: Do not duplicate jobs for same action



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

9 changed files:

Changes:

  • buildgrid/server/bots/instance.py
    ... ... @@ -123,7 +123,7 @@ class BotsInterface:
    123 123
                 # Job does not exist, remove from bot.
    
    124 124
                 return None
    
    125 125
     
    
    126
    -        self._scheduler.update_job_lease(lease)
    
    126
    +        self._scheduler.update_job_lease_state(lease.id, lease)
    
    127 127
     
    
    128 128
             if lease_state == LeaseState.COMPLETED:
    
    129 129
                 return None
    
    ... ... @@ -161,7 +161,7 @@ class BotsInterface:
    161 161
                     self.__logger.error("Assigned lease id=[%s],"
    
    162 162
                                         " not found on bot with name=[%s] and id=[%s]."
    
    163 163
                                         " Retrying job", lease_id, bot_session.name, bot_session.bot_id)
    
    164
    -                self._scheduler.retry_job(lease_id)
    
    164
    +                self._scheduler.retry_job_lease(lease_id)
    
    165 165
     
    
    166 166
         def _close_bot_session(self, name):
    
    167 167
             """ Before removing the session, close any leases and
    
    ... ... @@ -174,7 +174,7 @@ class BotsInterface:
    174 174
     
    
    175 175
             self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
    
    176 176
             for lease_id in self._assigned_leases[name]:
    
    177
    -            self._scheduler.retry_job(lease_id)
    
    177
    +            self._scheduler.retry_job_lease(lease_id)
    
    178 178
             self._assigned_leases.pop(name)
    
    179 179
     
    
    180 180
             self.__logger.debug("Closing bot session: [%s]", name)
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -23,9 +23,7 @@ import logging
    23 23
     
    
    24 24
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26
    -
    
    27
    -from ..job import Job
    
    28
    -from ...utils import get_hash_type
    
    26
    +from buildgrid.utils import get_hash_type
    
    29 27
     
    
    30 28
     
    
    31 29
     class ExecutionInstance:
    
    ... ... @@ -46,44 +44,45 @@ class ExecutionInstance:
    46 44
         def hash_type(self):
    
    47 45
             return get_hash_type()
    
    48 46
     
    
    49
    -    def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
    
    47
    +    def execute(self, action_digest, skip_cache_lookup):
    
    50 48
             """ Sends a job for execution.
    
    51 49
             Queues an action and creates an Operation instance to be associated with
    
    52 50
             this action.
    
    53 51
             """
    
    54
    -
    
    55 52
             action = self._storage.get_message(action_digest, Action)
    
    56 53
     
    
    57 54
             if not action:
    
    58 55
                 raise FailedPreconditionError("Could not get action from storage.")
    
    59 56
     
    
    60
    -        job = Job(action, action_digest)
    
    61
    -        if peer is not None and message_queue is not None:
    
    62
    -            job.register_operation_client(peer, message_queue)
    
    63
    -
    
    64
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    57
    +        return self._scheduler.queue_job_operation(action, action_digest, skip_cache_lookup)
    
    65 58
     
    
    66
    -        return job.operation
    
    67
    -
    
    68
    -    def register_operation_client(self, job_name, peer, message_queue):
    
    59
    +    def register_operation_client(self, operation_name, peer, message_queue):
    
    69 60
             try:
    
    70
    -            self._scheduler.job_name(job_name, peer, message_queue)
    
    61
    +            return self._scheduler.register_job_operation_client(operation_name,
    
    62
    +                                                                 peer, message_queue)
    
    71 63
     
    
    72 64
             except NotFoundError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    65
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    66
    +                                       .format(operation_name))
    
    74 67
     
    
    75
    -    def unregister_operation_client(self, job_name, peer):
    
    68
    +    def unregister_operation_client(self, operation_name, peer):
    
    76 69
             try:
    
    77
    -            self._scheduler.unregister_operation_client(job_name, peer)
    
    70
    +            self._scheduler.unregister_job_operation_client(operation_name, peer)
    
    78 71
     
    
    79 72
             except NotFoundError:
    
    80
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    73
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    74
    +                                       .format(operation_name))
    
    75
    +
    
    76
    +    def stream_operation_updates(self, message_queue):
    
    77
    +        error, operation = message_queue.get()
    
    78
    +        if error is not None:
    
    79
    +            raise error
    
    80
    +
    
    81
    +        while not operation.done:
    
    82
    +            yield operation
    
    81 83
     
    
    82
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    83
    -        job = message_queue.get()
    
    84
    -        while not job.operation.done:
    
    85
    -            yield job.operation
    
    86
    -            job = message_queue.get()
    
    87
    -            job.check_operation_status()
    
    84
    +            error, operation = message_queue.get()
    
    85
    +            if error is not None:
    
    86
    +                raise error
    
    88 87
     
    
    89
    -        yield job.operation
    88
    +        yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -97,13 +97,14 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    97 97
             try:
    
    98 98
                 instance = self._get_instance(instance_name)
    
    99 99
     
    
    100
    -            operation = instance.execute(request.action_digest,
    
    101
    -                                         request.skip_cache_lookup,
    
    102
    -                                         peer=peer,
    
    103
    -                                         message_queue=message_queue)
    
    100
    +            job_name = instance.execute(request.action_digest,
    
    101
    +                                        request.skip_cache_lookup)
    
    102
    +
    
    103
    +            operation_name = instance.register_operation_client(job_name,
    
    104
    +                                                                peer, message_queue)
    
    104 105
     
    
    105 106
                 context.add_callback(partial(self._rpc_termination_callback,
    
    106
    -                                         peer, instance_name, operation.name))
    
    107
    +                                         peer, instance_name, operation_name))
    
    107 108
     
    
    108 109
                 if self._is_instrumented:
    
    109 110
                     if peer not in self.__peers:
    
    ... ... @@ -112,16 +113,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    112 113
                     else:
    
    113 114
                         self.__peers[peer] += 1
    
    114 115
     
    
    115
    -            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    116
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    116 117
     
    
    117
    -            self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    118
    +            self.__logger.info("Operation name: [%s]", operation_full_name)
    
    118 119
     
    
    119
    -            for operation in instance.stream_operation_updates(message_queue,
    
    120
    -                                                               operation.name):
    
    121
    -                op = operations_pb2.Operation()
    
    122
    -                op.CopyFrom(operation)
    
    123
    -                op.name = instanced_op_name
    
    124
    -                yield op
    
    120
    +            for operation in instance.stream_operation_updates(message_queue):
    
    121
    +                operation.name = operation_full_name
    
    122
    +                yield operation
    
    125 123
     
    
    126 124
             except InvalidArgumentError as e:
    
    127 125
                 self.__logger.error(e)
    
    ... ... @@ -159,8 +157,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    159 157
             try:
    
    160 158
                 instance = self._get_instance(instance_name)
    
    161 159
     
    
    162
    -            instance.register_operation_client(operation_name,
    
    163
    -                                               peer, message_queue)
    
    160
    +            operation_name = instance.register_operation_client(operation_name,
    
    161
    +                                                                peer, message_queue)
    
    164 162
     
    
    165 163
                 context.add_callback(partial(self._rpc_termination_callback,
    
    166 164
                                              peer, instance_name, operation_name))
    
    ... ... @@ -172,12 +170,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    172 170
                     else:
    
    173 171
                         self.__peers[peer] += 1
    
    174 172
     
    
    175
    -            for operation in instance.stream_operation_updates(message_queue,
    
    176
    -                                                               operation_name):
    
    177
    -                op = operations_pb2.Operation()
    
    178
    -                op.CopyFrom(operation)
    
    179
    -                op.name = request.name
    
    180
    -                yield op
    
    173
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    174
    +
    
    175
    +            for operation in instance.stream_operation_updates(message_queue):
    
    176
    +                operation.name = operation_full_name
    
    177
    +                yield operation
    
    181 178
     
    
    182 179
             except InvalidArgumentError as e:
    
    183 180
                 self.__logger.error(e)
    
    ... ... @@ -212,10 +209,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    212 209
     
    
    213 210
         # --- Private API ---
    
    214 211
     
    
    215
    -    def _rpc_termination_callback(self, peer, instance_name, job_name):
    
    212
    +    def _rpc_termination_callback(self, peer, instance_name, operation_name):
    
    216 213
             instance = self._get_instance(instance_name)
    
    217 214
     
    
    218
    -        instance.unregister_operation_client(job_name, peer)
    
    215
    +        instance.unregister_operation_client(operation_name, peer)
    
    219 216
     
    
    220 217
             if self._is_instrumented:
    
    221 218
                 if self.__peers[peer] > 1:
    

  • buildgrid/server/job.py
    ... ... @@ -20,7 +20,7 @@ import uuid
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    21 21
     
    
    22 22
     from buildgrid._enums import LeaseState, OperationStage
    
    23
    -from buildgrid._exceptions import CancelledError
    
    23
    +from buildgrid._exceptions import CancelledError, NotFoundError
    
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 25
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 26
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -35,30 +35,32 @@ class Job:
    35 35
             self._name = str(uuid.uuid4())
    
    36 36
             self._priority = priority
    
    37 37
             self._action = remote_execution_pb2.Action()
    
    38
    -        self._operation = operations_pb2.Operation()
    
    39 38
             self._lease = None
    
    40 39
     
    
    41 40
             self.__execute_response = None
    
    42 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    42
    +        self.__operations_by_name = {}
    
    43
    +        self.__operations_by_peer = {}
    
    43 44
     
    
    44 45
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    45 46
             self.__queued_time_duration = duration_pb2.Duration()
    
    46 47
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    47 48
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    48 49
     
    
    49
    -        self.__operation_message_queues = {}
    
    50
    -        self.__operation_cancelled = False
    
    50
    +        self.__operations_message_queues = {}
    
    51
    +        self.__operations_cancelled = set()
    
    51 52
             self.__lease_cancelled = False
    
    53
    +        self.__job_cancelled = False
    
    52 54
     
    
    53 55
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    54 56
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    55 57
     
    
    56 58
             self._action.CopyFrom(action)
    
    57 59
             self._do_not_cache = self._action.do_not_cache
    
    58
    -        self._operation.name = self._name
    
    59
    -        self._operation.done = False
    
    60 60
             self._n_tries = 0
    
    61 61
     
    
    62
    +        self._done = False
    
    63
    +
    
    62 64
         def __eq__(self, other):
    
    63 65
             if isinstance(other, Job):
    
    64 66
                 return self.name == other.name
    
    ... ... @@ -78,12 +80,14 @@ class Job:
    78 80
             return self._priority
    
    79 81
     
    
    80 82
         @property
    
    81
    -    def do_not_cache(self):
    
    82
    -        return self._do_not_cache
    
    83
    +    def done(self):
    
    84
    +        return self._done
    
    85
    +
    
    86
    +    # --- Public API: REAPI ---
    
    83 87
     
    
    84 88
         @property
    
    85
    -    def action(self):
    
    86
    -        return self._action
    
    89
    +    def do_not_cache(self):
    
    90
    +        return self._do_not_cache
    
    87 91
     
    
    88 92
         @property
    
    89 93
         def action_digest(self):
    
    ... ... @@ -97,56 +101,49 @@ class Job:
    97 101
                 return None
    
    98 102
     
    
    99 103
         @property
    
    100
    -    def holds_cached_action_result(self):
    
    104
    +    def holds_cached_result(self):
    
    101 105
             if self.__execute_response is not None:
    
    102 106
                 return self.__execute_response.cached_result
    
    103 107
             else:
    
    104 108
                 return False
    
    105 109
     
    
    106
    -    @property
    
    107
    -    def operation(self):
    
    108
    -        return self._operation
    
    109
    -
    
    110
    -    @property
    
    111
    -    def operation_stage(self):
    
    112
    -        return OperationStage(self.__operation_metadata.state)
    
    113
    -
    
    114
    -    @property
    
    115
    -    def lease(self):
    
    116
    -        return self._lease
    
    110
    +    def set_cached_result(self, action_result):
    
    111
    +        """Allows specifying an action result form the action cache for the job.
    
    117 112
     
    
    118
    -    @property
    
    119
    -    def lease_state(self):
    
    120
    -        if self._lease is not None:
    
    121
    -            return LeaseState(self._lease.state)
    
    122
    -        else:
    
    123
    -            return None
    
    113
    +        Note:
    
    114
    +            This won't trigger any :class:`Operation` stage transition.
    
    124 115
     
    
    125
    -    @property
    
    126
    -    def lease_cancelled(self):
    
    127
    -        return self.__lease_cancelled
    
    128
    -
    
    129
    -    @property
    
    130
    -    def n_tries(self):
    
    131
    -        return self._n_tries
    
    116
    +        Args:
    
    117
    +            action_result (ActionResult): The result from cache.
    
    118
    +        """
    
    119
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    120
    +        self.__execute_response.result.CopyFrom(action_result)
    
    121
    +        self.__execute_response.cached_result = True
    
    132 122
     
    
    133 123
         @property
    
    134 124
         def n_clients(self):
    
    135
    -        return len(self.__operation_message_queues)
    
    125
    +        return len(self.__operations_message_queues)
    
    136 126
     
    
    137 127
         def register_operation_client(self, peer, message_queue):
    
    138 128
             """Subscribes to the job's :class:`Operation` stage changes.
    
    139 129
     
    
    140
    -        Queues this :object:`Job` instance.
    
    141
    -
    
    142 130
             Args:
    
    143 131
                 peer (str): a unique string identifying the client.
    
    144 132
                 message_queue (queue.Queue): the event queue to register.
    
    133
    +
    
    134
    +        Returns:
    
    135
    +            str: The name of the subscribed :class:`Operation`.
    
    145 136
             """
    
    146
    -        if peer not in self.__operation_message_queues:
    
    147
    -            self.__operation_message_queues[peer] = message_queue
    
    137
    +        if peer in self.__operations_by_peer:
    
    138
    +            operation = self.__operations_by_peer[peer]
    
    139
    +        else:
    
    140
    +            operation = self.create_operation(peer)
    
    141
    +
    
    142
    +        self.__operations_message_queues[peer] = message_queue
    
    143
    +
    
    144
    +        self._send_operations_updates(peers=[peer])
    
    148 145
     
    
    149
    -        message_queue.put(self)
    
    146
    +        return operation.name
    
    150 147
     
    
    151 148
         def unregister_operation_client(self, peer):
    
    152 149
             """Unsubscribes to the job's :class:`Operation` stage change.
    
    ... ... @@ -154,25 +151,181 @@ class Job:
    154 151
             Args:
    
    155 152
                 peer (str): a unique string identifying the client.
    
    156 153
             """
    
    157
    -        if peer not in self.__operation_message_queues:
    
    158
    -            del self.__operation_message_queues[peer]
    
    154
    +        if peer in self.__operations_message_queues:
    
    155
    +            del self.__operations_message_queues[peer]
    
    159 156
     
    
    160
    -    def set_cached_result(self, action_result):
    
    161
    -        """Allows specifying an action result form the action cache for the job.
    
    157
    +        # Drop the operation if nobody is watching it anymore:
    
    158
    +        if peer in self.__operations_by_peer:
    
    159
    +            operation = self.__operations_by_peer[peer]
    
    160
    +
    
    161
    +            if operation not in self.__operations_by_peer.values():
    
    162
    +                del self.__operations_by_name[operation.name]
    
    163
    +
    
    164
    +            del self.__operations_by_peer[peer]
    
    165
    +
    
    166
    +    def create_operation(self, peer):
    
    167
    +        """Generates a new :class:`Operation` for `peer`.
    
    168
    +
    
    169
    +        Args:
    
    170
    +            peer (str): a unique string identifying the client.
    
    162 171
             """
    
    163
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    164
    -        self.__execute_response.result.CopyFrom(action_result)
    
    165
    -        self.__execute_response.cached_result = True
    
    172
    +        if peer in self.__operations_by_peer:
    
    173
    +            return self.__operations_by_peer[peer]
    
    174
    +
    
    175
    +        new_operation = operations_pb2.Operation()
    
    176
    +        # Copy state from first existing and non cancelled operation:
    
    177
    +        for operation in self.__operations_by_name.values():
    
    178
    +            if operation.name not in self.__operations_cancelled:
    
    179
    +                new_operation.CopyFrom(operation)
    
    180
    +                break
    
    181
    +
    
    182
    +        new_operation.name = str(uuid.uuid4())
    
    183
    +
    
    184
    +        self.__operations_by_name[new_operation.name] = new_operation
    
    185
    +        self.__operations_by_peer[peer] = new_operation
    
    186
    +
    
    187
    +        return new_operation
    
    188
    +
    
    189
    +    def list_operations(self):
    
    190
    +        """Lists the :class:`Operation` related to a job.
    
    191
    +
    
    192
    +        Returns:
    
    193
    +            list: A list of :class:`Operation` names.
    
    194
    +        """
    
    195
    +        return list(self.__operations_by_name.keys())
    
    196
    +
    
    197
    +    def get_operation(self, operation_name):
    
    198
    +        """Returns a copy of the the job's :class:`Operation`.
    
    199
    +
    
    200
    +        Args:
    
    201
    +            operation_name (str): the operation's name.
    
    202
    +
    
    203
    +        Raises:
    
    204
    +            NotFoundError: If no operation with `operation_name` exists.
    
    205
    +        """
    
    206
    +        try:
    
    207
    +            operation = self.__operations_by_name[operation_name]
    
    208
    +
    
    209
    +        except KeyError:
    
    210
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    211
    +                                .format(operation_name))
    
    212
    +
    
    213
    +        return self._copy_operation(operation)
    
    214
    +
    
    215
    +    def update_operation_stage(self, stage):
    
    216
    +        """Operates a stage transition for the job's :class:`Operation`.
    
    217
    +
    
    218
    +        Args:
    
    219
    +            stage (OperationStage): the operation stage to transition to.
    
    220
    +        """
    
    221
    +        if stage.value == self.__operation_metadata.stage:
    
    222
    +            return
    
    223
    +
    
    224
    +        self.__operation_metadata.stage = stage.value
    
    225
    +
    
    226
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    227
    +            if self.__queued_timestamp.ByteSize() == 0:
    
    228
    +                self.__queued_timestamp.GetCurrentTime()
    
    229
    +            self._n_tries += 1
    
    230
    +
    
    231
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    232
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    233
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    234
    +
    
    235
    +        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    236
    +            self._done = True
    
    237
    +
    
    238
    +        for operation in self.__operations_by_name.values():
    
    239
    +            if operation.name in self.__operations_cancelled:
    
    240
    +                continue
    
    241
    +
    
    242
    +            if self.__execute_response is not None:
    
    243
    +                operation.response.Pack(self.__execute_response)
    
    244
    +
    
    245
    +            operation.metadata.Pack(self.__operation_metadata)
    
    246
    +            operation.done = self._done
    
    247
    +
    
    248
    +        self._send_operations_updates()
    
    249
    +
    
    250
    +    def cancel_operation(self, peer):
    
    251
    +        """Triggers a job's :class:`Operation` cancellation.
    
    252
    +
    
    253
    +        This may cancel any job's :class:`Lease` that may have been issued.
    
    254
    +
    
    255
    +        Args:
    
    256
    +            peer (str): a unique string identifying the client.
    
    257
    +        """
    
    258
    +        operation_names, peers = set(), set()
    
    259
    +        if peer in self.__operations_by_peer:
    
    260
    +            operation_names.add(self.__operations_by_peer[peer].name)
    
    261
    +            peers.add(peer)
    
    262
    +
    
    263
    +        else:
    
    264
    +            operation_names.update(self.__operations_by_name.keys())
    
    265
    +            peers.update(self.__operations_by_peer.keys())
    
    266
    +
    
    267
    +        operations_cancelled = operation_names - self.__operations_cancelled
    
    268
    +        if not operations_cancelled:
    
    269
    +            return
    
    270
    +
    
    271
    +        operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    272
    +        operation_metadata.CopyFrom(self.__operation_metadata)
    
    273
    +        operation_metadata.stage = OperationStage.COMPLETED.value
    
    274
    +
    
    275
    +        execute_response = remote_execution_pb2.ExecuteResponse()
    
    276
    +        if self.__execute_response is not None:
    
    277
    +            execute_response.CopyFrom(self.__execute_response)
    
    278
    +        execute_response.status.code = code_pb2.CANCELLED
    
    279
    +        execute_response.status.message = "Operation cancelled by client."
    
    280
    +
    
    281
    +        for operation_name in operations_cancelled:
    
    282
    +            operation = self.__operations_by_name[operation_name]
    
    283
    +
    
    284
    +            operation.metadata.Pack(operation_metadata)
    
    285
    +            operation.response.Pack(execute_response)
    
    286
    +            operation.done = True
    
    287
    +
    
    288
    +            self.__operations_cancelled.add(operation_name)
    
    289
    +
    
    290
    +        operation_names = set(self.__operations_by_name.keys())
    
    291
    +        # Job is cancelled if all the operation are:
    
    292
    +        self.__job_cancelled = bool(operation_names - self.__operations_cancelled)
    
    293
    +
    
    294
    +        if self.__job_cancelled and self._lease is not None:
    
    295
    +            self.cancel_lease()
    
    296
    +
    
    297
    +        self._send_operations_updates(peers=peers, notify_cancelled=True)
    
    298
    +
    
    299
    +    # --- Public API: RWAPI ---
    
    300
    +
    
    301
    +    @property
    
    302
    +    def lease(self):
    
    303
    +        return self._lease
    
    304
    +
    
    305
    +    @property
    
    306
    +    def lease_state(self):
    
    307
    +        if self._lease is not None:
    
    308
    +            return LeaseState(self._lease.state)
    
    309
    +        else:
    
    310
    +            return None
    
    311
    +
    
    312
    +    @property
    
    313
    +    def lease_cancelled(self):
    
    314
    +        return self.__lease_cancelled
    
    315
    +
    
    316
    +    @property
    
    317
    +    def n_tries(self):
    
    318
    +        return self._n_tries
    
    166 319
     
    
    167 320
         def create_lease(self):
    
    168 321
             """Emits a new :class:`Lease` for the job.
    
    169 322
     
    
    170 323
             Only one :class:`Lease` can be emitted for a given job. This method
    
    171
    -        should only be used once, any furhter calls are ignored.
    
    324
    +        should only be used once, any further calls are ignored.
    
    172 325
             """
    
    173
    -        if self.__operation_cancelled:
    
    174
    -            return None
    
    175
    -        elif self._lease is not None:
    
    326
    +        if self._lease is not None:
    
    327
    +            return self._lease
    
    328
    +        elif self.__job_cancelled:
    
    176 329
                 return None
    
    177 330
     
    
    178 331
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -183,14 +336,14 @@ class Job:
    183 336
             return self._lease
    
    184 337
     
    
    185 338
         def update_lease_state(self, state, status=None, result=None):
    
    186
    -        """Operates a state transition for the job's current :class:Lease.
    
    339
    +        """Operates a state transition for the job's current :class:`Lease`.
    
    187 340
     
    
    188 341
             Args:
    
    189 342
                 state (LeaseState): the lease state to transition to.
    
    190
    -            status (google.rpc.Status): the lease execution status, only
    
    191
    -                required if `state` is `COMPLETED`.
    
    192
    -            result (google.protobuf.Any): the lease execution result, only
    
    193
    -                required if `state` is `COMPLETED`.
    
    343
    +            status (google.rpc.Status, optional): the lease execution status,
    
    344
    +                only required if `state` is `COMPLETED`.
    
    345
    +            result (google.protobuf.Any, optional): the lease execution result,
    
    346
    +                only required if `state` is `COMPLETED`.
    
    194 347
             """
    
    195 348
             if state.value == self._lease.state:
    
    196 349
                 return
    
    ... ... @@ -231,72 +384,49 @@ class Job:
    231 384
                 self.__execute_response.status.CopyFrom(status)
    
    232 385
     
    
    233 386
         def cancel_lease(self):
    
    234
    -        """Triggers a job's :class:Lease cancellation.
    
    387
    +        """Triggers a job's :class:`Lease` cancellation.
    
    235 388
     
    
    236
    -        This will not cancel the job's :class:Operation.
    
    389
    +        Note:
    
    390
    +            This will not cancel the job's :class:`Operation`.
    
    237 391
             """
    
    238 392
             self.__lease_cancelled = True
    
    239 393
             if self._lease is not None:
    
    240 394
                 self.update_lease_state(LeaseState.CANCELLED)
    
    241 395
     
    
    242
    -    def update_operation_stage(self, stage):
    
    243
    -        """Operates a stage transition for the job's :class:Operation.
    
    244
    -
    
    245
    -        Args:
    
    246
    -            stage (OperationStage): the operation stage to transition to.
    
    247
    -        """
    
    248
    -        if stage.value == self.__operation_metadata.stage:
    
    249
    -            return
    
    250
    -
    
    251
    -        self.__operation_metadata.stage = stage.value
    
    252
    -
    
    253
    -        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    254
    -            if self.__queued_timestamp.ByteSize() == 0:
    
    255
    -                self.__queued_timestamp.GetCurrentTime()
    
    256
    -            self._n_tries += 1
    
    257
    -
    
    258
    -        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    259
    -            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    260
    -            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    261
    -
    
    262
    -        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    263
    -            if self.__execute_response is not None:
    
    264
    -                self._operation.response.Pack(self.__execute_response)
    
    265
    -            self._operation.done = True
    
    266
    -
    
    267
    -        self._operation.metadata.Pack(self.__operation_metadata)
    
    268
    -
    
    269
    -        for message_queue in self.__operation_message_queues.values():
    
    270
    -            message_queue.put(self)
    
    396
    +    # --- Public API: Monitoring ---
    
    271 397
     
    
    272
    -    def check_operation_status(self):
    
    273
    -        """Reports errors on unexpected job's :class:Operation state.
    
    398
    +    def query_queue_time(self):
    
    399
    +        return self.__queued_time_duration.ToTimedelta()
    
    274 400
     
    
    275
    -        Raises:
    
    276
    -            CancelledError: if the job's :class:Operation was cancelled.
    
    277
    -        """
    
    278
    -        if self.__operation_cancelled:
    
    279
    -            raise CancelledError(self.__execute_response.status.message)
    
    401
    +    def query_n_retries(self):
    
    402
    +        return self._n_tries - 1 if self._n_tries > 0 else 0
    
    280 403
     
    
    281
    -    def cancel_operation(self):
    
    282
    -        """Triggers a job's :class:Operation cancellation.
    
    404
    +    # --- Private API ---
    
    283 405
     
    
    284
    -        This will also cancel any job's :class:Lease that may have been issued.
    
    285
    -        """
    
    286
    -        self.__operation_cancelled = True
    
    287
    -        if self._lease is not None:
    
    288
    -            self.cancel_lease()
    
    406
    +    def _copy_operation(self, operation):
    
    407
    +        """Simply duplicates a given :class:`Lease` object."""
    
    408
    +        new_operation = operations_pb2.Operation()
    
    289 409
     
    
    290
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    291
    -        self.__execute_response.status.code = code_pb2.CANCELLED
    
    292
    -        self.__execute_response.status.message = "Operation cancelled by client."
    
    410
    +        new_operation.CopyFrom(operation)
    
    293 411
     
    
    294
    -        self.update_operation_stage(OperationStage.COMPLETED)
    
    412
    +        return new_operation
    
    295 413
     
    
    296
    -    # --- Public API: Monitoring ---
    
    414
    +    def _send_operations_updates(self, peers=None, notify_cancelled=False):
    
    415
    +        """Sends :class:`Operation` stage change messages to watchers."""
    
    416
    +        for peer, message_queue in self.__operations_message_queues.items():
    
    417
    +            if peer not in self.__operations_by_peer:
    
    418
    +                continue
    
    419
    +            elif peers and peer not in peers:
    
    420
    +                continue
    
    297 421
     
    
    298
    -    def query_queue_time(self):
    
    299
    -        return self.__queued_time_duration.ToTimedelta()
    
    422
    +            operation = self.__operations_by_peer[peer]
    
    423
    +            # Messages are pairs of (Exception, Operation,):
    
    424
    +            if not notify_cancelled and operation.name in self.__operations_cancelled:
    
    425
    +                continue
    
    426
    +            elif operation.name not in self.__operations_cancelled:
    
    427
    +                message = (None, self._copy_operation(operation),)
    
    428
    +            else:
    
    429
    +                message = (CancelledError(self.__execute_response.status.message),
    
    430
    +                           self._copy_operation(operation),)
    
    300 431
     
    
    301
    -    def query_n_retries(self):
    
    302
    -        return self._n_tries - 1 if self._n_tries > 0 else 0
    432
    +            message_queue.put(message)

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -39,39 +39,40 @@ class OperationsInstance:
    39 39
         def register_instance_with_server(self, instance_name, server):
    
    40 40
             server.add_operations_instance(self, instance_name)
    
    41 41
     
    
    42
    -    def get_operation(self, name):
    
    43
    -        job = self._scheduler.jobs.get(name)
    
    42
    +    def get_operation(self, job_name):
    
    43
    +        try:
    
    44
    +            operation = self._scheduler.get_job_operation(job_name)
    
    44 45
     
    
    45
    -        if job is None:
    
    46
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    46
    +        except NotFoundError:
    
    47
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    47 48
     
    
    48
    -        else:
    
    49
    -            return job.operation
    
    49
    +        return operation
    
    50 50
     
    
    51 51
         def list_operations(self, list_filter, page_size, page_token):
    
    52 52
             # TODO: Pages
    
    53 53
             # Spec says number of pages and length of a page are optional
    
    54 54
             response = operations_pb2.ListOperationsResponse()
    
    55
    +
    
    55 56
             operations = []
    
    56
    -        for job in self._scheduler.list_jobs():
    
    57
    -            op = operations_pb2.Operation()
    
    58
    -            op.CopyFrom(job.operation)
    
    59
    -            operations.append(op)
    
    57
    +        for job_name in self._scheduler.list_current_jobs():
    
    58
    +            operation = self._scheduler.get_job_operation(job_name)
    
    59
    +            operations.append(operation)
    
    60 60
     
    
    61 61
             response.operations.extend(operations)
    
    62 62
     
    
    63 63
             return response
    
    64 64
     
    
    65
    -    def delete_operation(self, name):
    
    65
    +    def delete_operation(self, job_name):
    
    66 66
             try:
    
    67
    -            self._scheduler.jobs.pop(name)
    
    67
    +            # TODO: Unregister the caller client
    
    68
    +            pass
    
    68 69
     
    
    69
    -        except KeyError:
    
    70
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    70
    +        except NotFoundError:
    
    71
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    71 72
     
    
    72
    -    def cancel_operation(self, name):
    
    73
    +    def cancel_operation(self, job_name):
    
    73 74
             try:
    
    74
    -            self._scheduler.cancel_job_operation(name)
    
    75
    +            self._scheduler.cancel_job_operation(job_name)
    
    75 76
     
    
    76
    -        except KeyError:
    
    77
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    77
    +        except NotFoundError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/scheduler.py
    ... ... @@ -25,6 +25,7 @@ import logging
    25 25
     
    
    26 26
     from buildgrid._enums import LeaseState, OperationStage
    
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28
    +from buildgrid.server.job import Job
    
    28 29
     
    
    29 30
     
    
    30 31
     class Scheduler:
    
    ... ... @@ -42,7 +43,11 @@ class Scheduler:
    42 43
             self.__retries_count = 0
    
    43 44
     
    
    44 45
             self._action_cache = action_cache
    
    45
    -        self.jobs = {}
    
    46
    +
    
    47
    +        self.__jobs_by_action = {}
    
    48
    +        self.__jobs_by_operation = {}
    
    49
    +        self.__jobs_by_name = {}
    
    50
    +
    
    46 51
             self.__queue = deque()
    
    47 52
     
    
    48 53
             self._is_instrumented = monitor
    
    ... ... @@ -52,55 +57,94 @@ class Scheduler:
    52 57
     
    
    53 58
         # --- Public API ---
    
    54 59
     
    
    55
    -    def register_operation_client(self, job_name, peer, message_queue):
    
    60
    +    def list_current_jobs(self):
    
    61
    +        """Returns a list of the :class:`Job` objects currently managed."""
    
    62
    +        return self.__jobs_by_name.keys()
    
    63
    +
    
    64
    +    # --- Public API: REAPI ---
    
    65
    +
    
    66
    +    def register_job_operation_client(self, operation_name, peer, message_queue):
    
    56 67
             """Subscribes to one of the job's :class:`Operation` stage changes.
    
    57 68
     
    
    58 69
             Args:
    
    59
    -            job_name (str): name of the job subscribe to.
    
    70
    +            operation_name (str): name of the operation to subscribe to.
    
    60 71
                 peer (str): a unique string identifying the client.
    
    61 72
                 message_queue (queue.Queue): the event queue to register.
    
    62 73
     
    
    74
    +        Returns:
    
    75
    +            str: The name of the subscribed :class:`Operation`.
    
    76
    +
    
    63 77
             Raises:
    
    64
    -            NotFoundError: If no job with `job_name` exists.
    
    78
    +            NotFoundError: If no operation with `operation_name` exists.
    
    65 79
             """
    
    66
    -        try:
    
    67
    -            job = self.jobs[job_name]
    
    68
    -        except KeyError:
    
    69
    -            raise NotFoundError('No job named {} found.'.format(job_name))
    
    80
    +        if operation_name in self.__jobs_by_operation:
    
    81
    +            job = self.__jobs_by_operation[operation_name]
    
    70 82
     
    
    71
    -        job.register_operation_client(peer, message_queue)
    
    83
    +        elif operation_name in self.__jobs_by_name:
    
    84
    +            job = self.__jobs_by_name[operation_name]
    
    72 85
     
    
    73
    -    def unregister_operation_client(self, job_name, peer):
    
    86
    +        else:
    
    87
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    88
    +                                .format(operation_name))
    
    89
    +
    
    90
    +        operation_name = job.register_operation_client(peer, message_queue)
    
    91
    +
    
    92
    +        self.__jobs_by_operation[operation_name] = job
    
    93
    +
    
    94
    +        return operation_name
    
    95
    +
    
    96
    +    def unregister_job_operation_client(self, operation_name, peer):
    
    74 97
             """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    75 98
     
    
    76 99
             Args:
    
    77
    -            job_name (str): name of the job to unsubscribe from.
    
    100
    +            operation_name (str): name of the operation to unsubscribe from.
    
    78 101
                 peer (str): a unique string identifying the client.
    
    79 102
     
    
    80 103
             Raises:
    
    81
    -            NotFoundError: If no job with `job_name` exists.
    
    104
    +            NotFoundError: If no operation with `operation_name` exists.
    
    82 105
             """
    
    83
    -        try:
    
    84
    -            job = self.jobs[job_name]
    
    85
    -        except KeyError:
    
    86
    -            raise NotFoundError('No job named {} found.'.format(job_name))
    
    106
    +        if operation_name in self.__jobs_by_operation:
    
    107
    +            job = self.__jobs_by_operation[operation_name]
    
    108
    +
    
    109
    +        elif operation_name in self.__jobs_by_name:
    
    110
    +            job = self.__jobs_by_name[operation_name]
    
    111
    +
    
    112
    +        else:
    
    113
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    114
    +                                .format(operation_name))
    
    115
    +
    
    116
    +        if operation_name in self.__jobs_by_operation:
    
    117
    +            del self.__jobs_by_operation[operation_name]
    
    87 118
     
    
    88 119
             job.unregister_operation_client(peer)
    
    89 120
     
    
    90
    -        if job.n_clients == 0 and job.operation.done:
    
    91
    -            del self.jobs[job.name]
    
    121
    +        if job.n_clients == 0 and job.done:
    
    122
    +            del self.__jobs_by_action[job.action_digest.hash]
    
    123
    +            del self.__jobs_by_name[job.name]
    
    92 124
     
    
    93 125
                 if self._is_instrumented:
    
    94
    -                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    95
    -                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    96
    -                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    97
    -                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    126
    +                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
    
    127
    +                self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
    
    128
    +                self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
    
    129
    +                self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
    
    98 130
     
    
    99
    -                self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    100
    -                self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    101
    -                self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    131
    +                self.__leases_by_state[LeaseState.PENDING].discard(job.name)
    
    132
    +                self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
    
    133
    +                self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
    
    102 134
     
    
    103
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    135
    +    def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    136
    +        """Inserts a newly created job into the execution queue.
    
    137
    +
    
    138
    +        Args:
    
    139
    +            action (Action): the given action to queue for execution.
    
    140
    +            action_digest (Digest): the digest of the given action.
    
    141
    +            priority (int): the execution job's priority.
    
    142
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    143
    +                result for the given action.
    
    144
    +
    
    145
    +        Returns:
    
    146
    +            str: the newly created operation's name.
    
    147
    +        """
    
    104 148
             def __queue_job(jobs_queue, new_job):
    
    105 149
                 index = 0
    
    106 150
                 for queued_job in reversed(jobs_queue):
    
    ... ... @@ -113,12 +157,29 @@ class Scheduler:
    113 157
     
    
    114 158
                 jobs_queue.insert(index, new_job)
    
    115 159
     
    
    116
    -        self.jobs[job.name] = job
    
    160
    +        if action_digest.hash in self.__jobs_by_action:
    
    161
    +            job = self.__jobs_by_action[action_digest.hash]
    
    162
    +
    
    163
    +            # Reschedule if priority is now greater:
    
    164
    +            if priority < job.priority:
    
    165
    +                job.priority = priority
    
    166
    +
    
    167
    +                if job in self.__queue:
    
    168
    +                    self.__queue.remove(job)
    
    169
    +                    __queue_job(self.__queue, job)
    
    170
    +
    
    171
    +            return job.name
    
    172
    +
    
    173
    +        job = Job(action, action_digest, priority=priority)
    
    174
    +
    
    175
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    176
    +        self.__jobs_by_name[job.name] = job
    
    117 177
     
    
    118 178
             operation_stage = None
    
    119 179
             if self._action_cache is not None and not skip_cache_lookup:
    
    120 180
                 try:
    
    121 181
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    182
    +
    
    122 183
                 except NotFoundError:
    
    123 184
                     operation_stage = OperationStage.QUEUED
    
    124 185
                     __queue_job(self.__queue, job)
    
    ... ... @@ -136,24 +197,49 @@ class Scheduler:
    136 197
     
    
    137 198
             self._update_job_operation_stage(job.name, operation_stage)
    
    138 199
     
    
    139
    -    def retry_job(self, job_name):
    
    140
    -        job = self.jobs[job_name]
    
    200
    +        return job.name
    
    141 201
     
    
    142
    -        operation_stage = None
    
    143
    -        if job.n_tries >= self.MAX_N_TRIES:
    
    144
    -            # TODO: Decide what to do with these jobs
    
    145
    -            operation_stage = OperationStage.COMPLETED
    
    146
    -            # TODO: Mark these jobs as done
    
    202
    +    def get_job_operation(self, operation_name):
    
    203
    +        """Retrieves a job's :class:`Operation` by name.
    
    147 204
     
    
    148
    -        else:
    
    149
    -            operation_stage = OperationStage.QUEUED
    
    150
    -            job.update_lease_state(LeaseState.PENDING)
    
    151
    -            self.__queue.append(job)
    
    205
    +        Args:
    
    206
    +            operation_name (str): name of the operation to query.
    
    152 207
     
    
    153
    -        self._update_job_operation_stage(job_name, operation_stage)
    
    208
    +        Raises:
    
    209
    +            NotFoundError: If no operation with `operation_name` exists.
    
    210
    +        """
    
    211
    +        try:
    
    212
    +            job = self.__jobs_by_operation[operation_name]
    
    213
    +
    
    214
    +        except KeyError:
    
    215
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    216
    +                                .format(operation_name))
    
    217
    +
    
    218
    +        return job.get_operation(operation_name)
    
    154 219
     
    
    155
    -    def list_jobs(self):
    
    156
    -        return self.jobs.values()
    
    220
    +    def cancel_job_operation(self, operation_name):
    
    221
    +        """"Cancels a job's :class:`Operation` by name.
    
    222
    +
    
    223
    +        Note:
    
    224
    +            This will also cancel any :class:`Lease` that may have been issued
    
    225
    +            for that job.
    
    226
    +
    
    227
    +        Args:
    
    228
    +            operation_name (str): name of the operation to cancel.
    
    229
    +
    
    230
    +        Raises:
    
    231
    +            NotFoundError: If no operation with `operation_name` exists.
    
    232
    +        """
    
    233
    +        try:
    
    234
    +            job = self.__jobs_by_operation[operation_name]
    
    235
    +
    
    236
    +        except KeyError:
    
    237
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    238
    +                                .format(operation_name))
    
    239
    +
    
    240
    +        job.cancel_operation(operation_name)
    
    241
    +
    
    242
    +    # --- Public API: RWAPI ---
    
    157 243
     
    
    158 244
         def request_job_leases(self, worker_capabilities):
    
    159 245
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -179,18 +265,25 @@ class Scheduler:
    179 265
     
    
    180 266
             return None
    
    181 267
     
    
    182
    -    def update_job_lease(self, lease):
    
    268
    +    def update_job_lease_state(self, job_name, lease):
    
    183 269
             """Requests a state transition for a job's current :class:Lease.
    
    184 270
     
    
    271
    +        Note:
    
    272
    +            This may trigger a job's :class:`Operation` stage transition.
    
    273
    +
    
    185 274
             Args:
    
    186 275
                 job_name (str): name of the job to query.
    
    187
    -            lease_state (LeaseState): the lease state to transition to.
    
    188
    -            lease_status (google.rpc.Status): the lease execution status, only
    
    189
    -                required if `lease_state` is `COMPLETED`.
    
    190
    -            lease_result (google.protobuf.Any): the lease execution result, only
    
    191
    -                required if `lease_state` is `COMPLETED`.
    
    276
    +            lease (Lease): the lease holding the new state.
    
    277
    +
    
    278
    +        Raises:
    
    279
    +            NotFoundError: If no job with `job_name` exists.
    
    192 280
             """
    
    193
    -        job = self.jobs[lease.id]
    
    281
    +        try:
    
    282
    +            job = self.__jobs_by_name[job_name]
    
    283
    +
    
    284
    +        except KeyError:
    
    285
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    286
    +
    
    194 287
             lease_state = LeaseState(lease.state)
    
    195 288
     
    
    196 289
             operation_stage = None
    
    ... ... @@ -226,29 +319,72 @@ class Scheduler:
    226 319
                     self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    227 320
                     self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
    
    228 321
     
    
    229
    -        self._update_job_operation_stage(lease.id, operation_stage)
    
    322
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    323
    +
    
    324
    +    def retry_job_lease(self, job_name):
    
    325
    +        """Re-queues a job on lease execution failure.
    
    326
    +
    
    327
    +        Note:
    
    328
    +            This may trigger a job's :class:`Operation` stage transition.
    
    329
    +
    
    330
    +        Args:
    
    331
    +            job_name (str): name of the job to query.
    
    332
    +
    
    333
    +        Raises:
    
    334
    +            NotFoundError: If no job with `job_name` exists.
    
    335
    +        """
    
    336
    +        try:
    
    337
    +            job = self.__jobs_by_name[job_name]
    
    338
    +
    
    339
    +        except KeyError:
    
    340
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    341
    +
    
    342
    +        operation_stage = None
    
    343
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    344
    +            # TODO: Decide what to do with these jobs
    
    345
    +            operation_stage = OperationStage.COMPLETED
    
    346
    +            # TODO: Mark these jobs as done
    
    347
    +
    
    348
    +        else:
    
    349
    +            operation_stage = OperationStage.QUEUED
    
    350
    +            job.update_lease_state(LeaseState.PENDING)
    
    351
    +            self.__queue.append(job)
    
    352
    +
    
    353
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    230 354
     
    
    231 355
         def get_job_lease(self, job_name):
    
    232
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    233
    -        return self.jobs[job_name].lease
    
    356
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    234 357
     
    
    235
    -    def get_job_lease_cancelled(self, job_name):
    
    236
    -        """Returns true if the lease is cancelled"""
    
    237
    -        return self.jobs[job_name].lease_cancelled
    
    358
    +        Args:
    
    359
    +            job_name (str): name of the job to query.
    
    360
    +
    
    361
    +        Raises:
    
    362
    +            NotFoundError: If no job with `job_name` exists.
    
    363
    +        """
    
    364
    +        try:
    
    365
    +            job = self.__jobs_by_name[job_name]
    
    238 366
     
    
    239
    -    def get_job_operation(self, job_name):
    
    240
    -        """Returns the operation associated to job."""
    
    241
    -        return self.jobs[job_name].operation
    
    367
    +        except KeyError:
    
    368
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    242 369
     
    
    243
    -    def cancel_job_operation(self, job_name):
    
    244
    -        """"Cancels the underlying operation of a given job.
    
    370
    +        return job.lease
    
    245 371
     
    
    246
    -        This will also cancel any job's lease that may have been issued.
    
    372
    +    def get_job_lease_cancelled(self, job_name):
    
    373
    +        """Returns true if the lease is cancelled.
    
    247 374
     
    
    248 375
             Args:
    
    249
    -            job_name (str): name of the job holding the operation to cancel.
    
    376
    +            job_name (str): name of the job to query.
    
    377
    +
    
    378
    +        Raises:
    
    379
    +            NotFoundError: If no job with `job_name` exists.
    
    250 380
             """
    
    251
    -        self.jobs[job_name].cancel_operation()
    
    381
    +        try:
    
    382
    +            job = self.__jobs_by_name[job_name]
    
    383
    +
    
    384
    +        except KeyError:
    
    385
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    386
    +
    
    387
    +        return job.lease_cancelled
    
    252 388
     
    
    253 389
         # --- Public API: Monitoring ---
    
    254 390
     
    
    ... ... @@ -298,11 +434,11 @@ class Scheduler:
    298 434
                 self.__build_metadata_queues.append(message_queue)
    
    299 435
     
    
    300 436
         def query_n_jobs(self):
    
    301
    -        return len(self.jobs)
    
    437
    +        return len(self.__jobs_by_name)
    
    302 438
     
    
    303 439
         def query_n_operations(self):
    
    304 440
             # For now n_operations == n_jobs:
    
    305
    -        return len(self.jobs)
    
    441
    +        return len(self.__jobs_by_operation)
    
    306 442
     
    
    307 443
         def query_n_operations_by_stage(self, operation_stage):
    
    308 444
             try:
    
    ... ... @@ -313,7 +449,7 @@ class Scheduler:
    313 449
             return 0
    
    314 450
     
    
    315 451
         def query_n_leases(self):
    
    316
    -        return len(self.jobs)
    
    452
    +        return len(self.__jobs_by_name)
    
    317 453
     
    
    318 454
         def query_n_leases_by_state(self, lease_state):
    
    319 455
             try:
    
    ... ... @@ -340,7 +476,7 @@ class Scheduler:
    340 476
                 job_name (str): name of the job to query.
    
    341 477
                 operation_stage (OperationStage): the stage to transition to.
    
    342 478
             """
    
    343
    -        job = self.jobs[job_name]
    
    479
    +        job = self.__jobs_by_name[job_name]
    
    344 480
     
    
    345 481
             if operation_stage == OperationStage.CACHE_CHECK:
    
    346 482
                 job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    ... ... @@ -389,7 +525,7 @@ class Scheduler:
    389 525
     
    
    390 526
                     self.__queue_time_average = average_order, average_time
    
    391 527
     
    
    392
    -                if not job.holds_cached_action_result:
    
    528
    +                if not job.holds_cached_result:
    
    393 529
                         execution_metadata = job.action_result.execution_metadata
    
    394 530
                         context_metadata = {'job-is': job.name}
    
    395 531
     
    

  • docs/source/conf.py
    ... ... @@ -182,3 +182,11 @@ texinfo_documents = [
    182 182
          author, 'BuildGrid', 'One line description of project.',
    
    183 183
          'Miscellaneous'),
    
    184 184
     ]
    
    185
    +
    
    186
    +# -- Options for the autodoc extension ----------------------------------------
    
    187
    +
    
    188
    +# This value selects if automatically documented members are sorted
    
    189
    +# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
    
    190
    +# by source order (value 'bysource'). The default is alphabetical.
    
    191
    +autodoc_member_order = 'bysource'
    
    192
    +

  • tests/integration/bots_service.py
    ... ... @@ -25,7 +25,6 @@ import pytest
    25 25
     
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 27
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    28
    -from buildgrid.server import job
    
    29 28
     from buildgrid.server.controller import ExecutionController
    
    30 29
     from buildgrid.server.job import LeaseState
    
    31 30
     from buildgrid.server.bots import service
    
    ... ... @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance):
    159 158
     def _inject_work(scheduler, action=None, action_digest=None):
    
    160 159
         if not action:
    
    161 160
             action = remote_execution_pb2.Action()
    
    161
    +
    
    162 162
         if not action_digest:
    
    163 163
             action_digest = remote_execution_pb2.Digest()
    
    164
    -    j = job.Job(action, action_digest)
    
    165
    -    scheduler.queue_job(j, True)
    164
    +
    
    165
    +    scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -106,13 +106,13 @@ def test_no_action_digest_in_storage(instance, context):
    106 106
     
    
    107 107
     
    
    108 108
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    109
    +    j = controller.execution_instance._scheduler.queue_job_operation(action,
    
    110
    +                                                                     action_digest,
    
    111
    +                                                                     skip_cache_lookup=True)
    
    110 112
         j._operation.done = True
    
    111 113
     
    
    112 114
         request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    113 115
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    115
    -
    
    116 116
         action_result_any = any_pb2.Any()
    
    117 117
         action_result = remote_execution_pb2.ActionResult()
    
    118 118
         action_result_any.Pack(action_result)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]