Santiago Gil pushed to branch santigl/104-platform-matching at BuildGrid / buildgrid
Commits:
- 
b4f090d9
by Santiago Gil at 2019-02-11T14:59:04Z
- 
d87879ed
by Santiago Gil at 2019-02-11T14:59:04Z
- 
da192b12
by Santiago Gil at 2019-02-11T14:59:04Z
6 changed files:
- buildgrid/server/bots/instance.py
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
Changes:
| ... | ... | @@ -50,7 +50,6 @@ class BotsInterface: | 
| 50 | 50 |          register with the service, the old one should be closed along
 | 
| 51 | 51 |          with all its jobs.
 | 
| 52 | 52 |          """
 | 
| 53 | - | |
| 54 | 53 |          bot_id = bot_session.bot_id
 | 
| 55 | 54 |  | 
| 56 | 55 |          if bot_id == "":
 | 
| ... | ... | @@ -100,10 +99,25 @@ class BotsInterface: | 
| 100 | 99 |          return bot_session
 | 
| 101 | 100 |  | 
| 102 | 101 |      def _request_leases(self, bot_session):
 | 
| 103 | -        # TODO: Send worker capabilities to the scheduler!
 | |
| 104 | 102 |          # Only send one lease at a time currently.
 | 
| 105 | 103 |          if not bot_session.leases:
 | 
| 106 | -            leases = self._scheduler.request_job_leases({})
 | |
| 104 | +            worker_capabilities = dict()
 | |
| 105 | + | |
| 106 | +            # TODO? Fail if there are no devices in the worker?
 | |
| 107 | +            if bot_session.worker.devices:
 | |
| 108 | +                # According to the spec:
 | |
| 109 | +                #   "The first device in the worker is the "primary device" -
 | |
| 110 | +                #   that is, the device running a bot and which is
 | |
| 111 | +                #   responsible for actually executing commands."
 | |
| 112 | +                primary_device = bot_session.worker.devices[0]
 | |
| 113 | + | |
| 114 | +                for property in primary_device.properties:
 | |
| 115 | +                    if property.key not in worker_capabilities:
 | |
| 116 | +                        worker_capabilities[property.key] = set()
 | |
| 117 | +                    worker_capabilities[property.key].add(property.value)
 | |
| 118 | + | |
| 119 | +            leases = self._scheduler.request_job_leases(worker_capabilities)
 | |
| 120 | + | |
| 107 | 121 |              if leases:
 | 
| 108 | 122 |                  for lease in leases:
 | 
| 109 | 123 |                      self._assigned_leases[bot_session.name].add(lease.id)
 | 
| ... | ... | @@ -22,7 +22,7 @@ An instance of the Remote Execution Service. | 
| 22 | 22 |  import logging
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
 | 
| 25 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | |
| 25 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command
 | |
| 26 | 26 |  from buildgrid.utils import get_hash_type
 | 
| 27 | 27 |  | 
| 28 | 28 |  | 
| ... | ... | @@ -50,11 +50,26 @@ class ExecutionInstance: | 
| 50 | 50 |          this action.
 | 
| 51 | 51 |          """
 | 
| 52 | 52 |          action = self._storage.get_message(action_digest, Action)
 | 
| 53 | - | |
| 54 | 53 |          if not action:
 | 
| 55 | 54 |              raise FailedPreconditionError("Could not get action from storage.")
 | 
| 56 | 55 |  | 
| 56 | +        platform_requirements = None
 | |
| 57 | +        if action.command_digest.size_bytes:
 | |
| 58 | +            command = self._storage.get_message(action.command_digest, Command)
 | |
| 59 | + | |
| 60 | +            if not command:
 | |
| 61 | +                raise FailedPreconditionError("Could not get command from storage.")
 | |
| 62 | + | |
| 63 | +            if command:
 | |
| 64 | +                platform_requirements = dict()
 | |
| 65 | +                for property in command.platform.properties:
 | |
| 66 | +                    if property.name not in platform_requirements:
 | |
| 67 | +                        platform_requirements[property.name] = set()
 | |
| 68 | +                    platform_requirements[property.name].add(property.value)
 | |
| 69 | +                print(platform_requirements)
 | |
| 70 | + | |
| 57 | 71 |          return self._scheduler.queue_job_action(action, action_digest,
 | 
| 72 | +                                                platform_requirements,
 | |
| 58 | 73 |                                                  skip_cache_lookup=skip_cache_lookup)
 | 
| 59 | 74 |  | 
| 60 | 75 |      def register_job_peer(self, job_name, peer, message_queue):
 | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid._protos.google.rpc import code_pb2 | 
| 29 | 29 |  | 
| 30 | 30 |  class Job:
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, action, action_digest, priority=0):
 | |
| 32 | +    def __init__(self, action, action_digest, platform_requirements, priority=0):
 | |
| 33 | 33 |          self.__logger = logging.getLogger(__name__)
 | 
| 34 | 34 |  | 
| 35 | 35 |          self._name = str(uuid.uuid4())
 | 
| ... | ... | @@ -59,6 +59,8 @@ class Job: | 
| 59 | 59 |          self._do_not_cache = self._action.do_not_cache
 | 
| 60 | 60 |          self._n_tries = 0
 | 
| 61 | 61 |  | 
| 62 | +        self._platform_requirements = platform_requirements
 | |
| 63 | + | |
| 62 | 64 |          self._done = False
 | 
| 63 | 65 |  | 
| 64 | 66 |      def __lt__(self, other):
 | 
| ... | ... | @@ -111,6 +113,10 @@ class Job: | 
| 111 | 113 |      def done(self):
 | 
| 112 | 114 |          return self._done
 | 
| 113 | 115 |  | 
| 116 | +    @property
 | |
| 117 | +    def platform_requirements(self):
 | |
| 118 | +        return self._platform_requirements
 | |
| 119 | + | |
| 114 | 120 |      # --- Public API: REAPI ---
 | 
| 115 | 121 |  | 
| 116 | 122 |      @property
 | 
| ... | ... | @@ -145,7 +145,8 @@ class Scheduler: | 
| 145 | 145 |          if not job.n_peers and job.done and not job.lease:
 | 
| 146 | 146 |              self._delete_job(job.name)
 | 
| 147 | 147 |  | 
| 148 | -    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
 | |
| 148 | +    def queue_job_action(self, action, action_digest, platform_requirements,
 | |
| 149 | +                         priority=0, skip_cache_lookup=False):
 | |
| 149 | 150 |          """Inserts a newly created job into the execution queue.
 | 
| 150 | 151 |  | 
| 151 | 152 |          Warning:
 | 
| ... | ... | @@ -155,6 +156,9 @@ class Scheduler: | 
| 155 | 156 |          Args:
 | 
| 156 | 157 |              action (Action): the given action to queue for execution.
 | 
| 157 | 158 |              action_digest (Digest): the digest of the given action.
 | 
| 159 | +            platform_requirements (dict(set)): platform attributes that a worker
 | |
| 160 | +                must satisfy in order to be assigned the job. (Each key can
 | |
| 161 | +                have multiple values.)
 | |
| 158 | 162 |              priority (int): the execution job's priority.
 | 
| 159 | 163 |              skip_cache_lookup (bool): whether or not to look for pre-computed
 | 
| 160 | 164 |                  result for the given action.
 | 
| ... | ... | @@ -178,7 +182,8 @@ class Scheduler: | 
| 178 | 182 |  | 
| 179 | 183 |                  return job.name
 | 
| 180 | 184 |  | 
| 181 | -        job = Job(action, action_digest, priority=priority)
 | |
| 185 | +        job = Job(action, action_digest, platform_requirements,
 | |
| 186 | +                  priority=priority)
 | |
| 182 | 187 |  | 
| 183 | 188 |          self.__logger.debug("Job created for action [%s]: [%s]",
 | 
| 184 | 189 |                              action_digest.hash[:8], job.name)
 | 
| ... | ... | @@ -271,7 +276,7 @@ class Scheduler: | 
| 271 | 276 |          """Generates a list of the highest priority leases to be run.
 | 
| 272 | 277 |  | 
| 273 | 278 |          Args:
 | 
| 274 | -            worker_capabilities (dict): a set of key-value pairs decribing the
 | |
| 279 | +            worker_capabilities (dict): a set of key-value pairs describing the
 | |
| 275 | 280 |                  worker properties, configuration and state at the time of the
 | 
| 276 | 281 |                  request.
 | 
| 277 | 282 |  | 
| ... | ... | @@ -280,19 +285,21 @@ class Scheduler: | 
| 280 | 285 |          if not self.__queue:
 | 
| 281 | 286 |              return []
 | 
| 282 | 287 |  | 
| 283 | -        # TODO: Try to match worker_capabilities with jobs properties.
 | |
| 284 | -        job = self.__queue.pop()
 | |
| 288 | +        # For now we only look at the first job in the queue.
 | |
| 289 | +        # TODO: Try finding another job that is suitable for the worker.
 | |
| 290 | +        if self._worker_is_capable(worker_capabilities, self.__queue[0]):
 | |
| 291 | +            job = self.__queue.pop()
 | |
| 285 | 292 |  | 
| 286 | -        self.__logger.info("Job scheduled to run: [%s]", job.name)
 | |
| 293 | +            self.__logger.info("Job scheduled to run: [%s]", job.name)
 | |
| 287 | 294 |  | 
| 288 | -        lease = job.lease
 | |
| 295 | +            lease = job.lease
 | |
| 289 | 296 |  | 
| 290 | -        if not lease:
 | |
| 291 | -            # For now, one lease at a time:
 | |
| 292 | -            lease = job.create_lease()
 | |
| 297 | +            if not lease:
 | |
| 298 | +                # For now, one lease at a time:
 | |
| 299 | +                lease = job.create_lease()
 | |
| 293 | 300 |  | 
| 294 | -        if lease:
 | |
| 295 | -            return [lease]
 | |
| 301 | +            if lease:
 | |
| 302 | +                return [lease]
 | |
| 296 | 303 |  | 
| 297 | 304 |          return None
 | 
| 298 | 305 |  | 
| ... | ... | @@ -622,3 +629,28 @@ class Scheduler: | 
| 622 | 629 |  | 
| 623 | 630 |                      for message_queue in self.__build_metadata_queues:
 | 
| 624 | 631 |                          message_queue.put(message)
 | 
| 632 | + | |
| 633 | +    def _worker_is_capable(self, worker_capabilities, job):
 | |
| 634 | +        """Returns whether the worker is suitable to run the job."""
 | |
| 635 | +        # TODO: Replace this with the logic defined in the Platform msg. standard.
 | |
| 636 | + | |
| 637 | +        job_requirements = job.platform_requirements
 | |
| 638 | +        # For now we'll only check OS and ISA properties.
 | |
| 639 | + | |
| 640 | +        if not job_requirements:
 | |
| 641 | +            return True
 | |
| 642 | + | |
| 643 | +        # OS:
 | |
| 644 | +        worker_oses = worker_capabilities.get('os', set())
 | |
| 645 | +        job_oses = job_requirements.get('os', set())
 | |
| 646 | +        if job_oses and not (job_oses & worker_oses):
 | |
| 647 | +            return False
 | |
| 648 | + | |
| 649 | +        # ISAs:
 | |
| 650 | +        worker_isas = worker_capabilities.get('isa', [])
 | |
| 651 | +        job_isas = job_requirements.get('isa', None)
 | |
| 652 | + | |
| 653 | +        if job_isas and not (job_isas & worker_isas):
 | |
| 654 | +            return False
 | |
| 655 | + | |
| 656 | +        return True | 
| ... | ... | @@ -153,11 +153,27 @@ def test_post_bot_event_temp(context, instance): | 
| 153 | 153 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 154 | 154 |  | 
| 155 | 155 |  | 
| 156 | -def _inject_work(scheduler, action=None, action_digest=None):
 | |
| 156 | +def test_unmet_platform_requirements(bot_session, context, instance):
 | |
| 157 | +    request = bots_pb2.CreateBotSessionRequest(parent='',
 | |
| 158 | +                                               bot_session=bot_session)
 | |
| 159 | + | |
| 160 | +    action_digest = remote_execution_pb2.Digest(hash='gaff')
 | |
| 161 | +    _inject_work(instance._instances[""]._scheduler,
 | |
| 162 | +                 action_digest=action_digest,
 | |
| 163 | +                 platform_requirements={'os': set('wonderful-os')})
 | |
| 164 | + | |
| 165 | +    response = instance.CreateBotSession(request, context)
 | |
| 166 | + | |
| 167 | +    assert len(response.leases) == 0
 | |
| 168 | + | |
| 169 | + | |
| 170 | +def _inject_work(scheduler, action=None, action_digest=None,
 | |
| 171 | +                 platform_requirements=None):
 | |
| 157 | 172 |      if not action:
 | 
| 158 | 173 |          action = remote_execution_pb2.Action()
 | 
| 159 | 174 |  | 
| 160 | 175 |      if not action_digest:
 | 
| 161 | 176 |          action_digest = remote_execution_pb2.Digest()
 | 
| 162 | 177 |  | 
| 163 | -    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True) | |
| 178 | +    scheduler.queue_job_action(action, action_digest, platform_requirements,
 | |
| 179 | +                               skip_cache_lookup=True) | 
| ... | ... | @@ -107,6 +107,7 @@ def test_no_action_digest_in_storage(instance, context): | 
| 107 | 107 |  def test_wait_execution(instance, controller, context):
 | 
| 108 | 108 |      job_name = controller.execution_instance._scheduler.queue_job_action(action,
 | 
| 109 | 109 |                                                                           action_digest,
 | 
| 110 | +                                                                         platform_requirements={},
 | |
| 110 | 111 |                                                                           skip_cache_lookup=True)
 | 
| 111 | 112 |  | 
| 112 | 113 |      message_queue = queue.Queue()
 | 
