Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
-
34e81ae1
by Tiago Gomes at 2018-09-30T06:33:34Z
-
b842658c
by Tiago Gomes at 2018-09-30T06:33:46Z
-
e3ff069e
by Tiago Gomes at 2018-09-30T06:33:49Z
-
9bca9183
by Tiago Gomes at 2018-09-30T06:34:21Z
-
2d025076
by Jürg Billeter at 2018-09-30T06:39:57Z
-
01831afe
by Jürg Billeter at 2018-09-30T06:41:13Z
-
1b7245da
by Jürg Billeter at 2018-09-30T06:41:18Z
-
fd46a9f9
by Jürg Billeter at 2018-09-30T06:41:25Z
-
764b7517
by Jürg Billeter at 2018-10-01T15:42:08Z
-
a009dcbe
by Tristan Van Berkom at 2018-10-02T11:51:19Z
-
7da5104b
by Tristan Van Berkom at 2018-10-02T13:18:54Z
-
6e820362
by Tristan Van Berkom at 2018-10-03T07:35:03Z
-
9568824f
by Jim MacArthur at 2018-10-03T07:35:51Z
-
4a67e4e3
by Jürg Billeter at 2018-10-03T07:35:51Z
-
f585b233
by Jürg Billeter at 2018-10-03T07:35:51Z
-
244e3c7c
by Tristan Van Berkom at 2018-10-03T08:05:47Z
-
3f4587ab
by Tristan Van Berkom at 2018-10-03T09:36:34Z
-
a33fd160
by Tristan Van Berkom at 2018-10-03T10:00:50Z
-
e80f435a
by Tristan Van Berkom at 2018-10-03T12:06:46Z
-
013a8ad4
by Tristan Van Berkom at 2018-10-03T12:37:24Z
-
262e789f
by Tristan Van Berkom at 2018-10-03T13:03:52Z
-
10d69988
by Tristan Van Berkom at 2018-10-03T13:03:56Z
-
c02a1ae8
by Tristan Van Berkom at 2018-10-03T13:04:03Z
-
eb92e8e9
by Tristan Van Berkom at 2018-10-03T13:44:31Z
-
26d48cc9
by Valentin David at 2018-10-04T14:55:13Z
-
96f09d48
by Valentin David at 2018-10-04T15:16:46Z
-
10abe77f
by Tristan Van Berkom at 2018-10-05T07:01:57Z
-
f0fc54d4
by Valentin David at 2018-10-17T15:19:21Z
-
e66e32ae
by Javier Jardón at 2018-10-17T15:43:16Z
-
405d6879
by Valentin David at 2018-10-25T11:55:00Z
-
0a522cad
by Valentin David at 2018-10-25T11:55:00Z
-
5ebd4138
by Valentin David at 2018-10-25T11:55:00Z
-
c4dc49e1
by Valentin David at 2018-10-25T11:55:00Z
-
f0e5ac6f
by Valentin David at 2018-10-25T11:55:00Z
-
e7dad590
by Valentin David at 2018-10-25T11:55:00Z
-
4be0f7dc
by Valentin David at 2018-10-25T11:59:17Z
17 changed files:
- NEWS
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/queue.py
- buildstream/_scheduler/scheduler.py
- buildstream/plugins/sources/git.py
- buildstream/source.py
- buildstream/utils.py
- doc/examples/autotools/project.conf
- doc/examples/integration-commands/project.conf
- doc/examples/running-commands/project.conf
- tests/frontend/push.py
- tests/integration/project/project.conf
- tests/testutils/artifactshare.py
Changes:
| 1 |
+=================
|
|
| 2 |
+buildstream 1.2.3
|
|
| 3 |
+=================
|
|
| 4 |
+ |
|
| 5 |
+ o Fixed an unhandled exception when cleaning up a build sandbox (#153)
|
|
| 6 |
+ |
|
| 7 |
+ o Fixed race condition when calculating cache size and commiting artifacts
|
|
| 8 |
+ |
|
| 9 |
+ o Fixed regression where terminating with `^C` results in a double user interrogation (#693)
|
|
| 10 |
+ |
|
| 11 |
+ o Fixed regression in summary when builds are terminated (#479)
|
|
| 12 |
+ |
|
| 13 |
+ o Fixed regression where irrelevant status messages appear from git sources
|
|
| 14 |
+ |
|
| 15 |
+ o Improve performance of artifact uploads by batching file transfers (#676/#677)
|
|
| 16 |
+ |
|
| 17 |
+ o Fixed performance of artifact downloads by batching file transfers (#554)
|
|
| 18 |
+ |
|
| 19 |
+ o Fixed checks for paths which escape the project directory (#673)
|
|
| 20 |
+ |
|
| 1 | 21 |
=================
|
| 2 | 22 |
buildstream 1.2.2
|
| 3 | 23 |
=================
|
| ... | ... | @@ -277,7 +277,7 @@ class ArtifactCache(): |
| 277 | 277 |
"Please increase the cache-quota in {}."
|
| 278 | 278 |
.format(self.context.config_origin or default_conf))
|
| 279 | 279 |
|
| 280 |
- if self.get_quota_exceeded():
|
|
| 280 |
+ if self.has_quota_exceeded():
|
|
| 281 | 281 |
raise ArtifactError("Cache too full. Aborting.",
|
| 282 | 282 |
detail=detail,
|
| 283 | 283 |
reason="cache-too-full")
|
| ... | ... | @@ -364,14 +364,14 @@ class ArtifactCache(): |
| 364 | 364 |
self._cache_size = cache_size
|
| 365 | 365 |
self._write_cache_size(self._cache_size)
|
| 366 | 366 |
|
| 367 |
- # get_quota_exceeded()
|
|
| 367 |
+ # has_quota_exceeded()
|
|
| 368 | 368 |
#
|
| 369 | 369 |
# Checks if the current artifact cache size exceeds the quota.
|
| 370 | 370 |
#
|
| 371 | 371 |
# Returns:
|
| 372 | 372 |
# (bool): True of the quota is exceeded
|
| 373 | 373 |
#
|
| 374 |
- def get_quota_exceeded(self):
|
|
| 374 |
+ def has_quota_exceeded(self):
|
|
| 375 | 375 |
return self.get_cache_size() > self._cache_quota
|
| 376 | 376 |
|
| 377 | 377 |
################################################
|
| ... | ... | @@ -26,6 +26,7 @@ import stat |
| 26 | 26 |
import tempfile
|
| 27 | 27 |
import uuid
|
| 28 | 28 |
import errno
|
| 29 |
+import contextlib
|
|
| 29 | 30 |
from urllib.parse import urlparse
|
| 30 | 31 |
|
| 31 | 32 |
import grpc
|
| ... | ... | @@ -43,6 +44,11 @@ from .._exceptions import ArtifactError |
| 43 | 44 |
from . import ArtifactCache
|
| 44 | 45 |
|
| 45 | 46 |
|
| 47 |
+# The default limit for gRPC messages is 4 MiB.
|
|
| 48 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
| 49 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
| 50 |
+ |
|
| 51 |
+ |
|
| 46 | 52 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
| 47 | 53 |
# Remote Execution API.
|
| 48 | 54 |
#
|
| ... | ... | @@ -76,6 +82,7 @@ class CASCache(ArtifactCache): |
| 76 | 82 |
################################################
|
| 77 | 83 |
# Implementation of abstract methods #
|
| 78 | 84 |
################################################
|
| 85 |
+ |
|
| 79 | 86 |
def contains(self, element, key):
|
| 80 | 87 |
refpath = self._refpath(self.get_artifact_fullname(element, key))
|
| 81 | 88 |
|
| ... | ... | @@ -115,7 +122,7 @@ class CASCache(ArtifactCache): |
| 115 | 122 |
def commit(self, element, content, keys):
|
| 116 | 123 |
refs = [self.get_artifact_fullname(element, key) for key in keys]
|
| 117 | 124 |
|
| 118 |
- tree = self._create_tree(content)
|
|
| 125 |
+ tree = self._commit_directory(content)
|
|
| 119 | 126 |
|
| 120 | 127 |
for ref in refs:
|
| 121 | 128 |
self.set_ref(ref, tree)
|
| ... | ... | @@ -151,6 +158,7 @@ class CASCache(ArtifactCache): |
| 151 | 158 |
q = multiprocessing.Queue()
|
| 152 | 159 |
for remote_spec in remote_specs:
|
| 153 | 160 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 161 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
| 154 | 162 |
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
| 155 | 163 |
|
| 156 | 164 |
try:
|
| ... | ... | @@ -263,109 +271,69 @@ class CASCache(ArtifactCache): |
| 263 | 271 |
|
| 264 | 272 |
self.set_ref(newref, tree)
|
| 265 | 273 |
|
| 274 |
+ def _push_refs_to_remote(self, refs, remote):
|
|
| 275 |
+ skipped_remote = True
|
|
| 276 |
+ try:
|
|
| 277 |
+ for ref in refs:
|
|
| 278 |
+ tree = self.resolve_ref(ref)
|
|
| 279 |
+ |
|
| 280 |
+ # Check whether ref is already on the server in which case
|
|
| 281 |
+ # there is no need to push the artifact
|
|
| 282 |
+ try:
|
|
| 283 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
| 284 |
+ request.key = ref
|
|
| 285 |
+ response = remote.ref_storage.GetReference(request)
|
|
| 286 |
+ |
|
| 287 |
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
| 288 |
+ # ref is already on the server with the same tree
|
|
| 289 |
+ continue
|
|
| 290 |
+ |
|
| 291 |
+ except grpc.RpcError as e:
|
|
| 292 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 293 |
+ # Intentionally re-raise RpcError for outer except block.
|
|
| 294 |
+ raise
|
|
| 295 |
+ |
|
| 296 |
+ self._send_directory(remote, tree)
|
|
| 297 |
+ |
|
| 298 |
+ request = buildstream_pb2.UpdateReferenceRequest()
|
|
| 299 |
+ request.keys.append(ref)
|
|
| 300 |
+ request.digest.hash = tree.hash
|
|
| 301 |
+ request.digest.size_bytes = tree.size_bytes
|
|
| 302 |
+ remote.ref_storage.UpdateReference(request)
|
|
| 303 |
+ |
|
| 304 |
+ skipped_remote = False
|
|
| 305 |
+ except grpc.RpcError as e:
|
|
| 306 |
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
| 307 |
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
| 308 |
+ |
|
| 309 |
+ return not skipped_remote
|
|
| 310 |
+ |
|
| 266 | 311 |
def push(self, element, keys):
|
| 267 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
| 312 |
+ |
|
| 313 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
| 268 | 314 |
|
| 269 | 315 |
project = element._get_project()
|
| 270 | 316 |
|
| 271 | 317 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
| 272 | 318 |
|
| 273 | 319 |
pushed = False
|
| 274 |
- display_key = element._get_brief_display_key()
|
|
| 320 |
+ |
|
| 275 | 321 |
for remote in push_remotes:
|
| 276 | 322 |
remote.init()
|
| 277 |
- skipped_remote = True
|
|
| 323 |
+ display_key = element._get_brief_display_key()
|
|
| 278 | 324 |
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
| 279 | 325 |
|
| 280 |
- try:
|
|
| 281 |
- for ref in refs:
|
|
| 282 |
- tree = self.resolve_ref(ref)
|
|
| 283 |
- |
|
| 284 |
- # Check whether ref is already on the server in which case
|
|
| 285 |
- # there is no need to push the artifact
|
|
| 286 |
- try:
|
|
| 287 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
| 288 |
- request.key = ref
|
|
| 289 |
- response = remote.ref_storage.GetReference(request)
|
|
| 290 |
- |
|
| 291 |
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
|
|
| 292 |
- # ref is already on the server with the same tree
|
|
| 293 |
- continue
|
|
| 294 |
- |
|
| 295 |
- except grpc.RpcError as e:
|
|
| 296 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 297 |
- # Intentionally re-raise RpcError for outer except block.
|
|
| 298 |
- raise
|
|
| 299 |
- |
|
| 300 |
- missing_blobs = {}
|
|
| 301 |
- required_blobs = self._required_blobs(tree)
|
|
| 302 |
- |
|
| 303 |
- # Limit size of FindMissingBlobs request
|
|
| 304 |
- for required_blobs_group in _grouper(required_blobs, 512):
|
|
| 305 |
- request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
| 306 |
- |
|
| 307 |
- for required_digest in required_blobs_group:
|
|
| 308 |
- d = request.blob_digests.add()
|
|
| 309 |
- d.hash = required_digest.hash
|
|
| 310 |
- d.size_bytes = required_digest.size_bytes
|
|
| 311 |
- |
|
| 312 |
- response = remote.cas.FindMissingBlobs(request)
|
|
| 313 |
- for digest in response.missing_blob_digests:
|
|
| 314 |
- d = remote_execution_pb2.Digest()
|
|
| 315 |
- d.hash = digest.hash
|
|
| 316 |
- d.size_bytes = digest.size_bytes
|
|
| 317 |
- missing_blobs[d.hash] = d
|
|
| 318 |
- |
|
| 319 |
- # Upload any blobs missing on the server
|
|
| 320 |
- skipped_remote = False
|
|
| 321 |
- for digest in missing_blobs.values():
|
|
| 322 |
- uuid_ = uuid.uuid4()
|
|
| 323 |
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
|
|
| 324 |
- digest.hash, str(digest.size_bytes)])
|
|
| 325 |
- |
|
| 326 |
- def request_stream(resname):
|
|
| 327 |
- with open(self.objpath(digest), 'rb') as f:
|
|
| 328 |
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
| 329 |
- offset = 0
|
|
| 330 |
- finished = False
|
|
| 331 |
- remaining = digest.size_bytes
|
|
| 332 |
- while not finished:
|
|
| 333 |
- chunk_size = min(remaining, 64 * 1024)
|
|
| 334 |
- remaining -= chunk_size
|
|
| 335 |
- |
|
| 336 |
- request = bytestream_pb2.WriteRequest()
|
|
| 337 |
- request.write_offset = offset
|
|
| 338 |
- # max. 64 kB chunks
|
|
| 339 |
- request.data = f.read(chunk_size)
|
|
| 340 |
- request.resource_name = resname
|
|
| 341 |
- request.finish_write = remaining <= 0
|
|
| 342 |
- yield request
|
|
| 343 |
- offset += chunk_size
|
|
| 344 |
- finished = request.finish_write
|
|
| 345 |
- response = remote.bytestream.Write(request_stream(resource_name))
|
|
| 346 |
- |
|
| 347 |
- request = buildstream_pb2.UpdateReferenceRequest()
|
|
| 348 |
- request.keys.append(ref)
|
|
| 349 |
- request.digest.hash = tree.hash
|
|
| 350 |
- request.digest.size_bytes = tree.size_bytes
|
|
| 351 |
- remote.ref_storage.UpdateReference(request)
|
|
| 352 |
- |
|
| 353 |
- pushed = True
|
|
| 354 |
- |
|
| 355 |
- if not skipped_remote:
|
|
| 356 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 357 |
- |
|
| 358 |
- except grpc.RpcError as e:
|
|
| 359 |
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
|
| 360 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
| 361 |
- |
|
| 362 |
- if skipped_remote:
|
|
| 326 |
+ if self._push_refs_to_remote(refs, remote):
|
|
| 327 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
| 328 |
+ pushed = True
|
|
| 329 |
+ else:
|
|
| 363 | 330 |
self.context.message(Message(
|
| 364 | 331 |
None,
|
| 365 | 332 |
MessageType.INFO,
|
| 366 | 333 |
"Remote ({}) already has {} cached".format(
|
| 367 | 334 |
remote.spec.url, element._get_brief_display_key())
|
| 368 | 335 |
))
|
| 336 |
+ |
|
| 369 | 337 |
return pushed
|
| 370 | 338 |
|
| 371 | 339 |
################################################
|
| ... | ... | @@ -393,13 +361,14 @@ class CASCache(ArtifactCache): |
| 393 | 361 |
# digest (Digest): An optional Digest object to populate
|
| 394 | 362 |
# path (str): Path to file to add
|
| 395 | 363 |
# buffer (bytes): Byte buffer to add
|
| 364 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
| 396 | 365 |
#
|
| 397 | 366 |
# Returns:
|
| 398 | 367 |
# (Digest): The digest of the added object
|
| 399 | 368 |
#
|
| 400 | 369 |
# Either `path` or `buffer` must be passed, but not both.
|
| 401 | 370 |
#
|
| 402 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
| 371 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
| 403 | 372 |
# Exactly one of the two parameters has to be specified
|
| 404 | 373 |
assert (path is None) != (buffer is None)
|
| 405 | 374 |
|
| ... | ... | @@ -409,28 +378,34 @@ class CASCache(ArtifactCache): |
| 409 | 378 |
try:
|
| 410 | 379 |
h = hashlib.sha256()
|
| 411 | 380 |
# Always write out new file to avoid corruption if input file is modified
|
| 412 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 413 |
- # Set mode bits to 0644
|
|
| 414 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 415 |
- |
|
| 416 |
- if path:
|
|
| 417 |
- with open(path, 'rb') as f:
|
|
| 418 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
| 419 |
- h.update(chunk)
|
|
| 420 |
- out.write(chunk)
|
|
| 381 |
+ with contextlib.ExitStack() as stack:
|
|
| 382 |
+ if path is not None and link_directly:
|
|
| 383 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
| 384 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
| 385 |
+ h.update(chunk)
|
|
| 421 | 386 |
else:
|
| 422 |
- h.update(buffer)
|
|
| 423 |
- out.write(buffer)
|
|
| 387 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
| 388 |
+ # Set mode bits to 0644
|
|
| 389 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 390 |
+ |
|
| 391 |
+ if path:
|
|
| 392 |
+ with open(path, 'rb') as f:
|
|
| 393 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
| 394 |
+ h.update(chunk)
|
|
| 395 |
+ tmp.write(chunk)
|
|
| 396 |
+ else:
|
|
| 397 |
+ h.update(buffer)
|
|
| 398 |
+ tmp.write(buffer)
|
|
| 424 | 399 |
|
| 425 |
- out.flush()
|
|
| 400 |
+ tmp.flush()
|
|
| 426 | 401 |
|
| 427 | 402 |
digest.hash = h.hexdigest()
|
| 428 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
| 403 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
| 429 | 404 |
|
| 430 | 405 |
# Place file at final location
|
| 431 | 406 |
objpath = self.objpath(digest)
|
| 432 | 407 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
| 433 |
- os.link(out.name, objpath)
|
|
| 408 |
+ os.link(tmp.name, objpath)
|
|
| 434 | 409 |
|
| 435 | 410 |
except FileExistsError as e:
|
| 436 | 411 |
# We can ignore the failed link() if the object is already in the repo.
|
| ... | ... | @@ -451,7 +426,7 @@ class CASCache(ArtifactCache): |
| 451 | 426 |
def set_ref(self, ref, tree):
|
| 452 | 427 |
refpath = self._refpath(ref)
|
| 453 | 428 |
os.makedirs(os.path.dirname(refpath), exist_ok=True)
|
| 454 |
- with utils.save_file_atomic(refpath, 'wb') as f:
|
|
| 429 |
+ with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
|
|
| 455 | 430 |
f.write(tree.SerializeToString())
|
| 456 | 431 |
|
| 457 | 432 |
# resolve_ref():
|
| ... | ... | @@ -565,7 +540,12 @@ class CASCache(ArtifactCache): |
| 565 | 540 |
#
|
| 566 | 541 |
# Prune unreachable objects from the repo.
|
| 567 | 542 |
#
|
| 568 |
- def prune(self):
|
|
| 543 |
+ # Args:
|
|
| 544 |
+ # keep_after (int|None): timestamp after which unreachable objects
|
|
| 545 |
+ # are kept. None if no unreachable object
|
|
| 546 |
+ # should be kept.
|
|
| 547 |
+ #
|
|
| 548 |
+ def prune(self, keep_after=None):
|
|
| 569 | 549 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
| 570 | 550 |
|
| 571 | 551 |
pruned = 0
|
| ... | ... | @@ -586,6 +566,10 @@ class CASCache(ArtifactCache): |
| 586 | 566 |
objhash = os.path.basename(root) + filename
|
| 587 | 567 |
if objhash not in reachable:
|
| 588 | 568 |
obj_path = os.path.join(root, filename)
|
| 569 |
+ if keep_after:
|
|
| 570 |
+ st = os.stat(obj_path)
|
|
| 571 |
+ if st.st_mtime >= keep_after:
|
|
| 572 |
+ continue
|
|
| 589 | 573 |
pruned += os.stat(obj_path).st_size
|
| 590 | 574 |
os.unlink(obj_path)
|
| 591 | 575 |
|
| ... | ... | @@ -594,6 +578,7 @@ class CASCache(ArtifactCache): |
| 594 | 578 |
################################################
|
| 595 | 579 |
# Local Private Methods #
|
| 596 | 580 |
################################################
|
| 581 |
+ |
|
| 597 | 582 |
def _checkout(self, dest, tree):
|
| 598 | 583 |
os.makedirs(dest, exist_ok=True)
|
| 599 | 584 |
|
| ... | ... | @@ -623,7 +608,21 @@ class CASCache(ArtifactCache): |
| 623 | 608 |
def _refpath(self, ref):
|
| 624 | 609 |
return os.path.join(self.casdir, 'refs', 'heads', ref)
|
| 625 | 610 |
|
| 626 |
- def _create_tree(self, path, *, digest=None):
|
|
| 611 |
+ # _commit_directory():
|
|
| 612 |
+ #
|
|
| 613 |
+ # Adds local directory to content addressable store.
|
|
| 614 |
+ #
|
|
| 615 |
+ # Adds files, symbolic links and recursively other directories in
|
|
| 616 |
+ # a local directory to the content addressable store.
|
|
| 617 |
+ #
|
|
| 618 |
+ # Args:
|
|
| 619 |
+ # path (str): Path to the directory to add.
|
|
| 620 |
+ # dir_digest (Digest): An optional Digest object to use.
|
|
| 621 |
+ #
|
|
| 622 |
+ # Returns:
|
|
| 623 |
+ # (Digest): Digest object for the directory added.
|
|
| 624 |
+ #
|
|
| 625 |
+ def _commit_directory(self, path, *, dir_digest=None):
|
|
| 627 | 626 |
directory = remote_execution_pb2.Directory()
|
| 628 | 627 |
|
| 629 | 628 |
for name in sorted(os.listdir(path)):
|
| ... | ... | @@ -632,7 +631,7 @@ class CASCache(ArtifactCache): |
| 632 | 631 |
if stat.S_ISDIR(mode):
|
| 633 | 632 |
dirnode = directory.directories.add()
|
| 634 | 633 |
dirnode.name = name
|
| 635 |
- self._create_tree(full_path, digest=dirnode.digest)
|
|
| 634 |
+ self._commit_directory(full_path, dir_digest=dirnode.digest)
|
|
| 636 | 635 |
elif stat.S_ISREG(mode):
|
| 637 | 636 |
filenode = directory.files.add()
|
| 638 | 637 |
filenode.name = name
|
| ... | ... | @@ -645,7 +644,8 @@ class CASCache(ArtifactCache): |
| 645 | 644 |
else:
|
| 646 | 645 |
raise ArtifactError("Unsupported file type for {}".format(full_path))
|
| 647 | 646 |
|
| 648 |
- return self.add_object(digest=digest, buffer=directory.SerializeToString())
|
|
| 647 |
+ return self.add_object(digest=dir_digest,
|
|
| 648 |
+ buffer=directory.SerializeToString())
|
|
| 649 | 649 |
|
| 650 | 650 |
def _get_subdir(self, tree, subdir):
|
| 651 | 651 |
head, name = os.path.split(subdir)
|
| ... | ... | @@ -756,16 +756,16 @@ class CASCache(ArtifactCache): |
| 756 | 756 |
#
|
| 757 | 757 |
q.put(str(e))
|
| 758 | 758 |
|
| 759 |
- def _required_blobs(self, tree):
|
|
| 759 |
+ def _required_blobs(self, directory_digest):
|
|
| 760 | 760 |
# parse directory, and recursively add blobs
|
| 761 | 761 |
d = remote_execution_pb2.Digest()
|
| 762 |
- d.hash = tree.hash
|
|
| 763 |
- d.size_bytes = tree.size_bytes
|
|
| 762 |
+ d.hash = directory_digest.hash
|
|
| 763 |
+ d.size_bytes = directory_digest.size_bytes
|
|
| 764 | 764 |
yield d
|
| 765 | 765 |
|
| 766 | 766 |
directory = remote_execution_pb2.Directory()
|
| 767 | 767 |
|
| 768 |
- with open(self.objpath(tree), 'rb') as f:
|
|
| 768 |
+ with open(self.objpath(directory_digest), 'rb') as f:
|
|
| 769 | 769 |
directory.ParseFromString(f.read())
|
| 770 | 770 |
|
| 771 | 771 |
for filenode in directory.files:
|
| ... | ... | @@ -777,50 +777,203 @@ class CASCache(ArtifactCache): |
| 777 | 777 |
for dirnode in directory.directories:
|
| 778 | 778 |
yield from self._required_blobs(dirnode.digest)
|
| 779 | 779 |
|
| 780 |
- def _fetch_blob(self, remote, digest, out):
|
|
| 780 |
+ def _fetch_blob(self, remote, digest, stream):
|
|
| 781 | 781 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
| 782 | 782 |
request = bytestream_pb2.ReadRequest()
|
| 783 | 783 |
request.resource_name = resource_name
|
| 784 | 784 |
request.read_offset = 0
|
| 785 | 785 |
for response in remote.bytestream.Read(request):
|
| 786 |
- out.write(response.data)
|
|
| 786 |
+ stream.write(response.data)
|
|
| 787 |
+ stream.flush()
|
|
| 787 | 788 |
|
| 788 |
- out.flush()
|
|
| 789 |
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
|
|
| 789 |
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
|
| 790 | 790 |
|
| 791 |
- def _fetch_directory(self, remote, tree):
|
|
| 792 |
- objpath = self.objpath(tree)
|
|
| 791 |
+ # _ensure_blob():
|
|
| 792 |
+ #
|
|
| 793 |
+ # Fetch and add blob if it's not already local.
|
|
| 794 |
+ #
|
|
| 795 |
+ # Args:
|
|
| 796 |
+ # remote (Remote): The remote to use.
|
|
| 797 |
+ # digest (Digest): Digest object for the blob to fetch.
|
|
| 798 |
+ #
|
|
| 799 |
+ # Returns:
|
|
| 800 |
+ # (str): The path of the object
|
|
| 801 |
+ #
|
|
| 802 |
+ def _ensure_blob(self, remote, digest):
|
|
| 803 |
+ objpath = self.objpath(digest)
|
|
| 793 | 804 |
if os.path.exists(objpath):
|
| 794 |
- # already in local cache
|
|
| 795 |
- return
|
|
| 805 |
+ # already in local repository
|
|
| 806 |
+ return objpath
|
|
| 796 | 807 |
|
| 797 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 798 |
- self._fetch_blob(remote, tree, out)
|
|
| 808 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 809 |
+ self._fetch_blob(remote, digest, f)
|
|
| 799 | 810 |
|
| 800 |
- directory = remote_execution_pb2.Directory()
|
|
| 811 |
+ added_digest = self.add_object(path=f.name)
|
|
| 812 |
+ assert added_digest.hash == digest.hash
|
|
| 801 | 813 |
|
| 802 |
- with open(out.name, 'rb') as f:
|
|
| 803 |
- directory.ParseFromString(f.read())
|
|
| 814 |
+ return objpath
|
|
| 804 | 815 |
|
| 805 |
- for filenode in directory.files:
|
|
| 806 |
- fileobjpath = self.objpath(tree)
|
|
| 807 |
- if os.path.exists(fileobjpath):
|
|
| 808 |
- # already in local cache
|
|
| 809 |
- continue
|
|
| 816 |
+ def _batch_download_complete(self, batch):
|
|
| 817 |
+ for digest, data in batch.send():
|
|
| 818 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 819 |
+ f.write(data)
|
|
| 820 |
+ f.flush()
|
|
| 821 |
+ |
|
| 822 |
+ added_digest = self.add_object(path=f.name)
|
|
| 823 |
+ assert added_digest.hash == digest.hash
|
|
| 810 | 824 |
|
| 811 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 812 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
| 825 |
+ # Helper function for _fetch_directory().
|
|
| 826 |
+ def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
|
|
| 827 |
+ self._batch_download_complete(batch)
|
|
| 813 | 828 |
|
| 814 |
- digest = self.add_object(path=f.name)
|
|
| 815 |
- assert digest.hash == filenode.digest.hash
|
|
| 829 |
+ # All previously scheduled directories are now locally available,
|
|
| 830 |
+ # move them to the processing queue.
|
|
| 831 |
+ fetch_queue.extend(fetch_next_queue)
|
|
| 832 |
+ fetch_next_queue.clear()
|
|
| 833 |
+ return _CASBatchRead(remote)
|
|
| 834 |
+ |
|
| 835 |
+ # Helper function for _fetch_directory().
|
|
| 836 |
+ def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
|
|
| 837 |
+ in_local_cache = os.path.exists(self.objpath(digest))
|
|
| 838 |
+ |
|
| 839 |
+ if in_local_cache:
|
|
| 840 |
+ # Skip download, already in local cache.
|
|
| 841 |
+ pass
|
|
| 842 |
+ elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 843 |
+ not remote.batch_read_supported):
|
|
| 844 |
+ # Too large for batch request, download in independent request.
|
|
| 845 |
+ self._ensure_blob(remote, digest)
|
|
| 846 |
+ in_local_cache = True
|
|
| 847 |
+ else:
|
|
| 848 |
+ if not batch.add(digest):
|
|
| 849 |
+ # Not enough space left in batch request.
|
|
| 850 |
+ # Complete pending batch first.
|
|
| 851 |
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
| 852 |
+ batch.add(digest)
|
|
| 853 |
+ |
|
| 854 |
+ if recursive:
|
|
| 855 |
+ if in_local_cache:
|
|
| 856 |
+ # Add directory to processing queue.
|
|
| 857 |
+ fetch_queue.append(digest)
|
|
| 858 |
+ else:
|
|
| 859 |
+ # Directory will be available after completing pending batch.
|
|
| 860 |
+ # Add directory to deferred processing queue.
|
|
| 861 |
+ fetch_next_queue.append(digest)
|
|
| 862 |
+ |
|
| 863 |
+ return batch
|
|
| 864 |
+ |
|
| 865 |
+ # _fetch_directory():
|
|
| 866 |
+ #
|
|
| 867 |
+ # Fetches remote directory and adds it to content addressable store.
|
|
| 868 |
+ #
|
|
| 869 |
+ # Fetches files, symbolic links and recursively other directories in
|
|
| 870 |
+ # the remote directory and adds them to the content addressable
|
|
| 871 |
+ # store.
|
|
| 872 |
+ #
|
|
| 873 |
+ # Args:
|
|
| 874 |
+ # remote (Remote): The remote to use.
|
|
| 875 |
+ # dir_digest (Digest): Digest object for the directory to fetch.
|
|
| 876 |
+ #
|
|
| 877 |
+ def _fetch_directory(self, remote, dir_digest):
|
|
| 878 |
+ fetch_queue = [dir_digest]
|
|
| 879 |
+ fetch_next_queue = []
|
|
| 880 |
+ batch = _CASBatchRead(remote)
|
|
| 881 |
+ |
|
| 882 |
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
| 883 |
+ if len(fetch_queue) == 0:
|
|
| 884 |
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
| 885 |
+ |
|
| 886 |
+ dir_digest = fetch_queue.pop(0)
|
|
| 887 |
+ |
|
| 888 |
+ objpath = self._ensure_blob(remote, dir_digest)
|
|
| 889 |
+ |
|
| 890 |
+ directory = remote_execution_pb2.Directory()
|
|
| 891 |
+ with open(objpath, 'rb') as f:
|
|
| 892 |
+ directory.ParseFromString(f.read())
|
|
| 816 | 893 |
|
| 817 | 894 |
for dirnode in directory.directories:
|
| 818 |
- self._fetch_directory(remote, dirnode.digest)
|
|
| 895 |
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
| 896 |
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
| 819 | 897 |
|
| 820 |
- # place directory blob only in final location when we've downloaded
|
|
| 821 |
- # all referenced blobs to avoid dangling references in the repository
|
|
| 822 |
- digest = self.add_object(path=out.name)
|
|
| 823 |
- assert digest.hash == tree.hash
|
|
| 898 |
+ for filenode in directory.files:
|
|
| 899 |
+ batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
|
| 900 |
+ fetch_queue, fetch_next_queue)
|
|
| 901 |
+ |
|
| 902 |
+ # Fetch final batch
|
|
| 903 |
+ self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
|
| 904 |
+ |
|
| 905 |
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
|
|
| 906 |
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
|
|
| 907 |
+ digest.hash, str(digest.size_bytes)])
|
|
| 908 |
+ |
|
| 909 |
+ def request_stream(resname, instream):
|
|
| 910 |
+ offset = 0
|
|
| 911 |
+ finished = False
|
|
| 912 |
+ remaining = digest.size_bytes
|
|
| 913 |
+ while not finished:
|
|
| 914 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
| 915 |
+ remaining -= chunk_size
|
|
| 916 |
+ |
|
| 917 |
+ request = bytestream_pb2.WriteRequest()
|
|
| 918 |
+ request.write_offset = offset
|
|
| 919 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
| 920 |
+ request.data = instream.read(chunk_size)
|
|
| 921 |
+ request.resource_name = resname
|
|
| 922 |
+ request.finish_write = remaining <= 0
|
|
| 923 |
+ |
|
| 924 |
+ yield request
|
|
| 925 |
+ |
|
| 926 |
+ offset += chunk_size
|
|
| 927 |
+ finished = request.finish_write
|
|
| 928 |
+ |
|
| 929 |
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
|
|
| 930 |
+ |
|
| 931 |
+ assert response.committed_size == digest.size_bytes
|
|
| 932 |
+ |
|
| 933 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
| 934 |
+ required_blobs = self._required_blobs(digest)
|
|
| 935 |
+ |
|
| 936 |
+ missing_blobs = dict()
|
|
| 937 |
+ # Limit size of FindMissingBlobs request
|
|
| 938 |
+ for required_blobs_group in _grouper(required_blobs, 512):
|
|
| 939 |
+ request = remote_execution_pb2.FindMissingBlobsRequest()
|
|
| 940 |
+ |
|
| 941 |
+ for required_digest in required_blobs_group:
|
|
| 942 |
+ d = request.blob_digests.add()
|
|
| 943 |
+ d.hash = required_digest.hash
|
|
| 944 |
+ d.size_bytes = required_digest.size_bytes
|
|
| 945 |
+ |
|
| 946 |
+ response = remote.cas.FindMissingBlobs(request)
|
|
| 947 |
+ for missing_digest in response.missing_blob_digests:
|
|
| 948 |
+ d = remote_execution_pb2.Digest()
|
|
| 949 |
+ d.hash = missing_digest.hash
|
|
| 950 |
+ d.size_bytes = missing_digest.size_bytes
|
|
| 951 |
+ missing_blobs[d.hash] = d
|
|
| 952 |
+ |
|
| 953 |
+ # Upload any blobs missing on the server
|
|
| 954 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
| 955 |
+ |
|
| 956 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
| 957 |
+ batch = _CASBatchUpdate(remote)
|
|
| 958 |
+ |
|
| 959 |
+ for digest in digests:
|
|
| 960 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
| 961 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
| 962 |
+ |
|
| 963 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 964 |
+ not remote.batch_update_supported):
|
|
| 965 |
+ # Too large for batch request, upload in independent request.
|
|
| 966 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
| 967 |
+ else:
|
|
| 968 |
+ if not batch.add(digest, f):
|
|
| 969 |
+ # Not enough space left in batch request.
|
|
| 970 |
+ # Complete pending batch first.
|
|
| 971 |
+ batch.send()
|
|
| 972 |
+ batch = _CASBatchUpdate(remote)
|
|
| 973 |
+ batch.add(digest, f)
|
|
| 974 |
+ |
|
| 975 |
+ # Send final batch
|
|
| 976 |
+ batch.send()
|
|
| 824 | 977 |
|
| 825 | 978 |
|
| 826 | 979 |
# Represents a single remote CAS cache.
|
| ... | ... | @@ -870,11 +1023,129 @@ class _CASRemote(): |
| 870 | 1023 |
|
| 871 | 1024 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
| 872 | 1025 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
| 1026 |
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
|
| 873 | 1027 |
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
| 874 | 1028 |
|
| 1029 |
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
| 1030 |
+ try:
|
|
| 1031 |
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
| 1032 |
+ response = self.capabilities.GetCapabilities(request)
|
|
| 1033 |
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
| 1034 |
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
| 1035 |
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
| 1036 |
+ except grpc.RpcError as e:
|
|
| 1037 |
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
| 1038 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
| 1039 |
+ raise
|
|
| 1040 |
+ |
|
| 1041 |
+ # Check whether the server supports BatchReadBlobs()
|
|
| 1042 |
+ self.batch_read_supported = False
|
|
| 1043 |
+ try:
|
|
| 1044 |
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
| 1045 |
+ response = self.cas.BatchReadBlobs(request)
|
|
| 1046 |
+ self.batch_read_supported = True
|
|
| 1047 |
+ except grpc.RpcError as e:
|
|
| 1048 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
| 1049 |
+ raise
|
|
| 1050 |
+ |
|
| 1051 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
| 1052 |
+ self.batch_update_supported = False
|
|
| 1053 |
+ try:
|
|
| 1054 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1055 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
| 1056 |
+ self.batch_update_supported = True
|
|
| 1057 |
+ except grpc.RpcError as e:
|
|
| 1058 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
| 1059 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
| 1060 |
+ raise
|
|
| 1061 |
+ |
|
| 875 | 1062 |
self._initialized = True
|
| 876 | 1063 |
|
| 877 | 1064 |
|
| 1065 |
+# Represents a batch of blobs queued for fetching.
|
|
| 1066 |
+#
|
|
| 1067 |
+class _CASBatchRead():
|
|
| 1068 |
+ def __init__(self, remote):
|
|
| 1069 |
+ self._remote = remote
|
|
| 1070 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
| 1071 |
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
| 1072 |
+ self._size = 0
|
|
| 1073 |
+ self._sent = False
|
|
| 1074 |
+ |
|
| 1075 |
+ def add(self, digest):
|
|
| 1076 |
+ assert not self._sent
|
|
| 1077 |
+ |
|
| 1078 |
+ new_batch_size = self._size + digest.size_bytes
|
|
| 1079 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
| 1080 |
+ # Not enough space left in current batch
|
|
| 1081 |
+ return False
|
|
| 1082 |
+ |
|
| 1083 |
+ request_digest = self._request.digests.add()
|
|
| 1084 |
+ request_digest.hash = digest.hash
|
|
| 1085 |
+ request_digest.size_bytes = digest.size_bytes
|
|
| 1086 |
+ self._size = new_batch_size
|
|
| 1087 |
+ return True
|
|
| 1088 |
+ |
|
| 1089 |
+ def send(self):
|
|
| 1090 |
+ assert not self._sent
|
|
| 1091 |
+ self._sent = True
|
|
| 1092 |
+ |
|
| 1093 |
+ if len(self._request.digests) == 0:
|
|
| 1094 |
+ return
|
|
| 1095 |
+ |
|
| 1096 |
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
| 1097 |
+ |
|
| 1098 |
+ for response in batch_response.responses:
|
|
| 1099 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
| 1100 |
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
| 1101 |
+ response.digest.hash, response.status.code))
|
|
| 1102 |
+ if response.digest.size_bytes != len(response.data):
|
|
| 1103 |
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
| 1104 |
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
| 1105 |
+ |
|
| 1106 |
+ yield (response.digest, response.data)
|
|
| 1107 |
+ |
|
| 1108 |
+ |
|
| 1109 |
+# Represents a batch of blobs queued for upload.
|
|
| 1110 |
+#
|
|
| 1111 |
+class _CASBatchUpdate():
|
|
| 1112 |
+ def __init__(self, remote):
|
|
| 1113 |
+ self._remote = remote
|
|
| 1114 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
| 1115 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1116 |
+ self._size = 0
|
|
| 1117 |
+ self._sent = False
|
|
| 1118 |
+ |
|
| 1119 |
+ def add(self, digest, stream):
|
|
| 1120 |
+ assert not self._sent
|
|
| 1121 |
+ |
|
| 1122 |
+ new_batch_size = self._size + digest.size_bytes
|
|
| 1123 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
| 1124 |
+ # Not enough space left in current batch
|
|
| 1125 |
+ return False
|
|
| 1126 |
+ |
|
| 1127 |
+ blob_request = self._request.requests.add()
|
|
| 1128 |
+ blob_request.digest.hash = digest.hash
|
|
| 1129 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
| 1130 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
| 1131 |
+ self._size = new_batch_size
|
|
| 1132 |
+ return True
|
|
| 1133 |
+ |
|
| 1134 |
+ def send(self):
|
|
| 1135 |
+ assert not self._sent
|
|
| 1136 |
+ self._sent = True
|
|
| 1137 |
+ |
|
| 1138 |
+ if len(self._request.requests) == 0:
|
|
| 1139 |
+ return
|
|
| 1140 |
+ |
|
| 1141 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
| 1142 |
+ |
|
| 1143 |
+ for response in batch_response.responses:
|
|
| 1144 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
| 1145 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
| 1146 |
+ response.digest.hash, response.status.code))
|
|
| 1147 |
+ |
|
| 1148 |
+ |
|
| 878 | 1149 |
def _grouper(iterable, n):
|
| 879 | 1150 |
while True:
|
| 880 | 1151 |
try:
|
| ... | ... | @@ -24,6 +24,10 @@ import signal |
| 24 | 24 |
import sys
|
| 25 | 25 |
import tempfile
|
| 26 | 26 |
import uuid
|
| 27 |
+import time
|
|
| 28 |
+import errno
|
|
| 29 |
+import ctypes
|
|
| 30 |
+import faulthandler
|
|
| 27 | 31 |
|
| 28 | 32 |
import click
|
| 29 | 33 |
import grpc
|
| ... | ... | @@ -38,8 +42,13 @@ from .._context import Context |
| 38 | 42 |
from .cascache import CASCache
|
| 39 | 43 |
|
| 40 | 44 |
|
| 41 |
-# The default limit for gRPC messages is 4 MiB
|
|
| 42 |
-_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
|
| 45 |
+# The default limit for gRPC messages is 4 MiB.
|
|
| 46 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
| 47 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
| 48 |
+ |
|
| 49 |
+# The minimum age in seconds for objects before they can be cleaned
|
|
| 50 |
+# up.
|
|
| 51 |
+_OBJECT_MIN_AGE = 6 * 60 * 60
|
|
| 43 | 52 |
|
| 44 | 53 |
|
| 45 | 54 |
# Trying to push an artifact that is too large
|
| ... | ... | @@ -69,7 +78,7 @@ def create_server(repo, *, enable_push): |
| 69 | 78 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
| 70 | 79 |
|
| 71 | 80 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 72 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
| 81 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 73 | 82 |
|
| 74 | 83 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 75 | 84 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -89,6 +98,8 @@ def create_server(repo, *, enable_push): |
| 89 | 98 |
help="Allow clients to upload blobs and update artifact cache")
|
| 90 | 99 |
@click.argument('repo')
|
| 91 | 100 |
def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
|
| 101 |
+ faulthandler.register(signal.SIGUSR1, all_threads=True)
|
|
| 102 |
+ |
|
| 92 | 103 |
server = create_server(repo, enable_push=enable_push)
|
| 93 | 104 |
|
| 94 | 105 |
use_tls = bool(server_key)
|
| ... | ... | @@ -130,11 +141,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
| 130 | 141 |
server.stop(0)
|
| 131 | 142 |
|
| 132 | 143 |
|
| 144 |
+class _FallocateCall:
|
|
| 145 |
+ |
|
| 146 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
| 147 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
| 148 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
| 149 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
| 150 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
| 151 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
| 152 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
| 153 |
+ |
|
| 154 |
+ def __init__(self):
|
|
| 155 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
| 156 |
+ try:
|
|
| 157 |
+ self.fallocate64 = self.libc.fallocate64
|
|
| 158 |
+ except AttributeError:
|
|
| 159 |
+ self.fallocate = self.libc.fallocate
|
|
| 160 |
+ |
|
| 161 |
+ def __call__(self, fd, mode, offset, length):
|
|
| 162 |
+ if hasattr(self, 'fallocate64'):
|
|
| 163 |
+ print(fd, mode, offset, length)
|
|
| 164 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
| 165 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
| 166 |
+ else:
|
|
| 167 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
| 168 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
| 169 |
+ if ret == -1:
|
|
| 170 |
+ errno = ctypes.get_errno()
|
|
| 171 |
+ raise OSError(errno, os.strerror(errno))
|
|
| 172 |
+ return ret
|
|
| 173 |
+ |
|
| 174 |
+ |
|
| 133 | 175 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
| 134 | 176 |
def __init__(self, cas, *, enable_push):
|
| 135 | 177 |
super().__init__()
|
| 136 | 178 |
self.cas = cas
|
| 137 | 179 |
self.enable_push = enable_push
|
| 180 |
+ self.fallocate = _FallocateCall()
|
|
| 138 | 181 |
|
| 139 | 182 |
def Read(self, request, context):
|
| 140 | 183 |
resource_name = request.resource_name
|
| ... | ... | @@ -158,7 +201,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 158 | 201 |
|
| 159 | 202 |
remaining = client_digest.size_bytes - request.read_offset
|
| 160 | 203 |
while remaining > 0:
|
| 161 |
- chunk_size = min(remaining, 64 * 1024)
|
|
| 204 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
| 162 | 205 |
remaining -= chunk_size
|
| 163 | 206 |
|
| 164 | 207 |
response = bytestream_pb2.ReadResponse()
|
| ... | ... | @@ -192,25 +235,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 192 | 235 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 193 | 236 |
return response
|
| 194 | 237 |
|
| 195 |
- try:
|
|
| 196 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 197 |
- except ArtifactTooLargeException as e:
|
|
| 198 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 199 |
- context.set_details(str(e))
|
|
| 200 |
- return response
|
|
| 238 |
+ while True:
|
|
| 239 |
+ if client_digest.size_bytes == 0:
|
|
| 240 |
+ break
|
|
| 241 |
+ try:
|
|
| 242 |
+ _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 243 |
+ except ArtifactTooLargeException as e:
|
|
| 244 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 245 |
+ context.set_details(str(e))
|
|
| 246 |
+ return response
|
|
| 247 |
+ |
|
| 248 |
+ try:
|
|
| 249 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
| 250 |
+ break
|
|
| 251 |
+ except OSError as e:
|
|
| 252 |
+ # Multiple upload can happen in the same time
|
|
| 253 |
+ if e.errno != errno.ENOSPC:
|
|
| 254 |
+ raise
|
|
| 255 |
+ |
|
| 201 | 256 |
elif request.resource_name:
|
| 202 | 257 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
| 203 | 258 |
if request.resource_name != resource_name:
|
| 204 | 259 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 205 | 260 |
return response
|
| 261 |
+ |
|
| 262 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
| 263 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
| 264 |
+ return response
|
|
| 265 |
+ |
|
| 206 | 266 |
out.write(request.data)
|
| 267 |
+ |
|
| 207 | 268 |
offset += len(request.data)
|
| 269 |
+ |
|
| 208 | 270 |
if request.finish_write:
|
| 209 | 271 |
if client_digest.size_bytes != offset:
|
| 210 | 272 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 211 | 273 |
return response
|
| 212 | 274 |
out.flush()
|
| 213 |
- digest = self.cas.add_object(path=out.name)
|
|
| 275 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
| 214 | 276 |
if digest.hash != client_digest.hash:
|
| 215 | 277 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 216 | 278 |
return response
|
| ... | ... | @@ -223,17 +285,25 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 223 | 285 |
|
| 224 | 286 |
|
| 225 | 287 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 226 |
- def __init__(self, cas):
|
|
| 288 |
+ def __init__(self, cas, *, enable_push):
|
|
| 227 | 289 |
super().__init__()
|
| 228 | 290 |
self.cas = cas
|
| 291 |
+ self.enable_push = enable_push
|
|
| 229 | 292 |
|
| 230 | 293 |
def FindMissingBlobs(self, request, context):
|
| 231 | 294 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| 232 | 295 |
for digest in request.blob_digests:
|
| 233 |
- if not _has_object(self.cas, digest):
|
|
| 234 |
- d = response.missing_blob_digests.add()
|
|
| 235 |
- d.hash = digest.hash
|
|
| 236 |
- d.size_bytes = digest.size_bytes
|
|
| 296 |
+ objpath = self.cas.objpath(digest)
|
|
| 297 |
+ try:
|
|
| 298 |
+ os.utime(objpath)
|
|
| 299 |
+ except OSError as e:
|
|
| 300 |
+ if e.errno != errno.ENOENT:
|
|
| 301 |
+ raise
|
|
| 302 |
+ else:
|
|
| 303 |
+ d = response.missing_blob_digests.add()
|
|
| 304 |
+ d.hash = digest.hash
|
|
| 305 |
+ d.size_bytes = digest.size_bytes
|
|
| 306 |
+ |
|
| 237 | 307 |
return response
|
| 238 | 308 |
|
| 239 | 309 |
def BatchReadBlobs(self, request, context):
|
| ... | ... | @@ -242,7 +312,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 242 | 312 |
|
| 243 | 313 |
for digest in request.digests:
|
| 244 | 314 |
batch_size += digest.size_bytes
|
| 245 |
- if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
|
|
| 315 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
| 246 | 316 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 247 | 317 |
return response
|
| 248 | 318 |
|
| ... | ... | @@ -261,6 +331,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 261 | 331 |
|
| 262 | 332 |
return response
|
| 263 | 333 |
|
| 334 |
+ def BatchUpdateBlobs(self, request, context):
|
|
| 335 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
| 336 |
+ |
|
| 337 |
+ if not self.enable_push:
|
|
| 338 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
| 339 |
+ return response
|
|
| 340 |
+ |
|
| 341 |
+ batch_size = 0
|
|
| 342 |
+ |
|
| 343 |
+ for blob_request in request.requests:
|
|
| 344 |
+ digest = blob_request.digest
|
|
| 345 |
+ |
|
| 346 |
+ batch_size += digest.size_bytes
|
|
| 347 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
| 348 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 349 |
+ return response
|
|
| 350 |
+ |
|
| 351 |
+ blob_response = response.responses.add()
|
|
| 352 |
+ blob_response.digest.hash = digest.hash
|
|
| 353 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
| 354 |
+ |
|
| 355 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
| 356 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 357 |
+ continue
|
|
| 358 |
+ |
|
| 359 |
+ try:
|
|
| 360 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 361 |
+ |
|
| 362 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
| 363 |
+ out.write(blob_request.data)
|
|
| 364 |
+ out.flush()
|
|
| 365 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
| 366 |
+ if server_digest.hash != digest.hash:
|
|
| 367 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 368 |
+ |
|
| 369 |
+ except ArtifactTooLargeException:
|
|
| 370 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
| 371 |
+ |
|
| 372 |
+ return response
|
|
| 373 |
+ |
|
| 264 | 374 |
|
| 265 | 375 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
| 266 | 376 |
def GetCapabilities(self, request, context):
|
| ... | ... | @@ -269,7 +379,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): |
| 269 | 379 |
cache_capabilities = response.cache_capabilities
|
| 270 | 380 |
cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
|
| 271 | 381 |
cache_capabilities.action_cache_update_capabilities.update_enabled = False
|
| 272 |
- cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
|
|
| 382 |
+ cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
| 273 | 383 |
cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
|
| 274 | 384 |
|
| 275 | 385 |
response.deprecated_api_version.major = 2
|
| ... | ... | @@ -362,11 +472,6 @@ def _digest_from_upload_resource_name(resource_name): |
| 362 | 472 |
return None
|
| 363 | 473 |
|
| 364 | 474 |
|
| 365 |
-def _has_object(cas, digest):
|
|
| 366 |
- objpath = cas.objpath(digest)
|
|
| 367 |
- return os.path.exists(objpath)
|
|
| 368 |
- |
|
| 369 |
- |
|
| 370 | 475 |
# _clean_up_cache()
|
| 371 | 476 |
#
|
| 372 | 477 |
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
| ... | ... | @@ -399,7 +504,14 @@ def _clean_up_cache(cas, object_size): |
| 399 | 504 |
# obtain a list of LRP artifacts
|
| 400 | 505 |
LRP_artifacts = cas.list_artifacts()
|
| 401 | 506 |
|
| 507 |
+ keep_after = time.time() - _OBJECT_MIN_AGE
|
|
| 508 |
+ |
|
| 402 | 509 |
removed_size = 0 # in bytes
|
| 510 |
+ if object_size - removed_size > free_disk_space:
|
|
| 511 |
+ # First we try to see if some unreferenced objects became old
|
|
| 512 |
+ # enough to be removed.
|
|
| 513 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
| 514 |
+ |
|
| 403 | 515 |
while object_size - removed_size > free_disk_space:
|
| 404 | 516 |
try:
|
| 405 | 517 |
to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
| ... | ... | @@ -411,7 +523,8 @@ def _clean_up_cache(cas, object_size): |
| 411 | 523 |
"the filesystem which mounts the remote "
|
| 412 | 524 |
"cache".format(object_size))
|
| 413 | 525 |
|
| 414 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
| 526 |
+ cas.remove(to_remove, defer_prune=True)
|
|
| 527 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
| 415 | 528 |
|
| 416 | 529 |
if removed_size > 0:
|
| 417 | 530 |
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
| ... | ... | @@ -119,6 +119,8 @@ class Job(): |
| 119 | 119 |
self._result = None # Return value of child action in the parent
|
| 120 | 120 |
self._tries = 0 # Try count, for retryable jobs
|
| 121 | 121 |
self._skipped_flag = False # Indicate whether the job was skipped.
|
| 122 |
+ self._terminated = False # Whether this job has been explicitly terminated
|
|
| 123 |
+ |
|
| 122 | 124 |
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
|
| 123 | 125 |
#
|
| 124 | 126 |
self._retry_flag = True
|
| ... | ... | @@ -188,6 +190,8 @@ class Job(): |
| 188 | 190 |
# Terminate the process using multiprocessing API pathway
|
| 189 | 191 |
self._process.terminate()
|
| 190 | 192 |
|
| 193 |
+ self._terminated = True
|
|
| 194 |
+ |
|
| 191 | 195 |
# terminate_wait()
|
| 192 | 196 |
#
|
| 193 | 197 |
# Wait for terminated jobs to complete
|
| ... | ... | @@ -271,18 +275,22 @@ class Job(): |
| 271 | 275 |
# running the integration commands).
|
| 272 | 276 |
#
|
| 273 | 277 |
# Args:
|
| 274 |
- # (int): The plugin identifier for this task
|
|
| 278 |
+ # task_id (int): The plugin identifier for this task
|
|
| 275 | 279 |
#
|
| 276 | 280 |
def set_task_id(self, task_id):
|
| 277 | 281 |
self._task_id = task_id
|
| 278 | 282 |
|
| 279 | 283 |
# skipped
|
| 280 | 284 |
#
|
| 285 |
+ # This will evaluate to True if the job was skipped
|
|
| 286 |
+ # during processing, or if it was forcefully terminated.
|
|
| 287 |
+ #
|
|
| 281 | 288 |
# Returns:
|
| 282 |
- # bool: True if the job was skipped while processing.
|
|
| 289 |
+ # (bool): Whether the job should appear as skipped
|
|
| 290 |
+ #
|
|
| 283 | 291 |
@property
|
| 284 | 292 |
def skipped(self):
|
| 285 |
- return self._skipped_flag
|
|
| 293 |
+ return self._skipped_flag or self._terminated
|
|
| 286 | 294 |
|
| 287 | 295 |
#######################################################
|
| 288 | 296 |
# Abstract Methods #
|
| ... | ... | @@ -65,7 +65,7 @@ class BuildQueue(Queue): |
| 65 | 65 |
# If the estimated size outgrows the quota, ask the scheduler
|
| 66 | 66 |
# to queue a job to actually check the real cache size.
|
| 67 | 67 |
#
|
| 68 |
- if artifacts.get_quota_exceeded():
|
|
| 68 |
+ if artifacts.has_quota_exceeded():
|
|
| 69 | 69 |
self._scheduler.check_cache_size()
|
| 70 | 70 |
|
| 71 | 71 |
def done(self, job, element, result, success):
|
| ... | ... | @@ -325,15 +325,22 @@ class Queue(): |
| 325 | 325 |
detail=traceback.format_exc())
|
| 326 | 326 |
self.failed_elements.append(element)
|
| 327 | 327 |
else:
|
| 328 |
- |
|
| 329 |
- # No exception occured, handle the success/failure state in the normal way
|
|
| 330 | 328 |
#
|
| 329 |
+ # No exception occured in post processing
|
|
| 330 |
+ #
|
|
| 331 |
+ |
|
| 332 |
+ # Only place in the output done queue if the job
|
|
| 333 |
+ # was considered successful
|
|
| 331 | 334 |
if success:
|
| 332 | 335 |
self._done_queue.append(job)
|
| 333 |
- if not job.skipped:
|
|
| 334 |
- self.processed_elements.append(element)
|
|
| 335 |
- else:
|
|
| 336 |
- self.skipped_elements.append(element)
|
|
| 336 |
+ |
|
| 337 |
+ # A Job can be skipped whether or not it has failed,
|
|
| 338 |
+ # we want to only bookkeep them as processed or failed
|
|
| 339 |
+ # if they are not skipped.
|
|
| 340 |
+ if job.skipped:
|
|
| 341 |
+ self.skipped_elements.append(element)
|
|
| 342 |
+ elif success:
|
|
| 343 |
+ self.processed_elements.append(element)
|
|
| 337 | 344 |
else:
|
| 338 | 345 |
self.failed_elements.append(element)
|
| 339 | 346 |
|
| ... | ... | @@ -349,7 +349,7 @@ class Scheduler(): |
| 349 | 349 |
platform = Platform.get_platform()
|
| 350 | 350 |
artifacts = platform.artifactcache
|
| 351 | 351 |
|
| 352 |
- if not artifacts.get_quota_exceeded():
|
|
| 352 |
+ if not artifacts.has_quota_exceeded():
|
|
| 353 | 353 |
return
|
| 354 | 354 |
|
| 355 | 355 |
job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
|
| ... | ... | @@ -387,6 +387,15 @@ class Scheduler(): |
| 387 | 387 |
# A loop registered event callback for keyboard interrupts
|
| 388 | 388 |
#
|
| 389 | 389 |
def _interrupt_event(self):
|
| 390 |
+ |
|
| 391 |
+ # FIXME: This should not be needed, but for some reason we receive an
|
|
| 392 |
+ # additional SIGINT event when the user hits ^C a second time
|
|
| 393 |
+ # to inform us that they really intend to terminate; even though
|
|
| 394 |
+ # we have disconnected our handlers at this time.
|
|
| 395 |
+ #
|
|
| 396 |
+ if self.terminated:
|
|
| 397 |
+ return
|
|
| 398 |
+ |
|
| 390 | 399 |
# Leave this to the frontend to decide, if no
|
| 391 | 400 |
# interrrupt callback was specified, then just terminate.
|
| 392 | 401 |
if self._interrupt_callback:
|
| ... | ... | @@ -164,10 +164,18 @@ class GitMirror(SourceFetcher): |
| 164 | 164 |
cwd=self.mirror)
|
| 165 | 165 |
|
| 166 | 166 |
def fetch(self, alias_override=None):
|
| 167 |
- self.ensure(alias_override)
|
|
| 168 |
- if not self.has_ref():
|
|
| 169 |
- self._fetch(alias_override)
|
|
| 170 |
- self.assert_ref()
|
|
| 167 |
+ # Resolve the URL for the message
|
|
| 168 |
+ resolved_url = self.source.translate_url(self.url,
|
|
| 169 |
+ alias_override=alias_override,
|
|
| 170 |
+ primary=self.primary)
|
|
| 171 |
+ |
|
| 172 |
+ with self.source.timed_activity("Fetching from {}"
|
|
| 173 |
+ .format(resolved_url),
|
|
| 174 |
+ silent_nested=True):
|
|
| 175 |
+ self.ensure(alias_override)
|
|
| 176 |
+ if not self.has_ref():
|
|
| 177 |
+ self._fetch(alias_override)
|
|
| 178 |
+ self.assert_ref()
|
|
| 171 | 179 |
|
| 172 | 180 |
def has_ref(self):
|
| 173 | 181 |
if not self.ref:
|
| ... | ... | @@ -585,28 +585,48 @@ class Source(Plugin): |
| 585 | 585 |
#
|
| 586 | 586 |
def _fetch(self):
|
| 587 | 587 |
project = self._get_project()
|
| 588 |
- source_fetchers = self.get_source_fetchers()
|
|
| 588 |
+ context = self._get_context()
|
|
| 589 |
+ |
|
| 590 |
+ # Silence the STATUS messages which might happen as a result
|
|
| 591 |
+ # of checking the source fetchers.
|
|
| 592 |
+ with context.silence():
|
|
| 593 |
+ source_fetchers = self.get_source_fetchers()
|
|
| 589 | 594 |
|
| 590 | 595 |
# Use the source fetchers if they are provided
|
| 591 | 596 |
#
|
| 592 | 597 |
if source_fetchers:
|
| 593 |
- for fetcher in source_fetchers:
|
|
| 594 |
- alias = fetcher._get_alias()
|
|
| 595 |
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
| 596 |
- try:
|
|
| 597 |
- fetcher.fetch(uri)
|
|
| 598 |
- # FIXME: Need to consider temporary vs. permanent failures,
|
|
| 599 |
- # and how this works with retries.
|
|
| 600 |
- except BstError as e:
|
|
| 601 |
- last_error = e
|
|
| 602 |
- continue
|
|
| 603 |
- |
|
| 604 |
- # No error, we're done with this fetcher
|
|
| 605 |
- break
|
|
| 606 | 598 |
|
| 607 |
- else:
|
|
| 608 |
- # No break occurred, raise the last detected error
|
|
| 609 |
- raise last_error
|
|
| 599 |
+ # Use a contorted loop here, this is to allow us to
|
|
| 600 |
+ # silence the messages which can result from consuming
|
|
| 601 |
+ # the items of source_fetchers, if it happens to be a generator.
|
|
| 602 |
+ #
|
|
| 603 |
+ source_fetchers = iter(source_fetchers)
|
|
| 604 |
+ try:
|
|
| 605 |
+ |
|
| 606 |
+ while True:
|
|
| 607 |
+ |
|
| 608 |
+ with context.silence():
|
|
| 609 |
+ fetcher = next(source_fetchers)
|
|
| 610 |
+ |
|
| 611 |
+ alias = fetcher._get_alias()
|
|
| 612 |
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
|
| 613 |
+ try:
|
|
| 614 |
+ fetcher.fetch(uri)
|
|
| 615 |
+ # FIXME: Need to consider temporary vs. permanent failures,
|
|
| 616 |
+ # and how this works with retries.
|
|
| 617 |
+ except BstError as e:
|
|
| 618 |
+ last_error = e
|
|
| 619 |
+ continue
|
|
| 620 |
+ |
|
| 621 |
+ # No error, we're done with this fetcher
|
|
| 622 |
+ break
|
|
| 623 |
+ |
|
| 624 |
+ else:
|
|
| 625 |
+ # No break occurred, raise the last detected error
|
|
| 626 |
+ raise last_error
|
|
| 627 |
+ |
|
| 628 |
+ except StopIteration:
|
|
| 629 |
+ pass
|
|
| 610 | 630 |
|
| 611 | 631 |
# Default codepath is to reinstantiate the Source
|
| 612 | 632 |
#
|
| ... | ... | @@ -496,7 +496,7 @@ def get_bst_version(): |
| 496 | 496 |
|
| 497 | 497 |
@contextmanager
|
| 498 | 498 |
def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
|
| 499 |
- errors=None, newline=None, closefd=True, opener=None):
|
|
| 499 |
+ errors=None, newline=None, closefd=True, opener=None, tempdir=None):
|
|
| 500 | 500 |
"""Save a file with a temporary name and rename it into place when ready.
|
| 501 | 501 |
|
| 502 | 502 |
This is a context manager which is meant for saving data to files.
|
| ... | ... | @@ -523,8 +523,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, |
| 523 | 523 |
# https://bugs.python.org/issue8604
|
| 524 | 524 |
|
| 525 | 525 |
assert os.path.isabs(filename), "The utils.save_file_atomic() parameter ``filename`` must be an absolute path"
|
| 526 |
- dirname = os.path.dirname(filename)
|
|
| 527 |
- fd, tempname = tempfile.mkstemp(dir=dirname)
|
|
| 526 |
+ if tempdir is None:
|
|
| 527 |
+ tempdir = os.path.dirname(filename)
|
|
| 528 |
+ fd, tempname = tempfile.mkstemp(dir=tempdir)
|
|
| 528 | 529 |
os.close(fd)
|
| 529 | 530 |
|
| 530 | 531 |
f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
|
| ... | ... | @@ -556,6 +557,9 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, |
| 556 | 557 |
#
|
| 557 | 558 |
# Get the disk usage of a given directory in bytes.
|
| 558 | 559 |
#
|
| 560 |
+# This function assumes that files do not inadvertantly
|
|
| 561 |
+# disappear while this function is running.
|
|
| 562 |
+#
|
|
| 559 | 563 |
# Arguments:
|
| 560 | 564 |
# (str) The path whose size to check.
|
| 561 | 565 |
#
|
| ... | ... | @@ -675,7 +679,7 @@ def _force_rmtree(rootpath, **kwargs): |
| 675 | 679 |
|
| 676 | 680 |
try:
|
| 677 | 681 |
shutil.rmtree(rootpath, **kwargs)
|
| 678 |
- except shutil.Error as e:
|
|
| 682 |
+ except OSError as e:
|
|
| 679 | 683 |
raise UtilError("Failed to remove cache directory '{}': {}"
|
| 680 | 684 |
.format(rootpath, e))
|
| 681 | 685 |
|
| ... | ... | @@ -9,5 +9,5 @@ element-path: elements |
| 9 | 9 |
|
| 10 | 10 |
# Define some aliases for the tarballs we download
|
| 11 | 11 |
aliases:
|
| 12 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
| 12 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
|
| 13 | 13 |
gnu: http://ftpmirror.gnu.org/gnu/automake/
|
| ... | ... | @@ -9,4 +9,4 @@ element-path: elements |
| 9 | 9 |
|
| 10 | 10 |
# Define an alias for our alpine tarball
|
| 11 | 11 |
aliases:
|
| 12 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
| 12 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
| ... | ... | @@ -9,4 +9,4 @@ element-path: elements |
| 9 | 9 |
|
| 10 | 10 |
# Define an alias for our alpine tarball
|
| 11 | 11 |
aliases:
|
| 12 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
| 12 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
| ... | ... | @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
| 231 | 231 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
| 232 | 232 |
assert_shared(cli, share, project, 'element2.bst')
|
| 233 | 233 |
|
| 234 |
+ share.make_all_objects_older()
|
|
| 235 |
+ |
|
| 234 | 236 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
| 235 | 237 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
| 236 | 238 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
| ... | ... | @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
| 327 | 329 |
# Ensure element1 is cached locally
|
| 328 | 330 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
| 329 | 331 |
|
| 332 |
+ share.make_all_objects_older()
|
|
| 333 |
+ |
|
| 330 | 334 |
# Create and build the element3 (of 5 MB)
|
| 331 | 335 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
| 332 | 336 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
| ... | ... | @@ -2,7 +2,7 @@ |
| 2 | 2 |
name: test
|
| 3 | 3 |
element-path: elements
|
| 4 | 4 |
aliases:
|
| 5 |
- alpine: https://gnome7.codethink.co.uk/tarballs/
|
|
| 5 |
+ alpine: https://bst-integration-test-images.ams3.cdn.digitaloceanspaces.com/
|
|
| 6 | 6 |
project_dir: file://{project_dir}
|
| 7 | 7 |
options:
|
| 8 | 8 |
linux:
|
| ... | ... | @@ -122,6 +122,15 @@ class ArtifactShare(): |
| 122 | 122 |
except ArtifactError:
|
| 123 | 123 |
return False
|
| 124 | 124 |
|
| 125 |
+ def make_all_objects_older(self):
|
|
| 126 |
+ for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
| 127 |
+ for name in files:
|
|
| 128 |
+ fullname = os.path.join(root, name)
|
|
| 129 |
+ st = os.stat(fullname)
|
|
| 130 |
+ mtime = st.st_mtime - 6 * 60 * 60
|
|
| 131 |
+ atime = st.st_atime - 6 * 60 * 60
|
|
| 132 |
+ os.utime(fullname, times=(atime, mtime))
|
|
| 133 |
+ |
|
| 125 | 134 |
# close():
|
| 126 | 135 |
#
|
| 127 | 136 |
# Remove the artifact share.
|
