[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 20 commits: server/instance.py: Deactivate SO_REUSEPORT option



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

15 changed files:

Changes:

  • buildgrid/_app/commands/cmd_operation.py
    ... ... @@ -27,6 +27,7 @@ from textwrap import indent
    27 27
     
    
    28 28
     import click
    
    29 29
     from google.protobuf import json_format
    
    30
    +import grpc
    
    30 31
     
    
    31 32
     from buildgrid.client.authentication import setup_channel
    
    32 33
     from buildgrid._enums import OperationStage
    
    ... ... @@ -213,10 +214,17 @@ def wait(context, operation_name, json):
    213 214
     
    
    214 215
         operation_iterator = stub.WaitExecution(request)
    
    215 216
     
    
    216
    -    for operation in operation_iterator:
    
    217
    -        if not json and operation.done:
    
    218
    -            _print_operation_status(operation, print_details=True)
    
    219
    -        elif not json:
    
    220
    -            _print_operation_status(operation)
    
    221
    -        else:
    
    222
    -            click.echo(json_format.MessageToJson(operation))
    217
    +    try:
    
    218
    +        for operation in operation_iterator:
    
    219
    +            if not json and operation.done:
    
    220
    +                _print_operation_status(operation, print_details=True)
    
    221
    +            elif not json:
    
    222
    +                _print_operation_status(operation)
    
    223
    +            else:
    
    224
    +                click.echo(json_format.MessageToJson(operation))
    
    225
    +
    
    226
    +    except grpc.RpcError as e:
    
    227
    +        if e.code() != grpc.StatusCode.CANCELLED:
    
    228
    +            click.echo('Error: {}'
    
    229
    +                       .format(e.details), err=True)
    
    230
    +            sys.exit(-1)

  • buildgrid/_app/commands/cmd_server.py
    ... ... @@ -24,6 +24,7 @@ import sys
    24 24
     
    
    25 25
     import click
    
    26 26
     
    
    27
    +from buildgrid._exceptions import PermissionDeniedError
    
    27 28
     from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
    
    28 29
     from buildgrid.server.instance import BuildGridServer
    
    29 30
     from buildgrid.server._monitoring import MonitoringOutputType, MonitoringOutputFormat
    
    ... ... @@ -120,8 +121,13 @@ def _create_server_from_config(configuration):
    120 121
     
    
    121 122
         server = BuildGridServer(**kargs)
    
    122 123
     
    
    123
    -    for channel in network:
    
    124
    -        server.add_port(channel.address, channel.credentials)
    
    124
    +    try:
    
    125
    +        for channel in network:
    
    126
    +            server.add_port(channel.address, channel.credentials)
    
    127
    +
    
    128
    +    except PermissionDeniedError as e:
    
    129
    +        click.echo("Error: {}.".format(e), err=True)
    
    130
    +        sys.exit(-1)
    
    125 131
     
    
    126 132
         for instance in instances:
    
    127 133
             instance_name = instance['name']
    

  • buildgrid/_enums.py
    ... ... @@ -25,7 +25,7 @@ from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    25 25
     
    
    26 26
     class BotStatus(Enum):
    
    27 27
         # Initially unknown state.
    
    28
    -    BOT_STATUS_UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    28
    +    UNSPECIFIED = bots_pb2.BotStatus.Value('BOT_STATUS_UNSPECIFIED')
    
    29 29
         # The bot is healthy, and will accept leases as normal.
    
    30 30
         OK = bots_pb2.BotStatus.Value('OK')
    
    31 31
         # The bot is unhealthy and will not accept new leases.
    
    ... ... @@ -38,7 +38,7 @@ class BotStatus(Enum):
    38 38
     
    
    39 39
     class LeaseState(Enum):
    
    40 40
         # Initially unknown state.
    
    41
    -    LEASE_STATE_UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
    
    41
    +    UNSPECIFIED = bots_pb2.LeaseState.Value('LEASE_STATE_UNSPECIFIED')
    
    42 42
         # The server expects the bot to accept this lease.
    
    43 43
         PENDING = bots_pb2.LeaseState.Value('PENDING')
    
    44 44
         # The bot has accepted this lease.
    

  • buildgrid/_exceptions.py
    ... ... @@ -56,6 +56,7 @@ class CancelledError(BgdError):
    56 56
         """The job was cancelled and any callers should be notified"""
    
    57 57
         def __init__(self, message, detail=None, reason=None):
    
    58 58
             super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    59
    +        self.last_response = None
    
    59 60
     
    
    60 61
     
    
    61 62
     class InvalidArgumentError(BgdError):
    
    ... ... @@ -89,3 +90,9 @@ class FailedPreconditionError(BgdError):
    89 90
         able to fix the errors and retry."""
    
    90 91
         def __init__(self, message, detail=None, reason=None):
    
    91 92
             super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)
    
    93
    +
    
    94
    +
    
    95
    +class PermissionDeniedError(BgdError):
    
    96
    +    """The caller does not have permission to execute the specified operation."""
    
    97
    +    def __init__(self, message, detail=None, reason=None):
    
    98
    +        super().__init__(message, detail=detail, domain=ErrorDomain.SERVER, reason=reason)

  • buildgrid/server/bots/instance.py
    ... ... @@ -126,7 +126,7 @@ class BotsInterface:
    126 126
                 # Job does not exist, remove from bot.
    
    127 127
                 return None
    
    128 128
     
    
    129
    -        self._scheduler.update_job_lease(lease)
    
    129
    +        self._scheduler.update_job_lease_state(lease.id, lease)
    
    130 130
     
    
    131 131
             if lease_state == LeaseState.COMPLETED:
    
    132 132
                 return None
    
    ... ... @@ -164,7 +164,7 @@ class BotsInterface:
    164 164
                     self.__logger.error("Assigned lease id=[%s],"
    
    165 165
                                         " not found on bot with name=[%s] and id=[%s]."
    
    166 166
                                         " Retrying job", lease_id, bot_session.name, bot_session.bot_id)
    
    167
    -                self._scheduler.retry_job(lease_id)
    
    167
    +                self._scheduler.retry_job_lease(lease_id)
    
    168 168
     
    
    169 169
         def _close_bot_session(self, name):
    
    170 170
             """ Before removing the session, close any leases and
    
    ... ... @@ -177,7 +177,7 @@ class BotsInterface:
    177 177
     
    
    178 178
             self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
    
    179 179
             for lease_id in self._assigned_leases[name]:
    
    180
    -            self._scheduler.retry_job(lease_id)
    
    180
    +            self._scheduler.retry_job_lease(lease_id)
    
    181 181
             self._assigned_leases.pop(name)
    
    182 182
     
    
    183 183
             self.__logger.debug("Closing bot session: [%s]", name)
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26
    -
    
    27
    -from ..job import Job
    
    28
    -from ...utils import get_hash_type
    
    26
    +from buildgrid.utils import get_hash_type
    
    29 27
     
    
    30 28
     
    
    31 29
     class ExecutionInstance:
    
    ... ... @@ -46,44 +44,56 @@ class ExecutionInstance:
    46 44
         def hash_type(self):
    
    47 45
             return get_hash_type()
    
    48 46
     
    
    49
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    47
    +    def execute(self, action_digest, skip_cache_lookup):
    
    50 48
             """ Sends a job for execution.
    
    51 49
             Queues an action and creates an Operation instance to be associated with
    
    52 50
             this action.
    
    53 51
             """
    
    54
    -
    
    55 52
             action = self._storage.get_message(action_digest, Action)
    
    56 53
     
    
    57 54
             if not action:
    
    58 55
                 raise FailedPreconditionError("Could not get action from storage.")
    
    59 56
     
    
    60
    -        job = Job(action, action_digest)
    
    61
    -        if message_queue is not None:
    
    62
    -            job.register_client(message_queue)
    
    57
    +        return self._scheduler.queue_job_action(action, action_digest,
    
    58
    +                                                skip_cache_lookup=skip_cache_lookup)
    
    63 59
     
    
    64
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    60
    +    def register_job_peer(self, job_name, peer, message_queue):
    
    61
    +        try:
    
    62
    +            return self._scheduler.register_job_peer(job_name,
    
    63
    +                                                     peer, message_queue)
    
    65 64
     
    
    66
    -        return job.operation
    
    65
    +        except NotFoundError:
    
    66
    +            raise InvalidArgumentError("Job name does not exist: [{}]"
    
    67
    +                                       .format(job_name))
    
    67 68
     
    
    68
    -    def register_message_client(self, name, queue):
    
    69
    +    def register_operation_peer(self, operation_name, peer, message_queue):
    
    69 70
             try:
    
    70
    -            self._scheduler.register_client(name, queue)
    
    71
    +            self._scheduler.register_job_operation_peer(operation_name,
    
    72
    +                                                        peer, message_queue)
    
    71 73
     
    
    72
    -        except KeyError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    74
    +        except NotFoundError:
    
    75
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    76
    +                                       .format(operation_name))
    
    74 77
     
    
    75
    -    def unregister_message_client(self, name, queue):
    
    78
    +    def unregister_operation_peer(self, operation_name, peer):
    
    76 79
             try:
    
    77
    -            self._scheduler.unregister_client(name, queue)
    
    80
    +            self._scheduler.unregister_job_operation_peer(operation_name, peer)
    
    81
    +
    
    82
    +        except NotFoundError:
    
    83
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    84
    +                                       .format(operation_name))
    
    85
    +
    
    86
    +    def stream_operation_updates(self, message_queue):
    
    87
    +        error, operation = message_queue.get()
    
    88
    +        if error is not None:
    
    89
    +            raise error
    
    78 90
     
    
    79
    -        except KeyError:
    
    80
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    91
    +        while not operation.done:
    
    92
    +            yield operation
    
    81 93
     
    
    82
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    83
    -        job = message_queue.get()
    
    84
    -        while not job.operation.done:
    
    85
    -            yield job.operation
    
    86
    -            job = message_queue.get()
    
    87
    -            job.check_operation_status()
    
    94
    +            error, operation = message_queue.get()
    
    95
    +            if error is not None:
    
    96
    +                error.last_response = operation
    
    97
    +                raise error
    
    88 98
     
    
    89
    -        yield job.operation
    99
    +        yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -98,12 +98,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    98 98
     
    
    99 99
             try:
    
    100 100
                 instance = self._get_instance(instance_name)
    
    101
    -            operation = instance.execute(request.action_digest,
    
    102
    -                                         request.skip_cache_lookup,
    
    103
    -                                         message_queue)
    
    101
    +
    
    102
    +            job_name = instance.execute(request.action_digest,
    
    103
    +                                        request.skip_cache_lookup)
    
    104
    +
    
    105
    +            operation_name = instance.register_job_peer(job_name,
    
    106
    +                                                        peer, message_queue)
    
    104 107
     
    
    105 108
                 context.add_callback(partial(self._rpc_termination_callback,
    
    106
    -                                         peer, instance_name, operation.name, message_queue))
    
    109
    +                                         peer, instance_name, operation_name))
    
    107 110
     
    
    108 111
                 if self._is_instrumented:
    
    109 112
                     if peer not in self.__peers:
    
    ... ... @@ -112,16 +115,14 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    112 115
                     else:
    
    113 116
                         self.__peers[peer] += 1
    
    114 117
     
    
    115
    -            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    118
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    116 119
     
    
    117
    -            self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    120
    +            self.__logger.info("Operation [%s] created for job [%s]",
    
    121
    +                               operation_full_name, job_name)
    
    118 122
     
    
    119
    -            for operation in instance.stream_operation_updates(message_queue,
    
    120
    -                                                               operation.name):
    
    121
    -                op = operations_pb2.Operation()
    
    122
    -                op.CopyFrom(operation)
    
    123
    -                op.name = instanced_op_name
    
    124
    -                yield op
    
    123
    +            for operation in instance.stream_operation_updates(message_queue):
    
    124
    +                operation.name = operation_full_name
    
    125
    +                yield operation
    
    125 126
     
    
    126 127
             except InvalidArgumentError as e:
    
    127 128
                 self.__logger.error(e)
    
    ... ... @@ -136,10 +137,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    136 137
                 yield operations_pb2.Operation()
    
    137 138
     
    
    138 139
             except CancelledError as e:
    
    139
    -            self.__logger.error(e)
    
    140
    +            self.__logger.info("Operation cancelled [%s]", operation_full_name)
    
    140 141
                 context.set_details(str(e))
    
    141 142
                 context.set_code(grpc.StatusCode.CANCELLED)
    
    142
    -            yield operations_pb2.Operation()
    
    143
    +            yield e.last_response
    
    143 144
     
    
    144 145
         @authorize(AuthContext)
    
    145 146
         def WaitExecution(self, request, context):
    
    ... ... @@ -160,9 +161,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    160 161
             try:
    
    161 162
                 instance = self._get_instance(instance_name)
    
    162 163
     
    
    163
    -            instance.register_message_client(operation_name, message_queue)
    
    164
    +            instance.register_operation_peer(operation_name,
    
    165
    +                                             peer, message_queue)
    
    166
    +
    
    164 167
                 context.add_callback(partial(self._rpc_termination_callback,
    
    165
    -                                         peer, instance_name, operation_name, message_queue))
    
    168
    +                                         peer, instance_name, operation_name))
    
    166 169
     
    
    167 170
                 if self._is_instrumented:
    
    168 171
                     if peer not in self.__peers:
    
    ... ... @@ -171,12 +174,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    171 174
                     else:
    
    172 175
                         self.__peers[peer] += 1
    
    173 176
     
    
    174
    -            for operation in instance.stream_operation_updates(message_queue,
    
    175
    -                                                               operation_name):
    
    176
    -                op = operations_pb2.Operation()
    
    177
    -                op.CopyFrom(operation)
    
    178
    -                op.name = request.name
    
    179
    -                yield op
    
    177
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    178
    +
    
    179
    +            for operation in instance.stream_operation_updates(message_queue):
    
    180
    +                operation.name = operation_full_name
    
    181
    +                yield operation
    
    180 182
     
    
    181 183
             except InvalidArgumentError as e:
    
    182 184
                 self.__logger.error(e)
    
    ... ... @@ -185,10 +187,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    185 187
                 yield operations_pb2.Operation()
    
    186 188
     
    
    187 189
             except CancelledError as e:
    
    188
    -            self.__logger.error(e)
    
    190
    +            self.__logger.info("Operation cancelled [%s]", operation_full_name)
    
    189 191
                 context.set_details(str(e))
    
    190 192
                 context.set_code(grpc.StatusCode.CANCELLED)
    
    191
    -            yield operations_pb2.Operation()
    
    193
    +            yield e.last_response
    
    192 194
     
    
    193 195
         # --- Public API: Monitoring ---
    
    194 196
     
    
    ... ... @@ -211,10 +213,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    211 213
     
    
    212 214
         # --- Private API ---
    
    213 215
     
    
    214
    -    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    216
    +    def _rpc_termination_callback(self, peer, instance_name, operation_name):
    
    215 217
             instance = self._get_instance(instance_name)
    
    216 218
     
    
    217
    -        instance.unregister_message_client(job_name, message_queue)
    
    219
    +        instance.unregister_operation_peer(operation_name, peer)
    
    218 220
     
    
    219 221
             if self._is_instrumented:
    
    220 222
                 if self.__peers[peer] > 1:
    

  • buildgrid/server/instance.py
    ... ... @@ -27,6 +27,7 @@ import grpc
    27 27
     import janus
    
    28 28
     
    
    29 29
     from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    30
    +from buildgrid._exceptions import PermissionDeniedError
    
    30 31
     from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    31 32
     from buildgrid.server.actioncache.service import ActionCacheService
    
    32 33
     from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
    
    ... ... @@ -87,7 +88,8 @@ class BuildGridServer:
    87 88
                 AuthContext.interceptor = self.__grpc_auth_interceptor
    
    88 89
     
    
    89 90
             self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    90
    -        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    91
    +        self.__grpc_server = grpc.server(self.__grpc_executor,
    
    92
    +                                         options=(('grpc.so_reuseport', 0),))
    
    91 93
     
    
    92 94
             self.__main_loop = asyncio.get_event_loop()
    
    93 95
     
    
    ... ... @@ -205,6 +207,9 @@ class BuildGridServer:
    205 207
     
    
    206 208
             Returns:
    
    207 209
                 int: Number of the bound port.
    
    210
    +
    
    211
    +        Raises:
    
    212
    +            PermissionDeniedError: If socket binding fails.
    
    208 213
             """
    
    209 214
             if credentials is not None:
    
    210 215
                 self.__logger.info("Adding secure connection on: [%s]", address)
    
    ... ... @@ -214,6 +219,9 @@ class BuildGridServer:
    214 219
                 self.__logger.info("Adding insecure connection on [%s]", address)
    
    215 220
                 port_number = self.__grpc_server.add_insecure_port(address)
    
    216 221
     
    
    222
    +        if not port_number:
    
    223
    +            raise PermissionDeniedError("Unable to configure socket")
    
    224
    +
    
    217 225
             return port_number
    
    218 226
     
    
    219 227
         def add_execution_instance(self, instance, instance_name):
    

  • buildgrid/server/job.py
    ... ... @@ -20,7 +20,7 @@ import uuid
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    21 21
     
    
    22 22
     from buildgrid._enums import LeaseState, OperationStage
    
    23
    -from buildgrid._exceptions import CancelledError
    
    23
    +from buildgrid._exceptions import CancelledError, NotFoundError
    
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 25
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 26
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -29,35 +29,70 @@ from buildgrid._protos.google.rpc import code_pb2
    29 29
     
    
    30 30
     class Job:
    
    31 31
     
    
    32
    -    def __init__(self, action, action_digest):
    
    32
    +    def __init__(self, action, action_digest, priority=0):
    
    33 33
             self.__logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._name = str(uuid.uuid4())
    
    36
    +        self._priority = priority
    
    36 37
             self._action = remote_execution_pb2.Action()
    
    37
    -        self._operation = operations_pb2.Operation()
    
    38 38
             self._lease = None
    
    39 39
     
    
    40 40
             self.__execute_response = None
    
    41 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    42
    +        self.__operations_by_name = {}  # Name to Operation 1:1 mapping
    
    43
    +        self.__operations_by_peer = {}  # Peer to Operation 1:1 mapping
    
    42 44
     
    
    43 45
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    44 46
             self.__queued_time_duration = duration_pb2.Duration()
    
    45 47
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    46 48
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    47 49
     
    
    48
    -        self.__operation_cancelled = False
    
    50
    +        self.__operations_message_queues = {}
    
    51
    +        self.__operations_cancelled = set()
    
    49 52
             self.__lease_cancelled = False
    
    53
    +        self.__job_cancelled = False
    
    50 54
     
    
    51 55
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    52 56
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    53 57
     
    
    54 58
             self._action.CopyFrom(action)
    
    55 59
             self._do_not_cache = self._action.do_not_cache
    
    56
    -        self._operation_update_queues = []
    
    57
    -        self._operation.name = self._name
    
    58
    -        self._operation.done = False
    
    59 60
             self._n_tries = 0
    
    60 61
     
    
    62
    +        self._done = False
    
    63
    +
    
    64
    +    def __lt__(self, other):
    
    65
    +        try:
    
    66
    +            return self.priority < other.priority
    
    67
    +        except AttributeError:
    
    68
    +            return NotImplemented
    
    69
    +
    
    70
    +    def __le__(self, other):
    
    71
    +        try:
    
    72
    +            return self.priority <= other.priority
    
    73
    +        except AttributeError:
    
    74
    +            return NotImplemented
    
    75
    +
    
    76
    +    def __eq__(self, other):
    
    77
    +        if isinstance(other, Job):
    
    78
    +            return self.name == other.name
    
    79
    +        return False
    
    80
    +
    
    81
    +    def __ne__(self, other):
    
    82
    +        return not self.__eq__(other)
    
    83
    +
    
    84
    +    def __gt__(self, other):
    
    85
    +        try:
    
    86
    +            return self.priority > other.priority
    
    87
    +        except AttributeError:
    
    88
    +            return NotImplemented
    
    89
    +
    
    90
    +    def __ge__(self, other):
    
    91
    +        try:
    
    92
    +            return self.priority >= other.priority
    
    93
    +        except AttributeError:
    
    94
    +            return NotImplemented
    
    95
    +
    
    61 96
         # --- Public API ---
    
    62 97
     
    
    63 98
         @property
    
    ... ... @@ -65,17 +100,31 @@ class Job:
    65 100
             return self._name
    
    66 101
     
    
    67 102
         @property
    
    68
    -    def do_not_cache(self):
    
    69
    -        return self._do_not_cache
    
    103
    +    def cancelled(self):
    
    104
    +        return self.__job_cancelled
    
    105
    +
    
    106
    +    @property
    
    107
    +    def priority(self):
    
    108
    +        return self._priority
    
    70 109
     
    
    71 110
         @property
    
    72
    -    def action(self):
    
    73
    -        return self._action
    
    111
    +    def done(self):
    
    112
    +        return self._done
    
    113
    +
    
    114
    +    # --- Public API: REAPI ---
    
    115
    +
    
    116
    +    @property
    
    117
    +    def do_not_cache(self):
    
    118
    +        return self._do_not_cache
    
    74 119
     
    
    75 120
         @property
    
    76 121
         def action_digest(self):
    
    77 122
             return self.__operation_metadata.action_digest
    
    78 123
     
    
    124
    +    @property
    
    125
    +    def operation_stage(self):
    
    126
    +        return OperationStage(self.__operation_metadata.stage)
    
    127
    +
    
    79 128
         @property
    
    80 129
         def action_result(self):
    
    81 130
             if self.__execute_response is not None:
    
    ... ... @@ -84,19 +133,222 @@ class Job:
    84 133
                 return None
    
    85 134
     
    
    86 135
         @property
    
    87
    -    def holds_cached_action_result(self):
    
    136
    +    def holds_cached_result(self):
    
    88 137
             if self.__execute_response is not None:
    
    89 138
                 return self.__execute_response.cached_result
    
    90 139
             else:
    
    91 140
                 return False
    
    92 141
     
    
    93
    -    @property
    
    94
    -    def operation(self):
    
    95
    -        return self._operation
    
    142
    +    def set_cached_result(self, action_result):
    
    143
    +        """Allows specifying an action result form the action cache for the job.
    
    144
    +
    
    145
    +        Note:
    
    146
    +            This won't trigger any :class:`Operation` stage transition.
    
    147
    +
    
    148
    +        Args:
    
    149
    +            action_result (ActionResult): The result from cache.
    
    150
    +        """
    
    151
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    152
    +        self.__execute_response.result.CopyFrom(action_result)
    
    153
    +        self.__execute_response.cached_result = True
    
    96 154
     
    
    97 155
         @property
    
    98
    -    def operation_stage(self):
    
    99
    -        return OperationStage(self.__operation_metadata.state)
    
    156
    +    def n_peers(self):
    
    157
    +        return len(self.__operations_message_queues)
    
    158
    +
    
    159
    +    def n_peers_for_operation(self, operation_name):
    
    160
    +        return len([operation for operation in self.__operations_by_peer.values()
    
    161
    +                    if operation.name == operation_name])
    
    162
    +
    
    163
    +    def register_new_operation_peer(self, peer, message_queue):
    
    164
    +        """Subscribes to a new job's :class:`Operation` stage changes.
    
    165
    +
    
    166
    +        Args:
    
    167
    +            peer (str): a unique string identifying the client.
    
    168
    +            message_queue (queue.Queue): the event queue to register.
    
    169
    +
    
    170
    +        Returns:
    
    171
    +            str: The name of the subscribed :class:`Operation`.
    
    172
    +        """
    
    173
    +        new_operation = operations_pb2.Operation()
    
    174
    +        # Copy state from first existing and non cancelled operation:
    
    175
    +        for operation in self.__operations_by_name.values():
    
    176
    +            if operation.name not in self.__operations_cancelled:
    
    177
    +                new_operation.CopyFrom(operation)
    
    178
    +                break
    
    179
    +
    
    180
    +        new_operation.name = str(uuid.uuid4())
    
    181
    +
    
    182
    +        self.__logger.debug("Operation created for job [%s]: [%s]",
    
    183
    +                            self._name, new_operation.name)
    
    184
    +
    
    185
    +        self.__operations_by_name[new_operation.name] = new_operation
    
    186
    +        self.__operations_by_peer[peer] = new_operation
    
    187
    +        self.__operations_message_queues[peer] = message_queue
    
    188
    +
    
    189
    +        self._send_operations_updates(peers=[peer])
    
    190
    +
    
    191
    +        return new_operation.name
    
    192
    +
    
    193
    +    def register_operation_peer(self, operation_name, peer, message_queue):
    
    194
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    195
    +
    
    196
    +        Args:
    
    197
    +            operation_name (str): an existing operation's name to subscribe to.
    
    198
    +            peer (str): a unique string identifying the client.
    
    199
    +            message_queue (queue.Queue): the event queue to register.
    
    200
    +
    
    201
    +        Returns:
    
    202
    +            str: The name of the subscribed :class:`Operation`.
    
    203
    +
    
    204
    +        Raises:
    
    205
    +            NotFoundError: If no operation with `operation_name` exists.
    
    206
    +        """
    
    207
    +        try:
    
    208
    +            operation = self.__operations_by_name[operation_name]
    
    209
    +
    
    210
    +        except KeyError:
    
    211
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    212
    +                                .format(operation_name))
    
    213
    +
    
    214
    +        self.__operations_by_peer[peer] = operation
    
    215
    +        self.__operations_message_queues[peer] = message_queue
    
    216
    +
    
    217
    +        self._send_operations_updates(peers=[peer])
    
    218
    +
    
    219
    +    def unregister_operation_peer(self, operation_name, peer):
    
    220
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    221
    +
    
    222
    +        Args:
    
    223
    +            operation_name (str): an existing operation's name to unsubscribe from.
    
    224
    +            peer (str): a unique string identifying the client.
    
    225
    +
    
    226
    +        Raises:
    
    227
    +            NotFoundError: If no operation with `operation_name` exists.
    
    228
    +        """
    
    229
    +        try:
    
    230
    +            operation = self.__operations_by_name[operation_name]
    
    231
    +
    
    232
    +        except KeyError:
    
    233
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    234
    +                                .format(operation_name))
    
    235
    +
    
    236
    +        if peer in self.__operations_message_queues:
    
    237
    +            del self.__operations_message_queues[peer]
    
    238
    +
    
    239
    +        del self.__operations_by_peer[peer]
    
    240
    +
    
    241
    +        # Drop the operation if nobody is watching it anymore:
    
    242
    +        if operation not in self.__operations_by_peer.values():
    
    243
    +            del self.__operations_by_name[operation.name]
    
    244
    +
    
    245
    +            self.__operations_cancelled.discard(operation.name)
    
    246
    +
    
    247
    +            self.__logger.debug("Operation deleted for job [%s]: [%s]",
    
    248
    +                                self._name, operation.name)
    
    249
    +
    
    250
    +    def list_operations(self):
    
    251
    +        """Lists the :class:`Operation` related to a job.
    
    252
    +
    
    253
    +        Returns:
    
    254
    +            list: A list of :class:`Operation` names.
    
    255
    +        """
    
    256
    +        return list(self.__operations_by_name.keys())
    
    257
    +
    
    258
    +    def get_operation(self, operation_name):
    
    259
    +        """Returns a copy of the the job's :class:`Operation`.
    
    260
    +
    
    261
    +        Args:
    
    262
    +            operation_name (str): the operation's name.
    
    263
    +
    
    264
    +        Raises:
    
    265
    +            NotFoundError: If no operation with `operation_name` exists.
    
    266
    +        """
    
    267
    +        try:
    
    268
    +            operation = self.__operations_by_name[operation_name]
    
    269
    +
    
    270
    +        except KeyError:
    
    271
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    272
    +                                .format(operation_name))
    
    273
    +
    
    274
    +        return self._copy_operation(operation)
    
    275
    +
    
    276
    +    def update_operation_stage(self, stage):
    
    277
    +        """Operates a stage transition for the job's :class:`Operation`.
    
    278
    +
    
    279
    +        Args:
    
    280
    +            stage (OperationStage): the operation stage to transition to.
    
    281
    +        """
    
    282
    +        if stage.value == self.__operation_metadata.stage:
    
    283
    +            return
    
    284
    +
    
    285
    +        self.__operation_metadata.stage = stage.value
    
    286
    +
    
    287
    +        self.__logger.debug("Stage changed for job [%s]: [%s] (operation)",
    
    288
    +                            self._name, stage.name)
    
    289
    +
    
    290
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    291
    +            if self.__queued_timestamp.ByteSize() == 0:
    
    292
    +                self.__queued_timestamp.GetCurrentTime()
    
    293
    +            self._n_tries += 1
    
    294
    +
    
    295
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    296
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    297
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    298
    +
    
    299
    +        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    300
    +            self._done = True
    
    301
    +
    
    302
    +        self._send_operations_updates()
    
    303
    +
    
    304
    +    def cancel_operation(self, operation_name):
    
    305
    +        """Triggers a job's :class:`Operation` cancellation.
    
    306
    +
    
    307
    +        This may cancel any job's :class:`Lease` that may have been issued.
    
    308
    +
    
    309
    +        Args:
    
    310
    +            operation_name (str): the operation's name.
    
    311
    +
    
    312
    +        Raises:
    
    313
    +            NotFoundError: If no operation with `operation_name` exists.
    
    314
    +        """
    
    315
    +        try:
    
    316
    +            operation = self.__operations_by_name[operation_name]
    
    317
    +
    
    318
    +        except KeyError:
    
    319
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    320
    +                                .format(operation_name))
    
    321
    +
    
    322
    +        self.__operations_cancelled.add(operation.name)
    
    323
    +
    
    324
    +        self.__logger.debug("Operation cancelled for job [%s]: [%s]",
    
    325
    +                            self._name, operation.name)
    
    326
    +
    
    327
    +        ongoing_operations = set(self.__operations_by_name.keys())
    
    328
    +        # Job is cancelled if all the operation are:
    
    329
    +        self.__job_cancelled = ongoing_operations.issubset(self.__operations_cancelled)
    
    330
    +
    
    331
    +        if self.__job_cancelled and self._lease is not None:
    
    332
    +            self.cancel_lease()
    
    333
    +
    
    334
    +        peers_to_notify = set()
    
    335
    +        # If the job is not cancelled, notify all the peers watching the given
    
    336
    +        # operation; if the job is cancelled, only notify the peers for which
    
    337
    +        # the operation status changed.
    
    338
    +        for peer, operation in self.__operations_by_peer.items():
    
    339
    +            if self.__job_cancelled:
    
    340
    +                if operation.name not in self.__operations_cancelled:
    
    341
    +                    peers_to_notify.add(peer)
    
    342
    +                elif operation.name == operation_name:
    
    343
    +                    peers_to_notify.add(peer)
    
    344
    +
    
    345
    +            else:
    
    346
    +                if operation.name == operation_name:
    
    347
    +                    peers_to_notify.add(peer)
    
    348
    +
    
    349
    +        self._send_operations_updates(peers=peers_to_notify, notify_cancelled=True)
    
    350
    +
    
    351
    +    # --- Public API: RWAPI ---
    
    100 352
     
    
    101 353
         @property
    
    102 354
         def lease(self):
    
    ... ... @@ -117,69 +369,47 @@ class Job:
    117 369
         def n_tries(self):
    
    118 370
             return self._n_tries
    
    119 371
     
    
    120
    -    @property
    
    121
    -    def n_clients(self):
    
    122
    -        return len(self._operation_update_queues)
    
    123
    -
    
    124
    -    def register_client(self, queue):
    
    125
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    126
    -
    
    127
    -        Queues this :object:`Job` instance.
    
    128
    -
    
    129
    -        Args:
    
    130
    -            queue (queue.Queue): the event queue to register.
    
    131
    -        """
    
    132
    -        self._operation_update_queues.append(queue)
    
    133
    -        queue.put(self)
    
    134
    -
    
    135
    -    def unregister_client(self, queue):
    
    136
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    137
    -
    
    138
    -        Args:
    
    139
    -            queue (queue.Queue): the event queue to unregister.
    
    140
    -        """
    
    141
    -        self._operation_update_queues.remove(queue)
    
    142
    -
    
    143
    -    def set_cached_result(self, action_result):
    
    144
    -        """Allows specifying an action result form the action cache for the job.
    
    145
    -        """
    
    146
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    147
    -        self.__execute_response.result.CopyFrom(action_result)
    
    148
    -        self.__execute_response.cached_result = True
    
    149
    -
    
    150 372
         def create_lease(self):
    
    151 373
             """Emits a new :class:`Lease` for the job.
    
    152 374
     
    
    153 375
             Only one :class:`Lease` can be emitted for a given job. This method
    
    154
    -        should only be used once, any furhter calls are ignored.
    
    376
    +        should only be used once, any further calls are ignored.
    
    155 377
             """
    
    156
    -        if self.__operation_cancelled:
    
    157
    -            return None
    
    158
    -        elif self._lease is not None:
    
    378
    +        if self._lease is not None:
    
    379
    +            return self._lease
    
    380
    +        elif self.__job_cancelled:
    
    159 381
                 return None
    
    160 382
     
    
    161 383
             self._lease = bots_pb2.Lease()
    
    162 384
             self._lease.id = self._name
    
    163 385
             self._lease.payload.Pack(self.__operation_metadata.action_digest)
    
    164
    -        self._lease.state = LeaseState.PENDING.value
    
    386
    +        self._lease.state = LeaseState.UNSPECIFIED.value
    
    387
    +
    
    388
    +        self.__logger.debug("Lease created for job [%s]: [%s]",
    
    389
    +                            self._name, self._lease.id)
    
    390
    +
    
    391
    +        self.update_lease_state(LeaseState.PENDING)
    
    165 392
     
    
    166 393
             return self._lease
    
    167 394
     
    
    168 395
         def update_lease_state(self, state, status=None, result=None):
    
    169
    -        """Operates a state transition for the job's current :class:Lease.
    
    396
    +        """Operates a state transition for the job's current :class:`Lease`.
    
    170 397
     
    
    171 398
             Args:
    
    172 399
                 state (LeaseState): the lease state to transition to.
    
    173
    -            status (google.rpc.Status): the lease execution status, only
    
    174
    -                required if `state` is `COMPLETED`.
    
    175
    -            result (google.protobuf.Any): the lease execution result, only
    
    176
    -                required if `state` is `COMPLETED`.
    
    400
    +            status (google.rpc.Status, optional): the lease execution status,
    
    401
    +                only required if `state` is `COMPLETED`.
    
    402
    +            result (google.protobuf.Any, optional): the lease execution result,
    
    403
    +                only required if `state` is `COMPLETED`.
    
    177 404
             """
    
    178 405
             if state.value == self._lease.state:
    
    179 406
                 return
    
    180 407
     
    
    181 408
             self._lease.state = state.value
    
    182 409
     
    
    410
    +        self.__logger.debug("State changed for job [%s]: [%s] (lease)",
    
    411
    +                            self._name, state.name)
    
    412
    +
    
    183 413
             if self._lease.state == LeaseState.PENDING.value:
    
    184 414
                 self.__worker_start_timestamp.Clear()
    
    185 415
                 self.__worker_completed_timestamp.Clear()
    
    ... ... @@ -214,79 +444,103 @@ class Job:
    214 444
                 self.__execute_response.status.CopyFrom(status)
    
    215 445
     
    
    216 446
         def cancel_lease(self):
    
    217
    -        """Triggers a job's :class:Lease cancellation.
    
    447
    +        """Triggers a job's :class:`Lease` cancellation.
    
    218 448
     
    
    219
    -        This will not cancel the job's :class:Operation.
    
    449
    +        Note:
    
    450
    +            This will not cancel the job's :class:`Operation`.
    
    220 451
             """
    
    221 452
             self.__lease_cancelled = True
    
    453
    +
    
    454
    +        self.__logger.debug("Lease cancelled for job [%s]: [%s]",
    
    455
    +                            self._name, self._lease.id)
    
    456
    +
    
    222 457
             if self._lease is not None:
    
    223 458
                 self.update_lease_state(LeaseState.CANCELLED)
    
    224 459
     
    
    225 460
         def delete_lease(self):
    
    226
    -        """Discard the job's :class:Lease."""
    
    461
    +        """Discard the job's :class:`Lease`.
    
    462
    +
    
    463
    +        Note:
    
    464
    +            This will not cancel the job's :class:`Operation`.
    
    465
    +        """
    
    227 466
             self.__worker_start_timestamp.Clear()
    
    228 467
             self.__worker_completed_timestamp.Clear()
    
    229 468
     
    
    469
    +        self.__logger.debug("Lease deleted for job [%s]: [%s]",
    
    470
    +                            self._name, self._lease.id)
    
    471
    +
    
    230 472
             self._lease = None
    
    231 473
     
    
    232
    -    def update_operation_stage(self, stage):
    
    233
    -        """Operates a stage transition for the job's :class:Operation.
    
    474
    +    # --- Public API: Monitoring ---
    
    234 475
     
    
    235
    -        Args:
    
    236
    -            stage (OperationStage): the operation stage to transition to.
    
    237
    -        """
    
    238
    -        if stage.value == self.__operation_metadata.stage:
    
    239
    -            return
    
    476
    +    def query_queue_time(self):
    
    477
    +        return self.__queued_time_duration.ToTimedelta()
    
    240 478
     
    
    241
    -        self.__operation_metadata.stage = stage.value
    
    479
    +    def query_n_retries(self):
    
    480
    +        return self._n_tries - 1 if self._n_tries > 0 else 0
    
    242 481
     
    
    243
    -        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    244
    -            if self.__queued_timestamp.ByteSize() == 0:
    
    245
    -                self.__queued_timestamp.GetCurrentTime()
    
    246
    -            self._n_tries += 1
    
    482
    +    # --- Private API ---
    
    247 483
     
    
    248
    -        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    249
    -            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    250
    -            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    484
    +    def _copy_operation(self, operation):
    
    485
    +        """Simply duplicates a given :class:`Lease` object."""
    
    486
    +        new_operation = operations_pb2.Operation()
    
    487
    +        new_operation.CopyFrom(operation)
    
    251 488
     
    
    252
    -        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    253
    -            if self.__execute_response is not None:
    
    254
    -                self._operation.response.Pack(self.__execute_response)
    
    255
    -            self._operation.done = True
    
    489
    +        return new_operation
    
    256 490
     
    
    257
    -        self._operation.metadata.Pack(self.__operation_metadata)
    
    491
    +    def _update_operation(self, operation, operation_metadata, execute_response=None, done=False):
    
    492
    +        """Forges a :class:`Operation` message given input data."""
    
    493
    +        operation.metadata.Pack(operation_metadata)
    
    258 494
     
    
    259
    -        for queue in self._operation_update_queues:
    
    260
    -            queue.put(self)
    
    495
    +        if execute_response is not None:
    
    496
    +            operation.response.Pack(execute_response)
    
    261 497
     
    
    262
    -    def check_operation_status(self):
    
    263
    -        """Reports errors on unexpected job's :class:Operation state.
    
    498
    +        operation.done = done
    
    264 499
     
    
    265
    -        Raises:
    
    266
    -            CancelledError: if the job's :class:Operation was cancelled.
    
    267
    -        """
    
    268
    -        if self.__operation_cancelled:
    
    269
    -            raise CancelledError(self.__execute_response.status.message)
    
    500
    +    def _update_cancelled_operation(self, operation, operation_metadata, execute_response=None):
    
    501
    +        """Forges a cancelled :class:`Operation` message given input data."""
    
    502
    +        cancelled_operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    503
    +        cancelled_operation_metadata.CopyFrom(operation_metadata)
    
    504
    +        cancelled_operation_metadata.stage = OperationStage.COMPLETED.value
    
    270 505
     
    
    271
    -    def cancel_operation(self):
    
    272
    -        """Triggers a job's :class:Operation cancellation.
    
    506
    +        operation.metadata.Pack(cancelled_operation_metadata)
    
    273 507
     
    
    274
    -        This will also cancel any job's :class:Lease that may have been issued.
    
    275
    -        """
    
    276
    -        self.__operation_cancelled = True
    
    277
    -        if self._lease is not None:
    
    278
    -            self.cancel_lease()
    
    508
    +        cancelled_execute_response = remote_execution_pb2.ExecuteResponse()
    
    509
    +        if execute_response is not None:
    
    510
    +            cancelled_execute_response.CopyFrom(self.__execute_response)
    
    511
    +        cancelled_execute_response.status.code = code_pb2.CANCELLED
    
    512
    +        cancelled_execute_response.status.message = "Operation cancelled by client."
    
    279 513
     
    
    280
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    281
    -        self.__execute_response.status.code = code_pb2.CANCELLED
    
    282
    -        self.__execute_response.status.message = "Operation cancelled by client."
    
    514
    +        operation.response.Pack(cancelled_execute_response)
    
    283 515
     
    
    284
    -        self.update_operation_stage(OperationStage.COMPLETED)
    
    516
    +        operation.done = True
    
    285 517
     
    
    286
    -    # --- Public API: Monitoring ---
    
    518
    +    def _send_operations_updates(self, peers=None, notify_cancelled=False):
    
    519
    +        """Sends :class:`Operation` stage change messages to watchers."""
    
    520
    +        for operation in self.__operations_by_name.values():
    
    521
    +            if operation.name in self.__operations_cancelled:
    
    522
    +                self._update_cancelled_operation(operation, self.__operation_metadata,
    
    523
    +                                                 execute_response=self.__execute_response)
    
    287 524
     
    
    288
    -    def query_queue_time(self):
    
    289
    -        return self.__queued_time_duration.ToTimedelta()
    
    525
    +            else:
    
    526
    +                self._update_operation(operation, self.__operation_metadata,
    
    527
    +                                       execute_response=self.__execute_response,
    
    528
    +                                       done=self._done)
    
    290 529
     
    
    291
    -    def query_n_retries(self):
    
    292
    -        return self._n_tries - 1 if self._n_tries > 0 else 0
    530
    +        for peer, message_queue in self.__operations_message_queues.items():
    
    531
    +            if peer not in self.__operations_by_peer:
    
    532
    +                continue
    
    533
    +            elif peers and peer not in peers:
    
    534
    +                continue
    
    535
    +
    
    536
    +            operation = self.__operations_by_peer[peer]
    
    537
    +            # Messages are pairs of (Exception, Operation,):
    
    538
    +            if not notify_cancelled and operation.name in self.__operations_cancelled:
    
    539
    +                continue
    
    540
    +            elif operation.name not in self.__operations_cancelled:
    
    541
    +                message = (None, self._copy_operation(operation),)
    
    542
    +            else:
    
    543
    +                message = (CancelledError("Operation has been cancelled"),
    
    544
    +                           self._copy_operation(operation),)
    
    545
    +
    
    546
    +            message_queue.put(message)

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -39,62 +39,43 @@ class OperationsInstance:
    39 39
         def register_instance_with_server(self, instance_name, server):
    
    40 40
             server.add_operations_instance(self, instance_name)
    
    41 41
     
    
    42
    -    def get_operation(self, name):
    
    43
    -        job = self._scheduler.jobs.get(name)
    
    42
    +    def get_operation(self, job_name):
    
    43
    +        try:
    
    44
    +            operation = self._scheduler.get_job_operation(job_name)
    
    44 45
     
    
    45
    -        if job is None:
    
    46
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    46
    +        except NotFoundError:
    
    47
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    47 48
     
    
    48
    -        else:
    
    49
    -            return job.operation
    
    49
    +        return operation
    
    50 50
     
    
    51 51
         def list_operations(self, list_filter, page_size, page_token):
    
    52 52
             # TODO: Pages
    
    53 53
             # Spec says number of pages and length of a page are optional
    
    54 54
             response = operations_pb2.ListOperationsResponse()
    
    55
    +
    
    56
    +        operation_names = [operation_name for job_name in
    
    57
    +                           self._scheduler.list_current_jobs() for operation_name in
    
    58
    +                           self._scheduler.list_job_operations(job_name)]
    
    59
    +
    
    55 60
             operations = []
    
    56
    -        for job in self._scheduler.list_jobs():
    
    57
    -            op = operations_pb2.Operation()
    
    58
    -            op.CopyFrom(job.operation)
    
    59
    -            operations.append(op)
    
    61
    +        for operation_name in operation_names:
    
    62
    +            operation = self._scheduler.get_job_operation(operation_name)
    
    63
    +            operations.append(operation)
    
    60 64
     
    
    61 65
             response.operations.extend(operations)
    
    62 66
     
    
    63 67
             return response
    
    64 68
     
    
    65
    -    def delete_operation(self, name):
    
    66
    -        try:
    
    67
    -            self._scheduler.jobs.pop(name)
    
    68
    -
    
    69
    -        except KeyError:
    
    70
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    71
    -
    
    72
    -    def cancel_operation(self, name):
    
    69
    +    def delete_operation(self, job_name):
    
    73 70
             try:
    
    74
    -            self._scheduler.cancel_job_operation(name)
    
    71
    +            self._scheduler.delete_job_operation(job_name)
    
    75 72
     
    
    76
    -        except KeyError:
    
    77
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73
    +        except NotFoundError:
    
    74
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    78 75
     
    
    79
    -    def register_message_client(self, name, queue):
    
    76
    +    def cancel_operation(self, job_name):
    
    80 77
             try:
    
    81
    -            self._scheduler.register_client(name, queue)
    
    82
    -
    
    83
    -        except KeyError:
    
    84
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    85
    -
    
    86
    -    def unregister_message_client(self, name, queue):
    
    87
    -        try:
    
    88
    -            self._scheduler.unregister_client(name, queue)
    
    89
    -
    
    90
    -        except KeyError:
    
    91
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    92
    -
    
    93
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    94
    -        job = message_queue.get()
    
    95
    -        while not job.operation.done:
    
    96
    -            yield job.operation
    
    97
    -            job = message_queue.get()
    
    98
    -            job.check_operation_status()
    
    78
    +            self._scheduler.cancel_job_operation(job_name)
    
    99 79
     
    
    100
    -        yield job.operation
    80
    +        except NotFoundError:
    
    81
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/scheduler.py
    ... ... @@ -19,12 +19,13 @@ Scheduler
    19 19
     Schedules jobs.
    
    20 20
     """
    
    21 21
     
    
    22
    -from collections import deque
    
    22
    +import bisect
    
    23 23
     from datetime import timedelta
    
    24 24
     import logging
    
    25 25
     
    
    26 26
     from buildgrid._enums import LeaseState, OperationStage
    
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28
    +from buildgrid.server.job import Job
    
    28 29
     
    
    29 30
     
    
    30 31
     class Scheduler:
    
    ... ... @@ -42,8 +43,12 @@ class Scheduler:
    42 43
             self.__retries_count = 0
    
    43 44
     
    
    44 45
             self._action_cache = action_cache
    
    45
    -        self.jobs = {}
    
    46
    -        self.queue = deque()
    
    46
    +
    
    47
    +        self.__jobs_by_action = {}  # Action to Job 1:1 mapping
    
    48
    +        self.__jobs_by_operation = {}  # Operation to Job 1:1 mapping
    
    49
    +        self.__jobs_by_name = {}  # Name to Job 1:1 mapping
    
    50
    +
    
    51
    +        self.__queue = []
    
    47 52
     
    
    48 53
             self._is_instrumented = monitor
    
    49 54
     
    
    ... ... @@ -52,61 +57,215 @@ class Scheduler:
    52 57
     
    
    53 58
         # --- Public API ---
    
    54 59
     
    
    55
    -    def register_client(self, job_name, queue):
    
    56
    -        job = self.jobs[job_name]
    
    60
    +    def list_current_jobs(self):
    
    61
    +        """Returns a list of the :class:`Job` names currently managed."""
    
    62
    +        return self.__jobs_by_name.keys()
    
    63
    +
    
    64
    +    def list_job_operations(self, job_name):
    
    65
    +        """Returns a list of :class:`Operation` names for a :class:`Job`."""
    
    66
    +        if job_name in self.__jobs_by_name:
    
    67
    +            return self.__jobs_by_name[job_name].list_operations()
    
    68
    +        else:
    
    69
    +            return []
    
    70
    +
    
    71
    +    # --- Public API: REAPI ---
    
    72
    +
    
    73
    +    def register_job_peer(self, job_name, peer, message_queue):
    
    74
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    75
    +
    
    76
    +        Args:
    
    77
    +            job_name (str): name of the job to subscribe to.
    
    78
    +            peer (str): a unique string identifying the client.
    
    79
    +            message_queue (queue.Queue): the event queue to register.
    
    80
    +
    
    81
    +        Returns:
    
    82
    +            str: The name of the subscribed :class:`Operation`.
    
    83
    +
    
    84
    +        Raises:
    
    85
    +            NotFoundError: If no job with `job_name` exists.
    
    86
    +        """
    
    87
    +        try:
    
    88
    +            job = self.__jobs_by_name[job_name]
    
    89
    +
    
    90
    +        except KeyError:
    
    91
    +            raise NotFoundError("Job name does not exist: [{}]"
    
    92
    +                                .format(job_name))
    
    93
    +
    
    94
    +        operation_name = job.register_new_operation_peer(peer, message_queue)
    
    95
    +
    
    96
    +        self.__jobs_by_operation[operation_name] = job
    
    97
    +
    
    98
    +        return operation_name
    
    99
    +
    
    100
    +    def register_job_operation_peer(self, operation_name, peer, message_queue):
    
    101
    +        """Subscribes to an existing the job's :class:`Operation` stage changes.
    
    57 102
     
    
    58
    -        job.register_client(queue)
    
    103
    +        Args:
    
    104
    +            operation_name (str): name of the operation to subscribe to.
    
    105
    +            peer (str): a unique string identifying the client.
    
    106
    +            message_queue (queue.Queue): the event queue to register.
    
    107
    +
    
    108
    +        Returns:
    
    109
    +            str: The name of the subscribed :class:`Operation`.
    
    110
    +
    
    111
    +        Raises:
    
    112
    +            NotFoundError: If no operation with `operation_name` exists.
    
    113
    +        """
    
    114
    +        try:
    
    115
    +            job = self.__jobs_by_operation[operation_name]
    
    59 116
     
    
    60
    -    def unregister_client(self, job_name, queue):
    
    61
    -        job = self.jobs[job_name]
    
    117
    +        except KeyError:
    
    118
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    119
    +                                .format(operation_name))
    
    120
    +
    
    121
    +        job.register_operation_peer(operation_name, peer, message_queue)
    
    62 122
     
    
    63
    -        job.unregister_client(queue)
    
    123
    +    def unregister_job_operation_peer(self, operation_name, peer):
    
    124
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    64 125
     
    
    65
    -        if not job.n_clients and job.operation.done and not job.lease:
    
    126
    +        Args:
    
    127
    +            operation_name (str): name of the operation to unsubscribe from.
    
    128
    +            peer (str): a unique string identifying the client.
    
    129
    +
    
    130
    +        Raises:
    
    131
    +            NotFoundError: If no operation with `operation_name` exists.
    
    132
    +        """
    
    133
    +        try:
    
    134
    +            job = self.__jobs_by_operation[operation_name]
    
    135
    +
    
    136
    +        except KeyError:
    
    137
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    138
    +                                .format(operation_name))
    
    139
    +
    
    140
    +        job.unregister_operation_peer(operation_name, peer)
    
    141
    +
    
    142
    +        if not job.n_peers_for_operation(operation_name):
    
    143
    +            del self.__jobs_by_operation[operation_name]
    
    144
    +
    
    145
    +        if not job.n_peers and job.done and not job.lease:
    
    66 146
                 self._delete_job(job.name)
    
    67 147
     
    
    68
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    69
    -        self.jobs[job.name] = job
    
    148
    +    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    149
    +        """Inserts a newly created job into the execution queue.
    
    150
    +
    
    151
    +        Warning:
    
    152
    +            Priority is handle like a POSIX ``nice`` values: a higher value
    
    153
    +            means a low priority, 0 being default priority.
    
    154
    +
    
    155
    +        Args:
    
    156
    +            action (Action): the given action to queue for execution.
    
    157
    +            action_digest (Digest): the digest of the given action.
    
    158
    +            priority (int): the execution job's priority.
    
    159
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    160
    +                result for the given action.
    
    161
    +
    
    162
    +        Returns:
    
    163
    +            str: the newly created job's name.
    
    164
    +        """
    
    165
    +        if action_digest.hash in self.__jobs_by_action:
    
    166
    +            job = self.__jobs_by_action[action_digest.hash]
    
    167
    +            # If existing job has been cancelled create a new one:
    
    168
    +            if not job.cancelled:
    
    169
    +                # Reschedule if priority is now greater:
    
    170
    +                if priority < job.priority:
    
    171
    +                    job.priority = priority
    
    172
    +
    
    173
    +                    if job.operation_stage == OperationStage.QUEUED:
    
    174
    +                        self._queue_job(job.name)
    
    175
    +
    
    176
    +                self.__logger.debug("Job deduplicated for action [%s]: [%s]",
    
    177
    +                                    action_digest.hash[:8], job.name)
    
    178
    +
    
    179
    +                return job.name
    
    180
    +
    
    181
    +        job = Job(action, action_digest, priority=priority)
    
    182
    +
    
    183
    +        self.__logger.debug("Job created for action [%s]: [%s]",
    
    184
    +                            action_digest.hash[:8], job.name)
    
    185
    +
    
    186
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    187
    +        self.__jobs_by_name[job.name] = job
    
    70 188
     
    
    71 189
             operation_stage = None
    
    190
    +
    
    72 191
             if self._action_cache is not None and not skip_cache_lookup:
    
    73 192
                 try:
    
    74 193
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    75
    -            except NotFoundError:
    
    76
    -                operation_stage = OperationStage.QUEUED
    
    77
    -                self.queue.append(job)
    
    78 194
     
    
    79
    -            else:
    
    80
    -                job.set_cached_result(action_result)
    
    195
    +                self.__logger.debug("Job cache hit for action [%s]: [%s]",
    
    196
    +                                    action_digest.hash[:8], job.name)
    
    197
    +
    
    81 198
                     operation_stage = OperationStage.COMPLETED
    
    199
    +                job.set_cached_result(action_result)
    
    82 200
     
    
    83
    -                if self._is_instrumented:
    
    84
    -                    self.__retries_count += 1
    
    201
    +            except NotFoundError:
    
    202
    +                operation_stage = OperationStage.QUEUED
    
    203
    +                self._queue_job(job.name)
    
    85 204
     
    
    86 205
             else:
    
    87 206
                 operation_stage = OperationStage.QUEUED
    
    88
    -            self.queue.append(job)
    
    207
    +            self._queue_job(job.name)
    
    89 208
     
    
    90 209
             self._update_job_operation_stage(job.name, operation_stage)
    
    91 210
     
    
    92
    -    def retry_job(self, job_name):
    
    93
    -        job = self.jobs[job_name]
    
    211
    +        return job.name
    
    94 212
     
    
    95
    -        operation_stage = None
    
    96
    -        if job.n_tries >= self.MAX_N_TRIES:
    
    97
    -            # TODO: Decide what to do with these jobs
    
    98
    -            operation_stage = OperationStage.COMPLETED
    
    99
    -            # TODO: Mark these jobs as done
    
    213
    +    def get_job_operation(self, operation_name):
    
    214
    +        """Retrieves a job's :class:`Operation` by name.
    
    100 215
     
    
    101
    -        else:
    
    102
    -            operation_stage = OperationStage.QUEUED
    
    103
    -            job.update_lease_state(LeaseState.PENDING)
    
    104
    -            self.queue.append(job)
    
    216
    +        Args:
    
    217
    +            operation_name (str): name of the operation to query.
    
    105 218
     
    
    106
    -        self._update_job_operation_stage(job_name, operation_stage)
    
    219
    +        Raises:
    
    220
    +            NotFoundError: If no operation with `operation_name` exists.
    
    221
    +        """
    
    222
    +        try:
    
    223
    +            job = self.__jobs_by_operation[operation_name]
    
    224
    +
    
    225
    +        except KeyError:
    
    226
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    227
    +                                .format(operation_name))
    
    228
    +
    
    229
    +        return job.get_operation(operation_name)
    
    230
    +
    
    231
    +    def cancel_job_operation(self, operation_name):
    
    232
    +        """"Cancels a job's :class:`Operation` by name.
    
    233
    +
    
    234
    +        Args:
    
    235
    +            operation_name (str): name of the operation to cancel.
    
    236
    +
    
    237
    +        Raises:
    
    238
    +            NotFoundError: If no operation with `operation_name` exists.
    
    239
    +        """
    
    240
    +        try:
    
    241
    +            job = self.__jobs_by_operation[operation_name]
    
    242
    +
    
    243
    +        except KeyError:
    
    244
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    245
    +                                .format(operation_name))
    
    246
    +
    
    247
    +        job.cancel_operation(operation_name)
    
    248
    +
    
    249
    +    def delete_job_operation(self, operation_name):
    
    250
    +        """"Removes a job.
    
    251
    +
    
    252
    +        Args:
    
    253
    +            operation_name (str): name of the operation to delete.
    
    107 254
     
    
    108
    -    def list_jobs(self):
    
    109
    -        return self.jobs.values()
    
    255
    +        Raises:
    
    256
    +            NotFoundError: If no operation with `operation_name` exists.
    
    257
    +        """
    
    258
    +        try:
    
    259
    +            job = self.__jobs_by_operation[operation_name]
    
    260
    +
    
    261
    +        except KeyError:
    
    262
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    263
    +                                .format(operation_name))
    
    264
    +
    
    265
    +        if not job.n_peers and job.done and not job.lease:
    
    266
    +            self._delete_job(job.name)
    
    267
    +
    
    268
    +    # --- Public API: RWAPI ---
    
    110 269
     
    
    111 270
         def request_job_leases(self, worker_capabilities):
    
    112 271
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -115,11 +274,16 @@ class Scheduler:
    115 274
                 worker_capabilities (dict): a set of key-value pairs decribing the
    
    116 275
                     worker properties, configuration and state at the time of the
    
    117 276
                     request.
    
    277
    +
    
    278
    +        Warning: Worker capabilities handling is not implemented at the moment!
    
    118 279
             """
    
    119
    -        if not self.queue:
    
    280
    +        if not self.__queue:
    
    120 281
                 return []
    
    121 282
     
    
    122
    -        job = self.queue.popleft()
    
    283
    +        # TODO: Try to match worker_capabilities with jobs properties.
    
    284
    +        job = self.__queue.pop()
    
    285
    +
    
    286
    +        self.__logger.info("Job de-queued: [%s]", job.name)
    
    123 287
     
    
    124 288
             lease = job.lease
    
    125 289
     
    
    ... ... @@ -132,18 +296,25 @@ class Scheduler:
    132 296
     
    
    133 297
             return None
    
    134 298
     
    
    135
    -    def update_job_lease(self, lease):
    
    299
    +    def update_job_lease_state(self, job_name, lease):
    
    136 300
             """Requests a state transition for a job's current :class:Lease.
    
    137 301
     
    
    302
    +        Note:
    
    303
    +            This may trigger a job's :class:`Operation` stage transition.
    
    304
    +
    
    138 305
             Args:
    
    139
    -            job_name (str): name of the job to query.
    
    140
    -            lease_state (LeaseState): the lease state to transition to.
    
    141
    -            lease_status (google.rpc.Status): the lease execution status, only
    
    142
    -                required if `lease_state` is `COMPLETED`.
    
    143
    -            lease_result (google.protobuf.Any): the lease execution result, only
    
    144
    -                required if `lease_state` is `COMPLETED`.
    
    306
    +            job_name (str): name of the job to update lease state from.
    
    307
    +            lease (Lease): the lease holding the new state.
    
    308
    +
    
    309
    +        Raises:
    
    310
    +            NotFoundError: If no job with `job_name` exists.
    
    145 311
             """
    
    146
    -        job = self.jobs[lease.id]
    
    312
    +        try:
    
    313
    +            job = self.__jobs_by_name[job_name]
    
    314
    +
    
    315
    +        except KeyError:
    
    316
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    317
    +
    
    147 318
             lease_state = LeaseState(lease.state)
    
    148 319
     
    
    149 320
             operation_stage = None
    
    ... ... @@ -179,38 +350,96 @@ class Scheduler:
    179 350
                     self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    180 351
                     self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
    
    181 352
     
    
    182
    -        self._update_job_operation_stage(lease.id, operation_stage)
    
    353
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    354
    +
    
    355
    +    def retry_job_lease(self, job_name):
    
    356
    +        """Re-queues a job on lease execution failure.
    
    357
    +
    
    358
    +        Note:
    
    359
    +            This may trigger a job's :class:`Operation` stage transition.
    
    360
    +
    
    361
    +        Args:
    
    362
    +            job_name (str): name of the job to retry the lease from.
    
    363
    +
    
    364
    +        Raises:
    
    365
    +            NotFoundError: If no job with `job_name` exists.
    
    366
    +        """
    
    367
    +        try:
    
    368
    +            job = self.__jobs_by_name[job_name]
    
    369
    +
    
    370
    +        except KeyError:
    
    371
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    372
    +
    
    373
    +        operation_stage = None
    
    374
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    375
    +            # TODO: Decide what to do with these jobs
    
    376
    +            operation_stage = OperationStage.COMPLETED
    
    377
    +            # TODO: Mark these jobs as done
    
    378
    +
    
    379
    +        else:
    
    380
    +            operation_stage = OperationStage.QUEUED
    
    381
    +            self._queue_job(job.name)
    
    382
    +
    
    383
    +            job.update_lease_state(LeaseState.PENDING)
    
    384
    +
    
    385
    +            if self._is_instrumented:
    
    386
    +                self.__retries_count += 1
    
    387
    +
    
    388
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    183 389
     
    
    184 390
         def get_job_lease(self, job_name):
    
    185
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    186
    -        return self.jobs[job_name].lease
    
    391
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    187 392
     
    
    188
    -    def get_job_lease_cancelled(self, job_name):
    
    189
    -        """Returns true if the lease is cancelled"""
    
    190
    -        return self.jobs[job_name].lease_cancelled
    
    393
    +        Args:
    
    394
    +            job_name (str): name of the job to query the lease from.
    
    395
    +
    
    396
    +        Raises:
    
    397
    +            NotFoundError: If no job with `job_name` exists.
    
    398
    +        """
    
    399
    +        try:
    
    400
    +            job = self.__jobs_by_name[job_name]
    
    401
    +
    
    402
    +        except KeyError:
    
    403
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    404
    +
    
    405
    +        return job.lease
    
    191 406
     
    
    192 407
         def delete_job_lease(self, job_name):
    
    193
    -        """Discards the lease associated to a job."""
    
    194
    -        job = self.jobs[job_name]
    
    408
    +        """Discards the lease associated with a job.
    
    195 409
     
    
    196
    -        self.jobs[job.name].delete_lease()
    
    410
    +        Args:
    
    411
    +            job_name (str): name of the job to delete the lease from.
    
    197 412
     
    
    198
    -        if not job.n_clients and job.operation.done:
    
    199
    -            self._delete_job(job.name)
    
    413
    +        Raises:
    
    414
    +            NotFoundError: If no job with `job_name` exists.
    
    415
    +        """
    
    416
    +        try:
    
    417
    +            job = self.__jobs_by_name[job_name]
    
    200 418
     
    
    201
    -    def get_job_operation(self, job_name):
    
    202
    -        """Returns the operation associated to job."""
    
    203
    -        return self.jobs[job_name].operation
    
    419
    +        except KeyError:
    
    420
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    204 421
     
    
    205
    -    def cancel_job_operation(self, job_name):
    
    206
    -        """"Cancels the underlying operation of a given job.
    
    422
    +        job.delete_lease()
    
    207 423
     
    
    208
    -        This will also cancel any job's lease that may have been issued.
    
    424
    +        if not job.n_peers and job.done:
    
    425
    +            self._delete_job(job.name)
    
    426
    +
    
    427
    +    def get_job_lease_cancelled(self, job_name):
    
    428
    +        """Returns true if the lease is cancelled.
    
    209 429
     
    
    210 430
             Args:
    
    211
    -            job_name (str): name of the job holding the operation to cancel.
    
    431
    +            job_name (str): name of the job to query the lease state from.
    
    432
    +
    
    433
    +        Raises:
    
    434
    +            NotFoundError: If no job with `job_name` exists.
    
    212 435
             """
    
    213
    -        self.jobs[job_name].cancel_operation()
    
    436
    +        try:
    
    437
    +            job = self.__jobs_by_name[job_name]
    
    438
    +
    
    439
    +        except KeyError:
    
    440
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    441
    +
    
    442
    +        return job.lease_cancelled
    
    214 443
     
    
    215 444
         # --- Public API: Monitoring ---
    
    216 445
     
    
    ... ... @@ -260,11 +489,11 @@ class Scheduler:
    260 489
                 self.__build_metadata_queues.append(message_queue)
    
    261 490
     
    
    262 491
         def query_n_jobs(self):
    
    263
    -        return len(self.jobs)
    
    492
    +        return len(self.__jobs_by_name)
    
    264 493
     
    
    265 494
         def query_n_operations(self):
    
    266 495
             # For now n_operations == n_jobs:
    
    267
    -        return len(self.jobs)
    
    496
    +        return len(self.__jobs_by_operation)
    
    268 497
     
    
    269 498
         def query_n_operations_by_stage(self, operation_stage):
    
    270 499
             try:
    
    ... ... @@ -275,7 +504,7 @@ class Scheduler:
    275 504
             return 0
    
    276 505
     
    
    277 506
         def query_n_leases(self):
    
    278
    -        return len(self.jobs)
    
    507
    +        return len(self.__jobs_by_name)
    
    279 508
     
    
    280 509
         def query_n_leases_by_state(self, lease_state):
    
    281 510
             try:
    
    ... ... @@ -295,19 +524,39 @@ class Scheduler:
    295 524
     
    
    296 525
         # --- Private API ---
    
    297 526
     
    
    527
    +    def _queue_job(self, job_name):
    
    528
    +        """Schedules or reschedules a job."""
    
    529
    +        job = self.__jobs_by_name[job_name]
    
    530
    +
    
    531
    +        if job.operation_stage == OperationStage.QUEUED:
    
    532
    +            self.__queue.sort()
    
    533
    +
    
    534
    +        else:
    
    535
    +            bisect.insort(self.__queue, job)
    
    536
    +
    
    537
    +        self.__logger.info("Job queued: [%s]", job.name)
    
    538
    +
    
    298 539
         def _delete_job(self, job_name):
    
    299 540
             """Drops an entry from the internal list of jobs."""
    
    300
    -        del self.jobs[job_name]
    
    541
    +        job = self.__jobs_by_name[job_name]
    
    542
    +
    
    543
    +        if job.operation_stage == OperationStage.QUEUED:
    
    544
    +            self.__queue.remove(job)
    
    545
    +
    
    546
    +        del self.__jobs_by_action[job.action_digest.hash]
    
    547
    +        del self.__jobs_by_name[job.name]
    
    548
    +
    
    549
    +        self.__logger.info("Job deleted: [%s]", job.name)
    
    301 550
     
    
    302 551
             if self._is_instrumented:
    
    303
    -            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    304
    -            self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    305
    -            self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    306
    -            self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    552
    +            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
    
    553
    +            self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
    
    554
    +            self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
    
    555
    +            self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
    
    307 556
     
    
    308
    -            self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    309
    -            self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    310
    -            self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    557
    +            self.__leases_by_state[LeaseState.PENDING].discard(job.name)
    
    558
    +            self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
    
    559
    +            self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
    
    311 560
     
    
    312 561
         def _update_job_operation_stage(self, job_name, operation_stage):
    
    313 562
             """Requests a stage transition for the job's :class:Operations.
    
    ... ... @@ -316,7 +565,7 @@ class Scheduler:
    316 565
                 job_name (str): name of the job to query.
    
    317 566
                 operation_stage (OperationStage): the stage to transition to.
    
    318 567
             """
    
    319
    -        job = self.jobs[job_name]
    
    568
    +        job = self.__jobs_by_name[job_name]
    
    320 569
     
    
    321 570
             if operation_stage == OperationStage.CACHE_CHECK:
    
    322 571
                 job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    ... ... @@ -365,7 +614,7 @@ class Scheduler:
    365 614
     
    
    366 615
                     self.__queue_time_average = average_order, average_time
    
    367 616
     
    
    368
    -                if not job.holds_cached_action_result:
    
    617
    +                if not job.holds_cached_result:
    
    369 618
                         execution_metadata = job.action_result.execution_metadata
    
    370 619
                         context_metadata = {'job-is': job.name}
    
    371 620
     
    

  • docs/source/conf.py
    ... ... @@ -182,3 +182,11 @@ texinfo_documents = [
    182 182
          author, 'BuildGrid', 'One line description of project.',
    
    183 183
          'Miscellaneous'),
    
    184 184
     ]
    
    185
    +
    
    186
    +# -- Options for the autodoc extension ----------------------------------------
    
    187
    +
    
    188
    +# This value selects if automatically documented members are sorted
    
    189
    +# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
    
    190
    +# by source order (value 'bysource'). The default is alphabetical.
    
    191
    +autodoc_member_order = 'bysource'
    
    192
    +

  • tests/integration/bots_service.py
    ... ... @@ -25,7 +25,6 @@ import pytest
    25 25
     
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 27
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    28
    -from buildgrid.server import job
    
    29 28
     from buildgrid.server.controller import ExecutionController
    
    30 29
     from buildgrid.server.job import LeaseState
    
    31 30
     from buildgrid.server.bots import service
    
    ... ... @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance):
    159 158
     def _inject_work(scheduler, action=None, action_digest=None):
    
    160 159
         if not action:
    
    161 160
             action = remote_execution_pb2.Action()
    
    161
    +
    
    162 162
         if not action_digest:
    
    163 163
             action_digest = remote_execution_pb2.Digest()
    
    164
    -    j = job.Job(action, action_digest)
    
    165
    -    scheduler.queue_job(j, True)
    164
    +
    
    165
    +    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -17,14 +17,15 @@
    17 17
     
    
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20
    +import queue
    
    20 21
     import uuid
    
    21 22
     from unittest import mock
    
    22 23
     
    
    23
    -from google.protobuf import any_pb2
    
    24 24
     import grpc
    
    25 25
     from grpc._server import _Context
    
    26 26
     import pytest
    
    27 27
     
    
    28
    +from buildgrid._enums import OperationStage
    
    28 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 30
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 31
     
    
    ... ... @@ -82,7 +83,7 @@ def test_execute(skip_cache_lookup, instance, context):
    82 83
         assert isinstance(result, operations_pb2.Operation)
    
    83 84
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 85
         result.metadata.Unpack(metadata)
    
    85
    -    assert metadata.stage == job.OperationStage.QUEUED.value
    
    86
    +    assert metadata.stage == OperationStage.QUEUED.value
    
    86 87
         operation_uuid = result.name.split('/')[-1]
    
    87 88
         assert uuid.UUID(operation_uuid, version=4)
    
    88 89
         assert result.done is False
    
    ... ... @@ -106,18 +107,19 @@ def test_no_action_digest_in_storage(instance, context):
    106 107
     
    
    107 108
     
    
    108 109
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    110
    -    j._operation.done = True
    
    110
    +    job_name = controller.execution_instance._scheduler.queue_job_action(action,
    
    111
    +                                                                         action_digest,
    
    112
    +                                                                         skip_cache_lookup=True)
    
    111 113
     
    
    112
    -    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    114
    +    message_queue = queue.Queue()
    
    115
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    116
    +                                                                     context.peer(),
    
    117
    +                                                                     message_queue)
    
    113 118
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    119
    +    controller.execution_instance._scheduler._update_job_operation_stage(job_name,
    
    120
    +                                                                         OperationStage.COMPLETED)
    
    115 121
     
    
    116
    -    action_result_any = any_pb2.Any()
    
    117
    -    action_result = remote_execution_pb2.ActionResult()
    
    118
    -    action_result_any.Pack(action_result)
    
    119
    -
    
    120
    -    j.update_operation_stage(job.OperationStage.COMPLETED)
    
    122
    +    request = remote_execution_pb2.WaitExecutionRequest(name=operation_name)
    
    121 123
     
    
    122 124
         response = instance.WaitExecution(request, context)
    
    123 125
     
    
    ... ... @@ -127,7 +129,6 @@ def test_wait_execution(instance, controller, context):
    127 129
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    128 130
         result.metadata.Unpack(metadata)
    
    129 131
         assert metadata.stage == job.OperationStage.COMPLETED.value
    
    130
    -    assert uuid.UUID(result.name, version=4)
    
    131 132
         assert result.done is True
    
    132 133
     
    
    133 134
     
    

  • tests/integration/operations_service.py
    ... ... @@ -17,6 +17,7 @@
    17 17
     
    
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20
    +import queue
    
    20 21
     from unittest import mock
    
    21 22
     
    
    22 23
     from google.protobuf import any_pb2
    
    ... ... @@ -86,8 +87,13 @@ def blank_instance(controller):
    86 87
     
    
    87 88
     # Queue an execution, get operation corresponding to that request
    
    88 89
     def test_get_operation(instance, controller, execute_request, context):
    
    89
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    90
    -                                                             execute_request.skip_cache_lookup)
    
    90
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    91
    +                                                     execute_request.skip_cache_lookup)
    
    92
    +
    
    93
    +    message_queue = queue.Queue()
    
    94
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    95
    +                                                                     context.peer(),
    
    96
    +                                                                     message_queue)
    
    91 97
     
    
    92 98
         request = operations_pb2.GetOperationRequest()
    
    93 99
     
    
    ... ... @@ -95,25 +101,28 @@ def test_get_operation(instance, controller, execute_request, context):
    95 101
         # we're manually creating the instance here, it doesn't get a name.
    
    96 102
         # Therefore we need to manually add the instance name to the operation
    
    97 103
         # name in the GetOperation request.
    
    98
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    104
    +    request.name = "{}/{}".format(instance_name, operation_name)
    
    99 105
     
    
    100 106
         response = instance.GetOperation(request, context)
    
    101
    -    assert response.name == "{}/{}".format(instance_name, response_execute.name)
    
    102
    -    assert response.done == response_execute.done
    
    107
    +    assert response.name == "{}/{}".format(instance_name, operation_name)
    
    103 108
     
    
    104 109
     
    
    105 110
     # Queue an execution, get operation corresponding to that request
    
    106 111
     def test_get_operation_blank(blank_instance, controller, execute_request, context):
    
    107
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    108
    -                                                             execute_request.skip_cache_lookup)
    
    112
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    113
    +                                                     execute_request.skip_cache_lookup)
    
    114
    +
    
    115
    +    message_queue = queue.Queue()
    
    116
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    117
    +                                                                     context.peer(),
    
    118
    +                                                                     message_queue)
    
    109 119
     
    
    110 120
         request = operations_pb2.GetOperationRequest()
    
    111 121
     
    
    112
    -    request.name = response_execute.name
    
    122
    +    request.name = operation_name
    
    113 123
     
    
    114 124
         response = blank_instance.GetOperation(request, context)
    
    115
    -    assert response.name == response_execute.name
    
    116
    -    assert response.done == response_execute.done
    
    125
    +    assert response.name == operation_name
    
    117 126
     
    
    118 127
     
    
    119 128
     def test_get_operation_fail(instance, context):
    
    ... ... @@ -133,25 +142,35 @@ def test_get_operation_instance_fail(instance, context):
    133 142
     
    
    134 143
     
    
    135 144
     def test_list_operations(instance, controller, execute_request, context):
    
    136
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    137
    -                                                             execute_request.skip_cache_lookup)
    
    145
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    146
    +                                                     execute_request.skip_cache_lookup)
    
    147
    +
    
    148
    +    message_queue = queue.Queue()
    
    149
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    150
    +                                                                     context.peer(),
    
    151
    +                                                                     message_queue)
    
    138 152
     
    
    139 153
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    140 154
         response = instance.ListOperations(request, context)
    
    141 155
     
    
    142 156
         names = response.operations[0].name.split('/')
    
    143 157
         assert names[0] == instance_name
    
    144
    -    assert names[1] == response_execute.name
    
    158
    +    assert names[1] == operation_name
    
    145 159
     
    
    146 160
     
    
    147 161
     def test_list_operations_blank(blank_instance, controller, execute_request, context):
    
    148
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    149
    -                                                             execute_request.skip_cache_lookup)
    
    162
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    163
    +                                                     execute_request.skip_cache_lookup)
    
    164
    +
    
    165
    +    message_queue = queue.Queue()
    
    166
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    167
    +                                                                     context.peer(),
    
    168
    +                                                                     message_queue)
    
    150 169
     
    
    151 170
         request = operations_pb2.ListOperationsRequest(name='')
    
    152 171
         response = blank_instance.ListOperations(request, context)
    
    153 172
     
    
    154
    -    assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    173
    +    assert response.operations[0].name.split('/')[-1] == operation_name
    
    155 174
     
    
    156 175
     
    
    157 176
     def test_list_operations_instance_fail(instance, controller, execute_request, context):
    
    ... ... @@ -174,14 +193,19 @@ def test_list_operations_empty(instance, context):
    174 193
     
    
    175 194
     # Send execution off, delete, try to find operation should fail
    
    176 195
     def test_delete_operation(instance, controller, execute_request, context):
    
    177
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    178
    -                                                             execute_request.skip_cache_lookup)
    
    196
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    197
    +                                                     execute_request.skip_cache_lookup)
    
    198
    +
    
    199
    +    message_queue = queue.Queue()
    
    200
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    201
    +                                                                     context.peer(),
    
    202
    +                                                                     message_queue)
    
    179 203
     
    
    180 204
         request = operations_pb2.DeleteOperationRequest()
    
    181
    -    request.name = response_execute.name
    
    205
    +    request.name = operation_name
    
    182 206
         instance.DeleteOperation(request, context)
    
    183 207
     
    
    184
    -    request_name = "{}/{}".format(instance_name, response_execute.name)
    
    208
    +    request_name = "{}/{}".format(instance_name, operation_name)
    
    185 209
     
    
    186 210
         with pytest.raises(InvalidArgumentError):
    
    187 211
             controller.operations_instance.get_operation(request_name)
    
    ... ... @@ -189,17 +213,11 @@ def test_delete_operation(instance, controller, execute_request, context):
    189 213
     
    
    190 214
     # Send execution off, delete, try to find operation should fail
    
    191 215
     def test_delete_operation_blank(blank_instance, controller, execute_request, context):
    
    192
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    193
    -                                                             execute_request.skip_cache_lookup)
    
    194
    -
    
    195 216
         request = operations_pb2.DeleteOperationRequest()
    
    196
    -    request.name = response_execute.name
    
    217
    +    request.name = "runner"
    
    197 218
         blank_instance.DeleteOperation(request, context)
    
    198 219
     
    
    199
    -    request_name = response_execute.name
    
    200
    -
    
    201
    -    with pytest.raises(InvalidArgumentError):
    
    202
    -        controller.operations_instance.get_operation(request_name)
    
    220
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    203 221
     
    
    204 222
     
    
    205 223
     def test_delete_operation_fail(instance, context):
    
    ... ... @@ -211,11 +229,16 @@ def test_delete_operation_fail(instance, context):
    211 229
     
    
    212 230
     
    
    213 231
     def test_cancel_operation(instance, controller, execute_request, context):
    
    214
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    215
    -                                                             execute_request.skip_cache_lookup)
    
    232
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    233
    +                                                     execute_request.skip_cache_lookup)
    
    234
    +
    
    235
    +    message_queue = queue.Queue()
    
    236
    +    operation_name = controller.execution_instance.register_job_peer(job_name,
    
    237
    +                                                                     context.peer(),
    
    238
    +                                                                     message_queue)
    
    216 239
     
    
    217 240
         request = operations_pb2.CancelOperationRequest()
    
    218
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    241
    +    request.name = "{}/{}".format(instance_name, operation_name)
    
    219 242
     
    
    220 243
         instance.CancelOperation(request, context)
    
    221 244
     
    
    ... ... @@ -238,7 +261,7 @@ def test_cancel_operation_blank(blank_instance, context):
    238 261
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    239 262
     
    
    240 263
     
    
    241
    -def test_cancel_operation_instance_fail(instance, context):
    
    264
    +def test_cancel_operation__fail(instance, context):
    
    242 265
         request = operations_pb2.CancelOperationRequest()
    
    243 266
         instance.CancelOperation(request, context)
    
    244 267
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]