Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
- 
d762e174
by Raoul Hidalgo Charman at 2018-11-30T16:46:44Z
- 
4379112b
by Raoul Hidalgo Charman at 2018-11-30T16:47:02Z
11 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot.py
- buildgrid/bot/interface.py
- buildgrid/bot/session.py
- buildgrid/bot/tenantmanager.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/settings.py
- tests/integration/bot_session.py
- tests/integration/bots_service.py
- tests/utils/bots_interface.py
Changes:
| ... | ... | @@ -31,7 +31,7 @@ from buildgrid.bot import bot, interface, session | 
| 31 | 31 |  from buildgrid.bot.hardware.interface import HardwareInterface
 | 
| 32 | 32 |  from buildgrid.bot.hardware.device import Device
 | 
| 33 | 33 |  from buildgrid.bot.hardware.worker import Worker
 | 
| 34 | - | |
| 34 | +from buildgrid.settings import INTERVAL_BUFFER
 | |
| 35 | 35 |  | 
| 36 | 36 |  from ..bots import buildbox, dummy, host
 | 
| 37 | 37 |  from ..cli import pass_context
 | 
| ... | ... | @@ -54,7 +54,7 @@ from ..cli import pass_context | 
| 54 | 54 |                help="Public CAS client certificate for TLS (PEM-encoded)")
 | 
| 55 | 55 |  @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | 
| 56 | 56 |                help="Public CAS server certificate for TLS (PEM-encoded)")
 | 
| 57 | -@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
 | |
| 57 | +@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
 | |
| 58 | 58 |                help="Time period for bot updates to the server in seconds.")
 | 
| 59 | 59 |  @click.option('--parent', type=click.STRING, default='main', show_default=True,
 | 
| 60 | 60 |                help="Targeted farm resource.")
 | 
| ... | ... | @@ -66,7 +66,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ | 
| 66 | 66 |  | 
| 67 | 67 |      context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | 
| 68 | 68 |      context.remote_url = remote
 | 
| 69 | -    context.update_period = update_period
 | |
| 70 | 69 |      context.parent = parent
 | 
| 71 | 70 |  | 
| 72 | 71 |      if url.scheme == 'http':
 | 
| ... | ... | @@ -124,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ | 
| 124 | 123 |  | 
| 125 | 124 |      click.echo("Starting for remote=[{}]".format(context.remote))
 | 
| 126 | 125 |  | 
| 127 | -    bot_interface = interface.BotInterface(context.channel)
 | |
| 126 | +    bot_interface = interface.BotInterface(context.channel, update_period + INTERVAL_BUFFER)
 | |
| 128 | 127 |      worker = Worker()
 | 
| 129 | 128 |      worker.add_device(Device())
 | 
| 130 | 129 |      hardware_interface = HardwareInterface(worker)
 | 
| ... | ... | @@ -142,7 +141,7 @@ def run_dummy(context): | 
| 142 | 141 |      try:
 | 
| 143 | 142 |          bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
 | 
| 144 | 143 |                                           dummy.work_dummy, context)
 | 
| 145 | -        b = bot.Bot(bot_session, context.update_period)
 | |
| 144 | +        b = bot.Bot(bot_session)
 | |
| 146 | 145 |          b.session()
 | 
| 147 | 146 |      except KeyboardInterrupt:
 | 
| 148 | 147 |          pass
 | 
| ... | ... | @@ -158,7 +157,7 @@ def run_host_tools(context): | 
| 158 | 157 |      try:
 | 
| 159 | 158 |          bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
 | 
| 160 | 159 |                                           host.work_host_tools, context)
 | 
| 161 | -        b = bot.Bot(bot_session, context.update_period)
 | |
| 160 | +        b = bot.Bot(bot_session)
 | |
| 162 | 161 |          b.session()
 | 
| 163 | 162 |      except KeyboardInterrupt:
 | 
| 164 | 163 |          pass
 | 
| ... | ... | @@ -180,7 +179,7 @@ def run_buildbox(context, local_cas, fuse_dir): | 
| 180 | 179 |      try:
 | 
| 181 | 180 |          bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
 | 
| 182 | 181 |                                           buildbox.work_buildbox, context)
 | 
| 183 | -        b = bot.Bot(bot_session, context.update_period)
 | |
| 182 | +        b = bot.Bot(bot_session)
 | |
| 184 | 183 |          b.session()
 | 
| 185 | 184 |      except KeyboardInterrupt:
 | 
| 186 | 185 |          pass | 
| ... | ... | @@ -20,14 +20,12 @@ import logging | 
| 20 | 20 |  class Bot:
 | 
| 21 | 21 |      """Creates a local BotSession."""
 | 
| 22 | 22 |  | 
| 23 | -    def __init__(self, bot_session, update_period=1):
 | |
| 23 | +    def __init__(self, bot_session):
 | |
| 24 | 24 |          """
 | 
| 25 | 25 |          """
 | 
| 26 | 26 |          self.__logger = logging.getLogger(__name__)
 | 
| 27 | 27 |  | 
| 28 | 28 |          self.__bot_session = bot_session
 | 
| 29 | -        self.__update_period = update_period
 | |
| 30 | - | |
| 31 | 29 |          self.__loop = None
 | 
| 32 | 30 |  | 
| 33 | 31 |      def session(self):
 | 
| ... | ... | @@ -37,7 +35,7 @@ class Bot: | 
| 37 | 35 |          self.__bot_session.create_bot_session()
 | 
| 38 | 36 |  | 
| 39 | 37 |          try:
 | 
| 40 | -            task = asyncio.ensure_future(self.__update_bot_session())
 | |
| 38 | +            task = asyncio.ensure_future(self.__bot_session.run())
 | |
| 41 | 39 |              self.__loop.run_until_complete(task)
 | 
| 42 | 40 |  | 
| 43 | 41 |          except KeyboardInterrupt:
 | 
| ... | ... | @@ -46,16 +44,6 @@ class Bot: | 
| 46 | 44 |          self.__kill_everyone()
 | 
| 47 | 45 |          self.__logger.info("Bot shutdown.")
 | 
| 48 | 46 |  | 
| 49 | -    async def __update_bot_session(self):
 | |
| 50 | -        """Calls the server periodically to inform the server the client has not died."""
 | |
| 51 | -        try:
 | |
| 52 | -            while True:
 | |
| 53 | -                self.__bot_session.update_bot_session()
 | |
| 54 | -                await asyncio.sleep(self.__update_period)
 | |
| 55 | - | |
| 56 | -        except asyncio.CancelledError:
 | |
| 57 | -            pass
 | |
| 58 | - | |
| 59 | 47 |      def __kill_everyone(self):
 | 
| 60 | 48 |          """Cancels and waits for them to stop."""
 | 
| 61 | 49 |          self.__logger.info("Cancelling remaining tasks...")
 | 
| ... | ... | @@ -31,28 +31,32 @@ class BotInterface: | 
| 31 | 31 |      Interface handles calls to the server.
 | 
| 32 | 32 |      """
 | 
| 33 | 33 |  | 
| 34 | -    def __init__(self, channel):
 | |
| 34 | +    def __init__(self, channel, interval):
 | |
| 35 | 35 |          self.__logger = logging.getLogger(__name__)
 | 
| 36 | 36 |  | 
| 37 | 37 |          self._stub = bots_pb2_grpc.BotsStub(channel)
 | 
| 38 | +        self.__interval = interval
 | |
| 39 | + | |
| 40 | +    @property
 | |
| 41 | +    def interval(self):
 | |
| 42 | +        return self.__interval
 | |
| 38 | 43 |  | 
| 39 | 44 |      def create_bot_session(self, parent, bot_session):
 | 
| 45 | +        """ Creates a bot session returning a grpc StatusCode if it failed """
 | |
| 40 | 46 |          request = bots_pb2.CreateBotSessionRequest(parent=parent,
 | 
| 41 | 47 |                                                     bot_session=bot_session)
 | 
| 42 | -        try:
 | |
| 43 | -            return self._stub.CreateBotSession(request)
 | |
| 44 | - | |
| 45 | -        except grpc.RpcError as e:
 | |
| 46 | -            self.__logger.error(e)
 | |
| 47 | -            raise
 | |
| 48 | +        return self._bot_call(self._stub.CreateBotSession, request)
 | |
| 48 | 49 |  | 
| 49 | 50 |      def update_bot_session(self, bot_session, update_mask=None):
 | 
| 51 | +        """ Updates a bot session returning a grpc StatusCode if it failed """
 | |
| 50 | 52 |          request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
 | 
| 51 | 53 |                                                     bot_session=bot_session,
 | 
| 52 | 54 |                                                     update_mask=update_mask)
 | 
| 53 | -        try:
 | |
| 54 | -            return self._stub.UpdateBotSession(request)
 | |
| 55 | +        return self._bot_call(self._stub.UpdateBotSession, request)
 | |
| 55 | 56 |  | 
| 57 | +    def _bot_call(self, call, request):
 | |
| 58 | +        try:
 | |
| 59 | +            return call(request, timeout=self.interval)
 | |
| 56 | 60 |          except grpc.RpcError as e:
 | 
| 57 | -            self.__logger.error(e)
 | |
| 58 | -            raise | |
| 61 | +            self.__logger.error(e.code())
 | |
| 62 | +            return e.code() | 
| ... | ... | @@ -19,9 +19,12 @@ Bot Session | 
| 19 | 19 |  | 
| 20 | 20 |  Allows connections
 | 
| 21 | 21 |  """
 | 
| 22 | +import asyncio
 | |
| 22 | 23 |  import logging
 | 
| 23 | 24 |  import platform
 | 
| 24 | 25 |  | 
| 26 | +from grpc import StatusCode
 | |
| 27 | + | |
| 25 | 28 |  from buildgrid._enums import BotStatus, LeaseState
 | 
| 26 | 29 |  from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
 | 
| 27 | 30 |  from buildgrid._protos.google.rpc import code_pb2
 | 
| ... | ... | @@ -47,6 +50,8 @@ class BotSession: | 
| 47 | 50 |          self._status = BotStatus.OK.value
 | 
| 48 | 51 |          self._tenant_manager = TenantManager()
 | 
| 49 | 52 |  | 
| 53 | +        self.connected = False
 | |
| 54 | + | |
| 50 | 55 |          self.__parent = parent
 | 
| 51 | 56 |          self.__bot_id = '{}.{}'.format(parent, platform.node())
 | 
| 52 | 57 |          self.__name = None
 | 
| ... | ... | @@ -58,10 +63,33 @@ class BotSession: | 
| 58 | 63 |      def bot_id(self):
 | 
| 59 | 64 |          return self.__bot_id
 | 
| 60 | 65 |  | 
| 66 | +    async def run(self):
 | |
| 67 | +        """ Run a bot session
 | |
| 68 | + | |
| 69 | +        This connects and reconnects via create bot session and waits on update
 | |
| 70 | +        bot session calls.
 | |
| 71 | +        """
 | |
| 72 | +        self.__logger.debug("Starting bot session")
 | |
| 73 | +        interval = self._bots_interface.interval
 | |
| 74 | +        while True:
 | |
| 75 | +            if not self.connected:
 | |
| 76 | +                self.create_bot_session()
 | |
| 77 | +            else:
 | |
| 78 | +                self.update_bot_session()
 | |
| 79 | + | |
| 80 | +            if not self.connected:
 | |
| 81 | +                await asyncio.sleep(interval)
 | |
| 82 | +            else:
 | |
| 83 | +                await self._tenant_manager.wait_on_tenants(interval)
 | |
| 84 | + | |
| 61 | 85 |      def create_bot_session(self):
 | 
| 62 | 86 |          self.__logger.debug("Creating bot session")
 | 
| 63 | 87 |  | 
| 64 | 88 |          session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
 | 
| 89 | +        if session in list(StatusCode):
 | |
| 90 | +            self.connected = False
 | |
| 91 | +            return
 | |
| 92 | +        self.connected = True
 | |
| 65 | 93 |          self.__name = session.name
 | 
| 66 | 94 |  | 
| 67 | 95 |          self.__logger.info("Created bot session with name: [%s]", self.__name)
 | 
| ... | ... | @@ -73,6 +101,13 @@ class BotSession: | 
| 73 | 101 |          self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
 | 
| 74 | 102 |  | 
| 75 | 103 |          session = self._bots_interface.update_bot_session(self.get_pb2())
 | 
| 104 | +        if session == StatusCode.DEADLINE_EXCEEDED:
 | |
| 105 | +            # try to continue to do update session if it passed the timeout
 | |
| 106 | +            return
 | |
| 107 | +        elif session in StatusCode:
 | |
| 108 | +            self.connected = False
 | |
| 109 | +            return
 | |
| 110 | +        self.connected = True
 | |
| 76 | 111 |          server_ids = []
 | 
| 77 | 112 |  | 
| 78 | 113 |          for lease in session.leases:
 | 
| ... | ... | @@ -150,6 +150,13 @@ class TenantManager: | 
| 150 | 150 |          """
 | 
| 151 | 151 |          return self._tenants[lease_id].tenant_completed
 | 
| 152 | 152 |  | 
| 153 | +    async def wait_on_tenants(self, timeout):
 | |
| 154 | +        if self._tasks:
 | |
| 155 | +            tasks = self._tasks.values()
 | |
| 156 | +            await asyncio.wait(tasks,
 | |
| 157 | +                               timeout=timeout,
 | |
| 158 | +                               return_when=asyncio.FIRST_COMPLETED)
 | |
| 159 | + | |
| 153 | 160 |      def _update_lease_result(self, lease_id, result):
 | 
| 154 | 161 |          """Updates the lease with the result."""
 | 
| 155 | 162 |          self._tenants[lease_id].update_lease_result(result)
 | 
| ... | ... | @@ -24,6 +24,7 @@ import logging | 
| 24 | 24 |  import uuid
 | 
| 25 | 25 |  | 
| 26 | 26 |  from buildgrid._exceptions import InvalidArgumentError
 | 
| 27 | +from buildgrid.settings import INTERVAL_BUFFER
 | |
| 27 | 28 |  | 
| 28 | 29 |  from ..job import LeaseState
 | 
| 29 | 30 |  | 
| ... | ... | @@ -75,7 +76,7 @@ class BotsInterface: | 
| 75 | 76 |          self._request_leases(bot_session)
 | 
| 76 | 77 |          return bot_session
 | 
| 77 | 78 |  | 
| 78 | -    def update_bot_session(self, name, bot_session):
 | |
| 79 | +    def update_bot_session(self, name, bot_session, deadline=None):
 | |
| 79 | 80 |          """ Client updates the server. Any changes in state to the Lease should be
 | 
| 80 | 81 |          registered server side. Assigns available leases with work.
 | 
| 81 | 82 |          """
 | 
| ... | ... | @@ -93,14 +94,15 @@ class BotsInterface: | 
| 93 | 94 |                      pass
 | 
| 94 | 95 |                  lease.Clear()
 | 
| 95 | 96 |  | 
| 96 | -        self._request_leases(bot_session)
 | |
| 97 | +        self._request_leases(bot_session, deadline)
 | |
| 97 | 98 |          return bot_session
 | 
| 98 | 99 |  | 
| 99 | -    def _request_leases(self, bot_session):
 | |
| 100 | +    def _request_leases(self, bot_session, deadline=None):
 | |
| 100 | 101 |          # TODO: Send worker capabilities to the scheduler!
 | 
| 101 | 102 |          # Only send one lease at a time currently.
 | 
| 102 | 103 |          if not bot_session.leases:
 | 
| 103 | -            leases = self._scheduler.request_job_leases({})
 | |
| 104 | +            leases = self._scheduler.request_job_leases(
 | |
| 105 | +                {}, timeout=deadline - INTERVAL_BUFFER if deadline else None)
 | |
| 104 | 106 |              if leases:
 | 
| 105 | 107 |                  for lease in leases:
 | 
| 106 | 108 |                      self._assigned_leases[bot_session.name].add(lease.id)
 | 
| ... | ... | @@ -138,8 +138,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): | 
| 138 | 138 |              instance_name = '/'.join(names[:-1])
 | 
| 139 | 139 |  | 
| 140 | 140 |              instance = self._get_instance(instance_name)
 | 
| 141 | -            bot_session = instance.update_bot_session(request.name,
 | |
| 142 | -                                                      request.bot_session)
 | |
| 141 | +            bot_session = instance.update_bot_session(
 | |
| 142 | +                request.name,
 | |
| 143 | +                request.bot_session,
 | |
| 144 | +                deadline=context.time_remaining())
 | |
| 143 | 145 |  | 
| 144 | 146 |              if self._is_instrumented:
 | 
| 145 | 147 |                  self.__bots[bot_id].GetCurrentTime()
 | 
| ... | ... | @@ -24,3 +24,6 @@ HASH_LENGTH = HASH().digest_size * 2 | 
| 24 | 24 |  | 
| 25 | 25 |  # Period, in seconds, for the monitoring cycle:
 | 
| 26 | 26 |  MONITORING_PERIOD = 5.0
 | 
| 27 | + | |
| 28 | +# time in seconds to pad timeouts
 | |
| 29 | +INTERVAL_BUFFER = 5 | 
| ... | ... | @@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess | 
| 30 | 30 |  from ..utils.bots_interface import serve_bots_interface
 | 
| 31 | 31 |  | 
| 32 | 32 |  | 
| 33 | +TIMEOUT = 5
 | |
| 33 | 34 |  INSTANCES = ['', 'instance']
 | 
| 34 | 35 |  | 
| 35 | 36 |  | 
| ... | ... | @@ -48,7 +49,7 @@ class ServerInterface: | 
| 48 | 49 |              bot_session = bots_pb2.BotSession()
 | 
| 49 | 50 |              bot_session.ParseFromString(string_bot_session)
 | 
| 50 | 51 |  | 
| 51 | -            interface = BotInterface(grpc.insecure_channel(remote))
 | |
| 52 | +            interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
 | |
| 52 | 53 |  | 
| 53 | 54 |              result = interface.create_bot_session(parent, bot_session)
 | 
| 54 | 55 |              queue.put(result.SerializeToString())
 | 
| ... | ... | @@ -67,7 +68,7 @@ class ServerInterface: | 
| 67 | 68 |              bot_session = bots_pb2.BotSession()
 | 
| 68 | 69 |              bot_session.ParseFromString(string_bot_session)
 | 
| 69 | 70 |  | 
| 70 | -            interface = BotInterface(grpc.insecure_channel(remote))
 | |
| 71 | +            interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
 | |
| 71 | 72 |  | 
| 72 | 73 |              result = interface.update_bot_session(bot_session, update_mask)
 | 
| 73 | 74 |              queue.put(result.SerializeToString())
 | 
| ... | ... | @@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server) | 
| 38 | 38 |  # GRPC context
 | 
| 39 | 39 |  @pytest.fixture
 | 
| 40 | 40 |  def context():
 | 
| 41 | -    yield mock.MagicMock(spec=_Context)
 | |
| 41 | +    context_mock = mock.MagicMock(spec=_Context)
 | |
| 42 | +    context_mock.time_remaining.return_value = None
 | |
| 43 | +    yield context_mock
 | |
| 42 | 44 |  | 
| 43 | 45 |  | 
| 44 | 46 |  @pytest.fixture
 | 
| ... | ... | @@ -123,7 +123,7 @@ class BotsInterface: | 
| 123 | 123 |          self.__bot_session_queue.put(bot_session.SerializeToString())
 | 
| 124 | 124 |          return bot_session
 | 
| 125 | 125 |  | 
| 126 | -    def update_bot_session(self, name, bot_session):
 | |
| 126 | +    def update_bot_session(self, name, bot_session, deadline=None):
 | |
| 127 | 127 |          for lease in bot_session.leases:
 | 
| 128 | 128 |              state = LeaseState(lease.state)
 | 
| 129 | 129 |              if state == LeaseState.COMPLETED:
 | 
