Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
8187d48b
by Raoul Hidalgo Charman at 2018-11-02T12:37:33Z
7 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- tests/integration/bots_service.py
Changes:
| ... | ... | @@ -33,9 +33,12 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker |
| 33 | 33 |
|
| 34 | 34 |
from ..bots import buildbox, dummy, host
|
| 35 | 35 |
from ..cli import pass_context
|
| 36 |
+from ...settings import INTERVAL_BUFFER
|
|
| 36 | 37 |
|
| 37 | 38 |
|
| 38 | 39 |
@click.group(name='bot', short_help="Create and register bot clients.")
|
| 40 |
+@click.option('--interval', type=click.INT, default=30,
|
|
| 41 |
+ help="Interval for calling central server")
|
|
| 39 | 42 |
@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
|
| 40 | 43 |
help="Remote execution server's URL (port defaults to 50051 if not specified).")
|
| 41 | 44 |
@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
|
| ... | ... | @@ -58,7 +61,7 @@ from ..cli import pass_context |
| 58 | 61 |
help="Targeted farm resource.")
|
| 59 | 62 |
@pass_context
|
| 60 | 63 |
def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
|
| 61 |
- remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
|
|
| 64 |
+ remote_cas, cas_client_key, cas_client_cert, cas_server_cert, interval):
|
|
| 62 | 65 |
# Setup the remote execution server channel:
|
| 63 | 66 |
url = urlparse(remote)
|
| 64 | 67 |
|
| ... | ... | @@ -123,7 +126,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
| 123 | 126 |
context.logger = logging.getLogger(__name__)
|
| 124 | 127 |
context.logger.debug("Starting for remote {}".format(context.remote))
|
| 125 | 128 |
|
| 126 |
- interface = bot_interface.BotInterface(context.channel)
|
|
| 129 |
+ interface = bot_interface.BotInterface(context.channel, interval + INTERVAL_BUFFER)
|
|
| 127 | 130 |
|
| 128 | 131 |
worker = Worker()
|
| 129 | 132 |
worker.add_device(Device())
|
| ... | ... | @@ -30,10 +30,11 @@ class BotInterface: |
| 30 | 30 |
Interface handles calls to the server.
|
| 31 | 31 |
"""
|
| 32 | 32 |
|
| 33 |
- def __init__(self, channel):
|
|
| 33 |
+ def __init__(self, channel, interval):
|
|
| 34 | 34 |
self.logger = logging.getLogger(__name__)
|
| 35 | 35 |
self.logger.info(channel)
|
| 36 | 36 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
| 37 |
+ self._interval = interval
|
|
| 37 | 38 |
|
| 38 | 39 |
def create_bot_session(self, parent, bot_session):
|
| 39 | 40 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
| ... | ... | @@ -44,4 +45,4 @@ class BotInterface: |
| 44 | 45 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
| 45 | 46 |
bot_session=bot_session,
|
| 46 | 47 |
update_mask=update_mask)
|
| 47 |
- return self._stub.UpdateBotSession(request)
|
|
| 48 |
+ return self._stub.UpdateBotSession(request, timeout=self._interval)
|
| ... | ... | @@ -26,6 +26,7 @@ import uuid |
| 26 | 26 |
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
|
| 27 | 27 |
|
| 28 | 28 |
from ..job import LeaseState
|
| 29 |
+from ...settings import INTERVAL_BUFFER
|
|
| 29 | 30 |
|
| 30 | 31 |
|
| 31 | 32 |
class BotsInterface:
|
| ... | ... | @@ -73,7 +74,7 @@ class BotsInterface: |
| 73 | 74 |
|
| 74 | 75 |
return bot_session
|
| 75 | 76 |
|
| 76 |
- def update_bot_session(self, name, bot_session):
|
|
| 77 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
| 77 | 78 |
""" Client updates the server. Any changes in state to the Lease should be
|
| 78 | 79 |
registered server side. Assigns available leases with work.
|
| 79 | 80 |
"""
|
| ... | ... | @@ -87,7 +88,9 @@ class BotsInterface: |
| 87 | 88 |
|
| 88 | 89 |
# TODO: Send worker capabilities to the scheduler!
|
| 89 | 90 |
if not bot_session.leases:
|
| 90 |
- leases = self._scheduler.request_job_leases({})
|
|
| 91 |
+ leases = self._scheduler.request_job_leases(
|
|
| 92 |
+ {}, block=True if deadline else None,
|
|
| 93 |
+ timeout=deadline - INTERVAL_BUFFER if deadline else None)
|
|
| 91 | 94 |
if leases:
|
| 92 | 95 |
bot_session.leases.extend(leases)
|
| 93 | 96 |
|
| ... | ... | @@ -64,8 +64,11 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
| 64 | 64 |
instance_name = ''.join(names[0:-1])
|
| 65 | 65 |
|
| 66 | 66 |
instance = self._get_instance(instance_name)
|
| 67 |
+ # server side context time_remaining is maxint - unix time
|
|
| 68 |
+ print(context.time_remaining())
|
|
| 67 | 69 |
return instance.update_bot_session(request.name,
|
| 68 |
- request.bot_session)
|
|
| 70 |
+ request.bot_session,
|
|
| 71 |
+ context.time_remaining())
|
|
| 69 | 72 |
|
| 70 | 73 |
except InvalidArgumentError as e:
|
| 71 | 74 |
self.logger.error(e)
|
| ... | ... | @@ -19,7 +19,7 @@ Scheduler |
| 19 | 19 |
Schedules jobs.
|
| 20 | 20 |
"""
|
| 21 | 21 |
|
| 22 |
-from collections import deque
|
|
| 22 |
+from queue import Queue, Empty
|
|
| 23 | 23 |
|
| 24 | 24 |
from buildgrid._exceptions import NotFoundError
|
| 25 | 25 |
|
| ... | ... | @@ -33,7 +33,7 @@ class Scheduler: |
| 33 | 33 |
def __init__(self, action_cache=None):
|
| 34 | 34 |
self._action_cache = action_cache
|
| 35 | 35 |
self.jobs = {}
|
| 36 |
- self.queue = deque()
|
|
| 36 |
+ self.queue = Queue()
|
|
| 37 | 37 |
|
| 38 | 38 |
def register_client(self, job_name, queue):
|
| 39 | 39 |
self.jobs[job_name].register_client(queue)
|
| ... | ... | @@ -53,7 +53,7 @@ class Scheduler: |
| 53 | 53 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
| 54 | 54 |
except NotFoundError:
|
| 55 | 55 |
operation_stage = OperationStage.QUEUED
|
| 56 |
- self.queue.append(job)
|
|
| 56 |
+ self.queue.put(job)
|
|
| 57 | 57 |
|
| 58 | 58 |
else:
|
| 59 | 59 |
job.set_cached_result(action_result)
|
| ... | ... | @@ -61,7 +61,7 @@ class Scheduler: |
| 61 | 61 |
|
| 62 | 62 |
else:
|
| 63 | 63 |
operation_stage = OperationStage.QUEUED
|
| 64 |
- self.queue.append(job)
|
|
| 64 |
+ self.queue.put(job)
|
|
| 65 | 65 |
|
| 66 | 66 |
job.update_operation_stage(operation_stage)
|
| 67 | 67 |
|
| ... | ... | @@ -74,12 +74,12 @@ class Scheduler: |
| 74 | 74 |
# TODO: Mark these jobs as done
|
| 75 | 75 |
else:
|
| 76 | 76 |
job.update_operation_stage(OperationStage.QUEUED)
|
| 77 |
- self.queue.appendleft(job)
|
|
| 77 |
+ self.queue.queue.appendleft(job)
|
|
| 78 | 78 |
|
| 79 | 79 |
def list_jobs(self):
|
| 80 | 80 |
return self.jobs.values()
|
| 81 | 81 |
|
| 82 |
- def request_job_leases(self, worker_capabilities):
|
|
| 82 |
+ def request_job_leases(self, worker_capabilities, block=False, timeout=None):
|
|
| 83 | 83 |
"""Generates a list of the highest priority leases to be run.
|
| 84 | 84 |
|
| 85 | 85 |
Args:
|
| ... | ... | @@ -87,10 +87,13 @@ class Scheduler: |
| 87 | 87 |
worker properties, configuration and state at the time of the
|
| 88 | 88 |
request.
|
| 89 | 89 |
"""
|
| 90 |
- if not self.queue:
|
|
| 90 |
+ if not block and self.queue.empty():
|
|
| 91 | 91 |
return []
|
| 92 | 92 |
|
| 93 |
- job = self.queue.popleft()
|
|
| 93 |
+ try:
|
|
| 94 |
+ job = self.queue.get(block, timeout)
|
|
| 95 |
+ except Empty:
|
|
| 96 |
+ return []
|
|
| 94 | 97 |
# For now, one lease at a time:
|
| 95 | 98 |
lease = job.create_lease()
|
| 96 | 99 |
|
| ... | ... | @@ -4,3 +4,6 @@ import hashlib |
| 4 | 4 |
# The hash function that CAS uses
|
| 5 | 5 |
HASH = hashlib.sha256
|
| 6 | 6 |
HASH_LENGTH = HASH().digest_size * 2
|
| 7 |
+ |
|
| 8 |
+# time in seconds to pad timeouts
|
|
| 9 |
+INTERVAL_BUFFER = 5
|
| ... | ... | @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server) |
| 39 | 39 |
# GRPC context
|
| 40 | 40 |
@pytest.fixture
|
| 41 | 41 |
def context():
|
| 42 |
- yield mock.MagicMock(spec=_Context)
|
|
| 42 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
| 43 |
+ context_mock.time_remaining.return_value = None
|
|
| 44 |
+ yield context_mock
|
|
| 43 | 45 |
|
| 44 | 46 |
|
| 45 | 47 |
@pytest.fixture
|
| ... | ... | @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance): |
| 91 | 93 |
|
| 92 | 94 |
request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
|
| 93 | 95 |
bot_session=bot)
|
| 94 |
- |
|
| 95 | 96 |
response = instance.UpdateBotSession(request, context)
|
| 96 | 97 |
|
| 97 | 98 |
assert isinstance(response, bots_pb2.BotSession)
|
