Santiago Gil pushed to branch santigl/104-platform-matching at BuildGrid / buildgrid
Commits:
- 
ce234854
by Santiago Gil at 2019-02-12T11:15:46Z
- 
3fb2bbea
by Santiago Gil at 2019-02-12T11:15:46Z
- 
370cac0a
by Santiago Gil at 2019-02-12T11:15:46Z
- 
6e91af6b
by Santiago Gil at 2019-02-12T12:01:39Z
8 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/commands/cmd_execute.py
- buildgrid/server/bots/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
Changes:
| ... | ... | @@ -59,11 +59,14 @@ from ..cli import pass_context, setup_logging | 
| 59 | 59 |                help="Time period for bot updates to the server in seconds.")
 | 
| 60 | 60 |  @click.option('--parent', type=click.STRING, default=None, show_default=True,
 | 
| 61 | 61 |                help="Targeted farm resource.")
 | 
| 62 | +@click.option('-w', '--worker-property', nargs=2, type=(click.STRING, click.STRING), multiple=True,
 | |
| 63 | +              help="List of key-value pairs of worker properties.")
 | |
| 62 | 64 |  @click.option('-v', '--verbose', count=True,
 | 
| 63 | 65 |                help='Increase log verbosity level.')
 | 
| 64 | 66 |  @pass_context
 | 
| 65 | -def cli(context, parent, update_period, remote, auth_token, client_key, client_cert, server_cert,
 | |
| 66 | -        remote_cas, cas_client_key, cas_client_cert, cas_server_cert, verbose):
 | |
| 67 | +def cli(context, parent, update_period, remote, auth_token, client_key,
 | |
| 68 | +        client_cert, server_cert, remote_cas, cas_client_key, cas_client_cert,
 | |
| 69 | +        cas_server_cert, worker_property, verbose):
 | |
| 67 | 70 |      setup_logging(verbosity=verbose)
 | 
| 68 | 71 |      # Setup the remote execution server channel:
 | 
| 69 | 72 |      try:
 | 
| ... | ... | @@ -90,8 +93,14 @@ def cli(context, parent, update_period, remote, auth_token, client_key, client_c | 
| 90 | 93 |  | 
| 91 | 94 |      bot_interface = interface.BotInterface(context.channel)
 | 
| 92 | 95 |  | 
| 96 | +    worker_properties_dict = {}
 | |
| 97 | +    for property_name, property_value in worker_property:
 | |
| 98 | +        if property_name not in worker_properties_dict:
 | |
| 99 | +            worker_properties_dict[property_name] = set()
 | |
| 100 | +        worker_properties_dict[property_name].add(property_value)
 | |
| 101 | + | |
| 93 | 102 |      worker = Worker()
 | 
| 94 | -    worker.add_device(Device())
 | |
| 103 | +    worker.add_device(Device(properties=worker_properties_dict))
 | |
| 95 | 104 |      hardware_interface = HardwareInterface(worker)
 | 
| 96 | 105 |  | 
| 97 | 106 |      context.bot_interface = bot_interface
 | 
| ... | ... | @@ -107,23 +107,31 @@ def request_dummy(context, number, wait_for_completion): | 
| 107 | 107 |                help="Tuple of expected output file and is-executeable flag.")
 | 
| 108 | 108 |  @click.option('--output-directory', default='testing', show_default=True,
 | 
| 109 | 109 |                help="Output directory for the output files.")
 | 
| 110 | +@click.option('-p', '--platform-property', nargs=2, type=(click.STRING, click.STRING), multiple=True,
 | |
| 111 | +              help="List of key-value pairs of required platform properties.")
 | |
| 110 | 112 |  @click.argument('input-root', nargs=1, type=click.Path(), required=True)
 | 
| 111 | 113 |  @click.argument('commands', nargs=-1, type=click.STRING, required=True)
 | 
| 112 | 114 |  @pass_context
 | 
| 113 | -def run_command(context, input_root, commands, output_file, output_directory):
 | |
| 115 | +def run_command(context, input_root, commands, output_file, output_directory,
 | |
| 116 | +                platform_property):
 | |
| 114 | 117 |      stub = remote_execution_pb2_grpc.ExecutionStub(context.channel)
 | 
| 115 | 118 |  | 
| 116 | -    output_executeables = []
 | |
| 119 | +    output_executables = []
 | |
| 117 | 120 |      with upload(context.channel, instance=context.instance_name) as uploader:
 | 
| 118 | 121 |          command = remote_execution_pb2.Command()
 | 
| 119 | 122 |  | 
| 120 | 123 |          for arg in commands:
 | 
| 121 | 124 |              command.arguments.extend([arg])
 | 
| 122 | 125 |  | 
| 123 | -        for file, is_executeable in output_file:
 | |
| 126 | +        for file, is_executable in output_file:
 | |
| 124 | 127 |              command.output_files.extend([file])
 | 
| 125 | -            if is_executeable:
 | |
| 126 | -                output_executeables.append(file)
 | |
| 128 | +            if is_executable:
 | |
| 129 | +                output_executables.append(file)
 | |
| 130 | + | |
| 131 | +        for attribute_name, attribute_value in platform_property:
 | |
| 132 | +            new_property = command.platform.properties.add()
 | |
| 133 | +            new_property.name = attribute_name
 | |
| 134 | +            new_property.value = attribute_value
 | |
| 127 | 135 |  | 
| 128 | 136 |          command_digest = uploader.put_message(command, queue=True)
 | 
| 129 | 137 |  | 
| ... | ... | @@ -165,6 +173,6 @@ def run_command(context, input_root, commands, output_file, output_directory): | 
| 165 | 173 |              downloader.download_file(output_file_response.digest, path)
 | 
| 166 | 174 |  | 
| 167 | 175 |      for output_file_response in execute_response.result.output_files:
 | 
| 168 | -        if output_file_response.path in output_executeables:
 | |
| 176 | +        if output_file_response.path in output_executables:
 | |
| 169 | 177 |              st = os.stat(path)
 | 
| 170 | 178 |              os.chmod(path, st.st_mode | stat.S_IXUSR) | 
| ... | ... | @@ -50,7 +50,6 @@ class BotsInterface: | 
| 50 | 50 |          register with the service, the old one should be closed along
 | 
| 51 | 51 |          with all its jobs.
 | 
| 52 | 52 |          """
 | 
| 53 | - | |
| 54 | 53 |          bot_id = bot_session.bot_id
 | 
| 55 | 54 |  | 
| 56 | 55 |          if bot_id == "":
 | 
| ... | ... | @@ -100,10 +99,25 @@ class BotsInterface: | 
| 100 | 99 |          return bot_session
 | 
| 101 | 100 |  | 
| 102 | 101 |      def _request_leases(self, bot_session):
 | 
| 103 | -        # TODO: Send worker capabilities to the scheduler!
 | |
| 104 | 102 |          # Only send one lease at a time currently.
 | 
| 105 | 103 |          if not bot_session.leases:
 | 
| 106 | -            leases = self._scheduler.request_job_leases({})
 | |
| 104 | +            worker_capabilities = {}
 | |
| 105 | + | |
| 106 | +            # TODO? Fail if there are no devices in the worker?
 | |
| 107 | +            if bot_session.worker.devices:
 | |
| 108 | +                # According to the spec:
 | |
| 109 | +                #   "The first device in the worker is the "primary device" -
 | |
| 110 | +                #   that is, the device running a bot and which is
 | |
| 111 | +                #   responsible for actually executing commands."
 | |
| 112 | +                primary_device = bot_session.worker.devices[0]
 | |
| 113 | + | |
| 114 | +                for device_property in primary_device.properties:
 | |
| 115 | +                    if device_property.key not in worker_capabilities:
 | |
| 116 | +                        worker_capabilities[device_property.key] = set()
 | |
| 117 | +                    worker_capabilities[device_property.key].add(device_property.value)
 | |
| 118 | + | |
| 119 | +            leases = self._scheduler.request_job_leases(worker_capabilities)
 | |
| 120 | + | |
| 107 | 121 |              if leases:
 | 
| 108 | 122 |                  for lease in leases:
 | 
| 109 | 123 |                      self._assigned_leases[bot_session.name].add(lease.id)
 | 
| ... | ... | @@ -22,7 +22,7 @@ An instance of the Remote Execution Service. | 
| 22 | 22 |  import logging
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
 | 
| 25 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | |
| 25 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command
 | |
| 26 | 26 |  from buildgrid.utils import get_hash_type
 | 
| 27 | 27 |  | 
| 28 | 28 |  | 
| ... | ... | @@ -50,11 +50,25 @@ class ExecutionInstance: | 
| 50 | 50 |          this action.
 | 
| 51 | 51 |          """
 | 
| 52 | 52 |          action = self._storage.get_message(action_digest, Action)
 | 
| 53 | - | |
| 54 | 53 |          if not action:
 | 
| 55 | 54 |              raise FailedPreconditionError("Could not get action from storage.")
 | 
| 56 | 55 |  | 
| 56 | +        platform_requirements = None
 | |
| 57 | +        if action.command_digest.size_bytes:
 | |
| 58 | +            command = self._storage.get_message(action.command_digest, Command)
 | |
| 59 | + | |
| 60 | +            if not command:
 | |
| 61 | +                raise FailedPreconditionError("Could not get command from storage.")
 | |
| 62 | + | |
| 63 | +            if command:
 | |
| 64 | +                platform_requirements = {}
 | |
| 65 | +                for platform_property in command.platform.properties:
 | |
| 66 | +                    if platform_property.name not in platform_requirements:
 | |
| 67 | +                        platform_requirements[platform_property.name] = set()
 | |
| 68 | +                    platform_requirements[platform_property.name].add(platform_property.value)
 | |
| 69 | + | |
| 57 | 70 |          return self._scheduler.queue_job_action(action, action_digest,
 | 
| 71 | +                                                platform_requirements,
 | |
| 58 | 72 |                                                  skip_cache_lookup=skip_cache_lookup)
 | 
| 59 | 73 |  | 
| 60 | 74 |      def register_job_peer(self, job_name, peer, message_queue):
 | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid._protos.google.rpc import code_pb2 | 
| 29 | 29 |  | 
| 30 | 30 |  class Job:
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, action, action_digest, priority=0):
 | |
| 32 | +    def __init__(self, action, action_digest, platform_requirements, priority=0):
 | |
| 33 | 33 |          self.__logger = logging.getLogger(__name__)
 | 
| 34 | 34 |  | 
| 35 | 35 |          self._name = str(uuid.uuid4())
 | 
| ... | ... | @@ -59,6 +59,8 @@ class Job: | 
| 59 | 59 |          self._do_not_cache = self._action.do_not_cache
 | 
| 60 | 60 |          self._n_tries = 0
 | 
| 61 | 61 |  | 
| 62 | +        self._platform_requirements = platform_requirements
 | |
| 63 | + | |
| 62 | 64 |          self._done = False
 | 
| 63 | 65 |  | 
| 64 | 66 |      def __lt__(self, other):
 | 
| ... | ... | @@ -113,6 +115,10 @@ class Job: | 
| 113 | 115 |  | 
| 114 | 116 |      # --- Public API: REAPI ---
 | 
| 115 | 117 |  | 
| 118 | +    @property
 | |
| 119 | +    def platform_requirements(self):
 | |
| 120 | +        return self._platform_requirements
 | |
| 121 | + | |
| 116 | 122 |      @property
 | 
| 117 | 123 |      def do_not_cache(self):
 | 
| 118 | 124 |          return self._do_not_cache
 | 
| ... | ... | @@ -145,7 +145,8 @@ class Scheduler: | 
| 145 | 145 |          if not job.n_peers and job.done and not job.lease:
 | 
| 146 | 146 |              self._delete_job(job.name)
 | 
| 147 | 147 |  | 
| 148 | -    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
 | |
| 148 | +    def queue_job_action(self, action, action_digest, platform_requirements,
 | |
| 149 | +                         priority=0, skip_cache_lookup=False):
 | |
| 149 | 150 |          """Inserts a newly created job into the execution queue.
 | 
| 150 | 151 |  | 
| 151 | 152 |          Warning:
 | 
| ... | ... | @@ -155,6 +156,9 @@ class Scheduler: | 
| 155 | 156 |          Args:
 | 
| 156 | 157 |              action (Action): the given action to queue for execution.
 | 
| 157 | 158 |              action_digest (Digest): the digest of the given action.
 | 
| 159 | +            platform_requirements (dict(set)): platform attributes that a worker
 | |
| 160 | +                must satisfy in order to be assigned the job. (Each key can
 | |
| 161 | +                have multiple values.)
 | |
| 158 | 162 |              priority (int): the execution job's priority.
 | 
| 159 | 163 |              skip_cache_lookup (bool): whether or not to look for pre-computed
 | 
| 160 | 164 |                  result for the given action.
 | 
| ... | ... | @@ -178,7 +182,8 @@ class Scheduler: | 
| 178 | 182 |  | 
| 179 | 183 |                  return job.name
 | 
| 180 | 184 |  | 
| 181 | -        job = Job(action, action_digest, priority=priority)
 | |
| 185 | +        job = Job(action, action_digest, platform_requirements,
 | |
| 186 | +                  priority=priority)
 | |
| 182 | 187 |  | 
| 183 | 188 |          self.__logger.debug("Job created for action [%s]: [%s]",
 | 
| 184 | 189 |                              action_digest.hash[:8], job.name)
 | 
| ... | ... | @@ -271,28 +276,29 @@ class Scheduler: | 
| 271 | 276 |          """Generates a list of the highest priority leases to be run.
 | 
| 272 | 277 |  | 
| 273 | 278 |          Args:
 | 
| 274 | -            worker_capabilities (dict): a set of key-value pairs decribing the
 | |
| 279 | +            worker_capabilities (dict): a set of key-value pairs describing the
 | |
| 275 | 280 |                  worker properties, configuration and state at the time of the
 | 
| 276 | 281 |                  request.
 | 
| 277 | - | |
| 278 | -        Warning: Worker capabilities handling is not implemented at the moment!
 | |
| 279 | 282 |          """
 | 
| 280 | 283 |          if not self.__queue:
 | 
| 281 | 284 |              return []
 | 
| 282 | 285 |  | 
| 283 | -        # TODO: Try to match worker_capabilities with jobs properties.
 | |
| 284 | -        job = self.__queue.pop()
 | |
| 286 | +        # Looking for the first job that could be assigned to the worker...
 | |
| 287 | +        for job_index, job in enumerate(self.__queue):
 | |
| 288 | +            if self._worker_is_capable(worker_capabilities, job):
 | |
| 289 | +                self.__logger.info("Job scheduled to run: [%s]", job.name)
 | |
| 285 | 290 |  | 
| 286 | -        self.__logger.info("Job scheduled to run: [%s]", job.name)
 | |
| 291 | +                lease = job.lease
 | |
| 287 | 292 |  | 
| 288 | -        lease = job.lease
 | |
| 293 | +                if not lease:
 | |
| 294 | +                    # For now, one lease at a time:
 | |
| 295 | +                    lease = job.create_lease()
 | |
| 289 | 296 |  | 
| 290 | -        if not lease:
 | |
| 291 | -            # For now, one lease at a time:
 | |
| 292 | -            lease = job.create_lease()
 | |
| 297 | +                if lease:
 | |
| 298 | +                    del self.__queue[job_index]
 | |
| 299 | +                    return [lease]
 | |
| 293 | 300 |  | 
| 294 | -        if lease:
 | |
| 295 | -            return [lease]
 | |
| 301 | +                return None
 | |
| 296 | 302 |  | 
| 297 | 303 |          return None
 | 
| 298 | 304 |  | 
| ... | ... | @@ -622,3 +628,28 @@ class Scheduler: | 
| 622 | 628 |  | 
| 623 | 629 |                      for message_queue in self.__build_metadata_queues:
 | 
| 624 | 630 |                          message_queue.put(message)
 | 
| 631 | + | |
| 632 | +    def _worker_is_capable(self, worker_capabilities, job):
 | |
| 633 | +        """Returns whether the worker is suitable to run the job."""
 | |
| 634 | +        # TODO: Replace this with the logic defined in the Platform msg. standard.
 | |
| 635 | + | |
| 636 | +        job_requirements = job.platform_requirements
 | |
| 637 | +        # For now we'll only check OS and ISA properties.
 | |
| 638 | + | |
| 639 | +        if not job_requirements:
 | |
| 640 | +            return True
 | |
| 641 | + | |
| 642 | +        # OS:
 | |
| 643 | +        worker_oses = worker_capabilities.get('os', set())
 | |
| 644 | +        job_oses = job_requirements.get('os', set())
 | |
| 645 | +        if job_oses and not (job_oses & worker_oses):
 | |
| 646 | +            return False
 | |
| 647 | + | |
| 648 | +        # ISAs:
 | |
| 649 | +        worker_isas = worker_capabilities.get('isa', [])
 | |
| 650 | +        job_isas = job_requirements.get('isa', None)
 | |
| 651 | + | |
| 652 | +        if job_isas and not (job_isas & worker_isas):
 | |
| 653 | +            return False
 | |
| 654 | + | |
| 655 | +        return True | 
| ... | ... | @@ -153,11 +153,27 @@ def test_post_bot_event_temp(context, instance): | 
| 153 | 153 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 154 | 154 |  | 
| 155 | 155 |  | 
| 156 | -def _inject_work(scheduler, action=None, action_digest=None):
 | |
| 156 | +def test_unmet_platform_requirements(bot_session, context, instance):
 | |
| 157 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 158 | +                                               bot_session=bot_session)
 | |
| 159 | + | |
| 160 | +    action_digest = remote_execution_pb2.Digest(hash='gaff')
 | |
| 161 | +    _inject_work(instance._instances[""]._scheduler,
 | |
| 162 | +                 action_digest=action_digest,
 | |
| 163 | +                 platform_requirements={'os': set('wonderful-os')})
 | |
| 164 | + | |
| 165 | +    response = instance.CreateBotSession(request, context)
 | |
| 166 | + | |
| 167 | +    assert len(response.leases) == 0
 | |
| 168 | + | |
| 169 | + | |
| 170 | +def _inject_work(scheduler, action=None, action_digest=None,
 | |
| 171 | +                 platform_requirements=None):
 | |
| 157 | 172 |      if not action:
 | 
| 158 | 173 |          action = remote_execution_pb2.Action()
 | 
| 159 | 174 |  | 
| 160 | 175 |      if not action_digest:
 | 
| 161 | 176 |          action_digest = remote_execution_pb2.Digest()
 | 
| 162 | 177 |  | 
| 163 | -    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True) | |
| 178 | +    scheduler.queue_job_action(action, action_digest, platform_requirements,
 | |
| 179 | +                               skip_cache_lookup=True) | 
| ... | ... | @@ -107,6 +107,7 @@ def test_no_action_digest_in_storage(instance, context): | 
| 107 | 107 |  def test_wait_execution(instance, controller, context):
 | 
| 108 | 108 |      job_name = controller.execution_instance._scheduler.queue_job_action(action,
 | 
| 109 | 109 |                                                                           action_digest,
 | 
| 110 | +                                                                         platform_requirements={},
 | |
| 110 | 111 |                                                                           skip_cache_lookup=True)
 | 
| 111 | 112 |  | 
| 112 | 113 |      message_queue = queue.Queue()
 | 
