Martin Blanchard pushed to branch mablanch/connection-drop-simulation at BuildGrid / buildgrid
Commits:
-
a539c4e9
by Finn at 2018-10-03T10:49:00Z
-
7cc13d06
by Finn at 2018-10-03T10:49:00Z
-
e59fb607
by Finn at 2018-10-03T13:10:21Z
-
23f0fed1
by Jürg Billeter at 2018-10-03T13:16:10Z
-
65f4b337
by Martin Blanchard at 2018-10-03T14:08:14Z
-
a88e0715
by Finn at 2018-10-03T14:35:20Z
-
4fa35bc9
by Finn at 2018-10-03T14:35:20Z
-
bb2cd92b
by Finn at 2018-10-03T15:06:01Z
-
74bca84f
by Finn at 2018-10-03T15:06:01Z
-
0a856306
by Marios Hadjimichael at 2018-10-03T20:09:34Z
-
2a7cfd66
by Martin Blanchard at 2018-10-05T09:09:21Z
14 changed files:
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/commands/cmd_bot.py
- buildgrid/_app/settings/cas.yml
- buildgrid/client/cas.py
- buildgrid/server/actioncache/service.py
- buildgrid/server/bots/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/execution/instance.py
- buildgrid/server/execution/service.py
- buildgrid/server/job.py
- buildgrid/server/operations/service.py
- buildgrid/server/referencestorage/service.py
- buildgrid/server/scheduler.py
- tests/integration/bots_service.py
Changes:
| ... | ... | @@ -46,11 +46,6 @@ def work_buildbox(context, lease): |
| 46 | 46 |
command = downloader.get_message(action.command_digest,
|
| 47 | 47 |
remote_execution_pb2.Command())
|
| 48 | 48 |
|
| 49 |
- environment = {}
|
|
| 50 |
- for variable in command.environment_variables:
|
|
| 51 |
- if variable.name not in ['PWD']:
|
|
| 52 |
- environment[variable.name] = variable.value
|
|
| 53 |
- |
|
| 54 | 49 |
if command.working_directory:
|
| 55 | 50 |
working_directory = command.working_directory
|
| 56 | 51 |
else:
|
| ... | ... | @@ -82,6 +77,12 @@ def work_buildbox(context, lease): |
| 82 | 77 |
if context.cas_server_cert:
|
| 83 | 78 |
command_line.append('--server-cert={}'.format(context.cas_server_cert))
|
| 84 | 79 |
|
| 80 |
+ command_line.append('--clearenv')
|
|
| 81 |
+ for variable in command.environment_variables:
|
|
| 82 |
+ command_line.append('--setenv')
|
|
| 83 |
+ command_line.append(variable.name)
|
|
| 84 |
+ command_line.append(variable.value)
|
|
| 85 |
+ |
|
| 85 | 86 |
command_line.append(context.fuse_dir)
|
| 86 | 87 |
command_line.extend(command.arguments)
|
| 87 | 88 |
|
| ... | ... | @@ -52,16 +52,19 @@ from ..cli import pass_context |
| 52 | 52 |
help="Public CAS client certificate for TLS (PEM-encoded)")
|
| 53 | 53 |
@click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
|
| 54 | 54 |
help="Public CAS server certificate for TLS (PEM-encoded)")
|
| 55 |
+@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
|
|
| 56 |
+ help="Time period for bot updates to the server in seconds.")
|
|
| 55 | 57 |
@click.option('--parent', type=click.STRING, default='main', show_default=True,
|
| 56 | 58 |
help="Targeted farm resource.")
|
| 57 | 59 |
@pass_context
|
| 58 |
-def cli(context, parent, remote, client_key, client_cert, server_cert,
|
|
| 60 |
+def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
|
|
| 59 | 61 |
remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
|
| 60 | 62 |
# Setup the remote execution server channel:
|
| 61 | 63 |
url = urlparse(remote)
|
| 62 | 64 |
|
| 63 | 65 |
context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
|
| 64 | 66 |
context.remote_url = remote
|
| 67 |
+ context.update_period = update_period
|
|
| 65 | 68 |
context.parent = parent
|
| 66 | 69 |
|
| 67 | 70 |
if url.scheme == 'http':
|
| ... | ... | @@ -138,7 +141,7 @@ def run_dummy(context): |
| 138 | 141 |
Creates a session, accepts leases, does fake work and updates the server.
|
| 139 | 142 |
"""
|
| 140 | 143 |
try:
|
| 141 |
- b = bot.Bot(context.bot_session)
|
|
| 144 |
+ b = bot.Bot(context.bot_session, context.update_period)
|
|
| 142 | 145 |
b.session(dummy.work_dummy,
|
| 143 | 146 |
context)
|
| 144 | 147 |
except KeyboardInterrupt:
|
| ... | ... | @@ -153,7 +156,7 @@ def run_host_tools(context): |
| 153 | 156 |
result back to CAS.
|
| 154 | 157 |
"""
|
| 155 | 158 |
try:
|
| 156 |
- b = bot.Bot(context.bot_session)
|
|
| 159 |
+ b = bot.Bot(context.bot_session, context.update_period)
|
|
| 157 | 160 |
b.session(host.work_host_tools,
|
| 158 | 161 |
context)
|
| 159 | 162 |
except KeyboardInterrupt:
|
| ... | ... | @@ -174,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir): |
| 174 | 177 |
context.fuse_dir = fuse_dir
|
| 175 | 178 |
|
| 176 | 179 |
try:
|
| 177 |
- b = bot.Bot(context.bot_session)
|
|
| 180 |
+ b = bot.Bot(context.bot_session, context.update_period)
|
|
| 178 | 181 |
b.session(buildbox.work_buildbox,
|
| 179 | 182 |
context)
|
| 180 | 183 |
except KeyboardInterrupt:
|
| 1 | 1 |
server:
|
| 2 | 2 |
- !channel
|
| 3 |
- port: 50051
|
|
| 3 |
+ port: 50052
|
|
| 4 | 4 |
insecure_mode: true
|
| 5 | 5 |
# credentials:
|
| 6 | 6 |
# tls-server-key: null
|
| ... | ... | @@ -241,7 +241,7 @@ class Downloader: |
| 241 | 241 |
"""Fetches a blob using ByteStream.Read()"""
|
| 242 | 242 |
read_blob = bytearray()
|
| 243 | 243 |
|
| 244 |
- if self.instance_name is not None:
|
|
| 244 |
+ if self.instance_name:
|
|
| 245 | 245 |
resource_name = '/'.join([self.instance_name, 'blobs',
|
| 246 | 246 |
digest.hash, str(digest.size_bytes)])
|
| 247 | 247 |
else:
|
| ... | ... | @@ -313,7 +313,7 @@ class Downloader: |
| 313 | 313 |
|
| 314 | 314 |
def _fetch_file(self, digest, file_path):
|
| 315 | 315 |
"""Fetches a file using ByteStream.Read()"""
|
| 316 |
- if self.instance_name is not None:
|
|
| 316 |
+ if self.instance_name:
|
|
| 317 | 317 |
resource_name = '/'.join([self.instance_name, 'blobs',
|
| 318 | 318 |
digest.hash, str(digest.size_bytes)])
|
| 319 | 319 |
else:
|
| ... | ... | @@ -699,7 +699,7 @@ class Uploader: |
| 699 | 699 |
else:
|
| 700 | 700 |
blob_digest.hash = HASH(blob).hexdigest()
|
| 701 | 701 |
blob_digest.size_bytes = len(blob)
|
| 702 |
- if self.instance_name is not None:
|
|
| 702 |
+ if self.instance_name:
|
|
| 703 | 703 |
resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
|
| 704 | 704 |
blob_digest.hash, str(blob_digest.size_bytes)])
|
| 705 | 705 |
else:
|
| ... | ... | @@ -52,7 +52,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
| 52 | 52 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 53 | 53 |
|
| 54 | 54 |
except NotFoundError as e:
|
| 55 |
- self.logger.info(e)
|
|
| 55 |
+ self.logger.debug(e)
|
|
| 56 | 56 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 57 | 57 |
|
| 58 | 58 |
return remote_execution_pb2.ActionResult()
|
| ... | ... | @@ -66,7 +66,9 @@ class BotsInterface: |
| 66 | 66 |
self._bot_sessions[name] = bot_session
|
| 67 | 67 |
self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
|
| 68 | 68 |
|
| 69 |
- for lease in self._scheduler.create_leases():
|
|
| 69 |
+ # For now, one lease at a time.
|
|
| 70 |
+ lease = self._scheduler.create_lease()
|
|
| 71 |
+ if lease:
|
|
| 70 | 72 |
bot_session.leases.extend([lease])
|
| 71 | 73 |
|
| 72 | 74 |
return bot_session
|
| ... | ... | @@ -83,8 +85,11 @@ class BotsInterface: |
| 83 | 85 |
del bot_session.leases[:]
|
| 84 | 86 |
bot_session.leases.extend(leases)
|
| 85 | 87 |
|
| 86 |
- for lease in self._scheduler.create_leases():
|
|
| 87 |
- bot_session.leases.extend([lease])
|
|
| 88 |
+ # For now, one lease at a time
|
|
| 89 |
+ if not bot_session.leases:
|
|
| 90 |
+ lease = self._scheduler.create_lease()
|
|
| 91 |
+ if lease:
|
|
| 92 |
+ bot_session.leases.extend([lease])
|
|
| 88 | 93 |
|
| 89 | 94 |
self._bot_sessions[name] = bot_session
|
| 90 | 95 |
return bot_session
|
| ... | ... | @@ -46,8 +46,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 46 | 46 |
|
| 47 | 47 |
def FindMissingBlobs(self, request, context):
|
| 48 | 48 |
try:
|
| 49 |
+ self.logger.debug("FindMissingBlobs request: [{}]".format(request))
|
|
| 49 | 50 |
instance = self._get_instance(request.instance_name)
|
| 50 |
- return instance.find_missing_blobs(request.blob_digests)
|
|
| 51 |
+ response = instance.find_missing_blobs(request.blob_digests)
|
|
| 52 |
+ self.logger.debug("FindMissingBlobs response: [{}]".format(response))
|
|
| 53 |
+ return response
|
|
| 51 | 54 |
|
| 52 | 55 |
except InvalidArgumentError as e:
|
| 53 | 56 |
self.logger.error(e)
|
| ... | ... | @@ -58,8 +61,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 58 | 61 |
|
| 59 | 62 |
def BatchUpdateBlobs(self, request, context):
|
| 60 | 63 |
try:
|
| 64 |
+ self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
|
|
| 61 | 65 |
instance = self._get_instance(request.instance_name)
|
| 62 |
- return instance.batch_update_blobs(request.requests)
|
|
| 66 |
+ response = instance.batch_update_blobs(request.requests)
|
|
| 67 |
+ self.logger.debug("FindMissingBlobs response: [{}]".format(response))
|
|
| 68 |
+ return response
|
|
| 63 | 69 |
|
| 64 | 70 |
except InvalidArgumentError as e:
|
| 65 | 71 |
self.logger.error(e)
|
| ... | ... | @@ -102,6 +108,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 102 | 108 |
|
| 103 | 109 |
def Read(self, request, context):
|
| 104 | 110 |
try:
|
| 111 |
+ self.logger.debug("Read request: [{}]".format(request))
|
|
| 105 | 112 |
path = request.resource_name.split("/")
|
| 106 | 113 |
instance_name = path[0]
|
| 107 | 114 |
|
| ... | ... | @@ -141,10 +148,13 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 141 | 148 |
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
| 142 | 149 |
yield bytestream_pb2.ReadResponse()
|
| 143 | 150 |
|
| 151 |
+ self.logger.debug("Read finished.")
|
|
| 152 |
+ |
|
| 144 | 153 |
def Write(self, requests, context):
|
| 145 | 154 |
try:
|
| 146 | 155 |
requests, request_probe = tee(requests, 2)
|
| 147 | 156 |
first_request = next(request_probe)
|
| 157 |
+ self.logger.debug("First write request: [{}]".format(first_request))
|
|
| 148 | 158 |
|
| 149 | 159 |
path = first_request.resource_name.split("/")
|
| 150 | 160 |
|
| ... | ... | @@ -164,7 +174,9 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 164 | 174 |
raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
|
| 165 | 175 |
|
| 166 | 176 |
instance = self._get_instance(instance_name)
|
| 167 |
- return instance.write(requests)
|
|
| 177 |
+ response = instance.write(requests)
|
|
| 178 |
+ self.logger.debug("Write response: [{}]".format(response))
|
|
| 179 |
+ return response
|
|
| 168 | 180 |
|
| 169 | 181 |
except NotImplementedError as e:
|
| 170 | 182 |
self.logger.error(e)
|
| ... | ... | @@ -21,8 +21,11 @@ An instance of the Remote Execution Service. |
| 21 | 21 |
|
| 22 | 22 |
import logging
|
| 23 | 23 |
|
| 24 |
+import grpc
|
|
| 25 |
+ |
|
| 24 | 26 |
from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
|
| 25 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
|
| 28 |
+from buildgrid._protos.google.longrunning import operations_pb2
|
|
| 26 | 29 |
|
| 27 | 30 |
from ..job import Job
|
| 28 | 31 |
|
| ... | ... | @@ -71,7 +74,12 @@ class ExecutionInstance: |
| 71 | 74 |
|
| 72 | 75 |
def stream_operation_updates(self, message_queue, operation_name):
|
| 73 | 76 |
operation = message_queue.get()
|
| 77 |
+ last_operation = operation
|
|
| 74 | 78 |
while not operation.done:
|
| 75 | 79 |
yield operation
|
| 80 |
+ last_operation = operation
|
|
| 76 | 81 |
operation = message_queue.get()
|
| 82 |
+ if not isinstance(operation, operations_pb2.Operation):
|
|
| 83 |
+ message_queue.context_.cancel()
|
|
| 84 |
+ operation = last_operation
|
|
| 77 | 85 |
yield operation
|
| ... | ... | @@ -52,6 +52,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 52 | 52 |
context.add_callback(partial(instance.unregister_message_client,
|
| 53 | 53 |
operation.name, message_queue))
|
| 54 | 54 |
|
| 55 |
+ message_queue.context_ = context
|
|
| 56 |
+ |
|
| 55 | 57 |
yield from instance.stream_operation_updates(message_queue,
|
| 56 | 58 |
operation.name)
|
| 57 | 59 |
|
| ... | ... | @@ -68,9 +70,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 68 | 70 |
yield operations_pb2.Operation()
|
| 69 | 71 |
|
| 70 | 72 |
def WaitExecution(self, request, context):
|
| 73 |
+ print('Client reopened the operation stream!')
|
|
| 74 |
+ |
|
| 71 | 75 |
try:
|
| 72 | 76 |
names = request.name.split("/")
|
| 73 |
- |
|
| 74 | 77 |
# Operation name should be in format:
|
| 75 | 78 |
# {instance/name}/{operation_id}
|
| 76 | 79 |
instance_name = ''.join(names[0:-1])
|
| ... | ... | @@ -122,8 +122,11 @@ class Job: |
| 122 | 122 |
if self.result is not None:
|
| 123 | 123 |
self._operation.done = True
|
| 124 | 124 |
response = remote_execution_pb2.ExecuteResponse(result=self.result,
|
| 125 |
- cached_result=self.result_cached,
|
|
| 126 |
- status=self.lease.status)
|
|
| 125 |
+ cached_result=self.result_cached)
|
|
| 126 |
+ |
|
| 127 |
+ if not self.result_cached:
|
|
| 128 |
+ response.status.CopyFrom(self.lease.status)
|
|
| 129 |
+ |
|
| 127 | 130 |
self._operation.response.CopyFrom(self._pack_any(response))
|
| 128 | 131 |
|
| 129 | 132 |
return self._operation
|
| ... | ... | @@ -43,14 +43,16 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
| 43 | 43 |
|
| 44 | 44 |
def GetOperation(self, request, context):
|
| 45 | 45 |
try:
|
| 46 |
- name = request.name
|
|
| 47 |
- operation_name = self._get_operation_name(name)
|
|
| 46 |
+ operation_name, job = request.name, None
|
|
| 47 |
+ for instance in self._instances.values():
|
|
| 48 |
+ if operation_name in instance._scheduler.jobs:
|
|
| 49 |
+ job = instance._scheduler.jobs.get(operation_name)
|
|
| 50 |
+ break
|
|
| 48 | 51 |
|
| 49 |
- instance = self._get_instance(name)
|
|
| 52 |
+ print('Connection drop simulation request received')
|
|
| 50 | 53 |
|
| 51 |
- operation = instance.get_operation(operation_name)
|
|
| 52 |
- operation.name = name
|
|
| 53 |
- return operation
|
|
| 54 |
+ for queue in job._operation_update_queues:
|
|
| 55 |
+ queue.put('drop')
|
|
| 54 | 56 |
|
| 55 | 57 |
except InvalidArgumentError as e:
|
| 56 | 58 |
self.logger.error(e)
|
| ... | ... | @@ -47,7 +47,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer): |
| 47 | 47 |
context.set_details(str(e))
|
| 48 | 48 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 49 | 49 |
|
| 50 |
- except NotFoundError:
|
|
| 50 |
+ except NotFoundError as e:
|
|
| 51 |
+ self.logger.debug(e)
|
|
| 51 | 52 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 52 | 53 |
|
| 53 | 54 |
return buildstream_pb2.GetReferenceResponse()
|
| ... | ... | @@ -23,8 +23,6 @@ Schedules jobs. |
| 23 | 23 |
|
| 24 | 24 |
from collections import deque
|
| 25 | 25 |
|
| 26 |
-from google.protobuf import any_pb2
|
|
| 27 |
- |
|
| 28 | 26 |
from buildgrid._exceptions import NotFoundError
|
| 29 | 27 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 30 | 28 |
from buildgrid._protos.google.longrunning import operations_pb2
|
| ... | ... | @@ -60,9 +58,7 @@ class Scheduler: |
| 60 | 58 |
job.update_execute_stage(ExecuteStage.QUEUED)
|
| 61 | 59 |
|
| 62 | 60 |
else:
|
| 63 |
- cached_result_any = any_pb2.Any()
|
|
| 64 |
- cached_result_any.Pack(cached_result)
|
|
| 65 |
- job.result = cached_result_any
|
|
| 61 |
+ job.result = cached_result
|
|
| 66 | 62 |
job.result_cached = True
|
| 67 | 63 |
job.update_execute_stage(ExecuteStage.COMPLETED)
|
| 68 | 64 |
|
| ... | ... | @@ -112,10 +108,11 @@ class Scheduler: |
| 112 | 108 |
if state in (LeaseState.PENDING.value, LeaseState.ACTIVE.value):
|
| 113 | 109 |
self.retry_job(name)
|
| 114 | 110 |
|
| 115 |
- def create_leases(self):
|
|
| 116 |
- while self.queue:
|
|
| 111 |
+ def create_lease(self):
|
|
| 112 |
+ if self.queue:
|
|
| 117 | 113 |
job = self.queue.popleft()
|
| 118 | 114 |
job.update_execute_stage(ExecuteStage.EXECUTING)
|
| 119 | 115 |
job.create_lease()
|
| 120 | 116 |
job.lease.state = LeaseState.PENDING.value
|
| 121 |
- yield job.lease
|
|
| 117 |
+ return job.lease
|
|
| 118 |
+ return None
|
| ... | ... | @@ -129,7 +129,7 @@ def test_number_of_leases(number_of_jobs, bot_session, context, instance): |
| 129 | 129 |
request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
|
| 130 | 130 |
response = instance.CreateBotSession(request, context)
|
| 131 | 131 |
|
| 132 |
- assert len(response.leases) == number_of_jobs
|
|
| 132 |
+ assert len(response.leases) == min(number_of_jobs, 1)
|
|
| 133 | 133 |
|
| 134 | 134 |
|
| 135 | 135 |
def test_update_leases_with_work(bot_session, context, instance):
|
