Martin Blanchard pushed to branch mablanch/90-actionresult-message-size at BuildGrid / buildgrid
Commits:
-
97e9d231
by Martin Blanchard at 2019-01-09T09:03:52Z
-
dc19f204
by Martin Blanchard at 2019-01-09T09:03:52Z
-
d7e793b0
by Martin Blanchard at 2019-01-09T09:03:52Z
-
aed13219
by Martin Blanchard at 2019-01-09T09:03:52Z
-
80341fc6
by Martin Blanchard at 2019-01-10T12:15:40Z
-
6b9e3a72
by Martin Blanchard at 2019-01-10T12:15:40Z
-
9176de9b
by Martin Blanchard at 2019-01-10T12:15:40Z
-
97734e28
by Martin Blanchard at 2019-01-10T12:15:40Z
-
0be7a01b
by Martin Blanchard at 2019-01-10T12:15:40Z
6 changed files:
- buildgrid/_app/bots/buildbox.py
- buildgrid/_app/bots/host.py
- buildgrid/bot/interface.py
- buildgrid/server/bots/instance.py
- buildgrid/server/job.py
- buildgrid/server/scheduler.py
Changes:
| ... | ... | @@ -21,7 +21,7 @@ import tempfile |
| 21 | 21 |
from buildgrid.client.cas import download, upload
|
| 22 | 22 |
from buildgrid._exceptions import BotError
|
| 23 | 23 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 24 |
-from buildgrid.settings import HASH_LENGTH
|
|
| 24 |
+from buildgrid.settings import HASH_LENGTH, MAX_REQUEST_SIZE
|
|
| 25 | 25 |
from buildgrid.utils import read_file, write_file
|
| 26 | 26 |
|
| 27 | 27 |
|
| ... | ... | @@ -52,9 +52,10 @@ def work_buildbox(lease, context, event): |
| 52 | 52 |
else:
|
| 53 | 53 |
working_directory = '/'
|
| 54 | 54 |
|
| 55 |
- logger.debug("command hash: {}".format(action.command_digest.hash))
|
|
| 56 |
- logger.debug("vdir hash: {}".format(action.input_root_digest.hash))
|
|
| 57 |
- logger.debug("\n{}".format(' '.join(command.arguments)))
|
|
| 55 |
+ logger.debug("Command digest: [{}/{}]"
|
|
| 56 |
+ .format(action.command_digest.hash, action.command_digest.size_bytes))
|
|
| 57 |
+ logger.debug("Input root digest: [{}/{}]"
|
|
| 58 |
+ .format(action.input_root_digest.hash, action.input_root_digest.size_bytes))
|
|
| 58 | 59 |
|
| 59 | 60 |
os.makedirs(os.path.join(local_cas_directory, 'tmp'), exist_ok=True)
|
| 60 | 61 |
os.makedirs(context.fuse_dir, exist_ok=True)
|
| ... | ... | @@ -87,9 +88,7 @@ def work_buildbox(lease, context, event): |
| 87 | 88 |
command_line.append(context.fuse_dir)
|
| 88 | 89 |
command_line.extend(command.arguments)
|
| 89 | 90 |
|
| 90 |
- logger.debug(' '.join(command_line))
|
|
| 91 |
- logger.debug("Input root digest:\n{}".format(action.input_root_digest))
|
|
| 92 |
- logger.info("Launching process")
|
|
| 91 |
+ logger.info("Starting execution: [{}...]".format(command.arguments[0]))
|
|
| 93 | 92 |
|
| 94 | 93 |
command_line = subprocess.Popen(command_line,
|
| 95 | 94 |
stdin=subprocess.PIPE,
|
| ... | ... | @@ -97,22 +96,17 @@ def work_buildbox(lease, context, event): |
| 97 | 96 |
stderr=subprocess.PIPE)
|
| 98 | 97 |
stdout, stderr = command_line.communicate()
|
| 99 | 98 |
returncode = command_line.returncode
|
| 99 |
+ |
|
| 100 | 100 |
action_result = remote_execution_pb2.ActionResult()
|
| 101 |
- # TODO: Upload to CAS or output RAW
|
|
| 102 |
- # For now, just pass raw
|
|
| 103 |
- # https://gitlab.com/BuildGrid/buildgrid/issues/90
|
|
| 104 |
- action_result.stdout_raw = stdout
|
|
| 105 |
- action_result.stderr_raw = stderr
|
|
| 106 | 101 |
action_result.exit_code = returncode
|
| 107 | 102 |
|
| 108 |
- logger.debug("BuildBox stderr: [{}]".format(stderr))
|
|
| 109 |
- logger.debug("BuildBox stdout: [{}]".format(stdout))
|
|
| 110 |
- logger.debug("BuildBox exit code: [{}]".format(returncode))
|
|
| 103 |
+ logger.info("Execution finished with code: [{}]".format(returncode))
|
|
| 111 | 104 |
|
| 112 | 105 |
output_digest = remote_execution_pb2.Digest()
|
| 113 | 106 |
output_digest.ParseFromString(read_file(output_digest_file.name))
|
| 114 | 107 |
|
| 115 |
- logger.debug("Output root digest: [{}]".format(output_digest))
|
|
| 108 |
+ logger.debug("Output root digest: [{}/{}]"
|
|
| 109 |
+ .format(output_digest.hash, output_digest.size_bytes))
|
|
| 116 | 110 |
|
| 117 | 111 |
if len(output_digest.hash) != HASH_LENGTH:
|
| 118 | 112 |
raise BotError(stdout,
|
| ... | ... | @@ -126,11 +120,25 @@ def work_buildbox(lease, context, event): |
| 126 | 120 |
with upload(context.cas_channel) as uploader:
|
| 127 | 121 |
output_tree_digest = uploader.put_message(output_tree)
|
| 128 | 122 |
|
| 129 |
- output_directory = remote_execution_pb2.OutputDirectory()
|
|
| 130 |
- output_directory.tree_digest.CopyFrom(output_tree_digest)
|
|
| 131 |
- output_directory.path = os.path.relpath(working_directory, start='/')
|
|
| 123 |
+ output_directory = remote_execution_pb2.OutputDirectory()
|
|
| 124 |
+ output_directory.tree_digest.CopyFrom(output_tree_digest)
|
|
| 125 |
+ output_directory.path = os.path.relpath(working_directory, start='/')
|
|
| 126 |
+ |
|
| 127 |
+ action_result.output_directories.extend([output_directory])
|
|
| 128 |
+ |
|
| 129 |
+ if action_result.ByteSize() + len(stdout) > MAX_REQUEST_SIZE:
|
|
| 130 |
+ stdout_digest = uploader.put_blob(stdout)
|
|
| 131 |
+ action_result.stdout_digest.CopyFrom(stdout_digest)
|
|
| 132 |
+ |
|
| 133 |
+ else:
|
|
| 134 |
+ action_result.stdout_raw = stdout
|
|
| 135 |
+ |
|
| 136 |
+ if action_result.ByteSize() + len(stderr) > MAX_REQUEST_SIZE:
|
|
| 137 |
+ stderr_digest = uploader.put_blob(stderr)
|
|
| 138 |
+ action_result.stderr_digest.CopyFrom(stderr_digest)
|
|
| 132 | 139 |
|
| 133 |
- action_result.output_directories.extend([output_directory])
|
|
| 140 |
+ else:
|
|
| 141 |
+ action_result.stderr_raw = stderr
|
|
| 134 | 142 |
|
| 135 | 143 |
lease.result.Pack(action_result)
|
| 136 | 144 |
|
| ... | ... | @@ -20,6 +20,7 @@ import tempfile |
| 20 | 20 |
|
| 21 | 21 |
from buildgrid.client.cas import download, upload
|
| 22 | 22 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 23 |
+from buildgrid.settings import MAX_REQUEST_SIZE
|
|
| 23 | 24 |
from buildgrid.utils import get_hostname, output_file_maker, output_directory_maker
|
| 24 | 25 |
|
| 25 | 26 |
|
| ... | ... | @@ -27,6 +28,7 @@ def work_host_tools(lease, context, event): |
| 27 | 28 |
"""Executes a lease for a build action, using host tools.
|
| 28 | 29 |
"""
|
| 29 | 30 |
instance_name = context.parent
|
| 31 |
+ |
|
| 30 | 32 |
logger = logging.getLogger(__name__)
|
| 31 | 33 |
|
| 32 | 34 |
action_digest = remote_execution_pb2.Digest()
|
| ... | ... | @@ -51,6 +53,11 @@ def work_host_tools(lease, context, event): |
| 51 | 53 |
|
| 52 | 54 |
downloader.download_directory(action.input_root_digest, temp_directory)
|
| 53 | 55 |
|
| 56 |
+ logger.debug("Command digest: [{}/{}]"
|
|
| 57 |
+ .format(action.command_digest.hash, action.command_digest.size_bytes))
|
|
| 58 |
+ logger.debug("Input root digest: [{}/{}]"
|
|
| 59 |
+ .format(action.input_root_digest.hash, action.input_root_digest.size_bytes))
|
|
| 60 |
+ |
|
| 54 | 61 |
action_result.execution_metadata.input_fetch_completed_timestamp.GetCurrentTime()
|
| 55 | 62 |
|
| 56 | 63 |
environment = os.environ.copy()
|
| ... | ... | @@ -76,7 +83,7 @@ def work_host_tools(lease, context, event): |
| 76 | 83 |
os.path.dirname(output_path))
|
| 77 | 84 |
os.makedirs(directory_path, exist_ok=True)
|
| 78 | 85 |
|
| 79 |
- logger.debug(' '.join(command_line))
|
|
| 86 |
+ logger.info("Starting execution: [{}...]".format(command.arguments[0]))
|
|
| 80 | 87 |
|
| 81 | 88 |
action_result.execution_metadata.execution_start_timestamp.GetCurrentTime()
|
| 82 | 89 |
|
| ... | ... | @@ -92,16 +99,9 @@ def work_host_tools(lease, context, event): |
| 92 | 99 |
|
| 93 | 100 |
action_result.execution_metadata.execution_completed_timestamp.GetCurrentTime()
|
| 94 | 101 |
|
| 95 |
- # TODO: Upload to CAS or output RAW
|
|
| 96 |
- # For now, just pass raw
|
|
| 97 |
- # https://gitlab.com/BuildGrid/buildgrid/issues/90
|
|
| 98 |
- action_result.stdout_raw = stdout
|
|
| 99 |
- action_result.stderr_raw = stderr
|
|
| 100 | 102 |
action_result.exit_code = returncode
|
| 101 | 103 |
|
| 102 |
- logger.debug("Command stderr: [{}]".format(stderr))
|
|
| 103 |
- logger.debug("Command stdout: [{}]".format(stdout))
|
|
| 104 |
- logger.debug("Command exit code: [{}]".format(returncode))
|
|
| 104 |
+ logger.info("Execution finished with code: [{}]".format(returncode))
|
|
| 105 | 105 |
|
| 106 | 106 |
action_result.execution_metadata.output_upload_start_timestamp.GetCurrentTime()
|
| 107 | 107 |
|
| ... | ... | @@ -119,6 +119,9 @@ def work_host_tools(lease, context, event): |
| 119 | 119 |
file_digest)
|
| 120 | 120 |
output_files.append(output_file)
|
| 121 | 121 |
|
| 122 |
+ logger.debug("Output file digest: [{}/{}]"
|
|
| 123 |
+ .format(file_digest.hash, file_digest.size_bytes))
|
|
| 124 |
+ |
|
| 122 | 125 |
action_result.output_files.extend(output_files)
|
| 123 | 126 |
|
| 124 | 127 |
for output_path in command.output_directories:
|
| ... | ... | @@ -132,8 +135,25 @@ def work_host_tools(lease, context, event): |
| 132 | 135 |
tree_digest)
|
| 133 | 136 |
output_directories.append(output_directory)
|
| 134 | 137 |
|
| 138 |
+ logger.debug("Output tree digest: [{}/{}]"
|
|
| 139 |
+ .format(tree_digest.hash, tree_digest.size_bytes))
|
|
| 140 |
+ |
|
| 135 | 141 |
action_result.output_directories.extend(output_directories)
|
| 136 | 142 |
|
| 143 |
+ if action_result.ByteSize() + len(stdout) > MAX_REQUEST_SIZE:
|
|
| 144 |
+ stdout_digest = uploader.put_blob(stdout)
|
|
| 145 |
+ action_result.stdout_digest.CopyFrom(stdout_digest)
|
|
| 146 |
+ |
|
| 147 |
+ else:
|
|
| 148 |
+ action_result.stdout_raw = stdout
|
|
| 149 |
+ |
|
| 150 |
+ if action_result.ByteSize() + len(stderr) > MAX_REQUEST_SIZE:
|
|
| 151 |
+ stderr_digest = uploader.put_blob(stderr)
|
|
| 152 |
+ action_result.stderr_digest.CopyFrom(stderr_digest)
|
|
| 153 |
+ |
|
| 154 |
+ else:
|
|
| 155 |
+ action_result.stderr_raw = stderr
|
|
| 156 |
+ |
|
| 137 | 157 |
action_result.execution_metadata.output_upload_completed_timestamp.GetCurrentTime()
|
| 138 | 158 |
|
| 139 | 159 |
lease.result.Pack(action_result)
|
| ... | ... | @@ -57,5 +57,5 @@ class BotInterface: |
| 57 | 57 |
try:
|
| 58 | 58 |
return call(request)
|
| 59 | 59 |
except grpc.RpcError as e:
|
| 60 |
- self.__logger.error(e.code())
|
|
| 60 |
+ self.__logger.error(e)
|
|
| 61 | 61 |
return e.code()
|
| ... | ... | @@ -83,7 +83,7 @@ class BotsInterface: |
| 83 | 83 |
self._check_bot_ids(bot_session.bot_id, name)
|
| 84 | 84 |
self._check_assigned_leases(bot_session)
|
| 85 | 85 |
|
| 86 |
- for lease in bot_session.leases:
|
|
| 86 |
+ for lease in list(bot_session.leases):
|
|
| 87 | 87 |
checked_lease = self._check_lease_state(lease)
|
| 88 | 88 |
if not checked_lease:
|
| 89 | 89 |
# TODO: Make sure we don't need this
|
| ... | ... | @@ -91,7 +91,10 @@ class BotsInterface: |
| 91 | 91 |
self._assigned_leases[name].remove(lease.id)
|
| 92 | 92 |
except KeyError:
|
| 93 | 93 |
pass
|
| 94 |
- lease.Clear()
|
|
| 94 |
+ |
|
| 95 |
+ self._scheduler.delete_job_lease(lease.id)
|
|
| 96 |
+ |
|
| 97 |
+ bot_session.leases.remove(lease)
|
|
| 95 | 98 |
|
| 96 | 99 |
self._request_leases(bot_session)
|
| 97 | 100 |
return bot_session
|
| ... | ... | @@ -117,7 +120,7 @@ class BotsInterface: |
| 117 | 120 |
|
| 118 | 121 |
try:
|
| 119 | 122 |
if self._scheduler.get_job_lease_cancelled(lease.id):
|
| 120 |
- lease.state.CopyFrom(LeaseState.CANCELLED.value)
|
|
| 123 |
+ lease.state = LeaseState.CANCELLED.value
|
|
| 121 | 124 |
return lease
|
| 122 | 125 |
except KeyError:
|
| 123 | 126 |
# Job does not exist, remove from bot.
|
| ... | ... | @@ -222,6 +222,13 @@ class Job: |
| 222 | 222 |
if self._lease is not None:
|
| 223 | 223 |
self.update_lease_state(LeaseState.CANCELLED)
|
| 224 | 224 |
|
| 225 |
+ def delete_lease(self):
|
|
| 226 |
+ """Discard the job's :class:Lease."""
|
|
| 227 |
+ self.__worker_start_timestamp.Clear()
|
|
| 228 |
+ self.__worker_completed_timestamp.Clear()
|
|
| 229 |
+ |
|
| 230 |
+ self._lease = None
|
|
| 231 |
+ |
|
| 225 | 232 |
def update_operation_stage(self, stage):
|
| 226 | 233 |
"""Operates a stage transition for the job's :class:Operation.
|
| 227 | 234 |
|
| ... | ... | @@ -62,18 +62,8 @@ class Scheduler: |
| 62 | 62 |
|
| 63 | 63 |
job.unregister_client(queue)
|
| 64 | 64 |
|
| 65 |
- if not job.n_clients and job.operation.done:
|
|
| 66 |
- del self.jobs[job_name]
|
|
| 67 |
- |
|
| 68 |
- if self._is_instrumented:
|
|
| 69 |
- self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
|
|
| 70 |
- self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
|
|
| 71 |
- self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
|
|
| 72 |
- self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
|
|
| 73 |
- |
|
| 74 |
- self.__leases_by_state[LeaseState.PENDING].discard(job_name)
|
|
| 75 |
- self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
|
|
| 76 |
- self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
|
|
| 65 |
+ if not job.n_clients and job.operation.done and not job.lease:
|
|
| 66 |
+ self._delete_job(job.name)
|
|
| 77 | 67 |
|
| 78 | 68 |
def queue_job(self, job, skip_cache_lookup=False):
|
| 79 | 69 |
self.jobs[job.name] = job
|
| ... | ... | @@ -199,6 +189,15 @@ class Scheduler: |
| 199 | 189 |
"""Returns true if the lease is cancelled"""
|
| 200 | 190 |
return self.jobs[job_name].lease_cancelled
|
| 201 | 191 |
|
| 192 |
+ def delete_job_lease(self, job_name):
|
|
| 193 |
+ """Discards the lease associated to a job."""
|
|
| 194 |
+ job = self.jobs[job_name]
|
|
| 195 |
+ |
|
| 196 |
+ self.jobs[job.name].delete_lease()
|
|
| 197 |
+ |
|
| 198 |
+ if not job.n_clients and job.operation.done:
|
|
| 199 |
+ self._delete_job(job.name)
|
|
| 200 |
+ |
|
| 202 | 201 |
def get_job_operation(self, job_name):
|
| 203 | 202 |
"""Returns the operation associated to job."""
|
| 204 | 203 |
return self.jobs[job_name].operation
|
| ... | ... | @@ -296,6 +295,20 @@ class Scheduler: |
| 296 | 295 |
|
| 297 | 296 |
# --- Private API ---
|
| 298 | 297 |
|
| 298 |
+ def _delete_job(self, job_name):
|
|
| 299 |
+ """Drops an entry from the internal list of jobs."""
|
|
| 300 |
+ del self.jobs[job_name]
|
|
| 301 |
+ |
|
| 302 |
+ if self._is_instrumented:
|
|
| 303 |
+ self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
|
|
| 304 |
+ self.__operations_by_stage[OperationStage.QUEUED].discard(job_name)
|
|
| 305 |
+ self.__operations_by_stage[OperationStage.EXECUTING].discard(job_name)
|
|
| 306 |
+ self.__operations_by_stage[OperationStage.COMPLETED].discard(job_name)
|
|
| 307 |
+ |
|
| 308 |
+ self.__leases_by_state[LeaseState.PENDING].discard(job_name)
|
|
| 309 |
+ self.__leases_by_state[LeaseState.ACTIVE].discard(job_name)
|
|
| 310 |
+ self.__leases_by_state[LeaseState.COMPLETED].discard(job_name)
|
|
| 311 |
+ |
|
| 299 | 312 |
def _update_job_operation_stage(self, job_name, operation_stage):
|
| 300 | 313 |
"""Requests a stage transition for the job's :class:Operations.
|
| 301 | 314 |
|
