Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
05053c36
by Raoul Hidalgo Charman at 2018-11-02T14:43:47Z
-
81f7da62
by Raoul Hidalgo Charman at 2018-11-02T14:43:55Z
9 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot.py
- buildgrid/bot/bot_interface.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/server/scheduler.py
- buildgrid/settings.py
- buildgrid/utils.py
- tests/integration/bots_service.py
Changes:
| ... | ... | @@ -33,6 +33,7 @@ from buildgrid.bot.bot_session import BotSession, Device, Worker |
| 33 | 33 |
|
| 34 | 34 |
from ..bots import buildbox, dummy, host
|
| 35 | 35 |
from ..cli import pass_context
|
| 36 |
+from ...settings import INTERVAL_BUFFER
|
|
| 36 | 37 |
|
| 37 | 38 |
|
| 38 | 39 |
@click.group(name='bot', short_help="Create and register bot clients.")
|
| ... | ... | @@ -52,7 +53,7 @@ from ..cli import pass_context |
| 52 | 53 |
help="Public CAS client certificate for TLS (PEM-encoded)")
|
| 53 | 54 |
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
| 54 | 55 |
help="Public CAS server certificate for TLS (PEM-encoded)")
|
| 55 |
-@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
|
|
| 56 |
+@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
|
|
| 56 | 57 |
help="Time period for bot updates to the server in seconds.")
|
| 57 | 58 |
@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
| 58 | 59 |
help="Targeted farm resource.")
|
| ... | ... | @@ -64,7 +65,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
| 64 | 65 |
|
| 65 | 66 |
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
| 66 | 67 |
context.remote_url = remote
|
| 67 |
- context.update_period = update_period
|
|
| 68 | 68 |
context.parent = parent
|
| 69 | 69 |
|
| 70 | 70 |
if url.scheme == 'http':
|
| ... | ... | @@ -123,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
| 123 | 123 |
context.logger = logging.getLogger(__name__)
|
| 124 | 124 |
context.logger.debug("Starting for remote {}".format(context.remote))
|
| 125 | 125 |
|
| 126 |
- interface = bot_interface.BotInterface(context.channel)
|
|
| 126 |
+ interface = bot_interface.BotInterface(context.channel, update_period)
|
|
| 127 | 127 |
|
| 128 | 128 |
worker = Worker()
|
| 129 | 129 |
worker.add_device(Device())
|
| ... | ... | @@ -141,7 +141,7 @@ def run_dummy(context): |
| 141 | 141 |
Creates a session, accepts leases, does fake work and updates the server.
|
| 142 | 142 |
"""
|
| 143 | 143 |
try:
|
| 144 |
- b = bot.Bot(context.bot_session, context.update_period)
|
|
| 144 |
+ b = bot.Bot(context.bot_session)
|
|
| 145 | 145 |
b.session(dummy.work_dummy,
|
| 146 | 146 |
context)
|
| 147 | 147 |
except KeyboardInterrupt:
|
| ... | ... | @@ -156,7 +156,7 @@ def run_host_tools(context): |
| 156 | 156 |
result back to CAS.
|
| 157 | 157 |
"""
|
| 158 | 158 |
try:
|
| 159 |
- b = bot.Bot(context.bot_session, context.update_period)
|
|
| 159 |
+ b = bot.Bot(context.bot_session)
|
|
| 160 | 160 |
b.session(host.work_host_tools,
|
| 161 | 161 |
context)
|
| 162 | 162 |
except KeyboardInterrupt:
|
| ... | ... | @@ -177,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir): |
| 177 | 177 |
context.fuse_dir = fuse_dir
|
| 178 | 178 |
|
| 179 | 179 |
try:
|
| 180 |
- b = bot.Bot(context.bot_session, context.update_period)
|
|
| 180 |
+ b = bot.Bot(context.bot_session)
|
|
| 181 | 181 |
b.session(buildbox.work_buildbox,
|
| 182 | 182 |
context)
|
| 183 | 183 |
except KeyboardInterrupt:
|
| ... | ... | @@ -29,11 +29,10 @@ class Bot: |
| 29 | 29 |
Creates a local BotSession.
|
| 30 | 30 |
"""
|
| 31 | 31 |
|
| 32 |
- def __init__(self, bot_session, update_period=1):
|
|
| 32 |
+ def __init__(self, bot_session):
|
|
| 33 | 33 |
self.logger = logging.getLogger(__name__)
|
| 34 | 34 |
|
| 35 | 35 |
self._bot_session = bot_session
|
| 36 |
- self._update_period = update_period
|
|
| 37 | 36 |
|
| 38 | 37 |
def session(self, work, context):
|
| 39 | 38 |
loop = asyncio.get_event_loop()
|
| ... | ... | @@ -55,4 +54,3 @@ class Bot: |
| 55 | 54 |
"""
|
| 56 | 55 |
while True:
|
| 57 | 56 |
self._bot_session.update_bot_session()
|
| 58 |
- await asyncio.sleep(self._update_period)
|
| ... | ... | @@ -21,7 +21,10 @@ Interface to grpc |
| 21 | 21 |
"""
|
| 22 | 22 |
|
| 23 | 23 |
import logging
|
| 24 |
+from ..utils import timeout
|
|
| 25 |
+from ..settings import INTERVAL_BUFFER
|
|
| 24 | 26 |
|
| 27 |
+import grpc
|
|
| 25 | 28 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
|
| 26 | 29 |
|
| 27 | 30 |
|
| ... | ... | @@ -30,10 +33,11 @@ class BotInterface: |
| 30 | 33 |
Interface handles calls to the server.
|
| 31 | 34 |
"""
|
| 32 | 35 |
|
| 33 |
- def __init__(self, channel):
|
|
| 36 |
+ def __init__(self, channel, interval):
|
|
| 34 | 37 |
self.logger = logging.getLogger(__name__)
|
| 35 | 38 |
self.logger.info(channel)
|
| 36 | 39 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
| 40 |
+ self._interval = interval
|
|
| 37 | 41 |
|
| 38 | 42 |
def create_bot_session(self, parent, bot_session):
|
| 39 | 43 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
| ... | ... | @@ -44,4 +48,15 @@ class BotInterface: |
| 44 | 48 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
| 45 | 49 |
bot_session=bot_session,
|
| 46 | 50 |
update_mask=update_mask)
|
| 47 |
- return self._stub.UpdateBotSession(request)
|
|
| 51 |
+ try:
|
|
| 52 |
+ with timeout(30):
|
|
| 53 |
+ return self._stub.UpdateBotSession(
|
|
| 54 |
+ request, timeout=self._interval + INTERVAL_BUFFER)
|
|
| 55 |
+ except grpc.StatusCode.DEADLINE_EXCEEDED:
|
|
| 56 |
+ self.logger.info("Server didn't respond")
|
|
| 57 |
+ return None
|
|
| 58 |
+ except TimeoutError:
|
|
| 59 |
+ self.logger.info("server didn't respond")
|
|
| 60 |
+ except Exception as e:
|
|
| 61 |
+ self.logger.info("Some error: {}".format(e))
|
|
| 62 |
+ |
| ... | ... | @@ -26,6 +26,7 @@ import uuid |
| 26 | 26 |
from buildgrid._exceptions import InvalidArgumentError, OutOfSyncError
|
| 27 | 27 |
|
| 28 | 28 |
from ..job import LeaseState
|
| 29 |
+from ...settings import INTERVAL_BUFFER
|
|
| 29 | 30 |
|
| 30 | 31 |
|
| 31 | 32 |
class BotsInterface:
|
| ... | ... | @@ -73,7 +74,7 @@ class BotsInterface: |
| 73 | 74 |
|
| 74 | 75 |
return bot_session
|
| 75 | 76 |
|
| 76 |
- def update_bot_session(self, name, bot_session):
|
|
| 77 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
| 77 | 78 |
""" Client updates the server. Any changes in state to the Lease should be
|
| 78 | 79 |
registered server side. Assigns available leases with work.
|
| 79 | 80 |
"""
|
| ... | ... | @@ -87,7 +88,9 @@ class BotsInterface: |
| 87 | 88 |
|
| 88 | 89 |
# TODO: Send worker capabilities to the scheduler!
|
| 89 | 90 |
if not bot_session.leases:
|
| 90 |
- leases = self._scheduler.request_job_leases({})
|
|
| 91 |
+ leases = self._scheduler.request_job_leases(
|
|
| 92 |
+ {}, block=True if deadline else None,
|
|
| 93 |
+ timeout=deadline - INTERVAL_BUFFER if deadline else None)
|
|
| 91 | 94 |
if leases:
|
| 92 | 95 |
bot_session.leases.extend(leases)
|
| 93 | 96 |
|
| ... | ... | @@ -64,8 +64,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
| 64 | 64 |
instance_name = ''.join(names[0:-1])
|
| 65 | 65 |
|
| 66 | 66 |
instance = self._get_instance(instance_name)
|
| 67 |
+ # server side context time_remaining is maxint - unix time
|
|
| 67 | 68 |
return instance.update_bot_session(request.name,
|
| 68 |
- request.bot_session)
|
|
| 69 |
+ request.bot_session,
|
|
| 70 |
+ context.time_remaining())
|
|
| 69 | 71 |
|
| 70 | 72 |
except InvalidArgumentError as e:
|
| 71 | 73 |
self.logger.error(e)
|
| ... | ... | @@ -19,7 +19,7 @@ Scheduler |
| 19 | 19 |
Schedules jobs.
|
| 20 | 20 |
"""
|
| 21 | 21 |
|
| 22 |
-from collections import deque
|
|
| 22 |
+from queue import Queue, Empty
|
|
| 23 | 23 |
|
| 24 | 24 |
from buildgrid._exceptions import NotFoundError
|
| 25 | 25 |
|
| ... | ... | @@ -33,7 +33,7 @@ class Scheduler: |
| 33 | 33 |
def __init__(self, action_cache=None):
|
| 34 | 34 |
self._action_cache = action_cache
|
| 35 | 35 |
self.jobs = {}
|
| 36 |
- self.queue = deque()
|
|
| 36 |
+ self.queue = Queue()
|
|
| 37 | 37 |
|
| 38 | 38 |
def register_client(self, job_name, queue):
|
| 39 | 39 |
self.jobs[job_name].register_client(queue)
|
| ... | ... | @@ -53,7 +53,7 @@ class Scheduler: |
| 53 | 53 |
action_result = self._action_cache.get_action_result(job.action_digest)
|
| 54 | 54 |
except NotFoundError:
|
| 55 | 55 |
operation_stage = OperationStage.QUEUED
|
| 56 |
- self.queue.append(job)
|
|
| 56 |
+ self.queue.put(job)
|
|
| 57 | 57 |
|
| 58 | 58 |
else:
|
| 59 | 59 |
job.set_cached_result(action_result)
|
| ... | ... | @@ -61,7 +61,7 @@ class Scheduler: |
| 61 | 61 |
|
| 62 | 62 |
else:
|
| 63 | 63 |
operation_stage = OperationStage.QUEUED
|
| 64 |
- self.queue.append(job)
|
|
| 64 |
+ self.queue.put(job)
|
|
| 65 | 65 |
|
| 66 | 66 |
job.update_operation_stage(operation_stage)
|
| 67 | 67 |
|
| ... | ... | @@ -74,12 +74,12 @@ class Scheduler: |
| 74 | 74 |
# TODO: Mark these jobs as done
|
| 75 | 75 |
else:
|
| 76 | 76 |
job.update_operation_stage(OperationStage.QUEUED)
|
| 77 |
- self.queue.appendleft(job)
|
|
| 77 |
+ self.queue.queue.appendleft(job)
|
|
| 78 | 78 |
|
| 79 | 79 |
def list_jobs(self):
|
| 80 | 80 |
return self.jobs.values()
|
| 81 | 81 |
|
| 82 |
- def request_job_leases(self, worker_capabilities):
|
|
| 82 |
+ def request_job_leases(self, worker_capabilities, block=False, timeout=None):
|
|
| 83 | 83 |
"""Generates a list of the highest priority leases to be run.
|
| 84 | 84 |
|
| 85 | 85 |
Args:
|
| ... | ... | @@ -87,10 +87,13 @@ class Scheduler: |
| 87 | 87 |
worker properties, configuration and state at the time of the
|
| 88 | 88 |
request.
|
| 89 | 89 |
"""
|
| 90 |
- if not self.queue:
|
|
| 90 |
+ if not block and self.queue.empty():
|
|
| 91 | 91 |
return []
|
| 92 | 92 |
|
| 93 |
- job = self.queue.popleft()
|
|
| 93 |
+ try:
|
|
| 94 |
+ job = self.queue.get(block, timeout)
|
|
| 95 |
+ except Empty:
|
|
| 96 |
+ return []
|
|
| 94 | 97 |
# For now, one lease at a time:
|
| 95 | 98 |
lease = job.create_lease()
|
| 96 | 99 |
|
| ... | ... | @@ -4,3 +4,6 @@ import hashlib |
| 4 | 4 |
# The hash function that CAS uses
|
| 5 | 5 |
HASH = hashlib.sha256
|
| 6 | 6 |
HASH_LENGTH = HASH().digest_size * 2
|
| 7 |
+ |
|
| 8 |
+# time in seconds to pad timeouts
|
|
| 9 |
+INTERVAL_BUFFER = 5
|
| ... | ... | @@ -16,6 +16,8 @@ |
| 16 | 16 |
from operator import attrgetter
|
| 17 | 17 |
import os
|
| 18 | 18 |
import socket
|
| 19 |
+from contextlib import contextmanager
|
|
| 20 |
+import signal
|
|
| 19 | 21 |
|
| 20 | 22 |
from buildgrid.settings import HASH
|
| 21 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| ... | ... | @@ -208,3 +210,15 @@ def output_directory_maker(directory_path, working_path, tree_digest): |
| 208 | 210 |
output_directory.path = os.path.relpath(directory_path, start=working_path)
|
| 209 | 211 |
|
| 210 | 212 |
return output_directory
|
| 213 |
+ |
|
| 214 |
+ |
|
| 215 |
+@contextmanager
|
|
| 216 |
+def timeout(seconds):
|
|
| 217 |
+ def raise_timeout(x, y):
|
|
| 218 |
+ raise TimeoutError
|
|
| 219 |
+ signal.signal(signal.SIGALRM, raise_timeout)
|
|
| 220 |
+ signal.alarm(seconds)
|
|
| 221 |
+ try:
|
|
| 222 |
+ yield
|
|
| 223 |
+ finally:
|
|
| 224 |
+ signal.alarm(0)
|
| ... | ... | @@ -39,7 +39,9 @@ server = mock.create_autospec(grpc.server) |
| 39 | 39 |
# GRPC context
|
| 40 | 40 |
@pytest.fixture
|
| 41 | 41 |
def context():
|
| 42 |
- yield mock.MagicMock(spec=_Context)
|
|
| 42 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
| 43 |
+ context_mock.time_remaining.return_value = None
|
|
| 44 |
+ yield context_mock
|
|
| 43 | 45 |
|
| 44 | 46 |
|
| 45 | 47 |
@pytest.fixture
|
| ... | ... | @@ -91,7 +93,6 @@ def test_update_bot_session(bot_session, context, instance): |
| 91 | 93 |
|
| 92 | 94 |
request = bots_pb2.UpdateBotSessionRequest(name=bot.name,
|
| 93 | 95 |
bot_session=bot)
|
| 94 |
- |
|
| 95 | 96 |
response = instance.UpdateBotSession(request, context)
|
| 96 | 97 |
|
| 97 | 98 |
assert isinstance(response, bots_pb2.BotSession)
|
