Santiago Gil pushed to branch santigl/104-platform-matching at BuildGrid / buildgrid
Commits:
- 
095164dd
by Santiago Gil at 2019-02-07T17:21:15Z
5 changed files:
- buildgrid/server/execution/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
- tests/integration/execution_service.py
Changes:
| ... | ... | @@ -22,7 +22,7 @@ An instance of the Remote Execution Service. | 
| 22 | 22 |  import logging
 | 
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
 | 
| 25 | -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | |
| 25 | +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Command
 | |
| 26 | 26 |  from buildgrid.utils import get_hash_type
 | 
| 27 | 27 |  | 
| 28 | 28 |  | 
| ... | ... | @@ -50,11 +50,18 @@ class ExecutionInstance: | 
| 50 | 50 |          this action.
 | 
| 51 | 51 |          """
 | 
| 52 | 52 |          action = self._storage.get_message(action_digest, Action)
 | 
| 53 | - | |
| 54 | 53 |          if not action:
 | 
| 55 | 54 |              raise FailedPreconditionError("Could not get action from storage.")
 | 
| 56 | 55 |  | 
| 56 | +        platform_requirements = None
 | |
| 57 | +        if action.command_digest.hash:
 | |
| 58 | +            command = self._storage.get_message(action.command_digest, Command)
 | |
| 59 | + | |
| 60 | +            if command:
 | |
| 61 | +                platform_requirements = command.platform
 | |
| 62 | + | |
| 57 | 63 |          return self._scheduler.queue_job_action(action, action_digest,
 | 
| 64 | +                                                platform_requirements,
 | |
| 58 | 65 |                                                  skip_cache_lookup=skip_cache_lookup)
 | 
| 59 | 66 |  | 
| 60 | 67 |      def register_job_peer(self, job_name, peer, message_queue):
 | 
| ... | ... | @@ -29,7 +29,7 @@ from buildgrid._protos.google.rpc import code_pb2 | 
| 29 | 29 |  | 
| 30 | 30 |  class Job:
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, action, action_digest, priority=0):
 | |
| 32 | +    def __init__(self, action, action_digest, platform_requirements, priority=0):
 | |
| 33 | 33 |          self.__logger = logging.getLogger(__name__)
 | 
| 34 | 34 |  | 
| 35 | 35 |          self._name = str(uuid.uuid4())
 | 
| ... | ... | @@ -59,6 +59,8 @@ class Job: | 
| 59 | 59 |          self._do_not_cache = self._action.do_not_cache
 | 
| 60 | 60 |          self._n_tries = 0
 | 
| 61 | 61 |  | 
| 62 | +        self._platform_requirements = platform_requirements
 | |
| 63 | + | |
| 62 | 64 |          self._done = False
 | 
| 63 | 65 |  | 
| 64 | 66 |      def __lt__(self, other):
 | 
| ... | ... | @@ -111,6 +113,10 @@ class Job: | 
| 111 | 113 |      def done(self):
 | 
| 112 | 114 |          return self._done
 | 
| 113 | 115 |  | 
| 116 | +    @property
 | |
| 117 | +    def platform_requirements(self):
 | |
| 118 | +        return self._platform_requirements
 | |
| 119 | + | |
| 114 | 120 |      # --- Public API: REAPI ---
 | 
| 115 | 121 |  | 
| 116 | 122 |      @property
 | 
| ... | ... | @@ -145,7 +145,8 @@ class Scheduler: | 
| 145 | 145 |          if not job.n_peers and job.done and not job.lease:
 | 
| 146 | 146 |              self._delete_job(job.name)
 | 
| 147 | 147 |  | 
| 148 | -    def queue_job_action(self, action, action_digest, priority=0, skip_cache_lookup=False):
 | |
| 148 | +    def queue_job_action(self, action, action_digest, platform_requirements,
 | |
| 149 | +                         priority=0, skip_cache_lookup=False):
 | |
| 149 | 150 |          """Inserts a newly created job into the execution queue.
 | 
| 150 | 151 |  | 
| 151 | 152 |          Warning:
 | 
| ... | ... | @@ -155,6 +156,9 @@ class Scheduler: | 
| 155 | 156 |          Args:
 | 
| 156 | 157 |              action (Action): the given action to queue for execution.
 | 
| 157 | 158 |              action_digest (Digest): the digest of the given action.
 | 
| 159 | +            platform_requirements (dict(list)): platform attributes that a worker
 | |
| 160 | +                must satisfy in order to be assigned the job. (Each key can
 | |
| 161 | +                have multiple values.)
 | |
| 158 | 162 |              priority (int): the execution job's priority.
 | 
| 159 | 163 |              skip_cache_lookup (bool): whether or not to look for pre-computed
 | 
| 160 | 164 |                  result for the given action.
 | 
| ... | ... | @@ -178,7 +182,8 @@ class Scheduler: | 
| 178 | 182 |  | 
| 179 | 183 |                  return job.name
 | 
| 180 | 184 |  | 
| 181 | -        job = Job(action, action_digest, priority=priority)
 | |
| 185 | +        job = Job(action, action_digest, platform_requirements,
 | |
| 186 | +                  priority=priority)
 | |
| 182 | 187 |  | 
| 183 | 188 |          self.__logger.debug("Job created for action [%s]: [%s]",
 | 
| 184 | 189 |                              action_digest.hash[:8], job.name)
 | 
| ... | ... | @@ -271,7 +276,7 @@ class Scheduler: | 
| 271 | 276 |          """Generates a list of the highest priority leases to be run.
 | 
| 272 | 277 |  | 
| 273 | 278 |          Args:
 | 
| 274 | -            worker_capabilities (dict): a set of key-value pairs decribing the
 | |
| 279 | +            worker_capabilities (dict): a set of key-value pairs describing the
 | |
| 275 | 280 |                  worker properties, configuration and state at the time of the
 | 
| 276 | 281 |                  request.
 | 
| 277 | 282 |  | 
| ... | ... | @@ -280,7 +285,7 @@ class Scheduler: | 
| 280 | 285 |          if not self.__queue:
 | 
| 281 | 286 |              return []
 | 
| 282 | 287 |  | 
| 283 | -        # TODO: Try to match worker_capabilities with jobs properties.
 | |
| 288 | +        # TODO: Try to match worker_capabilities with jobs properties (job.platform_requirements)
 | |
| 284 | 289 |          job = self.__queue.pop()
 | 
| 285 | 290 |  | 
| 286 | 291 |          self.__logger.info("Job scheduled to run: [%s]", job.name)
 | 
| ... | ... | @@ -153,11 +153,13 @@ def test_post_bot_event_temp(context, instance): | 
| 153 | 153 |      context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
 | 
| 154 | 154 |  | 
| 155 | 155 |  | 
| 156 | -def _inject_work(scheduler, action=None, action_digest=None):
 | |
| 156 | +def _inject_work(scheduler, action=None, action_digest=None,
 | |
| 157 | +                 platform_requirements=None):
 | |
| 157 | 158 |      if not action:
 | 
| 158 | 159 |          action = remote_execution_pb2.Action()
 | 
| 159 | 160 |  | 
| 160 | 161 |      if not action_digest:
 | 
| 161 | 162 |          action_digest = remote_execution_pb2.Digest()
 | 
| 162 | 163 |  | 
| 163 | -    scheduler.queue_job_action(action, action_digest, skip_cache_lookup=True) | |
| 164 | +    scheduler.queue_job_action(action, action_digest, platform_requirements,
 | |
| 165 | +                               skip_cache_lookup=True) | 
| ... | ... | @@ -107,6 +107,7 @@ def test_no_action_digest_in_storage(instance, context): | 
| 107 | 107 |  def test_wait_execution(instance, controller, context):
 | 
| 108 | 108 |      job_name = controller.execution_instance._scheduler.queue_job_action(action,
 | 
| 109 | 109 |                                                                           action_digest,
 | 
| 110 | +                                                                         platform_requirements={},
 | |
| 110 | 111 |                                                                           skip_cache_lookup=True)
 | 
| 111 | 112 |  | 
| 112 | 113 |      message_queue = queue.Queue()
 | 
