Raoul Hidalgo Charman pushed to branch 725-job-cancellation-on-remote-builds at BuildGrid / buildgrid
Commits:
- 
589ffa40
by Martin Blanchard at 2018-11-06T15:12:47Z
- 
e4fb8842
by Raoul Hidalgo Charman at 2018-11-07T17:25:15Z
- 
d2e70ad5
by Raoul Hidalgo Charman at 2018-11-07T17:25:15Z
10 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/bot/bot_session.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- setup.py
- tests/integration/bots_service.py
Changes:
| ... | ... | @@ -52,7 +52,8 @@ from ..cli import pass_context | 
| 52 | 52 |                help="Public CAS client certificate for TLS (PEM-encoded)")
 | 
| 53 | 53 |  @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | 
| 54 | 54 |                help="Public CAS server certificate for TLS (PEM-encoded)")
 | 
| 55 | -@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
 | |
| 55 | +# TODO change default to 30
 | |
| 56 | +@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
 | |
| 56 | 57 |                help="Time period for bot updates to the server in seconds.")
 | 
| 57 | 58 |  @click.option('--parent', type=click.STRING, default='main', show_default=True,
 | 
| 58 | 59 |                help="Targeted farm resource.")
 | 
| ... | ... | @@ -64,7 +65,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ | 
| 64 | 65 |  | 
| 65 | 66 |      context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | 
| 66 | 67 |      context.remote_url = remote
 | 
| 67 | -    context.update_period = update_period
 | |
| 68 | 68 |      context.parent = parent
 | 
| 69 | 69 |  | 
| 70 | 70 |      if url.scheme == 'http':
 | 
| ... | ... | @@ -123,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ | 
| 123 | 123 |      context.logger = logging.getLogger(__name__)
 | 
| 124 | 124 |      context.logger.debug("Starting for remote {}".format(context.remote))
 | 
| 125 | 125 |  | 
| 126 | -    interface = bot_interface.BotInterface(context.channel)
 | |
| 126 | +    interface = bot_interface.BotInterface(context.channel, update_period)
 | |
| 127 | 127 |  | 
| 128 | 128 |      worker = Worker()
 | 
| 129 | 129 |      worker.add_device(Device())
 | 
| ... | ... | @@ -141,7 +141,7 @@ def run_dummy(context): | 
| 141 | 141 |      Creates a session, accepts leases, does fake work and updates the server.
 | 
| 142 | 142 |      """
 | 
| 143 | 143 |      try:
 | 
| 144 | -        b = bot.Bot(context.bot_session, context.update_period)
 | |
| 144 | +        b = bot.Bot(context.bot_session)
 | |
| 145 | 145 |          b.session(dummy.work_dummy,
 | 
| 146 | 146 |                    context)
 | 
| 147 | 147 |      except KeyboardInterrupt:
 | 
| ... | ... | @@ -156,7 +156,7 @@ def run_host_tools(context): | 
| 156 | 156 |      result back to CAS.
 | 
| 157 | 157 |      """
 | 
| 158 | 158 |      try:
 | 
| 159 | -        b = bot.Bot(context.bot_session, context.update_period)
 | |
| 159 | +        b = bot.Bot(context.bot_session)
 | |
| 160 | 160 |          b.session(host.work_host_tools,
 | 
| 161 | 161 |                    context)
 | 
| 162 | 162 |      except KeyboardInterrupt:
 | 
| ... | ... | @@ -177,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir): | 
| 177 | 177 |      context.fuse_dir = fuse_dir
 | 
| 178 | 178 |  | 
| 179 | 179 |      try:
 | 
| 180 | -        b = bot.Bot(context.bot_session, context.update_period)
 | |
| 180 | +        b = bot.Bot(context.bot_session)
 | |
| 181 | 181 |          b.session(buildbox.work_buildbox,
 | 
| 182 | 182 |                    context)
 | 
| 183 | 183 |      except KeyboardInterrupt:
 | 
| ... | ... | @@ -29,19 +29,16 @@ class Bot: | 
| 29 | 29 |      Creates a local BotSession.
 | 
| 30 | 30 |      """
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, bot_session, update_period=1):
 | |
| 32 | +    def __init__(self, bot_session):
 | |
| 33 | 33 |          self.logger = logging.getLogger(__name__)
 | 
| 34 | 34 |  | 
| 35 | 35 |          self._bot_session = bot_session
 | 
| 36 | -        self._update_period = update_period
 | |
| 37 | 36 |  | 
| 38 | 37 |      def session(self, work, context):
 | 
| 39 | 38 |          loop = asyncio.get_event_loop()
 | 
| 40 | 39 |  | 
| 41 | -        self._bot_session.create_bot_session(work, context)
 | |
| 42 | - | |
| 43 | 40 |          try:
 | 
| 44 | -            task = asyncio.ensure_future(self._update_bot_session())
 | |
| 41 | +            task = asyncio.ensure_future(self._bot_session.run(work, context))
 | |
| 45 | 42 |              loop.run_forever()
 | 
| 46 | 43 |          except KeyboardInterrupt:
 | 
| 47 | 44 |              pass
 | 
| ... | ... | @@ -49,10 +46,19 @@ class Bot: | 
| 49 | 46 |              task.cancel()
 | 
| 50 | 47 |              loop.close()
 | 
| 51 | 48 |  | 
| 52 | -    async def _update_bot_session(self):
 | |
| 49 | +    async def _run_bot_session(self, work, context):
 | |
| 53 | 50 |          """
 | 
| 54 | 51 |          Calls the server periodically to inform the server the client has not died.
 | 
| 55 | 52 |          """
 | 
| 56 | 53 |          while True:
 | 
| 57 | -            self._bot_session.update_bot_session()
 | |
| 58 | -            await asyncio.sleep(self._update_period) | |
| 54 | +            if self._bot_session.connected is False:
 | |
| 55 | +                self._bot_session.create_bot_session(work, context)
 | |
| 56 | +            else:
 | |
| 57 | +                self._bot_session.update_bot_session()
 | |
| 58 | + | |
| 59 | +            if self._bot_session._futures:
 | |
| 60 | +                await asyncio.wait(self._bot_session._futures.values(),
 | |
| 61 | +                                   timeout=30,
 | |
| 62 | +                                   return_when=asyncio.FIRST_COMPLETED)
 | |
| 63 | +            elif self._bot_session.connected is False:
 | |
| 64 | +                await asyncio.sleep(30) | 
| ... | ... | @@ -21,8 +21,10 @@ Interface to grpc | 
| 21 | 21 |  """
 | 
| 22 | 22 |  | 
| 23 | 23 |  import logging
 | 
| 24 | +import grpc
 | |
| 24 | 25 |  | 
| 25 | 26 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
 | 
| 27 | +from ..settings import INTERVAL_BUFFER
 | |
| 26 | 28 |  | 
| 27 | 29 |  | 
| 28 | 30 |  class BotInterface:
 | 
| ... | ... | @@ -30,18 +32,27 @@ class BotInterface: | 
| 30 | 32 |      Interface handles calls to the server.
 | 
| 31 | 33 |      """
 | 
| 32 | 34 |  | 
| 33 | -    def __init__(self, channel):
 | |
| 35 | +    def __init__(self, channel, interval):
 | |
| 34 | 36 |          self.logger = logging.getLogger(__name__)
 | 
| 35 | 37 |          self.logger.info(channel)
 | 
| 36 | 38 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 39 | +        self.interval = interval
 | |
| 37 | 40 |  | 
| 38 | 41 |      def create_bot_session(self, parent, bot_session):
 | 
| 39 | 42 |          request = bots_pb2.CreateBotSessionRequest(parent=parent,
 | 
| 40 | 43 |                                                     bot_session=bot_session)
 | 
| 41 | -        return self._stub.CreateBotSession(request)
 | |
| 44 | +        return self._bot_call(self._stub.CreateBotSession, request)
 | |
| 42 | 45 |  | 
| 43 | 46 |      def update_bot_session(self, bot_session, update_mask=None):
 | 
| 44 | 47 |          request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
 | 
| 45 | 48 |                                                     bot_session=bot_session,
 | 
| 46 | 49 |                                                     update_mask=update_mask)
 | 
| 47 | -        return self._stub.UpdateBotSession(request) | |
| 50 | +        return self._bot_call(self._stub.UpdateBotSession, request)
 | |
| 51 | + | |
| 52 | +    def _bot_call(self, call, request):
 | |
| 53 | +        try:
 | |
| 54 | +            return call(request, timeout=self.interval + INTERVAL_BUFFER)
 | |
| 55 | +        except grpc.RpcError as e:
 | |
| 56 | +            if e.code() in grpc.StatusCode:
 | |
| 57 | +                self.logger.warning("Server responded with error: {}".format(e.code()))
 | |
| 58 | +                return None | 
| ... | ... | @@ -49,7 +49,9 @@ class BotSession: | 
| 49 | 49 |          self._bot_id = '{}.{}'.format(parent, platform.node())
 | 
| 50 | 50 |          self._context = None
 | 
| 51 | 51 |          self._interface = interface
 | 
| 52 | +        self.connected = False
 | |
| 52 | 53 |          self._leases = {}
 | 
| 54 | +        self._futures = {}
 | |
| 53 | 55 |          self._name = None
 | 
| 54 | 56 |          self._parent = parent
 | 
| 55 | 57 |          self._status = BotStatus.OK.value
 | 
| ... | ... | @@ -63,12 +65,31 @@ class BotSession: | 
| 63 | 65 |      def add_worker(self, worker):
 | 
| 64 | 66 |          self._worker = worker
 | 
| 65 | 67 |  | 
| 68 | +    async def run(self, work, context=None):
 | |
| 69 | +        self.logger.info("Starting bot session runner")
 | |
| 70 | +        while True:
 | |
| 71 | +            if self.connected is False:
 | |
| 72 | +                self.create_bot_session(work, context)
 | |
| 73 | +            else:
 | |
| 74 | +                self.update_bot_session()
 | |
| 75 | + | |
| 76 | +            if self._futures:
 | |
| 77 | +                await asyncio.wait(self._futures.values(),
 | |
| 78 | +                                   timeout=self._interface.interval,
 | |
| 79 | +                                   return_when=asyncio.FIRST_COMPLETED)
 | |
| 80 | +            elif self.connected is False:
 | |
| 81 | +                await asyncio.sleep(self._interface.interval)
 | |
| 82 | + | |
| 66 | 83 |      def create_bot_session(self, work, context=None):
 | 
| 67 | 84 |          self.logger.debug("Creating bot session")
 | 
| 68 | 85 |          self._work = work
 | 
| 69 | 86 |          self._context = context
 | 
| 70 | 87 |  | 
| 71 | 88 |          session = self._interface.create_bot_session(self._parent, self.get_pb2())
 | 
| 89 | +        if session is None:
 | |
| 90 | +            self.connected = False
 | |
| 91 | +            return
 | |
| 92 | +        self.connected = True
 | |
| 72 | 93 |          self._name = session.name
 | 
| 73 | 94 |  | 
| 74 | 95 |          self.logger.info("Created bot session with name: [{}]".format(self._name))
 | 
| ... | ... | @@ -79,6 +100,10 @@ class BotSession: | 
| 79 | 100 |      def update_bot_session(self):
 | 
| 80 | 101 |          self.logger.debug("Updating bot session: [{}]".format(self._bot_id))
 | 
| 81 | 102 |          session = self._interface.update_bot_session(self.get_pb2())
 | 
| 103 | +        if session is None:
 | |
| 104 | +            self.connected = False
 | |
| 105 | +            return
 | |
| 106 | +        self.connected = True
 | |
| 82 | 107 |          for k, v in list(self._leases.items()):
 | 
| 83 | 108 |              if v.state == LeaseState.COMPLETED.value:
 | 
| 84 | 109 |                  del self._leases[k]
 | 
| ... | ... | @@ -110,7 +135,7 @@ class BotSession: | 
| 110 | 135 |              lease.state = LeaseState.ACTIVE.value
 | 
| 111 | 136 |              self._leases[lease.id] = lease
 | 
| 112 | 137 |              self.update_bot_session()
 | 
| 113 | -            asyncio.ensure_future(self.create_work(lease))
 | |
| 138 | +            self._futures[lease.id] = asyncio.ensure_future(self.create_work(lease))
 | |
| 114 | 139 |  | 
| 115 | 140 |      async def create_work(self, lease):
 | 
| 116 | 141 |          self.logger.debug("Work created: [{}]".format(lease.id))
 | 
| ... | ... | @@ -133,6 +158,7 @@ class BotSession: | 
| 133 | 158 |  | 
| 134 | 159 |          self.logger.debug("Work complete: [{}]".format(lease.id))
 | 
| 135 | 160 |          self.lease_completed(lease)
 | 
| 161 | +        del self._futures[lease.id]
 | |
| 136 | 162 |  | 
| 137 | 163 |  | 
| 138 | 164 |  class Worker:
 | 
| ... | ... | @@ -26,6 +26,7 @@ import uuid | 
| 26 | 26 |  from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
 | 
| 27 | 27 |  | 
| 28 | 28 |  from ..job import LeaseState
 | 
| 29 | +from ...settings import INTERVAL_BUFFER
 | |
| 29 | 30 |  | 
| 30 | 31 |  | 
| 31 | 32 |  class BotsInterface:
 | 
| ... | ... | @@ -73,7 +74,7 @@ class BotsInterface: | 
| 73 | 74 |  | 
| 74 | 75 |          return bot_session
 | 
| 75 | 76 |  | 
| 76 | -    def update_bot_session(self, name, bot_session):
 | |
| 77 | +    def update_bot_session(self, name, bot_session, deadline=None):
 | |
| 77 | 78 |          """ Client updates the server. Any changes in state to the Lease should be
 | 
| 78 | 79 |          registered server side. Assigns available leases with work.
 | 
| 79 | 80 |          """
 | 
| ... | ... | @@ -87,7 +88,9 @@ class BotsInterface: | 
| 87 | 88 |  | 
| 88 | 89 |          # TODO: Send worker capabilities to the scheduler!
 | 
| 89 | 90 |          if not bot_session.leases:
 | 
| 90 | -            leases = self._scheduler.request_job_leases({})
 | |
| 91 | +            leases = self._scheduler.request_job_leases(
 | |
| 92 | +                {}, block=True if deadline else None,
 | |
| 93 | +                timeout=deadline - INTERVAL_BUFFER if deadline else None)
 | |
| 91 | 94 |              if leases:
 | 
| 92 | 95 |                  bot_session.leases.extend(leases)
 | 
| 93 | 96 |  | 
| ... | ... | @@ -64,8 +64,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 64 | 64 |              instance_name = ''.join(names[0:-1])
 | 
| 65 | 65 |  | 
| 66 | 66 |              instance = self._get_instance(instance_name)
 | 
| 67 | +            # server side context time_remaining is maxint - unix time
 | |
| 67 | 68 |              return instance.update_bot_session(request.name,
 | 
| 68 | -                                               request.bot_session)
 | |
| 69 | +                                               request.bot_session,
 | |
| 70 | +                                               context.time_remaining())
 | |
| 69 | 71 |  | 
| 70 | 72 |          except InvalidArgumentError as e:
 | 
| 71 | 73 |              self.logger.error(e)
 | 
| ... | ... | @@ -19,7 +19,7 @@ Scheduler | 
| 19 | 19 |  Schedules jobs.
 | 
| 20 | 20 |  """
 | 
| 21 | 21 |  | 
| 22 | -from collections import deque
 | |
| 22 | +from queue import Queue, Empty
 | |
| 23 | 23 |  | 
| 24 | 24 |  from buildgrid._exceptions import NotFoundError
 | 
| 25 | 25 |  | 
| ... | ... | @@ -33,7 +33,7 @@ class Scheduler: | 
| 33 | 33 |      def __init__(self, action_cache=None):
 | 
| 34 | 34 |          self._action_cache = action_cache
 | 
| 35 | 35 |          self.jobs = {}
 | 
| 36 | -        self.queue = deque()
 | |
| 36 | +        self.queue = Queue()
 | |
| 37 | 37 |  | 
| 38 | 38 |      def register_client(self, job_name, queue):
 | 
| 39 | 39 |          self.jobs[job_name].register_client(queue)
 | 
| ... | ... | @@ -53,7 +53,7 @@ class Scheduler: | 
| 53 | 53 |                  action_result = self._action_cache.get_action_result(job.action_digest)
 | 
| 54 | 54 |              except NotFoundError:
 | 
| 55 | 55 |                  operation_stage = OperationStage.QUEUED
 | 
| 56 | -                self.queue.append(job)
 | |
| 56 | +                self.queue.put(job)
 | |
| 57 | 57 |  | 
| 58 | 58 |              else:
 | 
| 59 | 59 |                  job.set_cached_result(action_result)
 | 
| ... | ... | @@ -61,7 +61,7 @@ class Scheduler: | 
| 61 | 61 |  | 
| 62 | 62 |          else:
 | 
| 63 | 63 |              operation_stage = OperationStage.QUEUED
 | 
| 64 | -            self.queue.append(job)
 | |
| 64 | +            self.queue.put(job)
 | |
| 65 | 65 |  | 
| 66 | 66 |          job.update_operation_stage(operation_stage)
 | 
| 67 | 67 |  | 
| ... | ... | @@ -74,12 +74,12 @@ class Scheduler: | 
| 74 | 74 |                  # TODO: Mark these jobs as done
 | 
| 75 | 75 |              else:
 | 
| 76 | 76 |                  job.update_operation_stage(OperationStage.QUEUED)
 | 
| 77 | -                self.queue.appendleft(job)
 | |
| 77 | +                self.queue.queue.appendleft(job)
 | |
| 78 | 78 |  | 
| 79 | 79 |      def list_jobs(self):
 | 
| 80 | 80 |          return self.jobs.values()
 | 
| 81 | 81 |  | 
| 82 | -    def request_job_leases(self, worker_capabilities):
 | |
| 82 | +    def request_job_leases(self, worker_capabilities, block=False, timeout=None):
 | |
| 83 | 83 |          """Generates a list of the highest priority leases to be run.
 | 
| 84 | 84 |  | 
| 85 | 85 |          Args:
 | 
| ... | ... | @@ -87,10 +87,13 @@ class Scheduler: | 
| 87 | 87 |                  worker properties, configuration and state at the time of the
 | 
| 88 | 88 |                  request.
 | 
| 89 | 89 |          """
 | 
| 90 | -        if not self.queue:
 | |
| 90 | +        if not block and self.queue.empty():
 | |
| 91 | 91 |              return []
 | 
| 92 | 92 |  | 
| 93 | -        job = self.queue.popleft()
 | |
| 93 | +        try:
 | |
| 94 | +            job = self.queue.get(block, timeout)
 | |
| 95 | +        except Empty:
 | |
| 96 | +            return []
 | |
| 94 | 97 |          # For now, one lease at a time:
 | 
| 95 | 98 |          lease = job.create_lease()
 | 
| 96 | 99 |  | 
| ... | ... | @@ -4,3 +4,6 @@ import hashlib | 
| 4 | 4 |  # The hash function that CAS uses
 | 
| 5 | 5 |  HASH = hashlib.sha256
 | 
| 6 | 6 |  HASH_LENGTH = HASH().digest_size * 2
 | 
| 7 | + | |
| 8 | +# time in seconds to pad timeouts
 | |
| 9 | +INTERVAL_BUFFER = 5 | 
| ... | ... | @@ -87,7 +87,7 @@ def get_cmdclass(): | 
| 87 | 87 |  | 
| 88 | 88 |  tests_require = [
 | 
| 89 | 89 |      'coverage >= 4.5.0',
 | 
| 90 | -    'moto',
 | |
| 90 | +    'moto < 1.3.7',
 | |
| 91 | 91 |      'pep8',
 | 
| 92 | 92 |      'psutil',
 | 
| 93 | 93 |      'pytest >= 3.8.0',
 | 
| ... | ... | @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server) | 
| 39 | 39 |  # GRPC context
 | 
| 40 | 40 |  @pytest.fixture
 | 
| 41 | 41 |  def context():
 | 
| 42 | -    yield mock.MagicMock(spec=_Context)
 | |
| 42 | +    context_mock = mock.MagicMock(spec=_Context)
 | |
| 43 | +    context_mock.time_remaining.return_value = None
 | |
| 44 | +    yield context_mock
 | |
| 43 | 45 |  | 
| 44 | 46 |  | 
| 45 | 47 |  @pytest.fixture
 | 
| ... | ... | @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance): | 
| 91 | 93 |  | 
| 92 | 94 |      request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
 | 
| 93 | 95 |                                                 bot_session=bot)
 | 
| 94 | - | |
| 95 | 96 |      response = instance.UpdateBotSession(request, context)
 | 
| 96 | 97 |  | 
| 97 | 98 |      assert isinstance(response, bots_pb2.BotSession)
 | 
