Raoul Hidalgo Charman pushed to branch raoul/smarter-bot-calls at BuildGrid / buildgrid
Commits:
-
6c6fa9f9
by Raoul Hidalgo Charman at 2018-12-04T17:33:47Z
-
519ae206
by Raoul Hidalgo Charman at 2018-12-04T17:43:04Z
11 changed files:
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/bot/bot.py
- buildgrid/bot/interface.py
- buildgrid/bot/session.py
- buildgrid/bot/tenantmanager.py
- buildgrid/server/bots/instance.py
- buildgrid/server/bots/service.py
- buildgrid/settings.py
- tests/integration/bot_session.py
- tests/integration/bots_service.py
- tests/utils/bots_interface.py
Changes:
... | ... | @@ -31,7 +31,7 @@ from buildgrid.bot import bot, interface, session |
31 | 31 |
from buildgrid.bot.hardware.interface import HardwareInterface
|
32 | 32 |
from buildgrid.bot.hardware.device import Device
|
33 | 33 |
from buildgrid.bot.hardware.worker import Worker
|
34 |
- |
|
34 |
+from buildgrid.settings import NETWORK_TIMEOUT
|
|
35 | 35 |
|
36 | 36 |
from ..bots import buildbox, dummy, host
|
37 | 37 |
from ..cli import pass_context
|
... | ... | @@ -54,7 +54,7 @@ from ..cli import pass_context |
54 | 54 |
help="Public CAS client certificate for TLS (PEM-encoded)")
|
55 | 55 |
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
56 | 56 |
help="Public CAS server certificate for TLS (PEM-encoded)")
|
57 |
-@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
|
|
57 |
+@click.option('--update-period', type=click.FLOAT, default=30, show_default=True,
|
|
58 | 58 |
help="Time period for bot updates to the server in seconds.")
|
59 | 59 |
@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
60 | 60 |
help="Targeted farm resource.")
|
... | ... | @@ -66,7 +66,6 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
66 | 66 |
|
67 | 67 |
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
68 | 68 |
context.remote_url = remote
|
69 |
- context.update_period = update_period
|
|
70 | 69 |
context.parent = parent
|
71 | 70 |
|
72 | 71 |
if url.scheme == 'http':
|
... | ... | @@ -124,7 +123,7 @@ def cli(context, parent, update_period, remote, client_key, client_cert, server_ |
124 | 123 |
|
125 | 124 |
click.echo("Starting for remote=[{}]".format(context.remote))
|
126 | 125 |
|
127 |
- bot_interface = interface.BotInterface(context.channel)
|
|
126 |
+ bot_interface = interface.BotInterface(context.channel, update_period + NETWORK_TIMEOUT)
|
|
128 | 127 |
worker = Worker()
|
129 | 128 |
worker.add_device(Device())
|
130 | 129 |
hardware_interface = HardwareInterface(worker)
|
... | ... | @@ -142,7 +141,7 @@ def run_dummy(context): |
142 | 141 |
try:
|
143 | 142 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
144 | 143 |
dummy.work_dummy, context)
|
145 |
- b = bot.Bot(bot_session, context.update_period)
|
|
144 |
+ b = bot.Bot(bot_session)
|
|
146 | 145 |
b.session()
|
147 | 146 |
except KeyboardInterrupt:
|
148 | 147 |
pass
|
... | ... | @@ -158,7 +157,7 @@ def run_host_tools(context): |
158 | 157 |
try:
|
159 | 158 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
160 | 159 |
host.work_host_tools, context)
|
161 |
- b = bot.Bot(bot_session, context.update_period)
|
|
160 |
+ b = bot.Bot(bot_session)
|
|
162 | 161 |
b.session()
|
163 | 162 |
except KeyboardInterrupt:
|
164 | 163 |
pass
|
... | ... | @@ -180,7 +179,7 @@ def run_buildbox(context, local_cas, fuse_dir): |
180 | 179 |
try:
|
181 | 180 |
bot_session = session.BotSession(context.parent, context.bot_interface, context.hardware_interface,
|
182 | 181 |
buildbox.work_buildbox, context)
|
183 |
- b = bot.Bot(bot_session, context.update_period)
|
|
182 |
+ b = bot.Bot(bot_session)
|
|
184 | 183 |
b.session()
|
185 | 184 |
except KeyboardInterrupt:
|
186 | 185 |
pass
|
... | ... | @@ -20,14 +20,12 @@ import logging |
20 | 20 |
class Bot:
|
21 | 21 |
"""Creates a local BotSession."""
|
22 | 22 |
|
23 |
- def __init__(self, bot_session, update_period=1):
|
|
23 |
+ def __init__(self, bot_session):
|
|
24 | 24 |
"""
|
25 | 25 |
"""
|
26 | 26 |
self.__logger = logging.getLogger(__name__)
|
27 | 27 |
|
28 | 28 |
self.__bot_session = bot_session
|
29 |
- self.__update_period = update_period
|
|
30 |
- |
|
31 | 29 |
self.__loop = None
|
32 | 30 |
|
33 | 31 |
def session(self):
|
... | ... | @@ -37,7 +35,7 @@ class Bot: |
37 | 35 |
self.__bot_session.create_bot_session()
|
38 | 36 |
|
39 | 37 |
try:
|
40 |
- task = asyncio.ensure_future(self.__update_bot_session())
|
|
38 |
+ task = asyncio.ensure_future(self.__bot_session.run())
|
|
41 | 39 |
self.__loop.run_until_complete(task)
|
42 | 40 |
|
43 | 41 |
except KeyboardInterrupt:
|
... | ... | @@ -46,16 +44,6 @@ class Bot: |
46 | 44 |
self.__kill_everyone()
|
47 | 45 |
self.__logger.info("Bot shutdown.")
|
48 | 46 |
|
49 |
- async def __update_bot_session(self):
|
|
50 |
- """Calls the server periodically to inform the server the client has not died."""
|
|
51 |
- try:
|
|
52 |
- while True:
|
|
53 |
- self.__bot_session.update_bot_session()
|
|
54 |
- await asyncio.sleep(self.__update_period)
|
|
55 |
- |
|
56 |
- except asyncio.CancelledError:
|
|
57 |
- pass
|
|
58 |
- |
|
59 | 47 |
def __kill_everyone(self):
|
60 | 48 |
"""Cancels and waits for them to stop."""
|
61 | 49 |
self.__logger.info("Cancelling remaining tasks...")
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
import grpc
|
25 | 25 |
|
26 | 26 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2, bots_pb2_grpc
|
27 |
+from buildgrid.settings import MAX_NETWORK_INTERVAL
|
|
27 | 28 |
|
28 | 29 |
|
29 | 30 |
class BotInterface:
|
... | ... | @@ -31,28 +32,35 @@ class BotInterface: |
31 | 32 |
Interface handles calls to the server.
|
32 | 33 |
"""
|
33 | 34 |
|
34 |
- def __init__(self, channel):
|
|
35 |
+ def __init__(self, channel, interval):
|
|
35 | 36 |
self.__logger = logging.getLogger(__name__)
|
36 | 37 |
|
37 | 38 |
self._stub = bots_pb2_grpc.BotsStub(channel)
|
39 |
+ if interval > MAX_NETWORK_INTERVAL:
|
|
40 |
+ self.__interval = MAX_NETWORK_INTERVAL
|
|
41 |
+ else:
|
|
42 |
+ self.__interval = interval
|
|
43 |
+ |
|
44 |
+ @property
|
|
45 |
+ def interval(self):
|
|
46 |
+ return self.__interval
|
|
38 | 47 |
|
39 | 48 |
def create_bot_session(self, parent, bot_session):
|
49 |
+ """ Creates a bot session returning a grpc StatusCode if it failed """
|
|
40 | 50 |
request = bots_pb2.CreateBotSessionRequest(parent=parent,
|
41 | 51 |
bot_session=bot_session)
|
42 |
- try:
|
|
43 |
- return self._stub.CreateBotSession(request)
|
|
44 |
- |
|
45 |
- except grpc.RpcError as e:
|
|
46 |
- self.__logger.error(e)
|
|
47 |
- raise
|
|
52 |
+ return self._bot_call(self._stub.CreateBotSession, request)
|
|
48 | 53 |
|
49 | 54 |
def update_bot_session(self, bot_session, update_mask=None):
|
55 |
+ """ Updates a bot session returning a grpc StatusCode if it failed """
|
|
50 | 56 |
request = bots_pb2.UpdateBotSessionRequest(name=bot_session.name,
|
51 | 57 |
bot_session=bot_session,
|
52 | 58 |
update_mask=update_mask)
|
53 |
- try:
|
|
54 |
- return self._stub.UpdateBotSession(request)
|
|
59 |
+ return self._bot_call(self._stub.UpdateBotSession, request)
|
|
55 | 60 |
|
61 |
+ def _bot_call(self, call, request):
|
|
62 |
+ try:
|
|
63 |
+ return call(request, timeout=self.interval)
|
|
56 | 64 |
except grpc.RpcError as e:
|
57 |
- self.__logger.error(e)
|
|
58 |
- raise
|
|
65 |
+ self.__logger.error(e.code())
|
|
66 |
+ return e.code()
|
... | ... | @@ -19,9 +19,12 @@ Bot Session |
19 | 19 |
|
20 | 20 |
Allows connections
|
21 | 21 |
"""
|
22 |
+import asyncio
|
|
22 | 23 |
import logging
|
23 | 24 |
import platform
|
24 | 25 |
|
26 |
+from grpc import StatusCode
|
|
27 |
+ |
|
25 | 28 |
from buildgrid._enums import BotStatus, LeaseState
|
26 | 29 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
27 | 30 |
from buildgrid._protos.google.rpc import code_pb2
|
... | ... | @@ -47,6 +50,8 @@ class BotSession: |
47 | 50 |
self._status = BotStatus.OK.value
|
48 | 51 |
self._tenant_manager = TenantManager()
|
49 | 52 |
|
53 |
+ self.connected = False
|
|
54 |
+ |
|
50 | 55 |
self.__parent = parent
|
51 | 56 |
self.__bot_id = '{}.{}'.format(parent, platform.node())
|
52 | 57 |
self.__name = None
|
... | ... | @@ -58,10 +63,33 @@ class BotSession: |
58 | 63 |
def bot_id(self):
|
59 | 64 |
return self.__bot_id
|
60 | 65 |
|
66 |
+ async def run(self):
|
|
67 |
+ """ Run a bot session
|
|
68 |
+ |
|
69 |
+ This connects and reconnects via create bot session and waits on update
|
|
70 |
+ bot session calls.
|
|
71 |
+ """
|
|
72 |
+ self.__logger.debug("Starting bot session")
|
|
73 |
+ interval = self._bots_interface.interval
|
|
74 |
+ while True:
|
|
75 |
+ if not self.connected:
|
|
76 |
+ self.create_bot_session()
|
|
77 |
+ else:
|
|
78 |
+ self.update_bot_session()
|
|
79 |
+ |
|
80 |
+ if not self.connected:
|
|
81 |
+ await asyncio.sleep(interval)
|
|
82 |
+ else:
|
|
83 |
+ await self._tenant_manager.wait_on_tenants(interval)
|
|
84 |
+ |
|
61 | 85 |
def create_bot_session(self):
|
62 | 86 |
self.__logger.debug("Creating bot session")
|
63 | 87 |
|
64 | 88 |
session = self._bots_interface.create_bot_session(self.__parent, self.get_pb2())
|
89 |
+ if session in list(StatusCode):
|
|
90 |
+ self.connected = False
|
|
91 |
+ return
|
|
92 |
+ self.connected = True
|
|
65 | 93 |
self.__name = session.name
|
66 | 94 |
|
67 | 95 |
self.__logger.info("Created bot session with name: [%s]", self.__name)
|
... | ... | @@ -73,6 +101,13 @@ class BotSession: |
73 | 101 |
self.__logger.debug("Updating bot session: [%s]", self.__bot_id)
|
74 | 102 |
|
75 | 103 |
session = self._bots_interface.update_bot_session(self.get_pb2())
|
104 |
+ if session == StatusCode.DEADLINE_EXCEEDED:
|
|
105 |
+ # try to continue to do update session if it passed the timeout
|
|
106 |
+ return
|
|
107 |
+ elif session in StatusCode:
|
|
108 |
+ self.connected = False
|
|
109 |
+ return
|
|
110 |
+ self.connected = True
|
|
76 | 111 |
server_ids = []
|
77 | 112 |
|
78 | 113 |
for lease in session.leases:
|
... | ... | @@ -150,6 +150,13 @@ class TenantManager: |
150 | 150 |
"""
|
151 | 151 |
return self._tenants[lease_id].tenant_completed
|
152 | 152 |
|
153 |
+ async def wait_on_tenants(self, timeout):
|
|
154 |
+ if self._tasks:
|
|
155 |
+ tasks = self._tasks.values()
|
|
156 |
+ await asyncio.wait(tasks,
|
|
157 |
+ timeout=timeout,
|
|
158 |
+ return_when=asyncio.FIRST_COMPLETED)
|
|
159 |
+ |
|
153 | 160 |
def _update_lease_result(self, lease_id, result):
|
154 | 161 |
"""Updates the lease with the result."""
|
155 | 162 |
self._tenants[lease_id].update_lease_result(result)
|
... | ... | @@ -24,6 +24,7 @@ import logging |
24 | 24 |
import uuid
|
25 | 25 |
|
26 | 26 |
from buildgrid._exceptions import InvalidArgumentError
|
27 |
+from buildgrid.settings import NETWORK_TIMEOUT
|
|
27 | 28 |
|
28 | 29 |
from ..job import LeaseState
|
29 | 30 |
|
... | ... | @@ -75,7 +76,7 @@ class BotsInterface: |
75 | 76 |
self._request_leases(bot_session)
|
76 | 77 |
return bot_session
|
77 | 78 |
|
78 |
- def update_bot_session(self, name, bot_session):
|
|
79 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
79 | 80 |
""" Client updates the server. Any changes in state to the Lease should be
|
80 | 81 |
registered server side. Assigns available leases with work.
|
81 | 82 |
"""
|
... | ... | @@ -93,14 +94,15 @@ class BotsInterface: |
93 | 94 |
pass
|
94 | 95 |
lease.Clear()
|
95 | 96 |
|
96 |
- self._request_leases(bot_session)
|
|
97 |
+ self._request_leases(bot_session, deadline)
|
|
97 | 98 |
return bot_session
|
98 | 99 |
|
99 |
- def _request_leases(self, bot_session):
|
|
100 |
+ def _request_leases(self, bot_session, deadline=None):
|
|
100 | 101 |
# TODO: Send worker capabilities to the scheduler!
|
101 | 102 |
# Only send one lease at a time currently.
|
102 | 103 |
if not bot_session.leases:
|
103 |
- leases = self._scheduler.request_job_leases({})
|
|
104 |
+ leases = self._scheduler.request_job_leases(
|
|
105 |
+ {}, timeout=deadline - NETWORK_TIMEOUT if deadline else None)
|
|
104 | 106 |
if leases:
|
105 | 107 |
for lease in leases:
|
106 | 108 |
self._assigned_leases[bot_session.name].add(lease.id)
|
... | ... | @@ -138,8 +138,10 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
138 | 138 |
instance_name = '/'.join(names[:-1])
|
139 | 139 |
|
140 | 140 |
instance = self._get_instance(instance_name)
|
141 |
- bot_session = instance.update_bot_session(request.name,
|
|
142 |
- request.bot_session)
|
|
141 |
+ bot_session = instance.update_bot_session(
|
|
142 |
+ request.name,
|
|
143 |
+ request.bot_session,
|
|
144 |
+ deadline=context.time_remaining())
|
|
143 | 145 |
|
144 | 146 |
if self._is_instrumented:
|
145 | 147 |
self.__bots[bot_id].GetCurrentTime()
|
... | ... | @@ -30,3 +30,10 @@ MAX_REQUEST_SIZE = 2 * 1024 * 1024 |
30 | 30 |
|
31 | 31 |
# Maximum number of elements per gRPC request:
|
32 | 32 |
MAX_REQUEST_COUNT = 500
|
33 |
+ |
|
34 |
+# time in seconds to pad timeouts
|
|
35 |
+NETWORK_TIMEOUT = 5
|
|
36 |
+ |
|
37 |
+# Hard limit for grpc calls, avoids timeouts not being set defaulting the
|
|
38 |
+# intervals to the max int64 value
|
|
39 |
+MAX_NETWORK_INTERVAL = 300
|
... | ... | @@ -30,6 +30,7 @@ from ..utils.utils import run_in_subprocess |
30 | 30 |
from ..utils.bots_interface import serve_bots_interface
|
31 | 31 |
|
32 | 32 |
|
33 |
+TIMEOUT = 5
|
|
33 | 34 |
INSTANCES = ['', 'instance']
|
34 | 35 |
|
35 | 36 |
|
... | ... | @@ -48,7 +49,7 @@ class ServerInterface: |
48 | 49 |
bot_session = bots_pb2.BotSession()
|
49 | 50 |
bot_session.ParseFromString(string_bot_session)
|
50 | 51 |
|
51 |
- interface = BotInterface(grpc.insecure_channel(remote))
|
|
52 |
+ interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
|
|
52 | 53 |
|
53 | 54 |
result = interface.create_bot_session(parent, bot_session)
|
54 | 55 |
queue.put(result.SerializeToString())
|
... | ... | @@ -67,7 +68,7 @@ class ServerInterface: |
67 | 68 |
bot_session = bots_pb2.BotSession()
|
68 | 69 |
bot_session.ParseFromString(string_bot_session)
|
69 | 70 |
|
70 |
- interface = BotInterface(grpc.insecure_channel(remote))
|
|
71 |
+ interface = BotInterface(grpc.insecure_channel(remote), TIMEOUT)
|
|
71 | 72 |
|
72 | 73 |
result = interface.update_bot_session(bot_session, update_mask)
|
73 | 74 |
queue.put(result.SerializeToString())
|
... | ... | @@ -38,7 +38,9 @@ server = mock.create_autospec(grpc.server) |
38 | 38 |
# GRPC context
|
39 | 39 |
@pytest.fixture
|
40 | 40 |
def context():
|
41 |
- yield mock.MagicMock(spec=_Context)
|
|
41 |
+ context_mock = mock.MagicMock(spec=_Context)
|
|
42 |
+ context_mock.time_remaining.return_value = None
|
|
43 |
+ yield context_mock
|
|
42 | 44 |
|
43 | 45 |
|
44 | 46 |
@pytest.fixture
|
... | ... | @@ -123,7 +123,7 @@ class BotsInterface: |
123 | 123 |
self.__bot_session_queue.put(bot_session.SerializeToString())
|
124 | 124 |
return bot_session
|
125 | 125 |
|
126 |
- def update_bot_session(self, name, bot_session):
|
|
126 |
+ def update_bot_session(self, name, bot_session, deadline=None):
|
|
127 | 127 |
for lease in bot_session.leases:
|
128 | 128 |
state = LeaseState(lease.state)
|
129 | 129 |
if state == LeaseState.COMPLETED:
|