Santiago Gil pushed to branch santigl/106-request-metadata at BuildGrid / buildgrid
Commits:
- 
9fc7da02
by Santiago Gil at 2019-02-21T17:43:15Z
7 changed files:
- buildgrid/_app/commands/cmd_execute.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- + buildgrid/server/peer.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
Changes:
| ... | ... | @@ -30,6 +30,8 @@ from buildgrid.client.authentication import setup_channel | 
| 30 | 30 |  from buildgrid.client.cas import download, upload
 | 
| 31 | 31 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 32 | 32 |  from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 33 | +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME, \
 | |
| 34 | +    REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION
 | |
| 33 | 35 |  from buildgrid.utils import create_digest
 | 
| 34 | 36 |  | 
| 35 | 37 |  from ..cli import pass_context
 | 
| ... | ... | @@ -66,10 +68,12 @@ def cli(context, remote, instance_name, auth_token, client_key, client_cert, ser | 
| 66 | 68 |  @cli.command('request-dummy', short_help="Send a dummy action.")
 | 
| 67 | 69 |  @click.option('--number', type=click.INT, default=1, show_default=True,
 | 
| 68 | 70 |                help="Number of request to send.")
 | 
| 71 | +@click.option('--request-metadata', is_flag=True,
 | |
| 72 | +              help="Attach RequestMetadata to the request header.")
 | |
| 69 | 73 |  @click.option('--wait-for-completion', is_flag=True,
 | 
| 70 | 74 |                help="Stream updates until jobs are completed.")
 | 
| 71 | 75 |  @pass_context
 | 
| 72 | -def request_dummy(context, number, wait_for_completion):
 | |
| 76 | +def request_dummy(context, number, request_metadata, wait_for_completion):
 | |
| 73 | 77 |  | 
| 74 | 78 |      click.echo("Sending execution request...")
 | 
| 75 | 79 |      command = remote_execution_pb2.Command()
 | 
| ... | ... | @@ -85,12 +89,21 @@ def request_dummy(context, number, wait_for_completion): | 
| 85 | 89 |                                                    action_digest=action_digest,
 | 
| 86 | 90 |                                                    skip_cache_lookup=True)
 | 
| 87 | 91 |  | 
| 92 | +    # If enabled, we attach some `RequestMetadata` information to the request:
 | |
| 93 | +    execute_arguments = {}
 | |
| 94 | +    if request_metadata:
 | |
| 95 | +        metadata = request_metadata_header_entry(tool_name=REQUEST_METADATA_TOOL_NAME,
 | |
| 96 | +                                                 tool_version=REQUEST_METADATA_TOOL_VERSION,
 | |
| 97 | +                                                 action_id='2',
 | |
| 98 | +                                                 tool_invocation_id='3',
 | |
| 99 | +                                                 correlated_invocations_id='4')
 | |
| 100 | +        execute_arguments['metadata'] = metadata
 | |
| 101 | + | |
| 88 | 102 |      responses = []
 | 
| 89 | 103 |      for _ in range(0, number):
 | 
| 90 | -        responses.append(stub.Execute(request))
 | |
| 104 | +        responses.append(stub.Execute(request, **execute_arguments))
 | |
| 91 | 105 |  | 
| 92 | 106 |      for response in responses:
 | 
| 93 | - | |
| 94 | 107 |          if wait_for_completion:
 | 
| 95 | 108 |              result = None
 | 
| 96 | 109 |              for stream in response:
 | 
| ... | ... | @@ -113,14 +126,22 @@ def request_dummy(context, number, wait_for_completion): | 
| 113 | 126 |                help="Output directory for the output files.")
 | 
| 114 | 127 |  @click.option('-p', '--platform-property', nargs=2, type=(click.STRING, click.STRING), multiple=True,
 | 
| 115 | 128 |                help="List of key-value pairs of required platform properties.")
 | 
| 129 | +@click.option('-t', '--tool-details', nargs=2, type=str,
 | |
| 130 | +              default=(REQUEST_METADATA_TOOL_NAME, REQUEST_METADATA_TOOL_VERSION),
 | |
| 131 | +              help="Tool name and version.")
 | |
| 132 | +@click.option('-a', '--action-id', type=str, help='Action ID.')
 | |
| 133 | +@click.option('-i', '--invocation-id', type=str, help='Tool invocation ID.')
 | |
| 134 | +@click.option('-c', '--correlation-id', type=str, help='Correlated invocation ID.')
 | |
| 116 | 135 |  @click.argument('input-root', nargs=1, type=click.Path(), required=True)
 | 
| 117 | 136 |  @click.argument('commands', nargs=-1, type=click.STRING, required=True)
 | 
| 118 | 137 |  @pass_context
 | 
| 119 | 138 |  def run_command(context, input_root, commands, output_file, output_directory,
 | 
| 120 | -                platform_property):
 | |
| 139 | +                platform_property, tool_details, action_id, invocation_id,
 | |
| 140 | +                correlation_id):
 | |
| 121 | 141 |      stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
 | 
| 122 | 142 |  | 
| 123 | 143 |      output_executables = []
 | 
| 144 | + | |
| 124 | 145 |      with upload(context.channel, instance=context.instance_name) as uploader:
 | 
| 125 | 146 |          command = remote_execution_pb2.Command()
 | 
| 126 | 147 |  | 
| ... | ... | @@ -157,7 +178,14 @@ def run_command(context, input_root, commands, output_file, output_directory, | 
| 157 | 178 |      request = remote_execution_pb2.ExecuteRequest(instance_name=context.instance_name,
 | 
| 158 | 179 |                                                    action_digest=action_digest,
 | 
| 159 | 180 |                                                    skip_cache_lookup=True)
 | 
| 160 | -    response = stub.Execute(request)
 | |
| 181 | + | |
| 182 | +    metadata = request_metadata_header_entry(tool_name=tool_details[0],
 | |
| 183 | +                                             tool_version=tool_details[1],
 | |
| 184 | +                                             action_id=action_id,
 | |
| 185 | +                                             tool_invocation_id=invocation_id,
 | |
| 186 | +                                             correlated_invocations_id=correlation_id)
 | |
| 187 | + | |
| 188 | +    response = stub.Execute(request, metadata=metadata)
 | |
| 161 | 189 |  | 
| 162 | 190 |      stream = None
 | 
| 163 | 191 |      for stream in response:
 | 
| ... | ... | @@ -180,3 +208,25 @@ def run_command(context, input_root, commands, output_file, output_directory, | 
| 180 | 208 |          if output_file_response.path in output_executables:
 | 
| 181 | 209 |              st = os.stat(path)
 | 
| 182 | 210 |              os.chmod(path, st.st_mode | stat.S_IXUSR)
 | 
| 211 | + | |
| 212 | + | |
| 213 | +def request_metadata_header_entry(tool_name=None, tool_version=None,
 | |
| 214 | +                                  action_id=None, tool_invocation_id=None,
 | |
| 215 | +                                  correlated_invocations_id=None):
 | |
| 216 | +    """Creates a serialized RequestMetadata entry to attach to a gRPC
 | |
| 217 | +    call header. Arguments should be of type str or None.
 | |
| 218 | +    """
 | |
| 219 | +    request_metadata = remote_execution_pb2.RequestMetadata()
 | |
| 220 | +    if action_id:
 | |
| 221 | +        request_metadata.action_id = action_id
 | |
| 222 | +    if tool_invocation_id:
 | |
| 223 | +        request_metadata.tool_invocation_id = tool_invocation_id
 | |
| 224 | +    if correlated_invocations_id:
 | |
| 225 | +        request_metadata.correlated_invocations_id = correlated_invocations_id
 | |
| 226 | +    if tool_name:
 | |
| 227 | +        request_metadata.tool_details.tool_name = tool_name
 | |
| 228 | +    if tool_version:
 | |
| 229 | +        request_metadata.tool_details.tool_version = tool_version
 | |
| 230 | + | |
| 231 | +    return ((REQUEST_METADATA_HEADER_NAME,
 | |
| 232 | +             request_metadata.SerializeToString()),) | 
| ... | ... | @@ -62,7 +62,7 @@ class ExecutionInstance: | 
| 62 | 62 |          return get_hash_type()
 | 
| 63 | 63 |  | 
| 64 | 64 |      def execute(self, action_digest, skip_cache_lookup):
 | 
| 65 | -        """ Sends a job for execution.
 | |
| 65 | +        """Sends a job for execution.
 | |
| 66 | 66 |          Queues an action and creates an Operation instance to be associated with
 | 
| 67 | 67 |          this action.
 | 
| 68 | 68 |          """
 | 
| ... | ... | @@ -27,9 +27,11 @@ from functools import partial | 
| 27 | 27 |  import grpc
 | 
| 28 | 28 |  | 
| 29 | 29 |  from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, CancelledError
 | 
| 30 | -from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
 | |
| 30 | +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 31 | 31 |  from buildgrid._protos.google.longrunning import operations_pb2
 | 
| 32 | +from buildgrid.server.peer import Peer
 | |
| 32 | 33 |  from buildgrid.server._authentication import AuthContext, authorize
 | 
| 34 | +from buildgrid.settings import REQUEST_METADATA_HEADER_NAME
 | |
| 33 | 35 |  | 
| 34 | 36 |  | 
| 35 | 37 |  class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 | 
| ... | ... | @@ -94,7 +96,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 94 | 96 |  | 
| 95 | 97 |          instance_name = request.instance_name
 | 
| 96 | 98 |          message_queue = queue.Queue()
 | 
| 97 | -        peer = context.peer()
 | |
| 99 | +        peer_id = context.peer()
 | |
| 100 | + | |
| 101 | +        request_metadata = self._context_extract_request_metadata(context)
 | |
| 102 | +        peer = Peer(peer_id=peer_id, request_metadata=request_metadata)
 | |
| 98 | 103 |  | 
| 99 | 104 |          try:
 | 
| 100 | 105 |              instance = self._get_instance(instance_name)
 | 
| ... | ... | @@ -102,18 +107,18 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 102 | 107 |              job_name = instance.execute(request.action_digest,
 | 
| 103 | 108 |                                          request.skip_cache_lookup)
 | 
| 104 | 109 |  | 
| 105 | -            operation_name = instance.register_job_peer(job_name,
 | |
| 106 | -                                                        peer, message_queue)
 | |
| 110 | +            operation_name = instance.register_job_peer(job_name, peer,
 | |
| 111 | +                                                        message_queue)
 | |
| 107 | 112 |  | 
| 108 | 113 |              context.add_callback(partial(self._rpc_termination_callback,
 | 
| 109 | 114 |                                           peer, instance_name, operation_name))
 | 
| 110 | 115 |  | 
| 111 | 116 |              if self._is_instrumented:
 | 
| 112 | -                if peer not in self.__peers:
 | |
| 113 | -                    self.__peers_by_instance[instance_name].add(peer)
 | |
| 114 | -                    self.__peers[peer] = 1
 | |
| 117 | +                if peer_id not in self.__peers:
 | |
| 118 | +                    self.__peers_by_instance[instance_name].add(peer_id)
 | |
| 119 | +                    self.__peers[peer_id] = 1
 | |
| 115 | 120 |                  else:
 | 
| 116 | -                    self.__peers[peer] += 1
 | |
| 121 | +                    self.__peers[peer_id] += 1
 | |
| 117 | 122 |  | 
| 118 | 123 |              operation_full_name = "{}/{}".format(instance_name, operation_name)
 | 
| 119 | 124 |  | 
| ... | ... | @@ -161,8 +166,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 161 | 166 |          try:
 | 
| 162 | 167 |              instance = self._get_instance(instance_name)
 | 
| 163 | 168 |  | 
| 164 | -            instance.register_operation_peer(operation_name,
 | |
| 165 | -                                             peer, message_queue)
 | |
| 169 | +            instance.register_operation_peer(operation_name, peer, message_queue)
 | |
| 166 | 170 |  | 
| 167 | 171 |              context.add_callback(partial(self._rpc_termination_callback,
 | 
| 168 | 172 |                                           peer, instance_name, operation_name))
 | 
| ... | ... | @@ -219,11 +223,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 219 | 223 |          instance.unregister_operation_peer(operation_name, peer)
 | 
| 220 | 224 |  | 
| 221 | 225 |          if self._is_instrumented:
 | 
| 222 | -            if self.__peers[peer] > 1:
 | |
| 223 | -                self.__peers[peer] -= 1
 | |
| 226 | +            if self.__peers[peer.peer_id] > 1:
 | |
| 227 | +                self.__peers[peer.peer_id] -= 1
 | |
| 224 | 228 |              else:
 | 
| 225 | -                self.__peers_by_instance[instance_name].remove(peer)
 | |
| 226 | -                del self.__peers[peer]
 | |
| 229 | +                self.__peers_by_instance[instance_name].remove(peer.peer_id)
 | |
| 230 | +                del self.__peers[peer.peer_id]
 | |
| 227 | 231 |  | 
| 228 | 232 |      def _get_instance(self, name):
 | 
| 229 | 233 |          try:
 | 
| ... | ... | @@ -231,3 +235,21 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): | 
| 231 | 235 |  | 
| 232 | 236 |          except KeyError:
 | 
| 233 | 237 |              raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))
 | 
| 238 | + | |
| 239 | +    @classmethod
 | |
| 240 | +    def _context_extract_request_metadata(cls, context):
 | |
| 241 | +        """Given a context object, extract the RequestMetadata header
 | |
| 242 | +        values if they are present. If they were not provided,
 | |
| 243 | +        returns None.
 | |
| 244 | +        """
 | |
| 245 | +        invocation_metadata = context.invocation_metadata()
 | |
| 246 | +        request_metadata_entry = next((entry for entry in invocation_metadata
 | |
| 247 | +                                       if entry.key == REQUEST_METADATA_HEADER_NAME),
 | |
| 248 | +                                      None)
 | |
| 249 | +        if not request_metadata_entry:
 | |
| 250 | +            return None
 | |
| 251 | + | |
| 252 | +        request_metadata = remote_execution_pb2.RequestMetadata()
 | |
| 253 | +        request_metadata.ParseFromString(request_metadata_entry.value)
 | |
| 254 | + | |
| 255 | +        return request_metadata | 
| ... | ... | @@ -12,9 +12,9 @@ | 
| 12 | 12 |  # See the License for the specific language governing permissions and
 | 
| 13 | 13 |  # limitations under the License.
 | 
| 14 | 14 |  | 
| 15 | - | |
| 16 | 15 |  from datetime import datetime
 | 
| 17 | 16 |  import logging
 | 
| 17 | +from threading import Lock
 | |
| 18 | 18 |  import uuid
 | 
| 19 | 19 |  | 
| 20 | 20 |  from google.protobuf import duration_pb2, timestamp_pb2
 | 
| ... | ... | @@ -62,6 +62,9 @@ class Job: | 
| 62 | 62 |          self._platform_requirements = platform_requirements \
 | 
| 63 | 63 |              if platform_requirements else dict()
 | 
| 64 | 64 |  | 
| 65 | +        self.__peers_lock = Lock()
 | |
| 66 | +        self.__peers = set()
 | |
| 67 | + | |
| 65 | 68 |          self._done = False
 | 
| 66 | 69 |  | 
| 67 | 70 |      def __lt__(self, other):
 | 
| ... | ... | @@ -175,7 +178,7 @@ class Job: | 
| 175 | 178 |          """Subscribes to a new job's :class:`Operation` stage changes.
 | 
| 176 | 179 |  | 
| 177 | 180 |          Args:
 | 
| 178 | -            peer (str): a unique string identifying the client.
 | |
| 181 | +            peer (Peer): an object that represents the client.
 | |
| 179 | 182 |              message_queue (queue.Queue): the event queue to register.
 | 
| 180 | 183 |  | 
| 181 | 184 |          Returns:
 | 
| ... | ... | @@ -194,10 +197,13 @@ class Job: | 
| 194 | 197 |                              self._name, new_operation.name)
 | 
| 195 | 198 |  | 
| 196 | 199 |          self.__operations_by_name[new_operation.name] = new_operation
 | 
| 197 | -        self.__operations_by_peer[peer] = new_operation
 | |
| 198 | -        self.__operations_message_queues[peer] = message_queue
 | |
| 200 | +        self.__operations_by_peer[peer.peer_id] = new_operation
 | |
| 201 | +        self.__operations_message_queues[peer.peer_id] = message_queue
 | |
| 202 | + | |
| 203 | +        self._send_operations_updates(peers=[peer.peer_id])
 | |
| 199 | 204 |  | 
| 200 | -        self._send_operations_updates(peers=[peer])
 | |
| 205 | +        with self.__peers_lock:
 | |
| 206 | +            self.__peers.add(peer)
 | |
| 201 | 207 |  | 
| 202 | 208 |          return new_operation.name
 | 
| 203 | 209 |  | 
| ... | ... | @@ -206,7 +212,7 @@ class Job: | 
| 206 | 212 |  | 
| 207 | 213 |          Args:
 | 
| 208 | 214 |              operation_name (str): an existing operation's name to subscribe to.
 | 
| 209 | -            peer (str): a unique string identifying the client.
 | |
| 215 | +            peer (Peer): an object that represents the client.
 | |
| 210 | 216 |              message_queue (queue.Queue): the event queue to register.
 | 
| 211 | 217 |  | 
| 212 | 218 |          Returns:
 | 
| ... | ... | @@ -222,18 +228,20 @@ class Job: | 
| 222 | 228 |              raise NotFoundError("Operation name does not exist: [{}]"
 | 
| 223 | 229 |                                  .format(operation_name))
 | 
| 224 | 230 |  | 
| 225 | -        self.__operations_by_peer[peer] = operation
 | |
| 226 | -        self.__operations_message_queues[peer] = message_queue
 | |
| 231 | +        self.__operations_by_peer[peer.peer_id] = operation
 | |
| 232 | +        self.__operations_message_queues[peer.peer_id] = message_queue
 | |
| 227 | 233 |  | 
| 228 | -        self._send_operations_updates(peers=[peer])
 | |
| 234 | +        self._send_operations_updates(peers=[peer.peer_id])
 | |
| 235 | + | |
| 236 | +        with self.__peers_lock:
 | |
| 237 | +            self.__peers.add(peer)
 | |
| 229 | 238 |  | 
| 230 | 239 |      def unregister_operation_peer(self, operation_name, peer):
 | 
| 231 | 240 |          """Unsubscribes to the job's :class:`Operation` stage change.
 | 
| 232 | 241 |  | 
| 233 | 242 |          Args:
 | 
| 234 | 243 |              operation_name (str): an existing operation's name to unsubscribe from.
 | 
| 235 | -            peer (str): a unique string identifying the client.
 | |
| 236 | - | |
| 244 | +            peer (Peer): an object that represents the client.
 | |
| 237 | 245 |          Raises:
 | 
| 238 | 246 |              NotFoundError: If no operation with `operation_name` exists.
 | 
| 239 | 247 |          """
 | 
| ... | ... | @@ -244,10 +252,10 @@ class Job: | 
| 244 | 252 |              raise NotFoundError("Operation name does not exist: [{}]"
 | 
| 245 | 253 |                                  .format(operation_name))
 | 
| 246 | 254 |  | 
| 247 | -        if peer in self.__operations_message_queues:
 | |
| 248 | -            del self.__operations_message_queues[peer]
 | |
| 255 | +        if peer.peer_id in self.__operations_message_queues:
 | |
| 256 | +            del self.__operations_message_queues[peer.peer_id]
 | |
| 249 | 257 |  | 
| 250 | -        del self.__operations_by_peer[peer]
 | |
| 258 | +        del self.__operations_by_peer[peer.peer_id]
 | |
| 251 | 259 |  | 
| 252 | 260 |          # Drop the operation if nobody is watching it anymore:
 | 
| 253 | 261 |          if operation not in self.__operations_by_peer.values():
 | 
| ... | ... | @@ -258,6 +266,9 @@ class Job: | 
| 258 | 266 |              self.__logger.debug("Operation deleted for job [%s]: [%s]",
 | 
| 259 | 267 |                                  self._name, operation.name)
 | 
| 260 | 268 |  | 
| 269 | +        with self.__peers_lock:
 | |
| 270 | +            self.__peers.remove(peer)
 | |
| 271 | + | |
| 261 | 272 |      def list_operations(self):
 | 
| 262 | 273 |          """Lists the :class:`Operation` related to a job.
 | 
| 263 | 274 |  | 
| 1 | +# Copyright (C) 2019 Bloomberg LP
 | |
| 2 | +#
 | |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License");
 | |
| 4 | +# you may not use this file except in compliance with the License.
 | |
| 5 | +# You may obtain a copy of the License at
 | |
| 6 | +#
 | |
| 7 | +#  <http://www.apache.org/licenses/LICENSE-2.0>
 | |
| 8 | +#
 | |
| 9 | +# Unless required by applicable law or agreed to in writing, software
 | |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS,
 | |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| 12 | +# See the License for the specific language governing permissions and
 | |
| 13 | +# limitations under the License.
 | |
| 14 | + | |
| 15 | + | |
| 16 | +class Peer:
 | |
| 17 | +    def __init__(self, peer_id, token=None, request_metadata=None):
 | |
| 18 | +        self._id = peer_id  # This uniquely identifies a client
 | |
| 19 | +        self._token = token
 | |
| 20 | + | |
| 21 | +        self.__request_metadata = request_metadata
 | |
| 22 | + | |
| 23 | +    def __eq__(self, other):
 | |
| 24 | +        if isinstance(other, Peer):
 | |
| 25 | +            return self.peer_id == other.peer_id
 | |
| 26 | +        return False
 | |
| 27 | + | |
| 28 | +    def __hash__(self):
 | |
| 29 | +        return hash(self.peer_id)
 | |
| 30 | + | |
| 31 | +    @property
 | |
| 32 | +    def peer_id(self):
 | |
| 33 | +        return self._id
 | |
| 34 | + | |
| 35 | +    @property
 | |
| 36 | +    def token(self):
 | |
| 37 | +        return self._token
 | |
| 38 | + | |
| 39 | +    # -- `RequestMetadata` optional values (attached to the Execute() call) --
 | |
| 40 | +    @property
 | |
| 41 | +    def request_metadata(self):
 | |
| 42 | +        return self.__request_metadata
 | |
| 43 | + | |
| 44 | +    @property
 | |
| 45 | +    def tool_name(self):
 | |
| 46 | +        if self._has_request_metadata():
 | |
| 47 | +            if self.__request_metadata.tool_details:
 | |
| 48 | +                return self.__request_metadata.tool_details.tool_name
 | |
| 49 | +        return None
 | |
| 50 | + | |
| 51 | +    def tool_version(self):
 | |
| 52 | +        if self._has_request_metadata():
 | |
| 53 | +            if self.__request_metadata.tool_details:
 | |
| 54 | +                return self.__request_metadata.tool_details.tool_version
 | |
| 55 | +        return None
 | |
| 56 | + | |
| 57 | +    @property
 | |
| 58 | +    def action_id(self):
 | |
| 59 | +        if self._has_request_metadata():
 | |
| 60 | +            return self.__request_metadata.action_id
 | |
| 61 | +        return None
 | |
| 62 | + | |
| 63 | +    @property
 | |
| 64 | +    def tool_invocation_id(self):
 | |
| 65 | +        if self._has_request_metadata():
 | |
| 66 | +            return self.__request_metadata.tool_invocation_id
 | |
| 67 | +        return None
 | |
| 68 | + | |
| 69 | +    @property
 | |
| 70 | +    def correlated_invocations_id(self):
 | |
| 71 | +        if self._has_request_metadata():
 | |
| 72 | +            return self.__request_metadata.correlated_invocations_id
 | |
| 73 | +        return None
 | |
| 74 | + | |
| 75 | +    def has_request_metadata(self):
 | |
| 76 | +        return self.__request_metadata is not None | 
| ... | ... | @@ -54,9 +54,8 @@ class Scheduler: | 
| 54 | 54 |  | 
| 55 | 55 |          self.__queue = []
 | 
| 56 | 56 |  | 
| 57 | -        self._is_instrumented = monitor
 | |
| 58 | - | |
| 59 | -        if self._is_instrumented:
 | |
| 57 | +        self._is_instrumented = False
 | |
| 58 | +        if monitor:
 | |
| 60 | 59 |              self.activate_monitoring()
 | 
| 61 | 60 |  | 
| 62 | 61 |      # --- Public API ---
 | 
| ... | ... | @@ -87,7 +86,7 @@ class Scheduler: | 
| 87 | 86 |  | 
| 88 | 87 |          Args:
 | 
| 89 | 88 |              job_name (str): name of the job to subscribe to.
 | 
| 90 | -            peer (str): a unique string identifying the client.
 | |
| 89 | +            peer (Peer): object that represents the client
 | |
| 91 | 90 |              message_queue (queue.Queue): the event queue to register.
 | 
| 92 | 91 |  | 
| 93 | 92 |          Returns:
 | 
| ... | ... | @@ -114,7 +113,7 @@ class Scheduler: | 
| 114 | 113 |  | 
| 115 | 114 |          Args:
 | 
| 116 | 115 |              operation_name (str): name of the operation to subscribe to.
 | 
| 117 | -            peer (str): a unique string identifying the client.
 | |
| 116 | +            peer (Peer): an object that represents the client.
 | |
| 118 | 117 |              message_queue (queue.Queue): the event queue to register.
 | 
| 119 | 118 |  | 
| 120 | 119 |          Returns:
 | 
| ... | ... | @@ -137,7 +136,7 @@ class Scheduler: | 
| 137 | 136 |  | 
| 138 | 137 |          Args:
 | 
| 139 | 138 |              operation_name (str): name of the operation to unsubscribe from.
 | 
| 140 | -            peer (str): a unique string identifying the client.
 | |
| 139 | +            peer (Peer): object that represents the client
 | |
| 141 | 140 |  | 
| 142 | 141 |          Raises:
 | 
| 143 | 142 |              NotFoundError: If no operation with `operation_name` exists.
 | 
| ... | ... | @@ -43,3 +43,13 @@ BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/' | 
| 43 | 43 |  #  type       - Type of CAS object, eg. 'action_result', 'command'...
 | 
| 44 | 44 |  #  hash       - Object's digest hash.
 | 
| 45 | 45 |  #  sizebytes  - Object's digest size in bytes.
 | 
| 46 | + | |
| 47 | + | |
| 48 | +# Name of the header key to attach optional `RequestMetadata`values.
 | |
| 49 | +# (This is defined in the REAPI specification.)
 | |
| 50 | +REQUEST_METADATA_HEADER_NAME = 'requestmetadata-bin'
 | |
| 51 | + | |
| 52 | +# 'RequestMetadata' header values. These values will be used when
 | |
| 53 | +# attaching optional metadata to a gRPC request's header:
 | |
| 54 | +REQUEST_METADATA_TOOL_NAME = 'BuildGrid'
 | |
| 55 | +REQUEST_METADATA_TOOL_VERSION = '0.1' | 
