[Notes] [Git][BuildGrid/buildgrid][mablanch/75-requests-multiplexing] 28 commits: docs: Update BuildStream usage example



Title: GitLab

Martin Blanchard pushed to branch mablanch/75-requests-multiplexing at BuildGrid / buildgrid

Commits:

23 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -21,7 +21,7 @@ import tempfile
    21 21
     from buildgrid.client.cas import download, upload
    
    22 22
     from buildgrid._exceptions import BotError
    
    23 23
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    24
    -from buildgrid.settings import HASH_LENGTH
    
    24
    +from buildgrid.settings import HASH_LENGTH, MAX_REQUEST_SIZE
    
    25 25
     from buildgrid.utils import read_file, write_file
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -52,9 +52,10 @@ def work_buildbox(lease, context, event):
    52 52
         else:
    
    53 53
             working_directory = '/'
    
    54 54
     
    
    55
    -    logger.debug("command hash: {}".format(action.command_digest.hash))
    
    56
    -    logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
    
    57
    -    logger.debug("\n{}".format(' '.join(command.arguments)))
    
    55
    +    logger.debug("Command digest: [{}/{}]"
    
    56
    +                 .format(action.command_digest.hash, action.command_digest.size_bytes))
    
    57
    +    logger.debug("Input root digest: [{}/{}]"
    
    58
    +                 .format(action.input_root_digest.hash, action.input_root_digest.size_bytes))
    
    58 59
     
    
    59 60
         os.makedirs(os.path.join(local_cas_directory, 'tmp'), exist_ok=True)
    
    60 61
         os.makedirs(context.fuse_dir, exist_ok=True)
    
    ... ... @@ -87,9 +88,7 @@ def work_buildbox(lease, context, event):
    87 88
                 command_line.append(context.fuse_dir)
    
    88 89
                 command_line.extend(command.arguments)
    
    89 90
     
    
    90
    -            logger.debug(' '.join(command_line))
    
    91
    -            logger.debug("Input root digest:\n{}".format(action.input_root_digest))
    
    92
    -            logger.info("Launching process")
    
    91
    +            logger.info("Starting execution: [{}...]".format(command.arguments[0]))
    
    93 92
     
    
    94 93
                 command_line = subprocess.Popen(command_line,
    
    95 94
                                                 stdin=subprocess.PIPE,
    
    ... ... @@ -97,22 +96,17 @@ def work_buildbox(lease, context, event):
    97 96
                                                 stderr=subprocess.PIPE)
    
    98 97
                 stdout, stderr = command_line.communicate()
    
    99 98
                 returncode = command_line.returncode
    
    99
    +
    
    100 100
                 action_result = remote_execution_pb2.ActionResult()
    
    101
    -            # TODO: Upload to CAS or output RAW
    
    102
    -            # For now, just pass raw
    
    103
    -            # https://gitlab.com/BuildGrid/buildgrid/issues/90
    
    104
    -            action_result.stdout_raw = stdout
    
    105
    -            action_result.stderr_raw = stderr
    
    106 101
                 action_result.exit_code = returncode
    
    107 102
     
    
    108
    -            logger.debug("BuildBox stderr: [{}]".format(stderr))
    
    109
    -            logger.debug("BuildBox stdout: [{}]".format(stdout))
    
    110
    -            logger.debug("BuildBox exit code: [{}]".format(returncode))
    
    103
    +            logger.info("Execution finished with code: [{}]".format(returncode))
    
    111 104
     
    
    112 105
                 output_digest = remote_execution_pb2.Digest()
    
    113 106
                 output_digest.ParseFromString(read_file(output_digest_file.name))
    
    114 107
     
    
    115
    -            logger.debug("Output root digest: [{}]".format(output_digest))
    
    108
    +            logger.debug("Output root digest: [{}/{}]"
    
    109
    +                         .format(output_digest.hash, output_digest.size_bytes))
    
    116 110
     
    
    117 111
                 if len(output_digest.hash) != HASH_LENGTH:
    
    118 112
                     raise BotError(stdout,
    
    ... ... @@ -126,11 +120,25 @@ def work_buildbox(lease, context, event):
    126 120
                 with upload(context.cas_channel) as uploader:
    
    127 121
                     output_tree_digest = uploader.put_message(output_tree)
    
    128 122
     
    
    129
    -            output_directory = remote_execution_pb2.OutputDirectory()
    
    130
    -            output_directory.tree_digest.CopyFrom(output_tree_digest)
    
    131
    -            output_directory.path = os.path.relpath(working_directory, start='/')
    
    123
    +                output_directory = remote_execution_pb2.OutputDirectory()
    
    124
    +                output_directory.tree_digest.CopyFrom(output_tree_digest)
    
    125
    +                output_directory.path = os.path.relpath(working_directory, start='/')
    
    126
    +
    
    127
    +                action_result.output_directories.extend([output_directory])
    
    128
    +
    
    129
    +                if action_result.ByteSize() + len(stdout) > MAX_REQUEST_SIZE:
    
    130
    +                    stdout_digest = uploader.put_blob(stdout)
    
    131
    +                    action_result.stdout_digest.CopyFrom(stdout_digest)
    
    132
    +
    
    133
    +                else:
    
    134
    +                    action_result.stdout_raw = stdout
    
    135
    +
    
    136
    +                if action_result.ByteSize() + len(stderr) > MAX_REQUEST_SIZE:
    
    137
    +                    stderr_digest = uploader.put_blob(stderr)
    
    138
    +                    action_result.stderr_digest.CopyFrom(stderr_digest)
    
    132 139
     
    
    133
    -            action_result.output_directories.extend([output_directory])
    
    140
    +                else:
    
    141
    +                    action_result.stderr_raw = stderr
    
    134 142
     
    
    135 143
                 lease.result.Pack(action_result)
    
    136 144
     
    

  • buildgrid/_app/bots/host.py
    ... ... @@ -20,6 +20,7 @@ import tempfile
    20 20
     
    
    21 21
     from buildgrid.client.cas import download, upload
    
    22 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    23
    +from buildgrid.settings import MAX_REQUEST_SIZE
    
    23 24
     from buildgrid.utils import get_hostname, output_file_maker, output_directory_maker
    
    24 25
     
    
    25 26
     
    
    ... ... @@ -27,6 +28,7 @@ def work_host_tools(lease, context, event):
    27 28
         """Executes a lease for a build action, using host tools.
    
    28 29
         """
    
    29 30
         instance_name = context.parent
    
    31
    +
    
    30 32
         logger = logging.getLogger(__name__)
    
    31 33
     
    
    32 34
         action_digest = remote_execution_pb2.Digest()
    
    ... ... @@ -51,6 +53,11 @@ def work_host_tools(lease, context, event):
    51 53
     
    
    52 54
                 downloader.download_directory(action.input_root_digest, temp_directory)
    
    53 55
     
    
    56
    +        logger.debug("Command digest: [{}/{}]"
    
    57
    +                     .format(action.command_digest.hash, action.command_digest.size_bytes))
    
    58
    +        logger.debug("Input root digest: [{}/{}]"
    
    59
    +                     .format(action.input_root_digest.hash, action.input_root_digest.size_bytes))
    
    60
    +
    
    54 61
             action_result.execution_metadata.input_fetch_completed_timestamp.GetCurrentTime()
    
    55 62
     
    
    56 63
             environment = os.environ.copy()
    
    ... ... @@ -76,7 +83,7 @@ def work_host_tools(lease, context, event):
    76 83
                                               os.path.dirname(output_path))
    
    77 84
                 os.makedirs(directory_path, exist_ok=True)
    
    78 85
     
    
    79
    -        logger.debug(' '.join(command_line))
    
    86
    +        logger.info("Starting execution: [{}...]".format(command.arguments[0]))
    
    80 87
     
    
    81 88
             action_result.execution_metadata.execution_start_timestamp.GetCurrentTime()
    
    82 89
     
    
    ... ... @@ -92,16 +99,9 @@ def work_host_tools(lease, context, event):
    92 99
     
    
    93 100
             action_result.execution_metadata.execution_completed_timestamp.GetCurrentTime()
    
    94 101
     
    
    95
    -        # TODO: Upload to CAS or output RAW
    
    96
    -        # For now, just pass raw
    
    97
    -        # https://gitlab.com/BuildGrid/buildgrid/issues/90
    
    98
    -        action_result.stdout_raw = stdout
    
    99
    -        action_result.stderr_raw = stderr
    
    100 102
             action_result.exit_code = returncode
    
    101 103
     
    
    102
    -        logger.debug("Command stderr: [{}]".format(stderr))
    
    103
    -        logger.debug("Command stdout: [{}]".format(stdout))
    
    104
    -        logger.debug("Command exit code: [{}]".format(returncode))
    
    104
    +        logger.info("Execution finished with code: [{}]".format(returncode))
    
    105 105
     
    
    106 106
             action_result.execution_metadata.output_upload_start_timestamp.GetCurrentTime()
    
    107 107
     
    
    ... ... @@ -119,6 +119,9 @@ def work_host_tools(lease, context, event):
    119 119
                                                     file_digest)
    
    120 120
                     output_files.append(output_file)
    
    121 121
     
    
    122
    +                logger.debug("Output file digest: [{}/{}]"
    
    123
    +                             .format(file_digest.hash, file_digest.size_bytes))
    
    124
    +
    
    122 125
                 action_result.output_files.extend(output_files)
    
    123 126
     
    
    124 127
                 for output_path in command.output_directories:
    
    ... ... @@ -132,8 +135,25 @@ def work_host_tools(lease, context, event):
    132 135
                                                               tree_digest)
    
    133 136
                     output_directories.append(output_directory)
    
    134 137
     
    
    138
    +                logger.debug("Output tree digest: [{}/{}]"
    
    139
    +                             .format(tree_digest.hash, tree_digest.size_bytes))
    
    140
    +
    
    135 141
                 action_result.output_directories.extend(output_directories)
    
    136 142
     
    
    143
    +            if action_result.ByteSize() + len(stdout) > MAX_REQUEST_SIZE:
    
    144
    +                stdout_digest = uploader.put_blob(stdout)
    
    145
    +                action_result.stdout_digest.CopyFrom(stdout_digest)
    
    146
    +
    
    147
    +            else:
    
    148
    +                action_result.stdout_raw = stdout
    
    149
    +
    
    150
    +            if action_result.ByteSize() + len(stderr) > MAX_REQUEST_SIZE:
    
    151
    +                stderr_digest = uploader.put_blob(stderr)
    
    152
    +                action_result.stderr_digest.CopyFrom(stderr_digest)
    
    153
    +
    
    154
    +            else:
    
    155
    +                action_result.stderr_raw = stderr
    
    156
    +
    
    137 157
             action_result.execution_metadata.output_upload_completed_timestamp.GetCurrentTime()
    
    138 158
     
    
    139 159
             lease.result.Pack(action_result)
    

  • buildgrid/bot/interface.py
    ... ... @@ -57,5 +57,5 @@ class BotInterface:
    57 57
             try:
    
    58 58
                 return call(request)
    
    59 59
             except grpc.RpcError as e:
    
    60
    -            self.__logger.error(e.code())
    
    60
    +            self.__logger.error(e)
    
    61 61
                 return e.code()

  • buildgrid/client/authentication.py
    ... ... @@ -191,7 +191,7 @@ class AuthMetadataClientInterceptor(
    191 191
     
    
    192 192
             class _ClientCallDetails(
    
    193 193
                     namedtuple('_ClientCallDetails',
    
    194
    -                           ('method', 'timeout', 'credentials', 'metadata')),
    
    194
    +                           ('method', 'timeout', 'credentials', 'metadata',)),
    
    195 195
                     grpc.ClientCallDetails):
    
    196 196
                 pass
    
    197 197
     
    

  • buildgrid/server/_authentication.py
    ... ... @@ -13,8 +13,10 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from collections import namedtuple
    
    16 17
     from datetime import datetime
    
    17 18
     from enum import Enum
    
    19
    +import functools
    
    18 20
     import logging
    
    19 21
     
    
    20 22
     import grpc
    
    ... ... @@ -55,6 +57,11 @@ class AuthMetadataAlgorithm(Enum):
    55 57
         JWT_RS512 = 'rs512'  # RSASSA-PKCS1-v1_5 signature algorithm using SHA-512 hash algorithm
    
    56 58
     
    
    57 59
     
    
    60
    +class AuthContext:
    
    61
    +
    
    62
    +    interceptor = None
    
    63
    +
    
    64
    +
    
    58 65
     class _InvalidTokenError(Exception):
    
    59 66
         pass
    
    60 67
     
    
    ... ... @@ -67,14 +74,66 @@ class _UnboundedTokenError(Exception):
    67 74
         pass
    
    68 75
     
    
    69 76
     
    
    77
    +def authorize(auth_context):
    
    78
    +    """RPC method decorator for authorization validations.
    
    79
    +
    
    80
    +    This decorator is design to be used together with an :class:`AuthContext`
    
    81
    +    authorization context holder::
    
    82
    +
    
    83
    +        @authorize(AuthContext)
    
    84
    +        def Execute(self, request, context):
    
    85
    +
    
    86
    +    By default, any request is accepted. Authorization validation can be
    
    87
    +    activated by setting up a :class:`grpc.ServerInterceptor`::
    
    88
    +
    
    89
    +        AuthContext.interceptor = AuthMetadataServerInterceptor()
    
    90
    +
    
    91
    +    Args:
    
    92
    +        auth_context(AuthContext): Authorization context holder.
    
    93
    +    """
    
    94
    +    def __authorize_decorator(behavior):
    
    95
    +        """RPC authorization method decorator."""
    
    96
    +        _HandlerCallDetails = namedtuple(
    
    97
    +            '_HandlerCallDetails', ('invocation_metadata', 'method',))
    
    98
    +
    
    99
    +        @functools.wraps(behavior)
    
    100
    +        def __authorize_wrapper(self, request, context):
    
    101
    +            """RPC authorization method wrapper."""
    
    102
    +            if auth_context.interceptor is None:
    
    103
    +                return behavior(self, request, context)
    
    104
    +
    
    105
    +            authorized = False
    
    106
    +
    
    107
    +            def __continuator(handler_call_details):
    
    108
    +                nonlocal authorized
    
    109
    +                authorized = True
    
    110
    +
    
    111
    +            details = _HandlerCallDetails(context.invocation_metadata(),
    
    112
    +                                          behavior.__name__)
    
    113
    +
    
    114
    +            auth_context.interceptor.intercept_service(__continuator, details)
    
    115
    +
    
    116
    +            if authorized:
    
    117
    +                return behavior(self, request, context)
    
    118
    +
    
    119
    +            context.abort(grpc.StatusCode.UNAUTHENTICATED,
    
    120
    +                          "No valid authorization or authentication provided")
    
    121
    +
    
    122
    +            return None
    
    123
    +
    
    124
    +        return __authorize_wrapper
    
    125
    +
    
    126
    +    return __authorize_decorator
    
    127
    +
    
    128
    +
    
    70 129
     class AuthMetadataServerInterceptor(grpc.ServerInterceptor):
    
    71 130
     
    
    72 131
         __auth_errors = {
    
    73
    -        'missing-bearer': 'Missing authentication header field',
    
    74
    -        'invalid-bearer': 'Invalid authentication header field',
    
    75
    -        'invalid-token': 'Invalid authentication token',
    
    76
    -        'expired-token': 'Expired authentication token',
    
    77
    -        'unbounded-token': 'Unbounded authentication token',
    
    132
    +        'missing-bearer': "Missing authentication header field",
    
    133
    +        'invalid-bearer': "Invalid authentication header field",
    
    134
    +        'invalid-token': "Invalid authentication token",
    
    135
    +        'expired-token': "Expired authentication token",
    
    136
    +        'unbounded-token': "Unbounded authentication token",
    
    78 137
         }
    
    79 138
     
    
    80 139
         def __init__(self, method, secret=None, algorithm=AuthMetadataAlgorithm.UNSPECIFIED):
    

  • buildgrid/server/actioncache/service.py
    ... ... @@ -27,6 +27,7 @@ import grpc
    27 27
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    30
    +from buildgrid.server._authentication import AuthContext, authorize
    
    30 31
     
    
    31 32
     
    
    32 33
     class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    
    ... ... @@ -38,9 +39,14 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    38 39
     
    
    39 40
             remote_execution_pb2_grpc.add_ActionCacheServicer_to_server(self, server)
    
    40 41
     
    
    42
    +    # --- Public API ---
    
    43
    +
    
    41 44
         def add_instance(self, name, instance):
    
    42 45
             self._instances[name] = instance
    
    43 46
     
    
    47
    +    # --- Public API: Servicer ---
    
    48
    +
    
    49
    +    @authorize(AuthContext)
    
    44 50
         def GetActionResult(self, request, context):
    
    45 51
             self.__logger.debug("GetActionResult request from [%s]", context.peer())
    
    46 52
     
    
    ... ... @@ -59,6 +65,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    59 65
     
    
    60 66
             return remote_execution_pb2.ActionResult()
    
    61 67
     
    
    68
    +    @authorize(AuthContext)
    
    62 69
         def UpdateActionResult(self, request, context):
    
    63 70
             self.__logger.debug("UpdateActionResult request from [%s]", context.peer())
    
    64 71
     
    
    ... ... @@ -78,6 +85,8 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    78 85
     
    
    79 86
             return remote_execution_pb2.ActionResult()
    
    80 87
     
    
    88
    +    # --- Private API ---
    
    89
    +
    
    81 90
         def _get_instance(self, instance_name):
    
    82 91
             try:
    
    83 92
                 return self._instances[instance_name]
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -83,7 +83,7 @@ class BotsInterface:
    83 83
             self._check_bot_ids(bot_session.bot_id, name)
    
    84 84
             self._check_assigned_leases(bot_session)
    
    85 85
     
    
    86
    -        for lease in bot_session.leases:
    
    86
    +        for lease in list(bot_session.leases):
    
    87 87
                 checked_lease = self._check_lease_state(lease)
    
    88 88
                 if not checked_lease:
    
    89 89
                     # TODO: Make sure we don't need this
    
    ... ... @@ -91,7 +91,10 @@ class BotsInterface:
    91 91
                         self._assigned_leases[name].remove(lease.id)
    
    92 92
                     except KeyError:
    
    93 93
                         pass
    
    94
    -                lease.Clear()
    
    94
    +
    
    95
    +                self._scheduler.delete_job_lease(lease.id)
    
    96
    +
    
    97
    +                bot_session.leases.remove(lease)
    
    95 98
     
    
    96 99
             self._request_leases(bot_session)
    
    97 100
             return bot_session
    
    ... ... @@ -117,13 +120,13 @@ class BotsInterface:
    117 120
     
    
    118 121
             try:
    
    119 122
                 if self._scheduler.get_job_lease_cancelled(lease.id):
    
    120
    -                lease.state.CopyFrom(LeaseState.CANCELLED.value)
    
    123
    +                lease.state = LeaseState.CANCELLED.value
    
    121 124
                     return lease
    
    122 125
             except KeyError:
    
    123 126
                 # Job does not exist, remove from bot.
    
    124 127
                 return None
    
    125 128
     
    
    126
    -        self._scheduler.update_job_lease(lease)
    
    129
    +        self._scheduler.update_job_lease_state(lease.id, lease)
    
    127 130
     
    
    128 131
             if lease_state == LeaseState.COMPLETED:
    
    129 132
                 return None
    
    ... ... @@ -161,7 +164,7 @@ class BotsInterface:
    161 164
                     self.__logger.error("Assigned lease id=[%s],"
    
    162 165
                                         " not found on bot with name=[%s] and id=[%s]."
    
    163 166
                                         " Retrying job", lease_id, bot_session.name, bot_session.bot_id)
    
    164
    -                self._scheduler.retry_job(lease_id)
    
    167
    +                self._scheduler.retry_job_lease(lease_id)
    
    165 168
     
    
    166 169
         def _close_bot_session(self, name):
    
    167 170
             """ Before removing the session, close any leases and
    
    ... ... @@ -174,7 +177,7 @@ class BotsInterface:
    174 177
     
    
    175 178
             self.__logger.debug("Attempting to close [%s] with name: [%s]", bot_id, name)
    
    176 179
             for lease_id in self._assigned_leases[name]:
    
    177
    -            self._scheduler.retry_job(lease_id)
    
    180
    +            self._scheduler.retry_job_lease(lease_id)
    
    178 181
             self._assigned_leases.pop(name)
    
    179 182
     
    
    180 183
             self.__logger.debug("Closing bot session: [%s]", name)
    

  • buildgrid/server/bots/service.py
    ... ... @@ -29,6 +29,7 @@ from buildgrid._enums import BotStatus
    29 29
     from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
    
    30 30
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    31 31
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
    
    32
    +from buildgrid.server._authentication import AuthContext, authorize
    
    32 33
     
    
    33 34
     
    
    34 35
     class BotsService(bots_pb2_grpc.BotsServicer):
    
    ... ... @@ -86,6 +87,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    86 87
     
    
    87 88
         # --- Public API: Servicer ---
    
    88 89
     
    
    90
    +    @authorize(AuthContext)
    
    89 91
         def CreateBotSession(self, request, context):
    
    90 92
             """Handles CreateBotSessionRequest messages.
    
    91 93
     
    
    ... ... @@ -121,6 +123,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    121 123
     
    
    122 124
             return bots_pb2.BotSession()
    
    123 125
     
    
    126
    +    @authorize(AuthContext)
    
    124 127
         def UpdateBotSession(self, request, context):
    
    125 128
             """Handles UpdateBotSessionRequest messages.
    
    126 129
     
    
    ... ... @@ -175,6 +178,7 @@ class BotsService(bots_pb2_grpc.BotsServicer):
    175 178
     
    
    176 179
             return bots_pb2.BotSession()
    
    177 180
     
    
    181
    +    @authorize(AuthContext)
    
    178 182
         def PostBotEventTemp(self, request, context):
    
    179 183
             """Handles PostBotEventTempRequest messages.
    
    180 184
     
    

  • buildgrid/server/capabilities/service.py
    ... ... @@ -19,15 +19,20 @@ import grpc
    19 19
     
    
    20 20
     from buildgrid._exceptions import InvalidArgumentError
    
    21 21
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    22
    +from buildgrid.server._authentication import AuthContext, authorize
    
    22 23
     
    
    23 24
     
    
    24 25
     class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    
    25 26
     
    
    26 27
         def __init__(self, server):
    
    27 28
             self.__logger = logging.getLogger(__name__)
    
    29
    +
    
    28 30
             self.__instances = {}
    
    31
    +
    
    29 32
             remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(self, server)
    
    30 33
     
    
    34
    +    # --- Public API ---
    
    35
    +
    
    31 36
         def add_instance(self, name, instance):
    
    32 37
             self.__instances[name] = instance
    
    33 38
     
    
    ... ... @@ -40,6 +45,9 @@ class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    40 45
         def add_execution_instance(self, name, instance):
    
    41 46
             self.__instances[name].add_execution_instance(instance)
    
    42 47
     
    
    48
    +    # --- Public API: Servicer ---
    
    49
    +
    
    50
    +    @authorize(AuthContext)
    
    43 51
         def GetCapabilities(self, request, context):
    
    44 52
             try:
    
    45 53
                 instance = self._get_instance(request.instance_name)
    
    ... ... @@ -52,6 +60,8 @@ class CapabilitiesService(remote_execution_pb2_grpc.CapabilitiesServicer):
    52 60
     
    
    53 61
             return remote_execution_pb2.ServerCapabilities()
    
    54 62
     
    
    63
    +    # --- Private API ---
    
    64
    +
    
    55 65
         def _get_instance(self, name):
    
    56 66
             try:
    
    57 67
                 return self.__instances[name]
    

  • buildgrid/server/cas/service.py
    ... ... @@ -29,6 +29,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang
    29 29
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    31 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    32
    +from buildgrid.server._authentication import AuthContext, authorize
    
    32 33
     
    
    33 34
     
    
    34 35
     class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    ... ... @@ -40,9 +41,14 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    40 41
     
    
    41 42
             remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
    
    42 43
     
    
    44
    +    # --- Public API ---
    
    45
    +
    
    43 46
         def add_instance(self, name, instance):
    
    44 47
             self._instances[name] = instance
    
    45 48
     
    
    49
    +    # --- Public API: Servicer ---
    
    50
    +
    
    51
    +    @authorize(AuthContext)
    
    46 52
         def FindMissingBlobs(self, request, context):
    
    47 53
             self.__logger.debug("FindMissingBlobs request from [%s]", context.peer())
    
    48 54
     
    
    ... ... @@ -59,6 +65,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    59 65
     
    
    60 66
             return remote_execution_pb2.FindMissingBlobsResponse()
    
    61 67
     
    
    68
    +    @authorize(AuthContext)
    
    62 69
         def BatchUpdateBlobs(self, request, context):
    
    63 70
             self.__logger.debug("BatchUpdateBlobs request from [%s]", context.peer())
    
    64 71
     
    
    ... ... @@ -75,6 +82,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    75 82
     
    
    76 83
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    77 84
     
    
    85
    +    @authorize(AuthContext)
    
    78 86
         def BatchReadBlobs(self, request, context):
    
    79 87
             self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
    
    80 88
     
    
    ... ... @@ -83,6 +91,7 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    83 91
     
    
    84 92
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    85 93
     
    
    94
    +    @authorize(AuthContext)
    
    86 95
         def GetTree(self, request, context):
    
    87 96
             self.__logger.debug("GetTree request from [%s]", context.peer())
    
    88 97
     
    
    ... ... @@ -97,6 +106,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    97 106
     
    
    98 107
                 yield remote_execution_pb2.GetTreeResponse()
    
    99 108
     
    
    109
    +    # --- Private API ---
    
    110
    +
    
    100 111
         def _get_instance(self, instance_name):
    
    101 112
             try:
    
    102 113
                 return self._instances[instance_name]
    
    ... ... @@ -114,9 +125,14 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    114 125
     
    
    115 126
             bytestream_pb2_grpc.add_ByteStreamServicer_to_server(self, server)
    
    116 127
     
    
    128
    +    # --- Public API ---
    
    129
    +
    
    117 130
         def add_instance(self, name, instance):
    
    118 131
             self._instances[name] = instance
    
    119 132
     
    
    133
    +    # --- Public API: Servicer ---
    
    134
    +
    
    135
    +    @authorize(AuthContext)
    
    120 136
         def Read(self, request, context):
    
    121 137
             self.__logger.debug("Read request from [%s]", context.peer())
    
    122 138
     
    
    ... ... @@ -163,6 +179,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    163 179
                 context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    164 180
                 yield bytestream_pb2.ReadResponse()
    
    165 181
     
    
    182
    +    @authorize(AuthContext)
    
    166 183
         def Write(self, requests, context):
    
    167 184
             self.__logger.debug("Write request from [%s]", context.peer())
    
    168 185
     
    
    ... ... @@ -209,12 +226,15 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    209 226
     
    
    210 227
             return bytestream_pb2.WriteResponse()
    
    211 228
     
    
    229
    +    @authorize(AuthContext)
    
    212 230
         def QueryWriteStatus(self, request, context):
    
    213 231
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    214 232
             context.set_details('Method not implemented!')
    
    215 233
     
    
    216 234
             return bytestream_pb2.QueryWriteStatusResponse()
    
    217 235
     
    
    236
    +    # --- Private API ---
    
    237
    +
    
    218 238
         def _get_instance(self, instance_name):
    
    219 239
             try:
    
    220 240
                 return self._instances[instance_name]
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,11 +21,9 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    24
    +from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    26
    -
    
    27
    -from ..job import Job
    
    28
    -from ...utils import get_hash_type
    
    26
    +from buildgrid.utils import get_hash_type
    
    29 27
     
    
    30 28
     
    
    31 29
     class ExecutionInstance:
    
    ... ... @@ -46,44 +44,45 @@ class ExecutionInstance:
    46 44
         def hash_type(self):
    
    47 45
             return get_hash_type()
    
    48 46
     
    
    49
    -    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    47
    +    def execute(self, action_digest, skip_cache_lookup):
    
    50 48
             """ Sends a job for execution.
    
    51 49
             Queues an action and creates an Operation instance to be associated with
    
    52 50
             this action.
    
    53 51
             """
    
    54
    -
    
    55 52
             action = self._storage.get_message(action_digest, Action)
    
    56 53
     
    
    57 54
             if not action:
    
    58 55
                 raise FailedPreconditionError("Could not get action from storage.")
    
    59 56
     
    
    60
    -        job = Job(action, action_digest)
    
    61
    -        if message_queue is not None:
    
    62
    -            job.register_client(message_queue)
    
    57
    +        return self._scheduler.queue_job_operation(action, action_digest, skip_cache_lookup)
    
    63 58
     
    
    64
    -        self._scheduler.queue_job(job, skip_cache_lookup)
    
    59
    +    def register_operation_client(self, operation_name, peer, message_queue):
    
    60
    +        try:
    
    61
    +            return self._scheduler.register_job_operation_client(operation_name,
    
    62
    +                                                                 peer, message_queue)
    
    65 63
     
    
    66
    -        return job.operation
    
    64
    +        except NotFoundError:
    
    65
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    66
    +                                       .format(operation_name))
    
    67 67
     
    
    68
    -    def register_message_client(self, name, queue):
    
    68
    +    def unregister_operation_client(self, operation_name, peer):
    
    69 69
             try:
    
    70
    -            self._scheduler.register_client(name, queue)
    
    70
    +            self._scheduler.unregister_job_operation_client(operation_name, peer)
    
    71 71
     
    
    72
    -        except KeyError:
    
    73
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    72
    +        except NotFoundError:
    
    73
    +            raise InvalidArgumentError("Operation name does not exist: [{}]"
    
    74
    +                                       .format(operation_name))
    
    74 75
     
    
    75
    -    def unregister_message_client(self, name, queue):
    
    76
    -        try:
    
    77
    -            self._scheduler.unregister_client(name, queue)
    
    76
    +    def stream_operation_updates(self, message_queue):
    
    77
    +        error, operation = message_queue.get()
    
    78
    +        if error is not None:
    
    79
    +            raise error
    
    78 80
     
    
    79
    -        except KeyError:
    
    80
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    81
    +        while not operation.done:
    
    82
    +            yield operation
    
    81 83
     
    
    82
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    83
    -        job = message_queue.get()
    
    84
    -        while not job.operation.done:
    
    85
    -            yield job.operation
    
    86
    -            job = message_queue.get()
    
    87
    -            job.check_operation_status()
    
    84
    +            error, operation = message_queue.get()
    
    85
    +            if error is not None:
    
    86
    +                raise error
    
    88 87
     
    
    89
    -        yield job.operation
    88
    +        yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -29,6 +29,7 @@ import grpc
    29 29
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
    
    30 30
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    31 31
     from buildgrid._protos.google.longrunning import operations_pb2
    
    32
    +from buildgrid.server._authentication import AuthContext, authorize
    
    32 33
     
    
    33 34
     
    
    34 35
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    ... ... @@ -81,6 +82,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    81 82
     
    
    82 83
         # --- Public API: Servicer ---
    
    83 84
     
    
    85
    +    @authorize(AuthContext)
    
    84 86
         def Execute(self, request, context):
    
    85 87
             """Handles ExecuteRequest messages.
    
    86 88
     
    
    ... ... @@ -96,12 +98,15 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    96 98
     
    
    97 99
             try:
    
    98 100
                 instance = self._get_instance(instance_name)
    
    99
    -            operation = instance.execute(request.action_digest,
    
    100
    -                                         request.skip_cache_lookup,
    
    101
    -                                         message_queue)
    
    101
    +
    
    102
    +            job_name = instance.execute(request.action_digest,
    
    103
    +                                        request.skip_cache_lookup)
    
    104
    +
    
    105
    +            operation_name = instance.register_operation_client(job_name,
    
    106
    +                                                                peer, message_queue)
    
    102 107
     
    
    103 108
                 context.add_callback(partial(self._rpc_termination_callback,
    
    104
    -                                         peer, instance_name, operation.name, message_queue))
    
    109
    +                                         peer, instance_name, operation_name))
    
    105 110
     
    
    106 111
                 if self._is_instrumented:
    
    107 112
                     if peer not in self.__peers:
    
    ... ... @@ -110,16 +115,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    110 115
                     else:
    
    111 116
                         self.__peers[peer] += 1
    
    112 117
     
    
    113
    -            instanced_op_name = "{}/{}".format(instance_name, operation.name)
    
    118
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    114 119
     
    
    115
    -            self.__logger.info("Operation name: [%s]", instanced_op_name)
    
    120
    +            self.__logger.info("Operation name: [%s]", operation_full_name)
    
    116 121
     
    
    117
    -            for operation in instance.stream_operation_updates(message_queue,
    
    118
    -                                                               operation.name):
    
    119
    -                op = operations_pb2.Operation()
    
    120
    -                op.CopyFrom(operation)
    
    121
    -                op.name = instanced_op_name
    
    122
    -                yield op
    
    122
    +            for operation in instance.stream_operation_updates(message_queue):
    
    123
    +                operation.name = operation_full_name
    
    124
    +                yield operation
    
    123 125
     
    
    124 126
             except InvalidArgumentError as e:
    
    125 127
                 self.__logger.error(e)
    
    ... ... @@ -139,6 +141,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    139 141
                 context.set_code(grpc.StatusCode.CANCELLED)
    
    140 142
                 yield operations_pb2.Operation()
    
    141 143
     
    
    144
    +    @authorize(AuthContext)
    
    142 145
         def WaitExecution(self, request, context):
    
    143 146
             """Handles WaitExecutionRequest messages.
    
    144 147
     
    
    ... ... @@ -157,9 +160,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    157 160
             try:
    
    158 161
                 instance = self._get_instance(instance_name)
    
    159 162
     
    
    160
    -            instance.register_message_client(operation_name, message_queue)
    
    163
    +            operation_name = instance.register_operation_client(operation_name,
    
    164
    +                                                                peer, message_queue)
    
    165
    +
    
    161 166
                 context.add_callback(partial(self._rpc_termination_callback,
    
    162
    -                                         peer, instance_name, operation_name, message_queue))
    
    167
    +                                         peer, instance_name, operation_name))
    
    163 168
     
    
    164 169
                 if self._is_instrumented:
    
    165 170
                     if peer not in self.__peers:
    
    ... ... @@ -168,12 +173,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    168 173
                     else:
    
    169 174
                         self.__peers[peer] += 1
    
    170 175
     
    
    171
    -            for operation in instance.stream_operation_updates(message_queue,
    
    172
    -                                                               operation_name):
    
    173
    -                op = operations_pb2.Operation()
    
    174
    -                op.CopyFrom(operation)
    
    175
    -                op.name = request.name
    
    176
    -                yield op
    
    176
    +            operation_full_name = "{}/{}".format(instance_name, operation_name)
    
    177
    +
    
    178
    +            for operation in instance.stream_operation_updates(message_queue):
    
    179
    +                operation.name = operation_full_name
    
    180
    +                yield operation
    
    177 181
     
    
    178 182
             except InvalidArgumentError as e:
    
    179 183
                 self.__logger.error(e)
    
    ... ... @@ -208,10 +212,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    208 212
     
    
    209 213
         # --- Private API ---
    
    210 214
     
    
    211
    -    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
    
    215
    +    def _rpc_termination_callback(self, peer, instance_name, operation_name):
    
    212 216
             instance = self._get_instance(instance_name)
    
    213 217
     
    
    214
    -        instance.unregister_message_client(job_name, message_queue)
    
    218
    +        instance.unregister_operation_client(operation_name, peer)
    
    215 219
     
    
    216 220
             if self._is_instrumented:
    
    217 221
                 if self.__peers[peer] > 1:
    

  • buildgrid/server/instance.py
    ... ... @@ -29,7 +29,8 @@ import janus
    29 29
     from buildgrid._enums import BotStatus, LogRecordLevel, MetricRecordDomain, MetricRecordType
    
    30 30
     from buildgrid._protos.buildgrid.v2 import monitoring_pb2
    
    31 31
     from buildgrid.server.actioncache.service import ActionCacheService
    
    32
    -from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm, AuthMetadataServerInterceptor
    
    32
    +from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
    
    33
    +from buildgrid.server._authentication import AuthContext, AuthMetadataServerInterceptor
    
    33 34
     from buildgrid.server.bots.service import BotsService
    
    34 35
     from buildgrid.server.capabilities.instance import CapabilitiesInstance
    
    35 36
     from buildgrid.server.capabilities.service import CapabilitiesService
    
    ... ... @@ -78,16 +79,15 @@ class BuildGridServer:
    78 79
                 max_workers = (os.cpu_count() or 1) * 5
    
    79 80
     
    
    80 81
             self.__grpc_auth_interceptor = None
    
    82
    +
    
    81 83
             if auth_method != AuthMetadataMethod.NONE:
    
    82 84
                 self.__grpc_auth_interceptor = AuthMetadataServerInterceptor(
    
    83 85
                     method=auth_method, secret=auth_secret, algorithm=auth_algorithm)
    
    84
    -        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    85 86
     
    
    86
    -        if self.__grpc_auth_interceptor is not None:
    
    87
    -            self.__grpc_server = grpc.server(
    
    88
    -                self.__grpc_executor, interceptors=(self.__grpc_auth_interceptor,))
    
    89
    -        else:
    
    90
    -            self.__grpc_server = grpc.server(self.__grpc_executor)
    
    87
    +            AuthContext.interceptor = self.__grpc_auth_interceptor
    
    88
    +
    
    89
    +        self.__grpc_executor = futures.ThreadPoolExecutor(max_workers)
    
    90
    +        self.__grpc_server = grpc.server(self.__grpc_executor)
    
    91 91
     
    
    92 92
             self.__main_loop = asyncio.get_event_loop()
    
    93 93
     
    

  • buildgrid/server/job.py
    ... ... @@ -20,7 +20,7 @@ import uuid
    20 20
     from google.protobuf import duration_pb2, timestamp_pb2
    
    21 21
     
    
    22 22
     from buildgrid._enums import LeaseState, OperationStage
    
    23
    -from buildgrid._exceptions import CancelledError
    
    23
    +from buildgrid._exceptions import CancelledError, NotFoundError
    
    24 24
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    25 25
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    26 26
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -29,35 +29,46 @@ from buildgrid._protos.google.rpc import code_pb2
    29 29
     
    
    30 30
     class Job:
    
    31 31
     
    
    32
    -    def __init__(self, action, action_digest):
    
    32
    +    def __init__(self, action, action_digest, priority=0):
    
    33 33
             self.__logger = logging.getLogger(__name__)
    
    34 34
     
    
    35 35
             self._name = str(uuid.uuid4())
    
    36
    +        self._priority = priority
    
    36 37
             self._action = remote_execution_pb2.Action()
    
    37
    -        self._operation = operations_pb2.Operation()
    
    38 38
             self._lease = None
    
    39 39
     
    
    40 40
             self.__execute_response = None
    
    41 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    42
    +        self.__operations_by_name = {}
    
    43
    +        self.__operations_by_peer = {}
    
    42 44
     
    
    43 45
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    44 46
             self.__queued_time_duration = duration_pb2.Duration()
    
    45 47
             self.__worker_start_timestamp = timestamp_pb2.Timestamp()
    
    46 48
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    47 49
     
    
    48
    -        self.__operation_cancelled = False
    
    50
    +        self.__operations_message_queues = {}
    
    51
    +        self.__operations_cancelled = set()
    
    49 52
             self.__lease_cancelled = False
    
    53
    +        self.__job_cancelled = False
    
    50 54
     
    
    51 55
             self.__operation_metadata.action_digest.CopyFrom(action_digest)
    
    52 56
             self.__operation_metadata.stage = OperationStage.UNKNOWN.value
    
    53 57
     
    
    54 58
             self._action.CopyFrom(action)
    
    55 59
             self._do_not_cache = self._action.do_not_cache
    
    56
    -        self._operation_update_queues = []
    
    57
    -        self._operation.name = self._name
    
    58
    -        self._operation.done = False
    
    59 60
             self._n_tries = 0
    
    60 61
     
    
    62
    +        self._done = False
    
    63
    +
    
    64
    +    def __eq__(self, other):
    
    65
    +        if isinstance(other, Job):
    
    66
    +            return self.name == other.name
    
    67
    +        return False
    
    68
    +
    
    69
    +    def __ne__(self, other):
    
    70
    +        return not self.__eq__(other)
    
    71
    +
    
    61 72
         # --- Public API ---
    
    62 73
     
    
    63 74
         @property
    
    ... ... @@ -65,12 +76,18 @@ class Job:
    65 76
             return self._name
    
    66 77
     
    
    67 78
         @property
    
    68
    -    def do_not_cache(self):
    
    69
    -        return self._do_not_cache
    
    79
    +    def priority(self):
    
    80
    +        return self._priority
    
    70 81
     
    
    71 82
         @property
    
    72
    -    def action(self):
    
    73
    -        return self._action
    
    83
    +    def done(self):
    
    84
    +        return self._done
    
    85
    +
    
    86
    +    # --- Public API: REAPI ---
    
    87
    +
    
    88
    +    @property
    
    89
    +    def do_not_cache(self):
    
    90
    +        return self._do_not_cache
    
    74 91
     
    
    75 92
         @property
    
    76 93
         def action_digest(self):
    
    ... ... @@ -84,19 +101,176 @@ class Job:
    84 101
                 return None
    
    85 102
     
    
    86 103
         @property
    
    87
    -    def holds_cached_action_result(self):
    
    104
    +    def holds_cached_result(self):
    
    88 105
             if self.__execute_response is not None:
    
    89 106
                 return self.__execute_response.cached_result
    
    90 107
             else:
    
    91 108
                 return False
    
    92 109
     
    
    93
    -    @property
    
    94
    -    def operation(self):
    
    95
    -        return self._operation
    
    110
    +    def set_cached_result(self, action_result):
    
    111
    +        """Allows specifying an action result form the action cache for the job.
    
    112
    +
    
    113
    +        Note:
    
    114
    +            This won't trigger any :class:`Operation` stage transition.
    
    115
    +
    
    116
    +        Args:
    
    117
    +            action_result (ActionResult): The result from cache.
    
    118
    +        """
    
    119
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    120
    +        self.__execute_response.result.CopyFrom(action_result)
    
    121
    +        self.__execute_response.cached_result = True
    
    96 122
     
    
    97 123
         @property
    
    98
    -    def operation_stage(self):
    
    99
    -        return OperationStage(self.__operation_metadata.state)
    
    124
    +    def n_clients(self):
    
    125
    +        return len(self.__operations_message_queues)
    
    126
    +
    
    127
    +    def register_operation_client(self, peer, message_queue):
    
    128
    +        """Subscribes to the job's :class:`Operation` stage changes.
    
    129
    +
    
    130
    +        Args:
    
    131
    +            peer (str): a unique string identifying the client.
    
    132
    +            message_queue (queue.Queue): the event queue to register.
    
    133
    +
    
    134
    +        Returns:
    
    135
    +            str: The name of the subscribed :class:`Operation`.
    
    136
    +        """
    
    137
    +        if peer in self.__operations_by_peer:
    
    138
    +            operation = self.__operations_by_peer[peer]
    
    139
    +        else:
    
    140
    +            operation = self.create_operation(peer)
    
    141
    +
    
    142
    +        self.__operations_message_queues[peer] = message_queue
    
    143
    +
    
    144
    +        self._send_operations_updates(peers=[peer])
    
    145
    +
    
    146
    +        return operation.name
    
    147
    +
    
    148
    +    def unregister_operation_client(self, peer):
    
    149
    +        """Unsubscribes to the job's :class:`Operation` stage change.
    
    150
    +
    
    151
    +        Args:
    
    152
    +            peer (str): a unique string identifying the client.
    
    153
    +        """
    
    154
    +        if peer in self.__operations_message_queues:
    
    155
    +            del self.__operations_message_queues[peer]
    
    156
    +
    
    157
    +        # Drop the operation if nobody is watching it anymore:
    
    158
    +        if peer in self.__operations_by_peer:
    
    159
    +            operation = self.__operations_by_peer[peer]
    
    160
    +
    
    161
    +            if operation not in self.__operations_by_peer.values():
    
    162
    +                del self.__operations_by_name[operation.name]
    
    163
    +
    
    164
    +            del self.__operations_by_peer[peer]
    
    165
    +
    
    166
    +    def create_operation(self, peer):
    
    167
    +        """Generates a new :class:`Operation` for `peer`.
    
    168
    +
    
    169
    +        Args:
    
    170
    +            peer (str): a unique string identifying the client.
    
    171
    +        """
    
    172
    +        if peer in self.__operations_by_peer:
    
    173
    +            return self.__operations_by_peer[peer]
    
    174
    +
    
    175
    +        new_operation = operations_pb2.Operation()
    
    176
    +        # Copy state from first existing and non cancelled operation:
    
    177
    +        for operation in self.__operations_by_name.values():
    
    178
    +            if operation.name not in self.__operations_cancelled:
    
    179
    +                new_operation.CopyFrom(operation)
    
    180
    +                break
    
    181
    +
    
    182
    +        new_operation.name = str(uuid.uuid4())
    
    183
    +
    
    184
    +        self.__operations_by_name[new_operation.name] = new_operation
    
    185
    +        self.__operations_by_peer[peer] = new_operation
    
    186
    +
    
    187
    +        return new_operation
    
    188
    +
    
    189
    +    def list_operations(self):
    
    190
    +        """Lists the :class:`Operation` related to a job.
    
    191
    +
    
    192
    +        Returns:
    
    193
    +            list: A list of :class:`Operation` names.
    
    194
    +        """
    
    195
    +        return list(self.__operations_by_name.keys())
    
    196
    +
    
    197
    +    def get_operation(self, operation_name):
    
    198
    +        """Returns a copy of the the job's :class:`Operation`.
    
    199
    +
    
    200
    +        Args:
    
    201
    +            operation_name (str): the operation's name.
    
    202
    +
    
    203
    +        Raises:
    
    204
    +            NotFoundError: If no operation with `operation_name` exists.
    
    205
    +        """
    
    206
    +        try:
    
    207
    +            operation = self.__operations_by_name[operation_name]
    
    208
    +
    
    209
    +        except KeyError:
    
    210
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    211
    +                                .format(operation_name))
    
    212
    +
    
    213
    +        return self._copy_operation(operation)
    
    214
    +
    
    215
    +    def update_operation_stage(self, stage):
    
    216
    +        """Operates a stage transition for the job's :class:`Operation`.
    
    217
    +
    
    218
    +        Args:
    
    219
    +            stage (OperationStage): the operation stage to transition to.
    
    220
    +        """
    
    221
    +        if stage.value == self.__operation_metadata.stage:
    
    222
    +            return
    
    223
    +
    
    224
    +        self.__operation_metadata.stage = stage.value
    
    225
    +
    
    226
    +        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    227
    +            if self.__queued_timestamp.ByteSize() == 0:
    
    228
    +                self.__queued_timestamp.GetCurrentTime()
    
    229
    +            self._n_tries += 1
    
    230
    +
    
    231
    +        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    232
    +            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    233
    +            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    234
    +
    
    235
    +        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    236
    +            self._done = True
    
    237
    +
    
    238
    +        self._send_operations_updates()
    
    239
    +
    
    240
    +    def cancel_operation(self, peer):
    
    241
    +        """Triggers a job's :class:`Operation` cancellation.
    
    242
    +
    
    243
    +        This may cancel any job's :class:`Lease` that may have been issued.
    
    244
    +
    
    245
    +        Args:
    
    246
    +            peer (str): a unique string identifying the client.
    
    247
    +        """
    
    248
    +
    
    249
    +        operation_names, peers = set(), set()
    
    250
    +        if peer in self.__operations_by_peer:
    
    251
    +            operation_names.add(self.__operations_by_peer[peer].name)
    
    252
    +            peers.add(peer)
    
    253
    +
    
    254
    +        else:
    
    255
    +            operation_names.update(self.__operations_by_name.keys())
    
    256
    +            peers.update(self.__operations_by_peer.keys())
    
    257
    +
    
    258
    +        operations_cancelled = operation_names - self.__operations_cancelled
    
    259
    +        if not operations_cancelled:
    
    260
    +            return
    
    261
    +
    
    262
    +        self.__operations_cancelled.update(operations_cancelled)
    
    263
    +
    
    264
    +        operation_names = set(self.__operations_by_name.keys())
    
    265
    +        # Job is cancelled if all the operation are:
    
    266
    +        self.__job_cancelled = bool(operation_names - self.__operations_cancelled)
    
    267
    +
    
    268
    +        if self.__job_cancelled and self._lease is not None:
    
    269
    +            self.cancel_lease()
    
    270
    +
    
    271
    +        self._send_operations_updates(peers=peers, notify_cancelled=True)
    
    272
    +
    
    273
    +    # --- Public API: RWAPI ---
    
    100 274
     
    
    101 275
         @property
    
    102 276
         def lease(self):
    
    ... ... @@ -117,45 +291,15 @@ class Job:
    117 291
         def n_tries(self):
    
    118 292
             return self._n_tries
    
    119 293
     
    
    120
    -    @property
    
    121
    -    def n_clients(self):
    
    122
    -        return len(self._operation_update_queues)
    
    123
    -
    
    124
    -    def register_client(self, queue):
    
    125
    -        """Subscribes to the job's :class:`Operation` stage change events.
    
    126
    -
    
    127
    -        Queues this :object:`Job` instance.
    
    128
    -
    
    129
    -        Args:
    
    130
    -            queue (queue.Queue): the event queue to register.
    
    131
    -        """
    
    132
    -        self._operation_update_queues.append(queue)
    
    133
    -        queue.put(self)
    
    134
    -
    
    135
    -    def unregister_client(self, queue):
    
    136
    -        """Unsubscribes to the job's :class:`Operation` stage change events.
    
    137
    -
    
    138
    -        Args:
    
    139
    -            queue (queue.Queue): the event queue to unregister.
    
    140
    -        """
    
    141
    -        self._operation_update_queues.remove(queue)
    
    142
    -
    
    143
    -    def set_cached_result(self, action_result):
    
    144
    -        """Allows specifying an action result form the action cache for the job.
    
    145
    -        """
    
    146
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    147
    -        self.__execute_response.result.CopyFrom(action_result)
    
    148
    -        self.__execute_response.cached_result = True
    
    149
    -
    
    150 294
         def create_lease(self):
    
    151 295
             """Emits a new :class:`Lease` for the job.
    
    152 296
     
    
    153 297
             Only one :class:`Lease` can be emitted for a given job. This method
    
    154
    -        should only be used once, any furhter calls are ignored.
    
    298
    +        should only be used once, any further calls are ignored.
    
    155 299
             """
    
    156
    -        if self.__operation_cancelled:
    
    157
    -            return None
    
    158
    -        elif self._lease is not None:
    
    300
    +        if self._lease is not None:
    
    301
    +            return self._lease
    
    302
    +        elif self.__job_cancelled:
    
    159 303
                 return None
    
    160 304
     
    
    161 305
             self._lease = bots_pb2.Lease()
    
    ... ... @@ -166,14 +310,14 @@ class Job:
    166 310
             return self._lease
    
    167 311
     
    
    168 312
         def update_lease_state(self, state, status=None, result=None):
    
    169
    -        """Operates a state transition for the job's current :class:Lease.
    
    313
    +        """Operates a state transition for the job's current :class:`Lease`.
    
    170 314
     
    
    171 315
             Args:
    
    172 316
                 state (LeaseState): the lease state to transition to.
    
    173
    -            status (google.rpc.Status): the lease execution status, only
    
    174
    -                required if `state` is `COMPLETED`.
    
    175
    -            result (google.protobuf.Any): the lease execution result, only
    
    176
    -                required if `state` is `COMPLETED`.
    
    317
    +            status (google.rpc.Status, optional): the lease execution status,
    
    318
    +                only required if `state` is `COMPLETED`.
    
    319
    +            result (google.protobuf.Any, optional): the lease execution result,
    
    320
    +                only required if `state` is `COMPLETED`.
    
    177 321
             """
    
    178 322
             if state.value == self._lease.state:
    
    179 323
                 return
    
    ... ... @@ -214,67 +358,25 @@ class Job:
    214 358
                 self.__execute_response.status.CopyFrom(status)
    
    215 359
     
    
    216 360
         def cancel_lease(self):
    
    217
    -        """Triggers a job's :class:Lease cancellation.
    
    361
    +        """Triggers a job's :class:`Lease` cancellation.
    
    218 362
     
    
    219
    -        This will not cancel the job's :class:Operation.
    
    363
    +        Note:
    
    364
    +            This will not cancel the job's :class:`Operation`.
    
    220 365
             """
    
    221 366
             self.__lease_cancelled = True
    
    222 367
             if self._lease is not None:
    
    223 368
                 self.update_lease_state(LeaseState.CANCELLED)
    
    224 369
     
    
    225
    -    def update_operation_stage(self, stage):
    
    226
    -        """Operates a stage transition for the job's :class:Operation.
    
    370
    +    def delete_lease(self):
    
    371
    +        """Discard the job's :class:`Lease`.
    
    227 372
     
    
    228
    -        Args:
    
    229
    -            stage (OperationStage): the operation stage to transition to.
    
    373
    +        Note:
    
    374
    +            This will not cancel the job's :class:`Operation`.
    
    230 375
             """
    
    231
    -        if stage.value == self.__operation_metadata.stage:
    
    232
    -            return
    
    376
    +        self.__worker_start_timestamp.Clear()
    
    377
    +        self.__worker_completed_timestamp.Clear()
    
    233 378
     
    
    234
    -        self.__operation_metadata.stage = stage.value
    
    235
    -
    
    236
    -        if self.__operation_metadata.stage == OperationStage.QUEUED.value:
    
    237
    -            if self.__queued_timestamp.ByteSize() == 0:
    
    238
    -                self.__queued_timestamp.GetCurrentTime()
    
    239
    -            self._n_tries += 1
    
    240
    -
    
    241
    -        elif self.__operation_metadata.stage == OperationStage.EXECUTING.value:
    
    242
    -            queue_in, queue_out = self.__queued_timestamp.ToDatetime(), datetime.now()
    
    243
    -            self.__queued_time_duration.FromTimedelta(queue_out - queue_in)
    
    244
    -
    
    245
    -        elif self.__operation_metadata.stage == OperationStage.COMPLETED.value:
    
    246
    -            if self.__execute_response is not None:
    
    247
    -                self._operation.response.Pack(self.__execute_response)
    
    248
    -            self._operation.done = True
    
    249
    -
    
    250
    -        self._operation.metadata.Pack(self.__operation_metadata)
    
    251
    -
    
    252
    -        for queue in self._operation_update_queues:
    
    253
    -            queue.put(self)
    
    254
    -
    
    255
    -    def check_operation_status(self):
    
    256
    -        """Reports errors on unexpected job's :class:Operation state.
    
    257
    -
    
    258
    -        Raises:
    
    259
    -            CancelledError: if the job's :class:Operation was cancelled.
    
    260
    -        """
    
    261
    -        if self.__operation_cancelled:
    
    262
    -            raise CancelledError(self.__execute_response.status.message)
    
    263
    -
    
    264
    -    def cancel_operation(self):
    
    265
    -        """Triggers a job's :class:Operation cancellation.
    
    266
    -
    
    267
    -        This will also cancel any job's :class:Lease that may have been issued.
    
    268
    -        """
    
    269
    -        self.__operation_cancelled = True
    
    270
    -        if self._lease is not None:
    
    271
    -            self.cancel_lease()
    
    272
    -
    
    273
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    274
    -        self.__execute_response.status.code = code_pb2.CANCELLED
    
    275
    -        self.__execute_response.status.message = "Operation cancelled by client."
    
    276
    -
    
    277
    -        self.update_operation_stage(OperationStage.COMPLETED)
    
    379
    +        self._lease = None
    
    278 380
     
    
    279 381
         # --- Public API: Monitoring ---
    
    280 382
     
    
    ... ... @@ -283,3 +385,57 @@ class Job:
    283 385
     
    
    284 386
         def query_n_retries(self):
    
    285 387
             return self._n_tries - 1 if self._n_tries > 0 else 0
    
    388
    +
    
    389
    +    # --- Private API ---
    
    390
    +
    
    391
    +    def _copy_operation(self, operation):
    
    392
    +        """Simply duplicates a given :class:`Lease` object."""
    
    393
    +        new_operation = operations_pb2.Operation()
    
    394
    +        new_operation.CopyFrom(operation)
    
    395
    +
    
    396
    +        return new_operation
    
    397
    +
    
    398
    +    def _send_operations_updates(self, peers=None, notify_cancelled=False):
    
    399
    +        """Sends :class:`Operation` stage change messages to watchers."""
    
    400
    +        for operation in self.__operations_by_name.values():
    
    401
    +            if operation.name not in self.__operations_cancelled:
    
    402
    +                operation_metadata = self.__operation_metadata
    
    403
    +                execute_response = self.__execute_response
    
    404
    +
    
    405
    +                operation_done = self._done
    
    406
    +
    
    407
    +            else:
    
    408
    +                operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    409
    +                operation_metadata.CopyFrom(self.__operation_metadata)
    
    410
    +                operation_metadata.stage = OperationStage.COMPLETED.value
    
    411
    +
    
    412
    +                execute_response = remote_execution_pb2.ExecuteResponse()
    
    413
    +                if self.__execute_response is not None:
    
    414
    +                    execute_response.CopyFrom(self.__execute_response)
    
    415
    +                execute_response.status.code = code_pb2.CANCELLED
    
    416
    +                execute_response.status.message = "Operation cancelled by client."
    
    417
    +
    
    418
    +                operation_done = True
    
    419
    +
    
    420
    +            if execute_response is not None:
    
    421
    +                operation.response.Pack(execute_response)
    
    422
    +            operation.metadata.Pack(operation_metadata)
    
    423
    +            operation.done = operation_done
    
    424
    +
    
    425
    +        for peer, message_queue in self.__operations_message_queues.items():
    
    426
    +            if peer not in self.__operations_by_peer:
    
    427
    +                continue
    
    428
    +            elif peers and peer not in peers:
    
    429
    +                continue
    
    430
    +
    
    431
    +            operation = self.__operations_by_peer[peer]
    
    432
    +            # Messages are pairs of (Exception, Operation,):
    
    433
    +            if not notify_cancelled and operation.name in self.__operations_cancelled:
    
    434
    +                continue
    
    435
    +            elif operation.name not in self.__operations_cancelled:
    
    436
    +                message = (None, self._copy_operation(operation),)
    
    437
    +            else:
    
    438
    +                message = (CancelledError("Operation has been cancelled"),
    
    439
    +                           self._copy_operation(operation),)
    
    440
    +
    
    441
    +            message_queue.put(message)

  • buildgrid/server/operations/instance.py
    ... ... @@ -21,7 +21,7 @@ An instance of the LongRunningOperations Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    -from buildgrid._exceptions import InvalidArgumentError
    
    24
    +from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    25 25
     from buildgrid._protos.google.longrunning import operations_pb2
    
    26 26
     
    
    27 27
     
    
    ... ... @@ -39,62 +39,43 @@ class OperationsInstance:
    39 39
         def register_instance_with_server(self, instance_name, server):
    
    40 40
             server.add_operations_instance(self, instance_name)
    
    41 41
     
    
    42
    -    def get_operation(self, name):
    
    43
    -        job = self._scheduler.jobs.get(name)
    
    42
    +    def get_operation(self, job_name):
    
    43
    +        try:
    
    44
    +            operation = self._scheduler.get_job_operation(job_name)
    
    44 45
     
    
    45
    -        if job is None:
    
    46
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    46
    +        except NotFoundError:
    
    47
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    47 48
     
    
    48
    -        else:
    
    49
    -            return job.operation
    
    49
    +        return operation
    
    50 50
     
    
    51 51
         def list_operations(self, list_filter, page_size, page_token):
    
    52 52
             # TODO: Pages
    
    53 53
             # Spec says number of pages and length of a page are optional
    
    54 54
             response = operations_pb2.ListOperationsResponse()
    
    55
    +
    
    56
    +        operation_names = [operation_name for job_name in
    
    57
    +                           self._scheduler.list_current_jobs() for operation_name in
    
    58
    +                           self._scheduler.list_job_operations(job_name)]
    
    59
    +
    
    55 60
             operations = []
    
    56
    -        for job in self._scheduler.list_jobs():
    
    57
    -            op = operations_pb2.Operation()
    
    58
    -            op.CopyFrom(job.operation)
    
    59
    -            operations.append(op)
    
    61
    +        for operation_name in operation_names:
    
    62
    +            operation = self._scheduler.get_job_operation(operation_name)
    
    63
    +            operations.append(operation)
    
    60 64
     
    
    61 65
             response.operations.extend(operations)
    
    62 66
     
    
    63 67
             return response
    
    64 68
     
    
    65
    -    def delete_operation(self, name):
    
    66
    -        try:
    
    67
    -            self._scheduler.jobs.pop(name)
    
    68
    -
    
    69
    -        except KeyError:
    
    70
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    71
    -
    
    72
    -    def cancel_operation(self, name):
    
    69
    +    def delete_operation(self, job_name):
    
    73 70
             try:
    
    74
    -            self._scheduler.cancel_job_operation(name)
    
    71
    +            self._scheduler.delete_job_operation(job_name)
    
    75 72
     
    
    76
    -        except KeyError:
    
    77
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    73
    +        except NotFoundError:
    
    74
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
    
    78 75
     
    
    79
    -    def register_message_client(self, name, queue):
    
    76
    +    def cancel_operation(self, job_name):
    
    80 77
             try:
    
    81
    -            self._scheduler.register_client(name, queue)
    
    82
    -
    
    83
    -        except KeyError:
    
    84
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    85
    -
    
    86
    -    def unregister_message_client(self, name, queue):
    
    87
    -        try:
    
    88
    -            self._scheduler.unregister_client(name, queue)
    
    89
    -
    
    90
    -        except KeyError:
    
    91
    -            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
    
    92
    -
    
    93
    -    def stream_operation_updates(self, message_queue, operation_name):
    
    94
    -        job = message_queue.get()
    
    95
    -        while not job.operation.done:
    
    96
    -            yield job.operation
    
    97
    -            job = message_queue.get()
    
    98
    -            job.check_operation_status()
    
    78
    +            self._scheduler.cancel_job_operation(job_name)
    
    99 79
     
    
    100
    -        yield job.operation
    80
    +        except NotFoundError:
    
    81
    +            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))

  • buildgrid/server/operations/service.py
    ... ... @@ -27,6 +27,7 @@ from google.protobuf.empty_pb2 import Empty
    27 27
     
    
    28 28
     from buildgrid._exceptions import InvalidArgumentError
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    30
    +from buildgrid.server._authentication import AuthContext, authorize
    
    30 31
     
    
    31 32
     
    
    32 33
     class OperationsService(operations_pb2_grpc.OperationsServicer):
    
    ... ... @@ -51,6 +52,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    51 52
     
    
    52 53
         # --- Public API: Servicer ---
    
    53 54
     
    
    55
    +    @authorize(AuthContext)
    
    54 56
         def GetOperation(self, request, context):
    
    55 57
             self.__logger.debug("GetOperation request from [%s]", context.peer())
    
    56 58
     
    
    ... ... @@ -74,6 +76,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    74 76
     
    
    75 77
             return operations_pb2.Operation()
    
    76 78
     
    
    79
    +    @authorize(AuthContext)
    
    77 80
         def ListOperations(self, request, context):
    
    78 81
             self.__logger.debug("ListOperations request from [%s]", context.peer())
    
    79 82
     
    
    ... ... @@ -99,6 +102,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    99 102
     
    
    100 103
             return operations_pb2.ListOperationsResponse()
    
    101 104
     
    
    105
    +    @authorize(AuthContext)
    
    102 106
         def DeleteOperation(self, request, context):
    
    103 107
             self.__logger.debug("DeleteOperation request from [%s]", context.peer())
    
    104 108
     
    
    ... ... @@ -118,6 +122,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    118 122
     
    
    119 123
             return Empty()
    
    120 124
     
    
    125
    +    @authorize(AuthContext)
    
    121 126
         def CancelOperation(self, request, context):
    
    122 127
             self.__logger.debug("CancelOperation request from [%s]", context.peer())
    
    123 128
     
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -20,6 +20,7 @@ import grpc
    20 20
     from buildgrid._exceptions import InvalidArgumentError, NotFoundError
    
    21 21
     from buildgrid._protos.buildstream.v2 import buildstream_pb2
    
    22 22
     from buildgrid._protos.buildstream.v2 import buildstream_pb2_grpc
    
    23
    +from buildgrid.server._authentication import AuthContext, authorize
    
    23 24
     
    
    24 25
     
    
    25 26
     class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    
    ... ... @@ -31,9 +32,14 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    31 32
     
    
    32 33
             buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(self, server)
    
    33 34
     
    
    35
    +    # --- Public API ---
    
    36
    +
    
    34 37
         def add_instance(self, name, instance):
    
    35 38
             self._instances[name] = instance
    
    36 39
     
    
    40
    +    # --- Public API: Servicer ---
    
    41
    +
    
    42
    +    @authorize(AuthContext)
    
    37 43
         def GetReference(self, request, context):
    
    38 44
             self.__logger.debug("GetReference request from [%s]", context.peer())
    
    39 45
     
    
    ... ... @@ -55,6 +61,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    55 61
     
    
    56 62
             return buildstream_pb2.GetReferenceResponse()
    
    57 63
     
    
    64
    +    @authorize(AuthContext)
    
    58 65
         def UpdateReference(self, request, context):
    
    59 66
             self.__logger.debug("UpdateReference request from [%s]", context.peer())
    
    60 67
     
    
    ... ... @@ -75,6 +82,7 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    75 82
     
    
    76 83
             return buildstream_pb2.UpdateReferenceResponse()
    
    77 84
     
    
    85
    +    @authorize(AuthContext)
    
    78 86
         def Status(self, request, context):
    
    79 87
             self.__logger.debug("Status request from [%s]", context.peer())
    
    80 88
     
    
    ... ... @@ -90,6 +98,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    90 98
     
    
    91 99
             return buildstream_pb2.StatusResponse()
    
    92 100
     
    
    101
    +    # --- Private API ---
    
    102
    +
    
    93 103
         def _get_instance(self, instance_name):
    
    94 104
             try:
    
    95 105
                 return self._instances[instance_name]
    

  • buildgrid/server/scheduler.py
    ... ... @@ -25,6 +25,7 @@ import logging
    25 25
     
    
    26 26
     from buildgrid._enums import LeaseState, OperationStage
    
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28
    +from buildgrid.server.job import Job
    
    28 29
     
    
    29 30
     
    
    30 31
     class Scheduler:
    
    ... ... @@ -42,8 +43,12 @@ class Scheduler:
    42 43
             self.__retries_count = 0
    
    43 44
     
    
    44 45
             self._action_cache = action_cache
    
    45
    -        self.jobs = {}
    
    46
    -        self.queue = deque()
    
    46
    +
    
    47
    +        self.__jobs_by_action = {}
    
    48
    +        self.__jobs_by_operation = {}
    
    49
    +        self.__jobs_by_name = {}
    
    50
    +
    
    51
    +        self.__queue = deque()
    
    47 52
     
    
    48 53
             self._is_instrumented = monitor
    
    49 54
     
    
    ... ... @@ -52,39 +57,132 @@ class Scheduler:
    52 57
     
    
    53 58
         # --- Public API ---
    
    54 59
     
    
    55
    -    def register_client(self, job_name, queue):
    
    56
    -        job = self.jobs[job_name]
    
    60
    +    def list_current_jobs(self):
    
    61
    +        """Returns a list of the :class:`Job` names currently managed."""
    
    62
    +        return self.__jobs_by_name.keys()
    
    57 63
     
    
    58
    -        job.register_client(queue)
    
    64
    +    def list_job_operations(self, job_name):
    
    65
    +        """Returns a list of :class:`Operation` names for a :class:`Job`."""
    
    66
    +        if job_name in self.__jobs_by_name:
    
    67
    +            return self.__jobs_by_name[job_name].list_operations()
    
    68
    +        else:
    
    69
    +            return []
    
    59 70
     
    
    60
    -    def unregister_client(self, job_name, queue):
    
    61
    -        job = self.jobs[job_name]
    
    71
    +    # --- Public API: REAPI ---
    
    62 72
     
    
    63
    -        job.unregister_client(queue)
    
    73
    +    def register_job_operation_client(self, operation_name, peer, message_queue):
    
    74
    +        """Subscribes to one of the job's :class:`Operation` stage changes.
    
    64 75
     
    
    65
    -        if not job.n_clients and job.operation.done:
    
    66
    -            del self.jobs[job_name]
    
    76
    +        Args:
    
    77
    +            operation_name (str): name of the operation to subscribe to.
    
    78
    +            peer (str): a unique string identifying the client.
    
    79
    +            message_queue (queue.Queue): the event queue to register.
    
    67 80
     
    
    68
    -            if self._is_instrumented:
    
    69
    -                self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
    
    70
    -                self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
    
    71
    -                self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
    
    72
    -                self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
    
    81
    +        Returns:
    
    82
    +            str: The name of the subscribed :class:`Operation`.
    
    83
    +
    
    84
    +        Raises:
    
    85
    +            NotFoundError: If no operation with `operation_name` exists.
    
    86
    +        """
    
    87
    +        if operation_name in self.__jobs_by_operation:
    
    88
    +            job = self.__jobs_by_operation[operation_name]
    
    89
    +
    
    90
    +        elif operation_name in self.__jobs_by_name:
    
    91
    +            job = self.__jobs_by_name[operation_name]
    
    92
    +
    
    93
    +        else:
    
    94
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    95
    +                                .format(operation_name))
    
    96
    +
    
    97
    +        operation_name = job.register_operation_client(peer, message_queue)
    
    98
    +
    
    99
    +        self.__jobs_by_operation[operation_name] = job
    
    100
    +
    
    101
    +        return operation_name
    
    102
    +
    
    103
    +    def unregister_job_operation_client(self, operation_name, peer):
    
    104
    +        """Unsubscribes to one of the job's :class:`Operation` stage change.
    
    105
    +
    
    106
    +        Args:
    
    107
    +            operation_name (str): name of the operation to unsubscribe from.
    
    108
    +            peer (str): a unique string identifying the client.
    
    109
    +
    
    110
    +        Raises:
    
    111
    +            NotFoundError: If no operation with `operation_name` exists.
    
    112
    +        """
    
    113
    +        if operation_name in self.__jobs_by_operation:
    
    114
    +            job = self.__jobs_by_operation[operation_name]
    
    115
    +
    
    116
    +        elif operation_name in self.__jobs_by_name:
    
    117
    +            job = self.__jobs_by_name[operation_name]
    
    118
    +
    
    119
    +        else:
    
    120
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    121
    +                                .format(operation_name))
    
    122
    +
    
    123
    +        if operation_name in self.__jobs_by_operation:
    
    124
    +            del self.__jobs_by_operation[operation_name]
    
    125
    +
    
    126
    +        job.unregister_operation_client(peer)
    
    127
    +
    
    128
    +        if not job.n_clients and job.done and not job.lease:
    
    129
    +            self._delete_job(job.name)
    
    130
    +
    
    131
    +    def queue_job_operation(self, action, action_digest, priority=0, skip_cache_lookup=False):
    
    132
    +        """Inserts a newly created job into the execution queue.
    
    133
    +
    
    134
    +        Warning:
    
    135
    +            Priority is handle like a POSIX ``nice`` values: a higher value
    
    136
    +            means a low priority, 0 being default priority.
    
    137
    +
    
    138
    +        Args:
    
    139
    +            action (Action): the given action to queue for execution.
    
    140
    +            action_digest (Digest): the digest of the given action.
    
    141
    +            priority (int): the execution job's priority.
    
    142
    +            skip_cache_lookup (bool): whether or not to look for pre-computed
    
    143
    +                result for the given action.
    
    144
    +
    
    145
    +        Returns:
    
    146
    +            str: the newly created operation's name.
    
    147
    +        """
    
    148
    +        def __queue_job(jobs_queue, new_job):
    
    149
    +            index = 0
    
    150
    +            for queued_job in reversed(jobs_queue):
    
    151
    +                if new_job.priority < queued_job.priority:
    
    152
    +                    index += 1
    
    153
    +                else:
    
    154
    +                    break
    
    155
    +
    
    156
    +            index = len(jobs_queue) - index
    
    157
    +
    
    158
    +            jobs_queue.insert(index, new_job)
    
    159
    +
    
    160
    +        if action_digest.hash in self.__jobs_by_action:
    
    161
    +            job = self.__jobs_by_action[action_digest.hash]
    
    73 162
     
    
    74
    -                self.__leases_by_state[LeaseState.PENDING].discard(job_name)
    
    75
    -                self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
    
    76
    -                self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
    
    163
    +            # Reschedule if priority is now greater:
    
    164
    +            if priority < job.priority:
    
    165
    +                job.priority = priority
    
    77 166
     
    
    78
    -    def queue_job(self, job, skip_cache_lookup=False):
    
    79
    -        self.jobs[job.name] = job
    
    167
    +                if job in self.__queue:
    
    168
    +                    self.__queue.remove(job)
    
    169
    +                    __queue_job(self.__queue, job)
    
    170
    +
    
    171
    +            return job.name
    
    172
    +
    
    173
    +        job = Job(action, action_digest, priority=priority)
    
    174
    +
    
    175
    +        self.__jobs_by_action[job.action_digest.hash] = job
    
    176
    +        self.__jobs_by_name[job.name] = job
    
    80 177
     
    
    81 178
             operation_stage = None
    
    82 179
             if self._action_cache is not None and not skip_cache_lookup:
    
    83 180
                 try:
    
    84 181
                     action_result = self._action_cache.get_action_result(job.action_digest)
    
    182
    +
    
    85 183
                 except NotFoundError:
    
    86 184
                     operation_stage = OperationStage.QUEUED
    
    87
    -                self.queue.append(job)
    
    185
    +                __queue_job(self.__queue, job)
    
    88 186
     
    
    89 187
                 else:
    
    90 188
                     job.set_cached_result(action_result)
    
    ... ... @@ -95,28 +193,68 @@ class Scheduler:
    95 193
     
    
    96 194
             else:
    
    97 195
                 operation_stage = OperationStage.QUEUED
    
    98
    -            self.queue.append(job)
    
    196
    +            __queue_job(self.__queue, job)
    
    99 197
     
    
    100 198
             self._update_job_operation_stage(job.name, operation_stage)
    
    101 199
     
    
    102
    -    def retry_job(self, job_name):
    
    103
    -        job = self.jobs[job_name]
    
    200
    +        return job.name
    
    104 201
     
    
    105
    -        operation_stage = None
    
    106
    -        if job.n_tries >= self.MAX_N_TRIES:
    
    107
    -            # TODO: Decide what to do with these jobs
    
    108
    -            operation_stage = OperationStage.COMPLETED
    
    109
    -            # TODO: Mark these jobs as done
    
    202
    +    def get_job_operation(self, operation_name):
    
    203
    +        """Retrieves a job's :class:`Operation` by name.
    
    110 204
     
    
    111
    -        else:
    
    112
    -            operation_stage = OperationStage.QUEUED
    
    113
    -            job.update_lease_state(LeaseState.PENDING)
    
    114
    -            self.queue.append(job)
    
    205
    +        Args:
    
    206
    +            operation_name (str): name of the operation to query.
    
    115 207
     
    
    116
    -        self._update_job_operation_stage(job_name, operation_stage)
    
    208
    +        Raises:
    
    209
    +            NotFoundError: If no operation with `operation_name` exists.
    
    210
    +        """
    
    211
    +        try:
    
    212
    +            job = self.__jobs_by_operation[operation_name]
    
    213
    +
    
    214
    +        except KeyError:
    
    215
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    216
    +                                .format(operation_name))
    
    217
    +
    
    218
    +        return job.get_operation(operation_name)
    
    219
    +
    
    220
    +    def cancel_job_operation(self, operation_name):
    
    221
    +        """"Cancels a job's :class:`Operation` by name.
    
    222
    +
    
    223
    +        Args:
    
    224
    +            operation_name (str): name of the operation to cancel.
    
    225
    +
    
    226
    +        Raises:
    
    227
    +            NotFoundError: If no operation with `operation_name` exists.
    
    228
    +        """
    
    229
    +        try:
    
    230
    +            job = self.__jobs_by_operation[operation_name]
    
    231
    +
    
    232
    +        except KeyError:
    
    233
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    234
    +                                .format(operation_name))
    
    235
    +
    
    236
    +        job.cancel_operation(operation_name)
    
    237
    +
    
    238
    +    def delete_job_operation(self, operation_name):
    
    239
    +        """"Removes a job.
    
    117 240
     
    
    118
    -    def list_jobs(self):
    
    119
    -        return self.jobs.values()
    
    241
    +        Args:
    
    242
    +            operation_name (str): name of the operation to cancel.
    
    243
    +
    
    244
    +        Raises:
    
    245
    +            NotFoundError: If no operation with `operation_name` exists.
    
    246
    +        """
    
    247
    +        try:
    
    248
    +            job = self.__jobs_by_operation[operation_name]
    
    249
    +
    
    250
    +        except KeyError:
    
    251
    +            raise NotFoundError("Operation name does not exist: [{}]"
    
    252
    +                                .format(operation_name))
    
    253
    +
    
    254
    +        if not job.n_clients and job.done and not job.lease:
    
    255
    +            self._delete_job(job.name)
    
    256
    +
    
    257
    +    # --- Public API: RWAPI ---
    
    120 258
     
    
    121 259
         def request_job_leases(self, worker_capabilities):
    
    122 260
             """Generates a list of the highest priority leases to be run.
    
    ... ... @@ -126,10 +264,10 @@ class Scheduler:
    126 264
                     worker properties, configuration and state at the time of the
    
    127 265
                     request.
    
    128 266
             """
    
    129
    -        if not self.queue:
    
    267
    +        if not self.__queue:
    
    130 268
                 return []
    
    131 269
     
    
    132
    -        job = self.queue.popleft()
    
    270
    +        job = self.__queue.popleft()
    
    133 271
     
    
    134 272
             lease = job.lease
    
    135 273
     
    
    ... ... @@ -142,18 +280,25 @@ class Scheduler:
    142 280
     
    
    143 281
             return None
    
    144 282
     
    
    145
    -    def update_job_lease(self, lease):
    
    283
    +    def update_job_lease_state(self, job_name, lease):
    
    146 284
             """Requests a state transition for a job's current :class:Lease.
    
    147 285
     
    
    286
    +        Note:
    
    287
    +            This may trigger a job's :class:`Operation` stage transition.
    
    288
    +
    
    148 289
             Args:
    
    149 290
                 job_name (str): name of the job to query.
    
    150
    -            lease_state (LeaseState): the lease state to transition to.
    
    151
    -            lease_status (google.rpc.Status): the lease execution status, only
    
    152
    -                required if `lease_state` is `COMPLETED`.
    
    153
    -            lease_result (google.protobuf.Any): the lease execution result, only
    
    154
    -                required if `lease_state` is `COMPLETED`.
    
    291
    +            lease (Lease): the lease holding the new state.
    
    292
    +
    
    293
    +        Raises:
    
    294
    +            NotFoundError: If no job with `job_name` exists.
    
    155 295
             """
    
    156
    -        job = self.jobs[lease.id]
    
    296
    +        try:
    
    297
    +            job = self.__jobs_by_name[job_name]
    
    298
    +
    
    299
    +        except KeyError:
    
    300
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    301
    +
    
    157 302
             lease_state = LeaseState(lease.state)
    
    158 303
     
    
    159 304
             operation_stage = None
    
    ... ... @@ -189,29 +334,92 @@ class Scheduler:
    189 334
                     self.__leases_by_state[LeaseState.ACTIVE].discard(lease.id)
    
    190 335
                     self.__leases_by_state[LeaseState.COMPLETED].add(lease.id)
    
    191 336
     
    
    192
    -        self._update_job_operation_stage(lease.id, operation_stage)
    
    337
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    338
    +
    
    339
    +    def retry_job_lease(self, job_name):
    
    340
    +        """Re-queues a job on lease execution failure.
    
    341
    +
    
    342
    +        Note:
    
    343
    +            This may trigger a job's :class:`Operation` stage transition.
    
    344
    +
    
    345
    +        Args:
    
    346
    +            job_name (str): name of the job to query.
    
    347
    +
    
    348
    +        Raises:
    
    349
    +            NotFoundError: If no job with `job_name` exists.
    
    350
    +        """
    
    351
    +        try:
    
    352
    +            job = self.__jobs_by_name[job_name]
    
    353
    +
    
    354
    +        except KeyError:
    
    355
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    356
    +
    
    357
    +        operation_stage = None
    
    358
    +        if job.n_tries >= self.MAX_N_TRIES:
    
    359
    +            # TODO: Decide what to do with these jobs
    
    360
    +            operation_stage = OperationStage.COMPLETED
    
    361
    +            # TODO: Mark these jobs as done
    
    362
    +
    
    363
    +        else:
    
    364
    +            operation_stage = OperationStage.QUEUED
    
    365
    +            job.update_lease_state(LeaseState.PENDING)
    
    366
    +            self.__queue.append(job)
    
    367
    +
    
    368
    +        self._update_job_operation_stage(job_name, operation_stage)
    
    193 369
     
    
    194 370
         def get_job_lease(self, job_name):
    
    195
    -        """Returns the lease associated to job, if any have been emitted yet."""
    
    196
    -        return self.jobs[job_name].lease
    
    371
    +        """Returns the lease associated to job, if any have been emitted yet.
    
    197 372
     
    
    198
    -    def get_job_lease_cancelled(self, job_name):
    
    199
    -        """Returns true if the lease is cancelled"""
    
    200
    -        return self.jobs[job_name].lease_cancelled
    
    373
    +        Args:
    
    374
    +            job_name (str): name of the job to query.
    
    375
    +
    
    376
    +        Raises:
    
    377
    +            NotFoundError: If no job with `job_name` exists.
    
    378
    +        """
    
    379
    +        try:
    
    380
    +            job = self.__jobs_by_name[job_name]
    
    381
    +
    
    382
    +        except KeyError:
    
    383
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    384
    +
    
    385
    +        return job.lease
    
    386
    +
    
    387
    +    def delete_job_lease(self, job_name):
    
    388
    +        """Discards the lease associated with a job.
    
    389
    +
    
    390
    +        Args:
    
    391
    +            job_name (str): name of the job to query.
    
    392
    +
    
    393
    +        Raises:
    
    394
    +            NotFoundError: If no job with `job_name` exists.
    
    395
    +        """
    
    396
    +        try:
    
    397
    +            job = self.__jobs_by_name[job_name]
    
    201 398
     
    
    202
    -    def get_job_operation(self, job_name):
    
    203
    -        """Returns the operation associated to job."""
    
    204
    -        return self.jobs[job_name].operation
    
    399
    +        except KeyError:
    
    400
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    205 401
     
    
    206
    -    def cancel_job_operation(self, job_name):
    
    207
    -        """"Cancels the underlying operation of a given job.
    
    402
    +        job.delete_lease()
    
    208 403
     
    
    209
    -        This will also cancel any job's lease that may have been issued.
    
    404
    +        if not job.n_clients and job.operation.done:
    
    405
    +            self._delete_job(job.name)
    
    406
    +
    
    407
    +    def get_job_lease_cancelled(self, job_name):
    
    408
    +        """Returns true if the lease is cancelled.
    
    210 409
     
    
    211 410
             Args:
    
    212
    -            job_name (str): name of the job holding the operation to cancel.
    
    411
    +            job_name (str): name of the job to query.
    
    412
    +
    
    413
    +        Raises:
    
    414
    +            NotFoundError: If no job with `job_name` exists.
    
    213 415
             """
    
    214
    -        self.jobs[job_name].cancel_operation()
    
    416
    +        try:
    
    417
    +            job = self.__jobs_by_name[job_name]
    
    418
    +
    
    419
    +        except KeyError:
    
    420
    +            raise NotFoundError("Job name does not exist: [{}]".format(job_name))
    
    421
    +
    
    422
    +        return job.lease_cancelled
    
    215 423
     
    
    216 424
         # --- Public API: Monitoring ---
    
    217 425
     
    
    ... ... @@ -261,11 +469,11 @@ class Scheduler:
    261 469
                 self.__build_metadata_queues.append(message_queue)
    
    262 470
     
    
    263 471
         def query_n_jobs(self):
    
    264
    -        return len(self.jobs)
    
    472
    +        return len(self.__jobs_by_name)
    
    265 473
     
    
    266 474
         def query_n_operations(self):
    
    267 475
             # For now n_operations == n_jobs:
    
    268
    -        return len(self.jobs)
    
    476
    +        return len(self.__jobs_by_operation)
    
    269 477
     
    
    270 478
         def query_n_operations_by_stage(self, operation_stage):
    
    271 479
             try:
    
    ... ... @@ -276,7 +484,7 @@ class Scheduler:
    276 484
             return 0
    
    277 485
     
    
    278 486
         def query_n_leases(self):
    
    279
    -        return len(self.jobs)
    
    487
    +        return len(self.__jobs_by_name)
    
    280 488
     
    
    281 489
         def query_n_leases_by_state(self, lease_state):
    
    282 490
             try:
    
    ... ... @@ -296,6 +504,23 @@ class Scheduler:
    296 504
     
    
    297 505
         # --- Private API ---
    
    298 506
     
    
    507
    +    def _delete_job(self, job_name):
    
    508
    +        """Drops an entry from the internal list of jobs."""
    
    509
    +        job = self.__jobs_by_name[job_name]
    
    510
    +
    
    511
    +        del self.__jobs_by_action[job.action_digest.hash]
    
    512
    +        del self.__jobs_by_name[job.name]
    
    513
    +
    
    514
    +        if self._is_instrumented:
    
    515
    +            self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job.name)
    
    516
    +            self.__operations_by_stage[OperationStage.QUEUED].discard(job.name)
    
    517
    +            self.__operations_by_stage[OperationStage.EXECUTING].discard(job.name)
    
    518
    +            self.__operations_by_stage[OperationStage.COMPLETED].discard(job.name)
    
    519
    +
    
    520
    +            self.__leases_by_state[LeaseState.PENDING].discard(job.name)
    
    521
    +            self.__leases_by_state[LeaseState.ACTIVE].discard(job.name)
    
    522
    +            self.__leases_by_state[LeaseState.COMPLETED].discard(job.name)
    
    523
    +
    
    299 524
         def _update_job_operation_stage(self, job_name, operation_stage):
    
    300 525
             """Requests a stage transition for the job's :class:Operations.
    
    301 526
     
    
    ... ... @@ -303,7 +528,7 @@ class Scheduler:
    303 528
                 job_name (str): name of the job to query.
    
    304 529
                 operation_stage (OperationStage): the stage to transition to.
    
    305 530
             """
    
    306
    -        job = self.jobs[job_name]
    
    531
    +        job = self.__jobs_by_name[job_name]
    
    307 532
     
    
    308 533
             if operation_stage == OperationStage.CACHE_CHECK:
    
    309 534
                 job.update_operation_stage(OperationStage.CACHE_CHECK)
    
    ... ... @@ -352,7 +577,7 @@ class Scheduler:
    352 577
     
    
    353 578
                     self.__queue_time_average = average_order, average_time
    
    354 579
     
    
    355
    -                if not job.holds_cached_action_result:
    
    580
    +                if not job.holds_cached_result:
    
    356 581
                         execution_metadata = job.action_result.execution_metadata
    
    357 582
                         context_metadata = {'job-is': job.name}
    
    358 583
     
    

  • docs/source/conf.py
    ... ... @@ -182,3 +182,11 @@ texinfo_documents = [
    182 182
          author, 'BuildGrid', 'One line description of project.',
    
    183 183
          'Miscellaneous'),
    
    184 184
     ]
    
    185
    +
    
    186
    +# -- Options for the autodoc extension ----------------------------------------
    
    187
    +
    
    188
    +# This value selects if automatically documented members are sorted
    
    189
    +# alphabetical (value 'alphabetical'), by member type (value 'groupwise') or
    
    190
    +# by source order (value 'bysource'). The default is alphabetical.
    
    191
    +autodoc_member_order = 'bysource'
    
    192
    +

  • docs/source/using_buildstream.rst
    ... ... @@ -4,9 +4,11 @@
    4 4
     BuildStream client
    
    5 5
     ==================
    
    6 6
     
    
    7
    -`BuildStream`_  is a free software tool for building and integrating software
    
    7
    +`BuildStream`_ is a free software tool for building and integrating software
    
    8 8
     stacks. It supports remote build execution using the remote execution API
    
    9
    -(REAPI) v2.
    
    9
    +(REAPI) v2. The project's documentation has a detailed section about its
    
    10
    +`remote execution subsystem architecture`_ that you are very recommanded to
    
    11
    +read first.
    
    10 12
     
    
    11 13
     .. note::
    
    12 14
     
    
    ... ... @@ -15,6 +17,7 @@ stacks. It supports remote build execution using the remote execution API
    15 17
        remote execution.
    
    16 18
     
    
    17 19
     .. _BuildStream: https://buildstream.build
    
    20
    +.. _remote execution subsystem architecture: https://buildstream.gitlab.io/buildstream/arch_remote_execution.html
    
    18 21
     .. _install it from sources: https://buildstream.build/source_install.html
    
    19 22
     
    
    20 23
     
    
    ... ... @@ -43,23 +46,23 @@ Project configuration
    43 46
     In order to activate remote build execution at project-level, the project's
    
    44 47
     ``project.conf`` file must declare two specific configuration nodes:
    
    45 48
     
    
    46
    -- ``artifacts`` for `remote CAS endpoint details`_.
    
    49
    +- ``artifacts`` for `remote cache endpoint details`_.
    
    47 50
     - ``remote-execution`` for `remote execution endpoint details`_.
    
    48 51
     
    
    49 52
     .. important::
    
    50 53
     
    
    51 54
        BuildStream does not support multi-instance remote execution servers and will
    
    52 55
        always submit remote execution request omitting the instance name parameter.
    
    53
    -   Thus, you must declare an unnamed `""` instance  in your server configuration
    
    56
    +   Thus, you must declare an unnamed `''` instance  in your server configuration
    
    54 57
        to workaround this.
    
    55 58
     
    
    56 59
     .. important::
    
    57 60
     
    
    58
    -   If you are using BuildGrid's artifact server, the server instance **must**
    
    59
    -   accept pushes from your client for remote execution to be possible.
    
    61
    +   If you are using BuildStream's artifact server, the server instance pointed
    
    62
    +   by the ``storage-service`` key **must** accept pushes from your client for
    
    63
    +   remote execution to be possible.
    
    60 64
     
    
    61
    -
    
    62
    -.. _remote CAS endpoint details: https://buildstream.gitlab.io/buildstream/install_artifacts.html#user-configuration
    
    65
    +.. _remote cache endpoint details: https://buildstream.gitlab.io/buildstream/format_project.html#artifact-server
    
    63 66
     .. _remote execution endpoint details: https://buildstream.gitlab.io/buildstream/format_project.html#remote-execution
    
    64 67
     
    
    65 68
     
    
    ... ... @@ -167,7 +170,15 @@ append at the end of the ``project.conf`` file from the root directory:
    167 170
          push: true
    
    168 171
     
    
    169 172
        remote-execution:
    
    170
    -     url: http://localhost:50051
    
    173
    +     execution-service:
    
    174
    +       url: http://localhost:50051
    
    175
    +     storage-service:
    
    176
    +       url: http://localhost:50051
    
    177
    +       client-key: ''
    
    178
    +       client-cert: ''
    
    179
    +       server-cert: ''
    
    180
    +     action-cache-service:
    
    181
    +       url: http://localhost:50051
    
    171 182
     
    
    172 183
     This activates BuildGrid's remote execution mode and points to the unnamed
    
    173 184
     remote execution server instance at ``localhost:50051``.
    

  • tests/integration/bots_service.py
    ... ... @@ -25,7 +25,6 @@ import pytest
    25 25
     
    
    26 26
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    27 27
     from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
    
    28
    -from buildgrid.server import job
    
    29 28
     from buildgrid.server.controller import ExecutionController
    
    30 29
     from buildgrid.server.job import LeaseState
    
    31 30
     from buildgrid.server.bots import service
    
    ... ... @@ -159,7 +158,8 @@ def test_post_bot_event_temp(context, instance):
    159 158
     def _inject_work(scheduler, action=None, action_digest=None):
    
    160 159
         if not action:
    
    161 160
             action = remote_execution_pb2.Action()
    
    161
    +
    
    162 162
         if not action_digest:
    
    163 163
             action_digest = remote_execution_pb2.Digest()
    
    164
    -    j = job.Job(action, action_digest)
    
    165
    -    scheduler.queue_job(j, True)
    164
    +
    
    165
    +    scheduler.queue_job_operation(action, action_digest, skip_cache_lookup=True)

  • tests/integration/execution_service.py
    ... ... @@ -20,11 +20,11 @@
    20 20
     import uuid
    
    21 21
     from unittest import mock
    
    22 22
     
    
    23
    -from google.protobuf import any_pb2
    
    24 23
     import grpc
    
    25 24
     from grpc._server import _Context
    
    26 25
     import pytest
    
    27 26
     
    
    27
    +from buildgrid._enums import OperationStage
    
    28 28
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    29 29
     from buildgrid._protos.google.longrunning import operations_pb2
    
    30 30
     
    
    ... ... @@ -82,7 +82,7 @@ def test_execute(skip_cache_lookup, instance, context):
    82 82
         assert isinstance(result, operations_pb2.Operation)
    
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85
    -    assert metadata.stage == job.OperationStage.QUEUED.value
    
    85
    +    assert metadata.stage == OperationStage.QUEUED.value
    
    86 86
         operation_uuid = result.name.split('/')[-1]
    
    87 87
         assert uuid.UUID(operation_uuid, version=4)
    
    88 88
         assert result.done is False
    
    ... ... @@ -106,18 +106,14 @@ def test_no_action_digest_in_storage(instance, context):
    106 106
     
    
    107 107
     
    
    108 108
     def test_wait_execution(instance, controller, context):
    
    109
    -    j = job.Job(action, action_digest)
    
    110
    -    j._operation.done = True
    
    109
    +    job_name = controller.execution_instance._scheduler.queue_job_operation(action,
    
    110
    +                                                                            action_digest,
    
    111
    +                                                                            skip_cache_lookup=True)
    
    111 112
     
    
    112
    -    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    113
    +    controller.execution_instance._scheduler._update_job_operation_stage(job_name,
    
    114
    +                                                                         OperationStage.COMPLETED)
    
    113 115
     
    
    114
    -    controller.execution_instance._scheduler.jobs[j.name] = j
    
    115
    -
    
    116
    -    action_result_any = any_pb2.Any()
    
    117
    -    action_result = remote_execution_pb2.ActionResult()
    
    118
    -    action_result_any.Pack(action_result)
    
    119
    -
    
    120
    -    j.update_operation_stage(job.OperationStage.COMPLETED)
    
    116
    +    request = remote_execution_pb2.WaitExecutionRequest(name=job_name)
    
    121 117
     
    
    122 118
         response = instance.WaitExecution(request, context)
    
    123 119
     
    
    ... ... @@ -127,7 +123,6 @@ def test_wait_execution(instance, controller, context):
    127 123
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    128 124
         result.metadata.Unpack(metadata)
    
    129 125
         assert metadata.stage == job.OperationStage.COMPLETED.value
    
    130
    -    assert uuid.UUID(result.name, version=4)
    
    131 126
         assert result.done is True
    
    132 127
     
    
    133 128
     
    

  • tests/integration/operations_service.py
    ... ... @@ -17,6 +17,7 @@
    17 17
     
    
    18 18
     # pylint: disable=redefined-outer-name
    
    19 19
     
    
    20
    +import queue
    
    20 21
     from unittest import mock
    
    21 22
     
    
    22 23
     from google.protobuf import any_pb2
    
    ... ... @@ -86,8 +87,13 @@ def blank_instance(controller):
    86 87
     
    
    87 88
     # Queue an execution, get operation corresponding to that request
    
    88 89
     def test_get_operation(instance, controller, execute_request, context):
    
    89
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    90
    -                                                             execute_request.skip_cache_lookup)
    
    90
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    91
    +                                                     execute_request.skip_cache_lookup)
    
    92
    +
    
    93
    +    message_queue = queue.Queue()
    
    94
    +    operation_name = controller.execution_instance.register_operation_client(job_name,
    
    95
    +                                                                             context.peer(),
    
    96
    +                                                                             message_queue)
    
    91 97
     
    
    92 98
         request = operations_pb2.GetOperationRequest()
    
    93 99
     
    
    ... ... @@ -95,25 +101,28 @@ def test_get_operation(instance, controller, execute_request, context):
    95 101
         # we're manually creating the instance here, it doesn't get a name.
    
    96 102
         # Therefore we need to manually add the instance name to the operation
    
    97 103
         # name in the GetOperation request.
    
    98
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    104
    +    request.name = "{}/{}".format(instance_name, operation_name)
    
    99 105
     
    
    100 106
         response = instance.GetOperation(request, context)
    
    101
    -    assert response.name == "{}/{}".format(instance_name, response_execute.name)
    
    102
    -    assert response.done == response_execute.done
    
    107
    +    assert response.name == "{}/{}".format(instance_name, operation_name)
    
    103 108
     
    
    104 109
     
    
    105 110
     # Queue an execution, get operation corresponding to that request
    
    106 111
     def test_get_operation_blank(blank_instance, controller, execute_request, context):
    
    107
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    108
    -                                                             execute_request.skip_cache_lookup)
    
    112
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    113
    +                                                     execute_request.skip_cache_lookup)
    
    114
    +
    
    115
    +    message_queue = queue.Queue()
    
    116
    +    operation_name = controller.execution_instance.register_operation_client(job_name,
    
    117
    +                                                                             context.peer(),
    
    118
    +                                                                             message_queue)
    
    109 119
     
    
    110 120
         request = operations_pb2.GetOperationRequest()
    
    111 121
     
    
    112
    -    request.name = response_execute.name
    
    122
    +    request.name = operation_name
    
    113 123
     
    
    114 124
         response = blank_instance.GetOperation(request, context)
    
    115
    -    assert response.name == response_execute.name
    
    116
    -    assert response.done == response_execute.done
    
    125
    +    assert response.name == operation_name
    
    117 126
     
    
    118 127
     
    
    119 128
     def test_get_operation_fail(instance, context):
    
    ... ... @@ -133,25 +142,35 @@ def test_get_operation_instance_fail(instance, context):
    133 142
     
    
    134 143
     
    
    135 144
     def test_list_operations(instance, controller, execute_request, context):
    
    136
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    137
    -                                                             execute_request.skip_cache_lookup)
    
    145
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    146
    +                                                     execute_request.skip_cache_lookup)
    
    147
    +
    
    148
    +    message_queue = queue.Queue()
    
    149
    +    operation_name = controller.execution_instance.register_operation_client(job_name,
    
    150
    +                                                                             context.peer(),
    
    151
    +                                                                             message_queue)
    
    138 152
     
    
    139 153
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    140 154
         response = instance.ListOperations(request, context)
    
    141 155
     
    
    142 156
         names = response.operations[0].name.split('/')
    
    143 157
         assert names[0] == instance_name
    
    144
    -    assert names[1] == response_execute.name
    
    158
    +    assert names[1] == operation_name
    
    145 159
     
    
    146 160
     
    
    147 161
     def test_list_operations_blank(blank_instance, controller, execute_request, context):
    
    148
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    149
    -                                                             execute_request.skip_cache_lookup)
    
    162
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    163
    +                                                     execute_request.skip_cache_lookup)
    
    164
    +
    
    165
    +    message_queue = queue.Queue()
    
    166
    +    operation_name = controller.execution_instance.register_operation_client(job_name,
    
    167
    +                                                                             context.peer(),
    
    168
    +                                                                             message_queue)
    
    150 169
     
    
    151 170
         request = operations_pb2.ListOperationsRequest(name='')
    
    152 171
         response = blank_instance.ListOperations(request, context)
    
    153 172
     
    
    154
    -    assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    173
    +    assert response.operations[0].name.split('/')[-1] == operation_name
    
    155 174
     
    
    156 175
     
    
    157 176
     def test_list_operations_instance_fail(instance, controller, execute_request, context):
    
    ... ... @@ -174,14 +193,19 @@ def test_list_operations_empty(instance, context):
    174 193
     
    
    175 194
     # Send execution off, delete, try to find operation should fail
    
    176 195
     def test_delete_operation(instance, controller, execute_request, context):
    
    177
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    178
    -                                                             execute_request.skip_cache_lookup)
    
    196
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    197
    +                                                     execute_request.skip_cache_lookup)
    
    198
    +
    
    199
    +    message_queue = queue.Queue()
    
    200
    +    operation_name = controller.execution_instance.register_operation_client(job_name,
    
    201
    +                                                                             context.peer(),
    
    202
    +                                                                             message_queue)
    
    179 203
     
    
    180 204
         request = operations_pb2.DeleteOperationRequest()
    
    181
    -    request.name = response_execute.name
    
    205
    +    request.name = operation_name
    
    182 206
         instance.DeleteOperation(request, context)
    
    183 207
     
    
    184
    -    request_name = "{}/{}".format(instance_name, response_execute.name)
    
    208
    +    request_name = "{}/{}".format(instance_name, operation_name)
    
    185 209
     
    
    186 210
         with pytest.raises(InvalidArgumentError):
    
    187 211
             controller.operations_instance.get_operation(request_name)
    
    ... ... @@ -189,17 +213,11 @@ def test_delete_operation(instance, controller, execute_request, context):
    189 213
     
    
    190 214
     # Send execution off, delete, try to find operation should fail
    
    191 215
     def test_delete_operation_blank(blank_instance, controller, execute_request, context):
    
    192
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    193
    -                                                             execute_request.skip_cache_lookup)
    
    194
    -
    
    195 216
         request = operations_pb2.DeleteOperationRequest()
    
    196
    -    request.name = response_execute.name
    
    217
    +    request.name = "runner"
    
    197 218
         blank_instance.DeleteOperation(request, context)
    
    198 219
     
    
    199
    -    request_name = response_execute.name
    
    200
    -
    
    201
    -    with pytest.raises(InvalidArgumentError):
    
    202
    -        controller.operations_instance.get_operation(request_name)
    
    220
    +    context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    203 221
     
    
    204 222
     
    
    205 223
     def test_delete_operation_fail(instance, context):
    
    ... ... @@ -211,11 +229,16 @@ def test_delete_operation_fail(instance, context):
    211 229
     
    
    212 230
     
    
    213 231
     def test_cancel_operation(instance, controller, execute_request, context):
    
    214
    -    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    215
    -                                                             execute_request.skip_cache_lookup)
    
    232
    +    job_name = controller.execution_instance.execute(execute_request.action_digest,
    
    233
    +                                                     execute_request.skip_cache_lookup)
    
    234
    +
    
    235
    +    message_queue = queue.Queue()
    
    236
    +    operation_name = controller.execution_instance.register_operation_client(job_name,
    
    237
    +                                                                             context.peer(),
    
    238
    +                                                                             message_queue)
    
    216 239
     
    
    217 240
         request = operations_pb2.CancelOperationRequest()
    
    218
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    241
    +    request.name = "{}/{}".format(instance_name, operation_name)
    
    219 242
     
    
    220 243
         instance.CancelOperation(request, context)
    
    221 244
     
    
    ... ... @@ -238,7 +261,7 @@ def test_cancel_operation_blank(blank_instance, context):
    238 261
         context.set_code.assert_called_once_with(grpc.StatusCode.INVALID_ARGUMENT)
    
    239 262
     
    
    240 263
     
    
    241
    -def test_cancel_operation_instance_fail(instance, context):
    
    264
    +def test_cancel_operation__fail(instance, context):
    
    242 265
         request = operations_pb2.CancelOperationRequest()
    
    243 266
         instance.CancelOperation(request, context)
    
    244 267
     
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]