Jürg Billeter pushed to branch juerg/cas at BuildStream / buildstream
Commits:
-
4b585c5c
by Jürg Billeter at 2018-11-05T16:35:09Z
9 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_exceptions.py
- buildstream/storage/_casbaseddirectory.py
- tests/artifactcache/pull.py
- tests/artifactcache/push.py
- tests/testutils/artifactshare.py
Changes:
... | ... | @@ -17,17 +17,22 @@ |
17 | 17 |
# Authors:
|
18 | 18 |
# Tristan Maat <tristan maat codethink co uk>
|
19 | 19 |
|
20 |
+import multiprocessing
|
|
20 | 21 |
import os
|
22 |
+import signal
|
|
21 | 23 |
import string
|
22 | 24 |
from collections import namedtuple
|
23 | 25 |
from collections.abc import Mapping
|
24 | 26 |
|
25 | 27 |
from ..types import _KeyStrength
|
26 |
-from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
|
|
28 |
+from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
|
|
27 | 29 |
from .._message import Message, MessageType
|
30 |
+from .. import _signals
|
|
28 | 31 |
from .. import utils
|
29 | 32 |
from .. import _yaml
|
30 | 33 |
|
34 |
+from .cascache import CASCache, CASRemote
|
|
35 |
+ |
|
31 | 36 |
|
32 | 37 |
CACHE_SIZE_FILE = "cache_size"
|
33 | 38 |
|
... | ... | @@ -93,7 +98,8 @@ class ArtifactCache(): |
93 | 98 |
def __init__(self, context):
|
94 | 99 |
self.context = context
|
95 | 100 |
self.extractdir = os.path.join(context.artifactdir, 'extract')
|
96 |
- self.tmpdir = os.path.join(context.artifactdir, 'tmp')
|
|
101 |
+ |
|
102 |
+ self.cas = CASCache(context.artifactdir)
|
|
97 | 103 |
|
98 | 104 |
self.global_remote_specs = []
|
99 | 105 |
self.project_remote_specs = {}
|
... | ... | @@ -104,12 +110,15 @@ class ArtifactCache(): |
104 | 110 |
self._cache_lower_threshold = None # The target cache size for a cleanup
|
105 | 111 |
self._remotes_setup = False # Check to prevent double-setup of remotes
|
106 | 112 |
|
113 |
+ # Per-project list of _CASRemote instances.
|
|
114 |
+ self._remotes = {}
|
|
115 |
+ |
|
116 |
+ self._has_fetch_remotes = False
|
|
117 |
+ self._has_push_remotes = False
|
|
118 |
+ |
|
107 | 119 |
os.makedirs(self.extractdir, exist_ok=True)
|
108 |
- os.makedirs(self.tmpdir, exist_ok=True)
|
|
109 | 120 |
|
110 |
- ################################################
|
|
111 |
- # Methods implemented on the abstract class #
|
|
112 |
- ################################################
|
|
121 |
+ self._calculate_cache_quota()
|
|
113 | 122 |
|
114 | 123 |
# get_artifact_fullname()
|
115 | 124 |
#
|
... | ... | @@ -240,8 +249,10 @@ class ArtifactCache(): |
240 | 249 |
for key in (strong_key, weak_key):
|
241 | 250 |
if key:
|
242 | 251 |
try:
|
243 |
- self.update_mtime(element, key)
|
|
244 |
- except ArtifactError:
|
|
252 |
+ ref = self.get_artifact_fullname(element, key)
|
|
253 |
+ |
|
254 |
+ self.cas.update_mtime(ref)
|
|
255 |
+ except CASError:
|
|
245 | 256 |
pass
|
246 | 257 |
|
247 | 258 |
# clean():
|
... | ... | @@ -252,7 +263,7 @@ class ArtifactCache(): |
252 | 263 |
# (int): The size of the cache after having cleaned up
|
253 | 264 |
#
|
254 | 265 |
def clean(self):
|
255 |
- artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return
|
|
266 |
+ artifacts = self.list_artifacts()
|
|
256 | 267 |
|
257 | 268 |
# Build a set of the cache keys which are required
|
258 | 269 |
# based on the required elements at cleanup time
|
... | ... | @@ -294,7 +305,7 @@ class ArtifactCache(): |
294 | 305 |
if key not in required_artifacts:
|
295 | 306 |
|
296 | 307 |
# Remove the actual artifact, if it's not required.
|
297 |
- size = self.remove(to_remove) # pylint: disable=assignment-from-no-return
|
|
308 |
+ size = self.remove(to_remove)
|
|
298 | 309 |
|
299 | 310 |
# Remove the size from the removed size
|
300 | 311 |
self.set_cache_size(self._cache_size - size)
|
... | ... | @@ -311,7 +322,7 @@ class ArtifactCache(): |
311 | 322 |
# (int): The size of the artifact cache.
|
312 | 323 |
#
|
313 | 324 |
def compute_cache_size(self):
|
314 |
- self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return
|
|
325 |
+ self._cache_size = self.cas.calculate_cache_size()
|
|
315 | 326 |
|
316 | 327 |
return self._cache_size
|
317 | 328 |
|
... | ... | @@ -380,28 +391,12 @@ class ArtifactCache(): |
380 | 391 |
def has_quota_exceeded(self):
|
381 | 392 |
return self.get_cache_size() > self._cache_quota
|
382 | 393 |
|
383 |
- ################################################
|
|
384 |
- # Abstract methods for subclasses to implement #
|
|
385 |
- ################################################
|
|
386 |
- |
|
387 | 394 |
# preflight():
|
388 | 395 |
#
|
389 | 396 |
# Preflight check.
|
390 | 397 |
#
|
391 | 398 |
def preflight(self):
|
392 |
- pass
|
|
393 |
- |
|
394 |
- # update_mtime()
|
|
395 |
- #
|
|
396 |
- # Update the mtime of an artifact.
|
|
397 |
- #
|
|
398 |
- # Args:
|
|
399 |
- # element (Element): The Element to update
|
|
400 |
- # key (str): The key of the artifact.
|
|
401 |
- #
|
|
402 |
- def update_mtime(self, element, key):
|
|
403 |
- raise ImplError("Cache '{kind}' does not implement update_mtime()"
|
|
404 |
- .format(kind=type(self).__name__))
|
|
399 |
+ self.cas.preflight()
|
|
405 | 400 |
|
406 | 401 |
# initialize_remotes():
|
407 | 402 |
#
|
... | ... | @@ -411,7 +406,59 @@ class ArtifactCache(): |
411 | 406 |
# on_failure (callable): Called if we fail to contact one of the caches.
|
412 | 407 |
#
|
413 | 408 |
def initialize_remotes(self, *, on_failure=None):
|
414 |
- pass
|
|
409 |
+ remote_specs = self.global_remote_specs
|
|
410 |
+ |
|
411 |
+ for project in self.project_remote_specs:
|
|
412 |
+ remote_specs += self.project_remote_specs[project]
|
|
413 |
+ |
|
414 |
+ remote_specs = list(utils._deduplicate(remote_specs))
|
|
415 |
+ |
|
416 |
+ remotes = {}
|
|
417 |
+ q = multiprocessing.Queue()
|
|
418 |
+ for remote_spec in remote_specs:
|
|
419 |
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
420 |
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
421 |
+ p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
|
|
422 |
+ |
|
423 |
+ try:
|
|
424 |
+ # Keep SIGINT blocked in the child process
|
|
425 |
+ with _signals.blocked([signal.SIGINT], ignore=False):
|
|
426 |
+ p.start()
|
|
427 |
+ |
|
428 |
+ error = q.get()
|
|
429 |
+ p.join()
|
|
430 |
+ except KeyboardInterrupt:
|
|
431 |
+ utils._kill_process_tree(p.pid)
|
|
432 |
+ raise
|
|
433 |
+ |
|
434 |
+ if error and on_failure:
|
|
435 |
+ on_failure(remote_spec.url, error)
|
|
436 |
+ elif error:
|
|
437 |
+ raise ArtifactError(error)
|
|
438 |
+ else:
|
|
439 |
+ self._has_fetch_remotes = True
|
|
440 |
+ if remote_spec.push:
|
|
441 |
+ self._has_push_remotes = True
|
|
442 |
+ |
|
443 |
+ remotes[remote_spec.url] = CASRemote(remote_spec)
|
|
444 |
+ |
|
445 |
+ for project in self.context.get_projects():
|
|
446 |
+ remote_specs = self.global_remote_specs
|
|
447 |
+ if project in self.project_remote_specs:
|
|
448 |
+ remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
|
|
449 |
+ |
|
450 |
+ project_remotes = []
|
|
451 |
+ |
|
452 |
+ for remote_spec in remote_specs:
|
|
453 |
+ # Errors are already handled in the loop above,
|
|
454 |
+ # skip unreachable remotes here.
|
|
455 |
+ if remote_spec.url not in remotes:
|
|
456 |
+ continue
|
|
457 |
+ |
|
458 |
+ remote = remotes[remote_spec.url]
|
|
459 |
+ project_remotes.append(remote)
|
|
460 |
+ |
|
461 |
+ self._remotes[project] = project_remotes
|
|
415 | 462 |
|
416 | 463 |
# contains():
|
417 | 464 |
#
|
... | ... | @@ -425,8 +472,9 @@ class ArtifactCache(): |
425 | 472 |
# Returns: True if the artifact is in the cache, False otherwise
|
426 | 473 |
#
|
427 | 474 |
def contains(self, element, key):
|
428 |
- raise ImplError("Cache '{kind}' does not implement contains()"
|
|
429 |
- .format(kind=type(self).__name__))
|
|
475 |
+ ref = self.get_artifact_fullname(element, key)
|
|
476 |
+ |
|
477 |
+ return self.cas.contains(ref)
|
|
430 | 478 |
|
431 | 479 |
# list_artifacts():
|
432 | 480 |
#
|
... | ... | @@ -437,8 +485,7 @@ class ArtifactCache(): |
437 | 485 |
# `ArtifactCache.get_artifact_fullname` in LRU order
|
438 | 486 |
#
|
439 | 487 |
def list_artifacts(self):
|
440 |
- raise ImplError("Cache '{kind}' does not implement list_artifacts()"
|
|
441 |
- .format(kind=type(self).__name__))
|
|
488 |
+ return self.cas.list_refs()
|
|
442 | 489 |
|
443 | 490 |
# remove():
|
444 | 491 |
#
|
... | ... | @@ -450,9 +497,31 @@ class ArtifactCache(): |
450 | 497 |
# generated by
|
451 | 498 |
# `ArtifactCache.get_artifact_fullname`)
|
452 | 499 |
#
|
453 |
- def remove(self, artifact_name):
|
|
454 |
- raise ImplError("Cache '{kind}' does not implement remove()"
|
|
455 |
- .format(kind=type(self).__name__))
|
|
500 |
+ # Returns:
|
|
501 |
+ # (int|None) The amount of space pruned from the repository in
|
|
502 |
+ # Bytes, or None if defer_prune is True
|
|
503 |
+ #
|
|
504 |
+ def remove(self, ref):
|
|
505 |
+ |
|
506 |
+ # Remove extract if not used by other ref
|
|
507 |
+ tree = self.cas.resolve_ref(ref)
|
|
508 |
+ ref_name, ref_hash = os.path.split(ref)
|
|
509 |
+ extract = os.path.join(self.extractdir, ref_name, tree.hash)
|
|
510 |
+ keys_file = os.path.join(extract, 'meta', 'keys.yaml')
|
|
511 |
+ if os.path.exists(keys_file):
|
|
512 |
+ keys_meta = _yaml.load(keys_file)
|
|
513 |
+ keys = [keys_meta['strong'], keys_meta['weak']]
|
|
514 |
+ remove_extract = True
|
|
515 |
+ for other_hash in keys:
|
|
516 |
+ if other_hash == ref_hash:
|
|
517 |
+ continue
|
|
518 |
+ remove_extract = False
|
|
519 |
+ break
|
|
520 |
+ |
|
521 |
+ if remove_extract:
|
|
522 |
+ utils._force_rmtree(extract)
|
|
523 |
+ |
|
524 |
+ return self.cas.remove(ref)
|
|
456 | 525 |
|
457 | 526 |
# extract():
|
458 | 527 |
#
|
... | ... | @@ -472,8 +541,11 @@ class ArtifactCache(): |
472 | 541 |
# Returns: path to extracted artifact
|
473 | 542 |
#
|
474 | 543 |
def extract(self, element, key):
|
475 |
- raise ImplError("Cache '{kind}' does not implement extract()"
|
|
476 |
- .format(kind=type(self).__name__))
|
|
544 |
+ ref = self.get_artifact_fullname(element, key)
|
|
545 |
+ |
|
546 |
+ path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
|
|
547 |
+ |
|
548 |
+ return self.cas.extract(ref, path)
|
|
477 | 549 |
|
478 | 550 |
# commit():
|
479 | 551 |
#
|
... | ... | @@ -485,8 +557,9 @@ class ArtifactCache(): |
485 | 557 |
# keys (list): The cache keys to use
|
486 | 558 |
#
|
487 | 559 |
def commit(self, element, content, keys):
|
488 |
- raise ImplError("Cache '{kind}' does not implement commit()"
|
|
489 |
- .format(kind=type(self).__name__))
|
|
560 |
+ refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
561 |
+ |
|
562 |
+ self.cas.commit(refs, content)
|
|
490 | 563 |
|
491 | 564 |
# diff():
|
492 | 565 |
#
|
... | ... | @@ -500,8 +573,10 @@ class ArtifactCache(): |
500 | 573 |
# subdir (str): A subdirectory to limit the comparison to
|
501 | 574 |
#
|
502 | 575 |
def diff(self, element, key_a, key_b, *, subdir=None):
|
503 |
- raise ImplError("Cache '{kind}' does not implement diff()"
|
|
504 |
- .format(kind=type(self).__name__))
|
|
576 |
+ ref_a = self.get_artifact_fullname(element, key_a)
|
|
577 |
+ ref_b = self.get_artifact_fullname(element, key_b)
|
|
578 |
+ |
|
579 |
+ return self.cas.diff(ref_a, ref_b, subdir=subdir)
|
|
505 | 580 |
|
506 | 581 |
# has_fetch_remotes():
|
507 | 582 |
#
|
... | ... | @@ -513,7 +588,16 @@ class ArtifactCache(): |
513 | 588 |
# Returns: True if any remote repositories are configured, False otherwise
|
514 | 589 |
#
|
515 | 590 |
def has_fetch_remotes(self, *, element=None):
|
516 |
- return False
|
|
591 |
+ if not self._has_fetch_remotes:
|
|
592 |
+ # No project has fetch remotes
|
|
593 |
+ return False
|
|
594 |
+ elif element is None:
|
|
595 |
+ # At least one (sub)project has fetch remotes
|
|
596 |
+ return True
|
|
597 |
+ else:
|
|
598 |
+ # Check whether the specified element's project has fetch remotes
|
|
599 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
600 |
+ return bool(remotes_for_project)
|
|
517 | 601 |
|
518 | 602 |
# has_push_remotes():
|
519 | 603 |
#
|
... | ... | @@ -525,7 +609,16 @@ class ArtifactCache(): |
525 | 609 |
# Returns: True if any remote repository is configured, False otherwise
|
526 | 610 |
#
|
527 | 611 |
def has_push_remotes(self, *, element=None):
|
528 |
- return False
|
|
612 |
+ if not self._has_push_remotes:
|
|
613 |
+ # No project has push remotes
|
|
614 |
+ return False
|
|
615 |
+ elif element is None:
|
|
616 |
+ # At least one (sub)project has push remotes
|
|
617 |
+ return True
|
|
618 |
+ else:
|
|
619 |
+ # Check whether the specified element's project has push remotes
|
|
620 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
621 |
+ return any(remote.spec.push for remote in remotes_for_project)
|
|
529 | 622 |
|
530 | 623 |
# push():
|
531 | 624 |
#
|
... | ... | @@ -542,8 +635,28 @@ class ArtifactCache(): |
542 | 635 |
# (ArtifactError): if there was an error
|
543 | 636 |
#
|
544 | 637 |
def push(self, element, keys):
|
545 |
- raise ImplError("Cache '{kind}' does not implement push()"
|
|
546 |
- .format(kind=type(self).__name__))
|
|
638 |
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
639 |
+ |
|
640 |
+ project = element._get_project()
|
|
641 |
+ |
|
642 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
643 |
+ |
|
644 |
+ pushed = False
|
|
645 |
+ |
|
646 |
+ for remote in push_remotes:
|
|
647 |
+ remote.init()
|
|
648 |
+ display_key = element._get_brief_display_key()
|
|
649 |
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
650 |
+ |
|
651 |
+ if self.cas.push(refs, remote):
|
|
652 |
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
653 |
+ pushed = True
|
|
654 |
+ else:
|
|
655 |
+ element.info("Remote ({}) already has {} cached".format(
|
|
656 |
+ remote.spec.url, element._get_brief_display_key()
|
|
657 |
+ ))
|
|
658 |
+ |
|
659 |
+ return pushed
|
|
547 | 660 |
|
548 | 661 |
# pull():
|
549 | 662 |
#
|
... | ... | @@ -558,8 +671,130 @@ class ArtifactCache(): |
558 | 671 |
# (bool): True if pull was successful, False if artifact was not available
|
559 | 672 |
#
|
560 | 673 |
def pull(self, element, key, *, progress=None):
|
561 |
- raise ImplError("Cache '{kind}' does not implement pull()"
|
|
562 |
- .format(kind=type(self).__name__))
|
|
674 |
+ ref = self.get_artifact_fullname(element, key)
|
|
675 |
+ |
|
676 |
+ project = element._get_project()
|
|
677 |
+ |
|
678 |
+ for remote in self._remotes[project]:
|
|
679 |
+ try:
|
|
680 |
+ display_key = element._get_brief_display_key()
|
|
681 |
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
682 |
+ |
|
683 |
+ if self.cas.pull(ref, remote, progress=progress):
|
|
684 |
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
685 |
+ # no need to pull from additional remotes
|
|
686 |
+ return True
|
|
687 |
+ else:
|
|
688 |
+ element.info("Remote ({}) does not have {} cached".format(
|
|
689 |
+ remote.spec.url, element._get_brief_display_key()
|
|
690 |
+ ))
|
|
691 |
+ |
|
692 |
+ except CASError as e:
|
|
693 |
+ raise ArtifactError("Failed to pull artifact {}: {}".format(
|
|
694 |
+ element._get_brief_display_key(), e)) from e
|
|
695 |
+ |
|
696 |
+ return False
|
|
697 |
+ |
|
698 |
+ # pull_tree():
|
|
699 |
+ #
|
|
700 |
+ # Pull a single Tree rather than an artifact.
|
|
701 |
+ # Does not update local refs.
|
|
702 |
+ #
|
|
703 |
+ # Args:
|
|
704 |
+ # project (Project): The current project
|
|
705 |
+ # digest (Digest): The digest of the tree
|
|
706 |
+ #
|
|
707 |
+ def pull_tree(self, project, digest):
|
|
708 |
+ for remote in self._remotes[project]:
|
|
709 |
+ digest = self.cas.pull_tree(remote, digest)
|
|
710 |
+ |
|
711 |
+ if digest:
|
|
712 |
+ # no need to pull from additional remotes
|
|
713 |
+ return digest
|
|
714 |
+ |
|
715 |
+ return None
|
|
716 |
+ |
|
717 |
+ # push_directory():
|
|
718 |
+ #
|
|
719 |
+ # Push the given virtual directory to all remotes.
|
|
720 |
+ #
|
|
721 |
+ # Args:
|
|
722 |
+ # project (Project): The current project
|
|
723 |
+ # directory (Directory): A virtual directory object to push.
|
|
724 |
+ #
|
|
725 |
+ # Raises:
|
|
726 |
+ # (ArtifactError): if there was an error
|
|
727 |
+ #
|
|
728 |
+ def push_directory(self, project, directory):
|
|
729 |
+ if self._has_push_remotes:
|
|
730 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
731 |
+ else:
|
|
732 |
+ push_remotes = []
|
|
733 |
+ |
|
734 |
+ if not push_remotes:
|
|
735 |
+ raise ArtifactError("push_directory was called, but no remote artifact " +
|
|
736 |
+ "servers are configured as push remotes.")
|
|
737 |
+ |
|
738 |
+ if directory.ref is None:
|
|
739 |
+ return
|
|
740 |
+ |
|
741 |
+ for remote in push_remotes:
|
|
742 |
+ self.cas.push_directory(remote, directory)
|
|
743 |
+ |
|
744 |
+ # push_message():
|
|
745 |
+ #
|
|
746 |
+ # Push the given protobuf message to all remotes.
|
|
747 |
+ #
|
|
748 |
+ # Args:
|
|
749 |
+ # project (Project): The current project
|
|
750 |
+ # message (Message): A protobuf message to push.
|
|
751 |
+ #
|
|
752 |
+ # Raises:
|
|
753 |
+ # (ArtifactError): if there was an error
|
|
754 |
+ #
|
|
755 |
+ def push_message(self, project, message):
|
|
756 |
+ |
|
757 |
+ if self._has_push_remotes:
|
|
758 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
759 |
+ else:
|
|
760 |
+ push_remotes = []
|
|
761 |
+ |
|
762 |
+ if not push_remotes:
|
|
763 |
+ raise ArtifactError("push_message was called, but no remote artifact " +
|
|
764 |
+ "servers are configured as push remotes.")
|
|
765 |
+ |
|
766 |
+ for remote in push_remotes:
|
|
767 |
+ message_digest = self.cas.push_message(remote, message)
|
|
768 |
+ |
|
769 |
+ return message_digest
|
|
770 |
+ |
|
771 |
+ # verify_digest_pushed():
|
|
772 |
+ #
|
|
773 |
+ # Check whether the object is already on the server in which case
|
|
774 |
+ # there is no need to upload it.
|
|
775 |
+ #
|
|
776 |
+ # Args:
|
|
777 |
+ # project (Project): The current project
|
|
778 |
+ # digest (Digest): The object digest.
|
|
779 |
+ #
|
|
780 |
+ def verify_digest_pushed(self, project, digest):
|
|
781 |
+ |
|
782 |
+ if self._has_push_remotes:
|
|
783 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
784 |
+ else:
|
|
785 |
+ push_remotes = []
|
|
786 |
+ |
|
787 |
+ if not push_remotes:
|
|
788 |
+ raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
|
|
789 |
+ "servers are configured as push remotes.")
|
|
790 |
+ |
|
791 |
+ pushed = False
|
|
792 |
+ |
|
793 |
+ for remote in push_remotes:
|
|
794 |
+ if self.cas.verify_digest_on_remote(remote, digest):
|
|
795 |
+ pushed = True
|
|
796 |
+ |
|
797 |
+ return pushed
|
|
563 | 798 |
|
564 | 799 |
# link_key():
|
565 | 800 |
#
|
... | ... | @@ -571,19 +806,10 @@ class ArtifactCache(): |
571 | 806 |
# newkey (str): A new cache key for the artifact
|
572 | 807 |
#
|
573 | 808 |
def link_key(self, element, oldkey, newkey):
|
574 |
- raise ImplError("Cache '{kind}' does not implement link_key()"
|
|
575 |
- .format(kind=type(self).__name__))
|
|
809 |
+ oldref = self.get_artifact_fullname(element, oldkey)
|
|
810 |
+ newref = self.get_artifact_fullname(element, newkey)
|
|
576 | 811 |
|
577 |
- # calculate_cache_size()
|
|
578 |
- #
|
|
579 |
- # Return the real artifact cache size.
|
|
580 |
- #
|
|
581 |
- # Returns:
|
|
582 |
- # (int): The size of the artifact cache.
|
|
583 |
- #
|
|
584 |
- def calculate_cache_size(self):
|
|
585 |
- raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
|
|
586 |
- .format(kind=type(self).__name__))
|
|
812 |
+ self.cas.link_ref(oldref, newref)
|
|
587 | 813 |
|
588 | 814 |
################################################
|
589 | 815 |
# Local Private Methods #
|
... | ... | @@ -20,9 +20,7 @@ |
20 | 20 |
import hashlib
|
21 | 21 |
import itertools
|
22 | 22 |
import io
|
23 |
-import multiprocessing
|
|
24 | 23 |
import os
|
25 |
-import signal
|
|
26 | 24 |
import stat
|
27 | 25 |
import tempfile
|
28 | 26 |
import uuid
|
... | ... | @@ -31,17 +29,13 @@ from urllib.parse import urlparse |
31 | 29 |
|
32 | 30 |
import grpc
|
33 | 31 |
|
34 |
-from .. import _yaml
|
|
35 |
- |
|
36 | 32 |
from .._protos.google.rpc import code_pb2
|
37 | 33 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
38 | 34 |
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
|
39 | 35 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
40 | 36 |
|
41 |
-from .. import _signals, utils
|
|
42 |
-from .._exceptions import ArtifactError
|
|
43 |
- |
|
44 |
-from . import ArtifactCache
|
|
37 |
+from .. import utils
|
|
38 |
+from .._exceptions import CASError
|
|
45 | 39 |
|
46 | 40 |
|
47 | 41 |
# The default limit for gRPC messages is 4 MiB.
|
... | ... | @@ -49,62 +43,68 @@ from . import ArtifactCache |
49 | 43 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
50 | 44 |
|
51 | 45 |
|
52 |
-# A CASCache manages artifacts in a CAS repository as specified in the
|
|
53 |
-# Remote Execution API.
|
|
46 |
+# A CASCache manages a CAS repository as specified in the Remote Execution API.
|
|
54 | 47 |
#
|
55 | 48 |
# Args:
|
56 |
-# context (Context): The BuildStream context
|
|
57 |
-#
|
|
58 |
-# Pushing is explicitly disabled by the platform in some cases,
|
|
59 |
-# like when we are falling back to functioning without using
|
|
60 |
-# user namespaces.
|
|
49 |
+# path (str): The root directory for the CAS repository
|
|
61 | 50 |
#
|
62 |
-class CASCache(ArtifactCache):
|
|
51 |
+class CASCache():
|
|
63 | 52 |
|
64 |
- def __init__(self, context):
|
|
65 |
- super().__init__(context)
|
|
66 |
- |
|
67 |
- self.casdir = os.path.join(context.artifactdir, 'cas')
|
|
53 |
+ def __init__(self, path):
|
|
54 |
+ self.casdir = os.path.join(path, 'cas')
|
|
55 |
+ self.tmpdir = os.path.join(path, 'tmp')
|
|
68 | 56 |
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
|
69 | 57 |
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
|
58 |
+ os.makedirs(self.tmpdir, exist_ok=True)
|
|
70 | 59 |
|
71 |
- self._calculate_cache_quota()
|
|
72 |
- |
|
73 |
- # Per-project list of _CASRemote instances.
|
|
74 |
- self._remotes = {}
|
|
75 |
- |
|
76 |
- self._has_fetch_remotes = False
|
|
77 |
- self._has_push_remotes = False
|
|
78 |
- |
|
79 |
- ################################################
|
|
80 |
- # Implementation of abstract methods #
|
|
81 |
- ################################################
|
|
82 |
- |
|
60 |
+ # preflight():
|
|
61 |
+ #
|
|
62 |
+ # Preflight check.
|
|
63 |
+ #
|
|
83 | 64 |
def preflight(self):
|
84 | 65 |
headdir = os.path.join(self.casdir, 'refs', 'heads')
|
85 | 66 |
objdir = os.path.join(self.casdir, 'objects')
|
86 | 67 |
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
|
87 |
- raise ArtifactError("CAS repository check failed for '{}'"
|
|
88 |
- .format(self.casdir))
|
|
68 |
+ raise CASError("CAS repository check failed for '{}'".format(self.casdir))
|
|
89 | 69 |
|
90 |
- def contains(self, element, key):
|
|
91 |
- refpath = self._refpath(self.get_artifact_fullname(element, key))
|
|
70 |
+ # contains():
|
|
71 |
+ #
|
|
72 |
+ # Check whether the specified ref is already available in the local CAS cache.
|
|
73 |
+ #
|
|
74 |
+ # Args:
|
|
75 |
+ # ref (str): The ref to check
|
|
76 |
+ #
|
|
77 |
+ # Returns: True if the ref is in the cache, False otherwise
|
|
78 |
+ #
|
|
79 |
+ def contains(self, ref):
|
|
80 |
+ refpath = self._refpath(ref)
|
|
92 | 81 |
|
93 | 82 |
# This assumes that the repository doesn't have any dangling pointers
|
94 | 83 |
return os.path.exists(refpath)
|
95 | 84 |
|
96 |
- def extract(self, element, key):
|
|
97 |
- ref = self.get_artifact_fullname(element, key)
|
|
98 |
- |
|
85 |
+ # extract():
|
|
86 |
+ #
|
|
87 |
+ # Extract cached directory for the specified ref if it hasn't
|
|
88 |
+ # already been extracted.
|
|
89 |
+ #
|
|
90 |
+ # Args:
|
|
91 |
+ # ref (str): The ref whose directory to extract
|
|
92 |
+ # path (str): The destination path
|
|
93 |
+ #
|
|
94 |
+ # Raises:
|
|
95 |
+ # CASError: In cases there was an OSError, or if the ref did not exist.
|
|
96 |
+ #
|
|
97 |
+ # Returns: path to extracted directory
|
|
98 |
+ #
|
|
99 |
+ def extract(self, ref, path):
|
|
99 | 100 |
tree = self.resolve_ref(ref, update_mtime=True)
|
100 | 101 |
|
101 |
- dest = os.path.join(self.extractdir, element._get_project().name,
|
|
102 |
- element.normal_name, tree.hash)
|
|
102 |
+ dest = os.path.join(path, tree.hash)
|
|
103 | 103 |
if os.path.isdir(dest):
|
104 |
- # artifact has already been extracted
|
|
104 |
+ # directory has already been extracted
|
|
105 | 105 |
return dest
|
106 | 106 |
|
107 |
- with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
|
|
107 |
+ with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
|
|
108 | 108 |
checkoutdir = os.path.join(tmpdir, ref)
|
109 | 109 |
self._checkout(checkoutdir, tree)
|
110 | 110 |
|
... | ... | @@ -118,23 +118,35 @@ class CASCache(ArtifactCache): |
118 | 118 |
# If rename fails with these errors, another process beat
|
119 | 119 |
# us to it so just ignore.
|
120 | 120 |
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
|
121 |
- raise ArtifactError("Failed to extract artifact for ref '{}': {}"
|
|
122 |
- .format(ref, e)) from e
|
|
121 |
+ raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
|
|
123 | 122 |
|
124 | 123 |
return dest
|
125 | 124 |
|
126 |
- def commit(self, element, content, keys):
|
|
127 |
- refs = [self.get_artifact_fullname(element, key) for key in keys]
|
|
128 |
- |
|
129 |
- tree = self._commit_directory(content)
|
|
125 |
+ # commit():
|
|
126 |
+ #
|
|
127 |
+ # Commit directory to cache.
|
|
128 |
+ #
|
|
129 |
+ # Args:
|
|
130 |
+ # refs (list): The refs to set
|
|
131 |
+ # path (str): The directory to import
|
|
132 |
+ #
|
|
133 |
+ def commit(self, refs, path):
|
|
134 |
+ tree = self._commit_directory(path)
|
|
130 | 135 |
|
131 | 136 |
for ref in refs:
|
132 | 137 |
self.set_ref(ref, tree)
|
133 | 138 |
|
134 |
- def diff(self, element, key_a, key_b, *, subdir=None):
|
|
135 |
- ref_a = self.get_artifact_fullname(element, key_a)
|
|
136 |
- ref_b = self.get_artifact_fullname(element, key_b)
|
|
137 |
- |
|
139 |
+ # diff():
|
|
140 |
+ #
|
|
141 |
+ # Return a list of files that have been added or modified between
|
|
142 |
+ # the refs described by ref_a and ref_b.
|
|
143 |
+ #
|
|
144 |
+ # Args:
|
|
145 |
+ # ref_a (str): The first ref
|
|
146 |
+ # ref_b (str): The second ref
|
|
147 |
+ # subdir (str): A subdirectory to limit the comparison to
|
|
148 |
+ #
|
|
149 |
+ def diff(self, ref_a, ref_b, *, subdir=None):
|
|
138 | 150 |
tree_a = self.resolve_ref(ref_a)
|
139 | 151 |
tree_b = self.resolve_ref(ref_b)
|
140 | 152 |
|
... | ... | @@ -150,158 +162,122 @@ class CASCache(ArtifactCache): |
150 | 162 |
|
151 | 163 |
return modified, removed, added
|
152 | 164 |
|
153 |
- def initialize_remotes(self, *, on_failure=None):
|
|
154 |
- remote_specs = self.global_remote_specs
|
|
155 |
- |
|
156 |
- for project in self.project_remote_specs:
|
|
157 |
- remote_specs += self.project_remote_specs[project]
|
|
158 |
- |
|
159 |
- remote_specs = list(utils._deduplicate(remote_specs))
|
|
160 |
- |
|
161 |
- remotes = {}
|
|
162 |
- q = multiprocessing.Queue()
|
|
163 |
- for remote_spec in remote_specs:
|
|
164 |
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
|
165 |
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
|
166 |
- p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
|
|
165 |
+ def initialize_remote(self, remote_spec, q):
|
|
166 |
+ try:
|
|
167 |
+ remote = CASRemote(remote_spec)
|
|
168 |
+ remote.init()
|
|
167 | 169 |
|
168 |
- try:
|
|
169 |
- # Keep SIGINT blocked in the child process
|
|
170 |
- with _signals.blocked([signal.SIGINT], ignore=False):
|
|
171 |
- p.start()
|
|
172 |
- |
|
173 |
- error = q.get()
|
|
174 |
- p.join()
|
|
175 |
- except KeyboardInterrupt:
|
|
176 |
- utils._kill_process_tree(p.pid)
|
|
177 |
- raise
|
|
170 |
+ request = buildstream_pb2.StatusRequest()
|
|
171 |
+ response = remote.ref_storage.Status(request)
|
|
178 | 172 |
|
179 |
- if error and on_failure:
|
|
180 |
- on_failure(remote_spec.url, error)
|
|
181 |
- elif error:
|
|
182 |
- raise ArtifactError(error)
|
|
173 |
+ if remote_spec.push and not response.allow_updates:
|
|
174 |
+ q.put('CAS server does not allow push')
|
|
183 | 175 |
else:
|
184 |
- self._has_fetch_remotes = True
|
|
185 |
- if remote_spec.push:
|
|
186 |
- self._has_push_remotes = True
|
|
176 |
+ # No error
|
|
177 |
+ q.put(None)
|
|
187 | 178 |
|
188 |
- remotes[remote_spec.url] = _CASRemote(remote_spec)
|
|
179 |
+ except grpc.RpcError as e:
|
|
180 |
+ # str(e) is too verbose for errors reported to the user
|
|
181 |
+ q.put(e.details())
|
|
189 | 182 |
|
190 |
- for project in self.context.get_projects():
|
|
191 |
- remote_specs = self.global_remote_specs
|
|
192 |
- if project in self.project_remote_specs:
|
|
193 |
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
|
|
183 |
+ except Exception as e: # pylint: disable=broad-except
|
|
184 |
+ # Whatever happens, we need to return it to the calling process
|
|
185 |
+ #
|
|
186 |
+ q.put(str(e))
|
|
194 | 187 |
|
195 |
- project_remotes = []
|
|
188 |
+ # pull():
|
|
189 |
+ #
|
|
190 |
+ # Pull a ref from a remote repository.
|
|
191 |
+ #
|
|
192 |
+ # Args:
|
|
193 |
+ # ref (str): The ref to pull
|
|
194 |
+ # remote (CASRemote): The remote repository to pull from
|
|
195 |
+ # progress (callable): The progress callback, if any
|
|
196 |
+ #
|
|
197 |
+ # Returns:
|
|
198 |
+ # (bool): True if pull was successful, False if ref was not available
|
|
199 |
+ #
|
|
200 |
+ def pull(self, ref, remote, *, progress=None):
|
|
201 |
+ try:
|
|
202 |
+ remote.init()
|
|
196 | 203 |
|
197 |
- for remote_spec in remote_specs:
|
|
198 |
- # Errors are already handled in the loop above,
|
|
199 |
- # skip unreachable remotes here.
|
|
200 |
- if remote_spec.url not in remotes:
|
|
201 |
- continue
|
|
204 |
+ request = buildstream_pb2.GetReferenceRequest()
|
|
205 |
+ request.key = ref
|
|
206 |
+ response = remote.ref_storage.GetReference(request)
|
|
202 | 207 |
|
203 |
- remote = remotes[remote_spec.url]
|
|
204 |
- project_remotes.append(remote)
|
|
208 |
+ tree = remote_execution_pb2.Digest()
|
|
209 |
+ tree.hash = response.digest.hash
|
|
210 |
+ tree.size_bytes = response.digest.size_bytes
|
|
205 | 211 |
|
206 |
- self._remotes[project] = project_remotes
|
|
212 |
+ self._fetch_directory(remote, tree)
|
|
207 | 213 |
|
208 |
- def has_fetch_remotes(self, *, element=None):
|
|
209 |
- if not self._has_fetch_remotes:
|
|
210 |
- # No project has fetch remotes
|
|
211 |
- return False
|
|
212 |
- elif element is None:
|
|
213 |
- # At least one (sub)project has fetch remotes
|
|
214 |
- return True
|
|
215 |
- else:
|
|
216 |
- # Check whether the specified element's project has fetch remotes
|
|
217 |
- remotes_for_project = self._remotes[element._get_project()]
|
|
218 |
- return bool(remotes_for_project)
|
|
214 |
+ self.set_ref(ref, tree)
|
|
219 | 215 |
|
220 |
- def has_push_remotes(self, *, element=None):
|
|
221 |
- if not self._has_push_remotes:
|
|
222 |
- # No project has push remotes
|
|
223 |
- return False
|
|
224 |
- elif element is None:
|
|
225 |
- # At least one (sub)project has push remotes
|
|
226 | 216 |
return True
|
227 |
- else:
|
|
228 |
- # Check whether the specified element's project has push remotes
|
|
229 |
- remotes_for_project = self._remotes[element._get_project()]
|
|
230 |
- return any(remote.spec.push for remote in remotes_for_project)
|
|
231 |
- |
|
232 |
- def pull(self, element, key, *, progress=None):
|
|
233 |
- ref = self.get_artifact_fullname(element, key)
|
|
234 |
- |
|
235 |
- project = element._get_project()
|
|
236 |
- |
|
237 |
- for remote in self._remotes[project]:
|
|
238 |
- try:
|
|
239 |
- remote.init()
|
|
240 |
- display_key = element._get_brief_display_key()
|
|
241 |
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
|
|
242 |
- |
|
243 |
- request = buildstream_pb2.GetReferenceRequest()
|
|
244 |
- request.key = ref
|
|
245 |
- response = remote.ref_storage.GetReference(request)
|
|
246 |
- |
|
247 |
- tree = remote_execution_pb2.Digest()
|
|
248 |
- tree.hash = response.digest.hash
|
|
249 |
- tree.size_bytes = response.digest.size_bytes
|
|
250 |
- |
|
251 |
- self._fetch_directory(remote, tree)
|
|
252 |
- |
|
253 |
- self.set_ref(ref, tree)
|
|
254 |
- |
|
255 |
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
|
256 |
- # no need to pull from additional remotes
|
|
257 |
- return True
|
|
258 |
- |
|
259 |
- except grpc.RpcError as e:
|
|
260 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
261 |
- raise ArtifactError("Failed to pull artifact {}: {}".format(
|
|
262 |
- element._get_brief_display_key(), e)) from e
|
|
263 |
- else:
|
|
264 |
- element.info("Remote ({}) does not have {} cached".format(
|
|
265 |
- remote.spec.url, element._get_brief_display_key()
|
|
266 |
- ))
|
|
267 |
- |
|
268 |
- return False
|
|
269 |
- |
|
270 |
- def pull_tree(self, project, digest):
|
|
271 |
- """ Pull a single Tree rather than an artifact.
|
|
272 |
- Does not update local refs. """
|
|
217 |
+ except grpc.RpcError as e:
|
|
218 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
219 |
+ raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
|
|
220 |
+ else:
|
|
221 |
+ return False
|
|
273 | 222 |
|
274 |
- for remote in self._remotes[project]:
|
|
275 |
- try:
|
|
276 |
- remote.init()
|
|
223 |
+ # pull_tree():
|
|
224 |
+ #
|
|
225 |
+ # Pull a single Tree rather than a ref.
|
|
226 |
+ # Does not update local refs.
|
|
227 |
+ #
|
|
228 |
+ # Args:
|
|
229 |
+ # remote (CASRemote): The remote to pull from
|
|
230 |
+ # digest (Digest): The digest of the tree
|
|
231 |
+ #
|
|
232 |
+ def pull_tree(self, remote, digest):
|
|
233 |
+ try:
|
|
234 |
+ remote.init()
|
|
277 | 235 |
|
278 |
- digest = self._fetch_tree(remote, digest)
|
|
236 |
+ digest = self._fetch_tree(remote, digest)
|
|
279 | 237 |
|
280 |
- # no need to pull from additional remotes
|
|
281 |
- return digest
|
|
238 |
+ return digest
|
|
282 | 239 |
|
283 |
- except grpc.RpcError as e:
|
|
284 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
285 |
- raise
|
|
240 |
+ except grpc.RpcError as e:
|
|
241 |
+ if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
242 |
+ raise
|
|
286 | 243 |
|
287 | 244 |
return None
|
288 | 245 |
|
289 |
- def link_key(self, element, oldkey, newkey):
|
|
290 |
- oldref = self.get_artifact_fullname(element, oldkey)
|
|
291 |
- newref = self.get_artifact_fullname(element, newkey)
|
|
292 |
- |
|
246 |
+ # link_ref():
|
|
247 |
+ #
|
|
248 |
+ # Add an alias for an existing ref.
|
|
249 |
+ #
|
|
250 |
+ # Args:
|
|
251 |
+ # oldref (str): An existing ref
|
|
252 |
+ # newref (str): A new ref for the same directory
|
|
253 |
+ #
|
|
254 |
+ def link_ref(self, oldref, newref):
|
|
293 | 255 |
tree = self.resolve_ref(oldref)
|
294 | 256 |
|
295 | 257 |
self.set_ref(newref, tree)
|
296 | 258 |
|
297 |
- def _push_refs_to_remote(self, refs, remote):
|
|
259 |
+ # push():
|
|
260 |
+ #
|
|
261 |
+ # Push committed refs to remote repository.
|
|
262 |
+ #
|
|
263 |
+ # Args:
|
|
264 |
+ # refs (list): The refs to push
|
|
265 |
+ # remote (CASRemote): The remote to push to
|
|
266 |
+ #
|
|
267 |
+ # Returns:
|
|
268 |
+ # (bool): True if any remote was updated, False if no pushes were required
|
|
269 |
+ #
|
|
270 |
+ # Raises:
|
|
271 |
+ # (CASError): if there was an error
|
|
272 |
+ #
|
|
273 |
+ def push(self, refs, remote):
|
|
298 | 274 |
skipped_remote = True
|
299 | 275 |
try:
|
300 | 276 |
for ref in refs:
|
301 | 277 |
tree = self.resolve_ref(ref)
|
302 | 278 |
|
303 | 279 |
# Check whether ref is already on the server in which case
|
304 |
- # there is no need to push the artifact
|
|
280 |
+ # there is no need to push the ref
|
|
305 | 281 |
try:
|
306 | 282 |
request = buildstream_pb2.GetReferenceRequest()
|
307 | 283 |
request.key = ref
|
... | ... | @@ -327,65 +303,38 @@ class CASCache(ArtifactCache): |
327 | 303 |
skipped_remote = False
|
328 | 304 |
except grpc.RpcError as e:
|
329 | 305 |
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
|
330 |
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
|
|
306 |
+ raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
|
|
331 | 307 |
|
332 | 308 |
return not skipped_remote
|
333 | 309 |
|
334 |
- def push(self, element, keys):
|
|
335 |
- |
|
336 |
- refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
|
337 |
- |
|
338 |
- project = element._get_project()
|
|
339 |
- |
|
340 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
341 |
- |
|
342 |
- pushed = False
|
|
343 |
- |
|
344 |
- for remote in push_remotes:
|
|
345 |
- remote.init()
|
|
346 |
- display_key = element._get_brief_display_key()
|
|
347 |
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
|
|
348 |
- |
|
349 |
- if self._push_refs_to_remote(refs, remote):
|
|
350 |
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
|
|
351 |
- pushed = True
|
|
352 |
- else:
|
|
353 |
- element.info("Remote ({}) already has {} cached".format(
|
|
354 |
- remote.spec.url, element._get_brief_display_key()
|
|
355 |
- ))
|
|
356 |
- |
|
357 |
- return pushed
|
|
358 |
- |
|
359 |
- def push_directory(self, project, directory):
|
|
360 |
- """ Push the given virtual directory to all remotes.
|
|
361 |
- |
|
362 |
- Args:
|
|
363 |
- project (Project): The current project
|
|
364 |
- directory (Directory): A virtual directory object to push.
|
|
365 |
- |
|
366 |
- Raises: ArtifactError if no push remotes are configured.
|
|
367 |
- """
|
|
368 |
- |
|
369 |
- if self._has_push_remotes:
|
|
370 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
371 |
- else:
|
|
372 |
- push_remotes = []
|
|
373 |
- |
|
374 |
- if not push_remotes:
|
|
375 |
- raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
|
|
376 |
- "servers are configured as push remotes.")
|
|
377 |
- |
|
378 |
- if directory.ref is None:
|
|
379 |
- return
|
|
380 |
- |
|
381 |
- for remote in push_remotes:
|
|
382 |
- remote.init()
|
|
383 |
- |
|
384 |
- self._send_directory(remote, directory.ref)
|
|
310 |
+ # push_directory():
|
|
311 |
+ #
|
|
312 |
+ # Push the given virtual directory to a remote.
|
|
313 |
+ #
|
|
314 |
+ # Args:
|
|
315 |
+ # remote (CASRemote): The remote to push to
|
|
316 |
+ # directory (Directory): A virtual directory object to push.
|
|
317 |
+ #
|
|
318 |
+ # Raises:
|
|
319 |
+ # (CASError): if there was an error
|
|
320 |
+ #
|
|
321 |
+ def push_directory(self, remote, directory):
|
|
322 |
+ remote.init()
|
|
385 | 323 |
|
386 |
- def push_message(self, project, message):
|
|
324 |
+ self._send_directory(remote, directory.ref)
|
|
387 | 325 |
|
388 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
326 |
+ # push_message():
|
|
327 |
+ #
|
|
328 |
+ # Push the given protobuf message to a remote.
|
|
329 |
+ #
|
|
330 |
+ # Args:
|
|
331 |
+ # remote (CASRemote): The remote to push to
|
|
332 |
+ # message (Message): A protobuf message to push.
|
|
333 |
+ #
|
|
334 |
+ # Raises:
|
|
335 |
+ # (CASError): if there was an error
|
|
336 |
+ #
|
|
337 |
+ def push_message(self, remote, message):
|
|
389 | 338 |
|
390 | 339 |
message_buffer = message.SerializeToString()
|
391 | 340 |
message_sha = hashlib.sha256(message_buffer)
|
... | ... | @@ -393,17 +342,25 @@ class CASCache(ArtifactCache): |
393 | 342 |
message_digest.hash = message_sha.hexdigest()
|
394 | 343 |
message_digest.size_bytes = len(message_buffer)
|
395 | 344 |
|
396 |
- for remote in push_remotes:
|
|
397 |
- remote.init()
|
|
345 |
+ remote.init()
|
|
398 | 346 |
|
399 |
- with io.BytesIO(message_buffer) as b:
|
|
400 |
- self._send_blob(remote, message_digest, b)
|
|
347 |
+ with io.BytesIO(message_buffer) as b:
|
|
348 |
+ self._send_blob(remote, message_digest, b)
|
|
401 | 349 |
|
402 | 350 |
return message_digest
|
403 | 351 |
|
404 |
- def _verify_digest_on_remote(self, remote, digest):
|
|
405 |
- # Check whether ref is already on the server in which case
|
|
406 |
- # there is no need to push the artifact
|
|
352 |
+ # verify_digest_on_remote():
|
|
353 |
+ #
|
|
354 |
+ # Check whether the object is already on the server in which case
|
|
355 |
+ # there is no need to upload it.
|
|
356 |
+ #
|
|
357 |
+ # Args:
|
|
358 |
+ # remote (CASRemote): The remote to check
|
|
359 |
+ # digest (Digest): The object digest.
|
|
360 |
+ #
|
|
361 |
+ def verify_digest_on_remote(self, remote, digest):
|
|
362 |
+ remote.init()
|
|
363 |
+ |
|
407 | 364 |
request = remote_execution_pb2.FindMissingBlobsRequest()
|
408 | 365 |
request.blob_digests.extend([digest])
|
409 | 366 |
|
... | ... | @@ -413,24 +370,6 @@ class CASCache(ArtifactCache): |
413 | 370 |
|
414 | 371 |
return True
|
415 | 372 |
|
416 |
- def verify_digest_pushed(self, project, digest):
|
|
417 |
- |
|
418 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
419 |
- |
|
420 |
- pushed = False
|
|
421 |
- |
|
422 |
- for remote in push_remotes:
|
|
423 |
- remote.init()
|
|
424 |
- |
|
425 |
- if self._verify_digest_on_remote(remote, digest):
|
|
426 |
- pushed = True
|
|
427 |
- |
|
428 |
- return pushed
|
|
429 |
- |
|
430 |
- ################################################
|
|
431 |
- # API Private Methods #
|
|
432 |
- ################################################
|
|
433 |
- |
|
434 | 373 |
# objpath():
|
435 | 374 |
#
|
436 | 375 |
# Return the path of an object based on its digest.
|
... | ... | @@ -496,7 +435,7 @@ class CASCache(ArtifactCache): |
496 | 435 |
pass
|
497 | 436 |
|
498 | 437 |
except OSError as e:
|
499 |
- raise ArtifactError("Failed to hash object: {}".format(e)) from e
|
|
438 |
+ raise CASError("Failed to hash object: {}".format(e)) from e
|
|
500 | 439 |
|
501 | 440 |
return digest
|
502 | 441 |
|
... | ... | @@ -537,26 +476,39 @@ class CASCache(ArtifactCache): |
537 | 476 |
return digest
|
538 | 477 |
|
539 | 478 |
except FileNotFoundError as e:
|
540 |
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
|
479 |
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
|
|
541 | 480 |
|
542 |
- def update_mtime(self, element, key):
|
|
481 |
+ # update_mtime()
|
|
482 |
+ #
|
|
483 |
+ # Update the mtime of a ref.
|
|
484 |
+ #
|
|
485 |
+ # Args:
|
|
486 |
+ # ref (str): The ref to update
|
|
487 |
+ #
|
|
488 |
+ def update_mtime(self, ref):
|
|
543 | 489 |
try:
|
544 |
- ref = self.get_artifact_fullname(element, key)
|
|
545 | 490 |
os.utime(self._refpath(ref))
|
546 | 491 |
except FileNotFoundError as e:
|
547 |
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
|
|
492 |
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
|
|
548 | 493 |
|
494 |
+ # calculate_cache_size()
|
|
495 |
+ #
|
|
496 |
+ # Return the real disk usage of the CAS cache.
|
|
497 |
+ #
|
|
498 |
+ # Returns:
|
|
499 |
+ # (int): The size of the cache.
|
|
500 |
+ #
|
|
549 | 501 |
def calculate_cache_size(self):
|
550 | 502 |
return utils._get_dir_size(self.casdir)
|
551 | 503 |
|
552 |
- # list_artifacts():
|
|
504 |
+ # list_refs():
|
|
553 | 505 |
#
|
554 |
- # List cached artifacts in Least Recently Modified (LRM) order.
|
|
506 |
+ # List refs in Least Recently Modified (LRM) order.
|
|
555 | 507 |
#
|
556 | 508 |
# Returns:
|
557 | 509 |
# (list) - A list of refs in LRM order
|
558 | 510 |
#
|
559 |
- def list_artifacts(self):
|
|
511 |
+ def list_refs(self):
|
|
560 | 512 |
# string of: /path/to/repo/refs/heads
|
561 | 513 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
562 | 514 |
|
... | ... | @@ -571,7 +523,7 @@ class CASCache(ArtifactCache): |
571 | 523 |
mtimes.append(os.path.getmtime(ref_path))
|
572 | 524 |
|
573 | 525 |
# NOTE: Sorted will sort from earliest to latest, thus the
|
574 |
- # first element of this list will be the file modified earliest.
|
|
526 |
+ # first ref of this list will be the file modified earliest.
|
|
575 | 527 |
return [ref for _, ref in sorted(zip(mtimes, refs))]
|
576 | 528 |
|
577 | 529 |
# remove():
|
... | ... | @@ -590,28 +542,10 @@ class CASCache(ArtifactCache): |
590 | 542 |
#
|
591 | 543 |
def remove(self, ref, *, defer_prune=False):
|
592 | 544 |
|
593 |
- # Remove extract if not used by other ref
|
|
594 |
- tree = self.resolve_ref(ref)
|
|
595 |
- ref_name, ref_hash = os.path.split(ref)
|
|
596 |
- extract = os.path.join(self.extractdir, ref_name, tree.hash)
|
|
597 |
- keys_file = os.path.join(extract, 'meta', 'keys.yaml')
|
|
598 |
- if os.path.exists(keys_file):
|
|
599 |
- keys_meta = _yaml.load(keys_file)
|
|
600 |
- keys = [keys_meta['strong'], keys_meta['weak']]
|
|
601 |
- remove_extract = True
|
|
602 |
- for other_hash in keys:
|
|
603 |
- if other_hash == ref_hash:
|
|
604 |
- continue
|
|
605 |
- remove_extract = False
|
|
606 |
- break
|
|
607 |
- |
|
608 |
- if remove_extract:
|
|
609 |
- utils._force_rmtree(extract)
|
|
610 |
- |
|
611 | 545 |
# Remove cache ref
|
612 | 546 |
refpath = self._refpath(ref)
|
613 | 547 |
if not os.path.exists(refpath):
|
614 |
- raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
|
|
548 |
+ raise CASError("Could not find ref '{}'".format(ref))
|
|
615 | 549 |
|
616 | 550 |
os.unlink(refpath)
|
617 | 551 |
|
... | ... | @@ -721,7 +655,7 @@ class CASCache(ArtifactCache): |
721 | 655 |
# The process serving the socket can't be cached anyway
|
722 | 656 |
pass
|
723 | 657 |
else:
|
724 |
- raise ArtifactError("Unsupported file type for {}".format(full_path))
|
|
658 |
+ raise CASError("Unsupported file type for {}".format(full_path))
|
|
725 | 659 |
|
726 | 660 |
return self.add_object(digest=dir_digest,
|
727 | 661 |
buffer=directory.SerializeToString())
|
... | ... | @@ -740,7 +674,7 @@ class CASCache(ArtifactCache): |
740 | 674 |
if dirnode.name == name:
|
741 | 675 |
return dirnode.digest
|
742 | 676 |
|
743 |
- raise ArtifactError("Subdirectory {} not found".format(name))
|
|
677 |
+ raise CASError("Subdirectory {} not found".format(name))
|
|
744 | 678 |
|
745 | 679 |
def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
|
746 | 680 |
dir_a = remote_execution_pb2.Directory()
|
... | ... | @@ -812,29 +746,6 @@ class CASCache(ArtifactCache): |
812 | 746 |
for dirnode in directory.directories:
|
813 | 747 |
self._reachable_refs_dir(reachable, dirnode.digest)
|
814 | 748 |
|
815 |
- def _initialize_remote(self, remote_spec, q):
|
|
816 |
- try:
|
|
817 |
- remote = _CASRemote(remote_spec)
|
|
818 |
- remote.init()
|
|
819 |
- |
|
820 |
- request = buildstream_pb2.StatusRequest()
|
|
821 |
- response = remote.ref_storage.Status(request)
|
|
822 |
- |
|
823 |
- if remote_spec.push and not response.allow_updates:
|
|
824 |
- q.put('Artifact server does not allow push')
|
|
825 |
- else:
|
|
826 |
- # No error
|
|
827 |
- q.put(None)
|
|
828 |
- |
|
829 |
- except grpc.RpcError as e:
|
|
830 |
- # str(e) is too verbose for errors reported to the user
|
|
831 |
- q.put(e.details())
|
|
832 |
- |
|
833 |
- except Exception as e: # pylint: disable=broad-except
|
|
834 |
- # Whatever happens, we need to return it to the calling process
|
|
835 |
- #
|
|
836 |
- q.put(str(e))
|
|
837 |
- |
|
838 | 749 |
def _required_blobs(self, directory_digest):
|
839 | 750 |
# parse directory, and recursively add blobs
|
840 | 751 |
d = remote_execution_pb2.Digest()
|
... | ... | @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache): |
1080 | 991 |
|
1081 | 992 |
# Represents a single remote CAS cache.
|
1082 | 993 |
#
|
1083 |
-class _CASRemote():
|
|
994 |
+class CASRemote():
|
|
1084 | 995 |
def __init__(self, spec):
|
1085 | 996 |
self.spec = spec
|
1086 | 997 |
self._initialized = False
|
... | ... | @@ -1125,7 +1036,7 @@ class _CASRemote(): |
1125 | 1036 |
certificate_chain=client_cert_bytes)
|
1126 | 1037 |
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
|
1127 | 1038 |
else:
|
1128 |
- raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
|
|
1039 |
+ raise CASError("Unsupported URL: {}".format(self.spec.url))
|
|
1129 | 1040 |
|
1130 | 1041 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
1131 | 1042 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
... | ... | @@ -1203,10 +1114,10 @@ class _CASBatchRead(): |
1203 | 1114 |
|
1204 | 1115 |
for response in batch_response.responses:
|
1205 | 1116 |
if response.status.code != code_pb2.OK:
|
1206 |
- raise ArtifactError("Failed to download blob {}: {}".format(
|
|
1117 |
+ raise CASError("Failed to download blob {}: {}".format(
|
|
1207 | 1118 |
response.digest.hash, response.status.code))
|
1208 | 1119 |
if response.digest.size_bytes != len(response.data):
|
1209 |
- raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1120 |
+ raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
1210 | 1121 |
response.digest.hash, response.digest.size_bytes, len(response.data)))
|
1211 | 1122 |
|
1212 | 1123 |
yield (response.digest, response.data)
|
... | ... | @@ -1248,7 +1159,7 @@ class _CASBatchUpdate(): |
1248 | 1159 |
|
1249 | 1160 |
for response in batch_response.responses:
|
1250 | 1161 |
if response.status.code != code_pb2.OK:
|
1251 |
- raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
1162 |
+ raise CASError("Failed to upload blob {}: {}".format(
|
|
1252 | 1163 |
response.digest.hash, response.status.code))
|
1253 | 1164 |
|
1254 | 1165 |
|
... | ... | @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo |
32 | 32 |
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
33 | 33 |
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
|
34 | 34 |
|
35 |
-from .._exceptions import ArtifactError
|
|
36 |
-from .._context import Context
|
|
35 |
+from .._exceptions import CASError
|
|
36 |
+ |
|
37 |
+from .cascache import CASCache
|
|
37 | 38 |
|
38 | 39 |
|
39 | 40 |
# The default limit for gRPC messages is 4 MiB.
|
... | ... | @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): |
55 | 56 |
# enable_push (bool): Whether to allow blob uploads and artifact updates
|
56 | 57 |
#
|
57 | 58 |
def create_server(repo, *, enable_push):
|
58 |
- context = Context()
|
|
59 |
- context.artifactdir = os.path.abspath(repo)
|
|
60 |
- |
|
61 |
- artifactcache = context.artifactcache
|
|
59 |
+ cas = CASCache(os.path.abspath(repo))
|
|
62 | 60 |
|
63 | 61 |
# Use max_workers default from Python 3.5+
|
64 | 62 |
max_workers = (os.cpu_count() or 1) * 5
|
65 | 63 |
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
|
66 | 64 |
|
67 | 65 |
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
|
68 |
- _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
|
66 |
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
|
|
69 | 67 |
|
70 | 68 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
71 |
- _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
69 |
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
|
|
72 | 70 |
|
73 | 71 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
74 | 72 |
_CapabilitiesServicer(), server)
|
75 | 73 |
|
76 | 74 |
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
|
77 |
- _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
75 |
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
|
|
78 | 76 |
|
79 | 77 |
return server
|
80 | 78 |
|
... | ... | @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
333 | 331 |
|
334 | 332 |
response.digest.hash = tree.hash
|
335 | 333 |
response.digest.size_bytes = tree.size_bytes
|
336 |
- except ArtifactError:
|
|
334 |
+ except CASError:
|
|
337 | 335 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
338 | 336 |
|
339 | 337 |
return response
|
... | ... | @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): |
437 | 435 |
return 0
|
438 | 436 |
|
439 | 437 |
# obtain a list of LRP artifacts
|
440 |
- LRP_artifacts = cas.list_artifacts()
|
|
438 |
+ LRP_artifacts = cas.list_refs()
|
|
441 | 439 |
|
442 | 440 |
removed_size = 0 # in bytes
|
443 | 441 |
while object_size - removed_size > free_disk_space:
|
... | ... | @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError |
31 | 31 |
from ._message import Message, MessageType
|
32 | 32 |
from ._profile import Topics, profile_start, profile_end
|
33 | 33 |
from ._artifactcache import ArtifactCache
|
34 |
-from ._artifactcache.cascache import CASCache
|
|
35 | 34 |
from ._workspaces import Workspaces
|
36 | 35 |
from .plugin import _plugin_lookup
|
37 | 36 |
|
... | ... | @@ -233,7 +232,7 @@ class Context(): |
233 | 232 |
@property
|
234 | 233 |
def artifactcache(self):
|
235 | 234 |
if not self._artifactcache:
|
236 |
- self._artifactcache = CASCache(self)
|
|
235 |
+ self._artifactcache = ArtifactCache(self)
|
|
237 | 236 |
|
238 | 237 |
return self._artifactcache
|
239 | 238 |
|
... | ... | @@ -90,6 +90,7 @@ class ErrorDomain(Enum): |
90 | 90 |
APP = 12
|
91 | 91 |
STREAM = 13
|
92 | 92 |
VIRTUAL_FS = 14
|
93 |
+ CAS = 15
|
|
93 | 94 |
|
94 | 95 |
|
95 | 96 |
# BstError is an internal base exception class for BuildSream
|
... | ... | @@ -274,6 +275,15 @@ class ArtifactError(BstError): |
274 | 275 |
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
|
275 | 276 |
|
276 | 277 |
|
278 |
+# CASError
|
|
279 |
+#
|
|
280 |
+# Raised when errors are encountered in the CAS
|
|
281 |
+#
|
|
282 |
+class CASError(BstError):
|
|
283 |
+ def __init__(self, message, *, detail=None, reason=None, temporary=False):
|
|
284 |
+ super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
|
|
285 |
+ |
|
286 |
+ |
|
277 | 287 |
# PipelineError
|
278 | 288 |
#
|
279 | 289 |
# Raised from pipeline operations
|
... | ... | @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory): |
79 | 79 |
self.filename = filename
|
80 | 80 |
self.common_name = common_name
|
81 | 81 |
self.pb2_directory = remote_execution_pb2.Directory()
|
82 |
- self.cas_cache = context.artifactcache
|
|
82 |
+ self.cas_cache = context.artifactcache.cas
|
|
83 | 83 |
if ref:
|
84 | 84 |
with open(self.cas_cache.objpath(ref), 'rb') as f:
|
85 | 85 |
self.pb2_directory.ParseFromString(f.read())
|
... | ... | @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles): |
190 | 190 |
# Load the project and CAS cache
|
191 | 191 |
project = Project(project_dir, context)
|
192 | 192 |
project.ensure_fully_loaded()
|
193 |
- cas = context.artifactcache
|
|
193 |
+ artifactcache = context.artifactcache
|
|
194 |
+ cas = artifactcache.cas
|
|
194 | 195 |
|
195 | 196 |
# Assert that the element's artifact is cached
|
196 | 197 |
element = project.load_elements(['target.bst'])[0]
|
197 | 198 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
198 |
- assert cas.contains(element, element_key)
|
|
199 |
+ assert artifactcache.contains(element, element_key)
|
|
199 | 200 |
|
200 | 201 |
# Retrieve the Directory object from the cached artifact
|
201 |
- artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
202 |
+ artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
|
|
202 | 203 |
artifact_digest = cas.resolve_ref(artifact_ref)
|
203 | 204 |
|
204 | 205 |
queue = multiprocessing.Queue()
|
... | ... | @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
268 | 269 |
project.ensure_fully_loaded()
|
269 | 270 |
|
270 | 271 |
# Create a local CAS cache handle
|
271 |
- cas = context.artifactcache
|
|
272 |
+ artifactcache = context.artifactcache
|
|
273 |
+ cas = artifactcache.cas
|
|
272 | 274 |
|
273 | 275 |
# Manually setup the CAS remote
|
274 |
- cas.setup_remotes(use_config=True)
|
|
276 |
+ artifactcache.setup_remotes(use_config=True)
|
|
275 | 277 |
|
276 |
- if cas.has_push_remotes():
|
|
278 |
+ if artifactcache.has_push_remotes():
|
|
277 | 279 |
directory = remote_execution_pb2.Directory()
|
278 | 280 |
|
279 | 281 |
with open(cas.objpath(artifact_digest), 'rb') as f:
|
... | ... | @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest |
284 | 286 |
tree_maker(cas, tree, directory)
|
285 | 287 |
|
286 | 288 |
# Push the Tree as a regular message
|
287 |
- tree_digest = cas.push_message(project, tree)
|
|
289 |
+ tree_digest = artifactcache.push_message(project, tree)
|
|
288 | 290 |
|
289 | 291 |
queue.put((tree_digest.hash, tree_digest.size_bytes))
|
290 | 292 |
else:
|
... | ... | @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles): |
165 | 165 |
# Load the project and CAS cache
|
166 | 166 |
project = Project(project_dir, context)
|
167 | 167 |
project.ensure_fully_loaded()
|
168 |
- cas = context.artifactcache
|
|
168 |
+ artifactcache = context.artifactcache
|
|
169 |
+ cas = artifactcache.cas
|
|
169 | 170 |
|
170 | 171 |
# Assert that the element's artifact is cached
|
171 | 172 |
element = project.load_elements(['target.bst'])[0]
|
172 | 173 |
element_key = cli.get_element_key(project_dir, 'target.bst')
|
173 |
- assert cas.contains(element, element_key)
|
|
174 |
+ assert artifactcache.contains(element, element_key)
|
|
174 | 175 |
|
175 | 176 |
# Manually setup the CAS remote
|
176 |
- cas.setup_remotes(use_config=True)
|
|
177 |
- cas.initialize_remotes()
|
|
178 |
- assert cas.has_push_remotes(element=element)
|
|
177 |
+ artifactcache.setup_remotes(use_config=True)
|
|
178 |
+ artifactcache.initialize_remotes()
|
|
179 |
+ assert artifactcache.has_push_remotes(element=element)
|
|
179 | 180 |
|
180 | 181 |
# Recreate the CasBasedDirectory object from the cached artifact
|
181 |
- artifact_ref = cas.get_artifact_fullname(element, element_key)
|
|
182 |
+ artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
|
|
182 | 183 |
artifact_digest = cas.resolve_ref(artifact_ref)
|
183 | 184 |
|
184 | 185 |
queue = multiprocessing.Queue()
|
... | ... | @@ -13,7 +13,7 @@ import pytest_cov |
13 | 13 |
from buildstream import _yaml
|
14 | 14 |
from buildstream._artifactcache.casserver import create_server
|
15 | 15 |
from buildstream._context import Context
|
16 |
-from buildstream._exceptions import ArtifactError
|
|
16 |
+from buildstream._exceptions import CASError
|
|
17 | 17 |
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
18 | 18 |
|
19 | 19 |
|
... | ... | @@ -48,7 +48,7 @@ class ArtifactShare(): |
48 | 48 |
context = Context()
|
49 | 49 |
context.artifactdir = self.repodir
|
50 | 50 |
|
51 |
- self.cas = context.artifactcache
|
|
51 |
+ self.cas = context.artifactcache.cas
|
|
52 | 52 |
|
53 | 53 |
self.total_space = total_space
|
54 | 54 |
self.free_space = free_space
|
... | ... | @@ -135,7 +135,7 @@ class ArtifactShare(): |
135 | 135 |
try:
|
136 | 136 |
tree = self.cas.resolve_ref(artifact_key)
|
137 | 137 |
return True
|
138 |
- except ArtifactError:
|
|
138 |
+ except CASError:
|
|
139 | 139 |
return False
|
140 | 140 |
|
141 | 141 |
# close():
|