Jürg Billeter pushed to branch juerg/cas at BuildStream / buildstream
Commits:
- 
4b585c5c
by Jürg Billeter at 2018-11-05T16:35:09Z
9 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_exceptions.py
- buildstream/storage/_casbaseddirectory.py
- tests/artifactcache/pull.py
- tests/artifactcache/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -17,17 +17,22 @@ | 
| 17 | 17 |  #  Authors:
 | 
| 18 | 18 |  #        Tristan Maat <tristan maat codethink co uk>
 | 
| 19 | 19 |  | 
| 20 | +import multiprocessing
 | |
| 20 | 21 |  import os
 | 
| 22 | +import signal
 | |
| 21 | 23 |  import string
 | 
| 22 | 24 |  from collections import namedtuple
 | 
| 23 | 25 |  from collections.abc import Mapping
 | 
| 24 | 26 |  | 
| 25 | 27 |  from ..types import _KeyStrength
 | 
| 26 | -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
 | |
| 28 | +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
 | |
| 27 | 29 |  from .._message import Message, MessageType
 | 
| 30 | +from .. import _signals
 | |
| 28 | 31 |  from .. import utils
 | 
| 29 | 32 |  from .. import _yaml
 | 
| 30 | 33 |  | 
| 34 | +from .cascache import CASCache, CASRemote
 | |
| 35 | + | |
| 31 | 36 |  | 
| 32 | 37 |  CACHE_SIZE_FILE = "cache_size"
 | 
| 33 | 38 |  | 
| ... | ... | @@ -93,7 +98,8 @@ class ArtifactCache(): | 
| 93 | 98 |      def __init__(self, context):
 | 
| 94 | 99 |          self.context = context
 | 
| 95 | 100 |          self.extractdir = os.path.join(context.artifactdir, 'extract')
 | 
| 96 | -        self.tmpdir = os.path.join(context.artifactdir, 'tmp')
 | |
| 101 | + | |
| 102 | +        self.cas = CASCache(context.artifactdir)
 | |
| 97 | 103 |  | 
| 98 | 104 |          self.global_remote_specs = []
 | 
| 99 | 105 |          self.project_remote_specs = {}
 | 
| ... | ... | @@ -104,12 +110,15 @@ class ArtifactCache(): | 
| 104 | 110 |          self._cache_lower_threshold = None    # The target cache size for a cleanup
 | 
| 105 | 111 |          self._remotes_setup = False           # Check to prevent double-setup of remotes
 | 
| 106 | 112 |  | 
| 113 | +        # Per-project list of _CASRemote instances.
 | |
| 114 | +        self._remotes = {}
 | |
| 115 | + | |
| 116 | +        self._has_fetch_remotes = False
 | |
| 117 | +        self._has_push_remotes = False
 | |
| 118 | + | |
| 107 | 119 |          os.makedirs(self.extractdir, exist_ok=True)
 | 
| 108 | -        os.makedirs(self.tmpdir, exist_ok=True)
 | |
| 109 | 120 |  | 
| 110 | -    ################################################
 | |
| 111 | -    #  Methods implemented on the abstract class   #
 | |
| 112 | -    ################################################
 | |
| 121 | +        self._calculate_cache_quota()
 | |
| 113 | 122 |  | 
| 114 | 123 |      # get_artifact_fullname()
 | 
| 115 | 124 |      #
 | 
| ... | ... | @@ -240,8 +249,10 @@ class ArtifactCache(): | 
| 240 | 249 |              for key in (strong_key, weak_key):
 | 
| 241 | 250 |                  if key:
 | 
| 242 | 251 |                      try:
 | 
| 243 | -                        self.update_mtime(element, key)
 | |
| 244 | -                    except ArtifactError:
 | |
| 252 | +                        ref = self.get_artifact_fullname(element, key)
 | |
| 253 | + | |
| 254 | +                        self.cas.update_mtime(ref)
 | |
| 255 | +                    except CASError:
 | |
| 245 | 256 |                          pass
 | 
| 246 | 257 |  | 
| 247 | 258 |      # clean():
 | 
| ... | ... | @@ -252,7 +263,7 @@ class ArtifactCache(): | 
| 252 | 263 |      #    (int): The size of the cache after having cleaned up
 | 
| 253 | 264 |      #
 | 
| 254 | 265 |      def clean(self):
 | 
| 255 | -        artifacts = self.list_artifacts()  # pylint: disable=assignment-from-no-return
 | |
| 266 | +        artifacts = self.list_artifacts()
 | |
| 256 | 267 |  | 
| 257 | 268 |          # Build a set of the cache keys which are required
 | 
| 258 | 269 |          # based on the required elements at cleanup time
 | 
| ... | ... | @@ -294,7 +305,7 @@ class ArtifactCache(): | 
| 294 | 305 |              if key not in required_artifacts:
 | 
| 295 | 306 |  | 
| 296 | 307 |                  # Remove the actual artifact, if it's not required.
 | 
| 297 | -                size = self.remove(to_remove)  # pylint: disable=assignment-from-no-return
 | |
| 308 | +                size = self.remove(to_remove)
 | |
| 298 | 309 |  | 
| 299 | 310 |                  # Remove the size from the removed size
 | 
| 300 | 311 |                  self.set_cache_size(self._cache_size - size)
 | 
| ... | ... | @@ -311,7 +322,7 @@ class ArtifactCache(): | 
| 311 | 322 |      #    (int): The size of the artifact cache.
 | 
| 312 | 323 |      #
 | 
| 313 | 324 |      def compute_cache_size(self):
 | 
| 314 | -        self._cache_size = self.calculate_cache_size()  # pylint: disable=assignment-from-no-return
 | |
| 325 | +        self._cache_size = self.cas.calculate_cache_size()
 | |
| 315 | 326 |  | 
| 316 | 327 |          return self._cache_size
 | 
| 317 | 328 |  | 
| ... | ... | @@ -380,28 +391,12 @@ class ArtifactCache(): | 
| 380 | 391 |      def has_quota_exceeded(self):
 | 
| 381 | 392 |          return self.get_cache_size() > self._cache_quota
 | 
| 382 | 393 |  | 
| 383 | -    ################################################
 | |
| 384 | -    # Abstract methods for subclasses to implement #
 | |
| 385 | -    ################################################
 | |
| 386 | - | |
| 387 | 394 |      # preflight():
 | 
| 388 | 395 |      #
 | 
| 389 | 396 |      # Preflight check.
 | 
| 390 | 397 |      #
 | 
| 391 | 398 |      def preflight(self):
 | 
| 392 | -        pass
 | |
| 393 | - | |
| 394 | -    # update_mtime()
 | |
| 395 | -    #
 | |
| 396 | -    # Update the mtime of an artifact.
 | |
| 397 | -    #
 | |
| 398 | -    # Args:
 | |
| 399 | -    #     element (Element): The Element to update
 | |
| 400 | -    #     key (str): The key of the artifact.
 | |
| 401 | -    #
 | |
| 402 | -    def update_mtime(self, element, key):
 | |
| 403 | -        raise ImplError("Cache '{kind}' does not implement update_mtime()"
 | |
| 404 | -                        .format(kind=type(self).__name__))
 | |
| 399 | +        self.cas.preflight()
 | |
| 405 | 400 |  | 
| 406 | 401 |      # initialize_remotes():
 | 
| 407 | 402 |      #
 | 
| ... | ... | @@ -411,7 +406,59 @@ class ArtifactCache(): | 
| 411 | 406 |      #     on_failure (callable): Called if we fail to contact one of the caches.
 | 
| 412 | 407 |      #
 | 
| 413 | 408 |      def initialize_remotes(self, *, on_failure=None):
 | 
| 414 | -        pass
 | |
| 409 | +        remote_specs = self.global_remote_specs
 | |
| 410 | + | |
| 411 | +        for project in self.project_remote_specs:
 | |
| 412 | +            remote_specs += self.project_remote_specs[project]
 | |
| 413 | + | |
| 414 | +        remote_specs = list(utils._deduplicate(remote_specs))
 | |
| 415 | + | |
| 416 | +        remotes = {}
 | |
| 417 | +        q = multiprocessing.Queue()
 | |
| 418 | +        for remote_spec in remote_specs:
 | |
| 419 | +            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
 | |
| 420 | +            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
 | |
| 421 | +            p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
 | |
| 422 | + | |
| 423 | +            try:
 | |
| 424 | +                # Keep SIGINT blocked in the child process
 | |
| 425 | +                with _signals.blocked([signal.SIGINT], ignore=False):
 | |
| 426 | +                    p.start()
 | |
| 427 | + | |
| 428 | +                error = q.get()
 | |
| 429 | +                p.join()
 | |
| 430 | +            except KeyboardInterrupt:
 | |
| 431 | +                utils._kill_process_tree(p.pid)
 | |
| 432 | +                raise
 | |
| 433 | + | |
| 434 | +            if error and on_failure:
 | |
| 435 | +                on_failure(remote_spec.url, error)
 | |
| 436 | +            elif error:
 | |
| 437 | +                raise ArtifactError(error)
 | |
| 438 | +            else:
 | |
| 439 | +                self._has_fetch_remotes = True
 | |
| 440 | +                if remote_spec.push:
 | |
| 441 | +                    self._has_push_remotes = True
 | |
| 442 | + | |
| 443 | +                remotes[remote_spec.url] = CASRemote(remote_spec)
 | |
| 444 | + | |
| 445 | +        for project in self.context.get_projects():
 | |
| 446 | +            remote_specs = self.global_remote_specs
 | |
| 447 | +            if project in self.project_remote_specs:
 | |
| 448 | +                remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
 | |
| 449 | + | |
| 450 | +            project_remotes = []
 | |
| 451 | + | |
| 452 | +            for remote_spec in remote_specs:
 | |
| 453 | +                # Errors are already handled in the loop above,
 | |
| 454 | +                # skip unreachable remotes here.
 | |
| 455 | +                if remote_spec.url not in remotes:
 | |
| 456 | +                    continue
 | |
| 457 | + | |
| 458 | +                remote = remotes[remote_spec.url]
 | |
| 459 | +                project_remotes.append(remote)
 | |
| 460 | + | |
| 461 | +            self._remotes[project] = project_remotes
 | |
| 415 | 462 |  | 
| 416 | 463 |      # contains():
 | 
| 417 | 464 |      #
 | 
| ... | ... | @@ -425,8 +472,9 @@ class ArtifactCache(): | 
| 425 | 472 |      # Returns: True if the artifact is in the cache, False otherwise
 | 
| 426 | 473 |      #
 | 
| 427 | 474 |      def contains(self, element, key):
 | 
| 428 | -        raise ImplError("Cache '{kind}' does not implement contains()"
 | |
| 429 | -                        .format(kind=type(self).__name__))
 | |
| 475 | +        ref = self.get_artifact_fullname(element, key)
 | |
| 476 | + | |
| 477 | +        return self.cas.contains(ref)
 | |
| 430 | 478 |  | 
| 431 | 479 |      # list_artifacts():
 | 
| 432 | 480 |      #
 | 
| ... | ... | @@ -437,8 +485,7 @@ class ArtifactCache(): | 
| 437 | 485 |      #               `ArtifactCache.get_artifact_fullname` in LRU order
 | 
| 438 | 486 |      #
 | 
| 439 | 487 |      def list_artifacts(self):
 | 
| 440 | -        raise ImplError("Cache '{kind}' does not implement list_artifacts()"
 | |
| 441 | -                        .format(kind=type(self).__name__))
 | |
| 488 | +        return self.cas.list_refs()
 | |
| 442 | 489 |  | 
| 443 | 490 |      # remove():
 | 
| 444 | 491 |      #
 | 
| ... | ... | @@ -450,9 +497,31 @@ class ArtifactCache(): | 
| 450 | 497 |      #                          generated by
 | 
| 451 | 498 |      #                          `ArtifactCache.get_artifact_fullname`)
 | 
| 452 | 499 |      #
 | 
| 453 | -    def remove(self, artifact_name):
 | |
| 454 | -        raise ImplError("Cache '{kind}' does not implement remove()"
 | |
| 455 | -                        .format(kind=type(self).__name__))
 | |
| 500 | +    # Returns:
 | |
| 501 | +    #    (int|None) The amount of space pruned from the repository in
 | |
| 502 | +    #               Bytes, or None if defer_prune is True
 | |
| 503 | +    #
 | |
| 504 | +    def remove(self, ref):
 | |
| 505 | + | |
| 506 | +        # Remove extract if not used by other ref
 | |
| 507 | +        tree = self.cas.resolve_ref(ref)
 | |
| 508 | +        ref_name, ref_hash = os.path.split(ref)
 | |
| 509 | +        extract = os.path.join(self.extractdir, ref_name, tree.hash)
 | |
| 510 | +        keys_file = os.path.join(extract, 'meta', 'keys.yaml')
 | |
| 511 | +        if os.path.exists(keys_file):
 | |
| 512 | +            keys_meta = _yaml.load(keys_file)
 | |
| 513 | +            keys = [keys_meta['strong'], keys_meta['weak']]
 | |
| 514 | +            remove_extract = True
 | |
| 515 | +            for other_hash in keys:
 | |
| 516 | +                if other_hash == ref_hash:
 | |
| 517 | +                    continue
 | |
| 518 | +                remove_extract = False
 | |
| 519 | +                break
 | |
| 520 | + | |
| 521 | +            if remove_extract:
 | |
| 522 | +                utils._force_rmtree(extract)
 | |
| 523 | + | |
| 524 | +        return self.cas.remove(ref)
 | |
| 456 | 525 |  | 
| 457 | 526 |      # extract():
 | 
| 458 | 527 |      #
 | 
| ... | ... | @@ -472,8 +541,11 @@ class ArtifactCache(): | 
| 472 | 541 |      # Returns: path to extracted artifact
 | 
| 473 | 542 |      #
 | 
| 474 | 543 |      def extract(self, element, key):
 | 
| 475 | -        raise ImplError("Cache '{kind}' does not implement extract()"
 | |
| 476 | -                        .format(kind=type(self).__name__))
 | |
| 544 | +        ref = self.get_artifact_fullname(element, key)
 | |
| 545 | + | |
| 546 | +        path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
 | |
| 547 | + | |
| 548 | +        return self.cas.extract(ref, path)
 | |
| 477 | 549 |  | 
| 478 | 550 |      # commit():
 | 
| 479 | 551 |      #
 | 
| ... | ... | @@ -485,8 +557,9 @@ class ArtifactCache(): | 
| 485 | 557 |      #     keys (list): The cache keys to use
 | 
| 486 | 558 |      #
 | 
| 487 | 559 |      def commit(self, element, content, keys):
 | 
| 488 | -        raise ImplError("Cache '{kind}' does not implement commit()"
 | |
| 489 | -                        .format(kind=type(self).__name__))
 | |
| 560 | +        refs = [self.get_artifact_fullname(element, key) for key in keys]
 | |
| 561 | + | |
| 562 | +        self.cas.commit(refs, content)
 | |
| 490 | 563 |  | 
| 491 | 564 |      # diff():
 | 
| 492 | 565 |      #
 | 
| ... | ... | @@ -500,8 +573,10 @@ class ArtifactCache(): | 
| 500 | 573 |      #     subdir (str): A subdirectory to limit the comparison to
 | 
| 501 | 574 |      #
 | 
| 502 | 575 |      def diff(self, element, key_a, key_b, *, subdir=None):
 | 
| 503 | -        raise ImplError("Cache '{kind}' does not implement diff()"
 | |
| 504 | -                        .format(kind=type(self).__name__))
 | |
| 576 | +        ref_a = self.get_artifact_fullname(element, key_a)
 | |
| 577 | +        ref_b = self.get_artifact_fullname(element, key_b)
 | |
| 578 | + | |
| 579 | +        return self.cas.diff(ref_a, ref_b, subdir=subdir)
 | |
| 505 | 580 |  | 
| 506 | 581 |      # has_fetch_remotes():
 | 
| 507 | 582 |      #
 | 
| ... | ... | @@ -513,7 +588,16 @@ class ArtifactCache(): | 
| 513 | 588 |      # Returns: True if any remote repositories are configured, False otherwise
 | 
| 514 | 589 |      #
 | 
| 515 | 590 |      def has_fetch_remotes(self, *, element=None):
 | 
| 516 | -        return False
 | |
| 591 | +        if not self._has_fetch_remotes:
 | |
| 592 | +            # No project has fetch remotes
 | |
| 593 | +            return False
 | |
| 594 | +        elif element is None:
 | |
| 595 | +            # At least one (sub)project has fetch remotes
 | |
| 596 | +            return True
 | |
| 597 | +        else:
 | |
| 598 | +            # Check whether the specified element's project has fetch remotes
 | |
| 599 | +            remotes_for_project = self._remotes[element._get_project()]
 | |
| 600 | +            return bool(remotes_for_project)
 | |
| 517 | 601 |  | 
| 518 | 602 |      # has_push_remotes():
 | 
| 519 | 603 |      #
 | 
| ... | ... | @@ -525,7 +609,16 @@ class ArtifactCache(): | 
| 525 | 609 |      # Returns: True if any remote repository is configured, False otherwise
 | 
| 526 | 610 |      #
 | 
| 527 | 611 |      def has_push_remotes(self, *, element=None):
 | 
| 528 | -        return False
 | |
| 612 | +        if not self._has_push_remotes:
 | |
| 613 | +            # No project has push remotes
 | |
| 614 | +            return False
 | |
| 615 | +        elif element is None:
 | |
| 616 | +            # At least one (sub)project has push remotes
 | |
| 617 | +            return True
 | |
| 618 | +        else:
 | |
| 619 | +            # Check whether the specified element's project has push remotes
 | |
| 620 | +            remotes_for_project = self._remotes[element._get_project()]
 | |
| 621 | +            return any(remote.spec.push for remote in remotes_for_project)
 | |
| 529 | 622 |  | 
| 530 | 623 |      # push():
 | 
| 531 | 624 |      #
 | 
| ... | ... | @@ -542,8 +635,28 @@ class ArtifactCache(): | 
| 542 | 635 |      #   (ArtifactError): if there was an error
 | 
| 543 | 636 |      #
 | 
| 544 | 637 |      def push(self, element, keys):
 | 
| 545 | -        raise ImplError("Cache '{kind}' does not implement push()"
 | |
| 546 | -                        .format(kind=type(self).__name__))
 | |
| 638 | +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
 | |
| 639 | + | |
| 640 | +        project = element._get_project()
 | |
| 641 | + | |
| 642 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 643 | + | |
| 644 | +        pushed = False
 | |
| 645 | + | |
| 646 | +        for remote in push_remotes:
 | |
| 647 | +            remote.init()
 | |
| 648 | +            display_key = element._get_brief_display_key()
 | |
| 649 | +            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
 | |
| 650 | + | |
| 651 | +            if self.cas.push(refs, remote):
 | |
| 652 | +                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
 | |
| 653 | +                pushed = True
 | |
| 654 | +            else:
 | |
| 655 | +                element.info("Remote ({}) already has {} cached".format(
 | |
| 656 | +                    remote.spec.url, element._get_brief_display_key()
 | |
| 657 | +                ))
 | |
| 658 | + | |
| 659 | +        return pushed
 | |
| 547 | 660 |  | 
| 548 | 661 |      # pull():
 | 
| 549 | 662 |      #
 | 
| ... | ... | @@ -558,8 +671,130 @@ class ArtifactCache(): | 
| 558 | 671 |      #   (bool): True if pull was successful, False if artifact was not available
 | 
| 559 | 672 |      #
 | 
| 560 | 673 |      def pull(self, element, key, *, progress=None):
 | 
| 561 | -        raise ImplError("Cache '{kind}' does not implement pull()"
 | |
| 562 | -                        .format(kind=type(self).__name__))
 | |
| 674 | +        ref = self.get_artifact_fullname(element, key)
 | |
| 675 | + | |
| 676 | +        project = element._get_project()
 | |
| 677 | + | |
| 678 | +        for remote in self._remotes[project]:
 | |
| 679 | +            try:
 | |
| 680 | +                display_key = element._get_brief_display_key()
 | |
| 681 | +                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
 | |
| 682 | + | |
| 683 | +                if self.cas.pull(ref, remote, progress=progress):
 | |
| 684 | +                    element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
 | |
| 685 | +                    # no need to pull from additional remotes
 | |
| 686 | +                    return True
 | |
| 687 | +                else:
 | |
| 688 | +                    element.info("Remote ({}) does not have {} cached".format(
 | |
| 689 | +                        remote.spec.url, element._get_brief_display_key()
 | |
| 690 | +                    ))
 | |
| 691 | + | |
| 692 | +            except CASError as e:
 | |
| 693 | +                raise ArtifactError("Failed to pull artifact {}: {}".format(
 | |
| 694 | +                    element._get_brief_display_key(), e)) from e
 | |
| 695 | + | |
| 696 | +        return False
 | |
| 697 | + | |
| 698 | +    # pull_tree():
 | |
| 699 | +    #
 | |
| 700 | +    # Pull a single Tree rather than an artifact.
 | |
| 701 | +    # Does not update local refs.
 | |
| 702 | +    #
 | |
| 703 | +    # Args:
 | |
| 704 | +    #     project (Project): The current project
 | |
| 705 | +    #     digest (Digest): The digest of the tree
 | |
| 706 | +    #
 | |
| 707 | +    def pull_tree(self, project, digest):
 | |
| 708 | +        for remote in self._remotes[project]:
 | |
| 709 | +            digest = self.cas.pull_tree(remote, digest)
 | |
| 710 | + | |
| 711 | +            if digest:
 | |
| 712 | +                # no need to pull from additional remotes
 | |
| 713 | +                return digest
 | |
| 714 | + | |
| 715 | +        return None
 | |
| 716 | + | |
| 717 | +    # push_directory():
 | |
| 718 | +    #
 | |
| 719 | +    # Push the given virtual directory to all remotes.
 | |
| 720 | +    #
 | |
| 721 | +    # Args:
 | |
| 722 | +    #     project (Project): The current project
 | |
| 723 | +    #     directory (Directory): A virtual directory object to push.
 | |
| 724 | +    #
 | |
| 725 | +    # Raises:
 | |
| 726 | +    #     (ArtifactError): if there was an error
 | |
| 727 | +    #
 | |
| 728 | +    def push_directory(self, project, directory):
 | |
| 729 | +        if self._has_push_remotes:
 | |
| 730 | +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 731 | +        else:
 | |
| 732 | +            push_remotes = []
 | |
| 733 | + | |
| 734 | +        if not push_remotes:
 | |
| 735 | +            raise ArtifactError("push_directory was called, but no remote artifact " +
 | |
| 736 | +                                "servers are configured as push remotes.")
 | |
| 737 | + | |
| 738 | +        if directory.ref is None:
 | |
| 739 | +            return
 | |
| 740 | + | |
| 741 | +        for remote in push_remotes:
 | |
| 742 | +            self.cas.push_directory(remote, directory)
 | |
| 743 | + | |
| 744 | +    # push_message():
 | |
| 745 | +    #
 | |
| 746 | +    # Push the given protobuf message to all remotes.
 | |
| 747 | +    #
 | |
| 748 | +    # Args:
 | |
| 749 | +    #     project (Project): The current project
 | |
| 750 | +    #     message (Message): A protobuf message to push.
 | |
| 751 | +    #
 | |
| 752 | +    # Raises:
 | |
| 753 | +    #     (ArtifactError): if there was an error
 | |
| 754 | +    #
 | |
| 755 | +    def push_message(self, project, message):
 | |
| 756 | + | |
| 757 | +        if self._has_push_remotes:
 | |
| 758 | +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 759 | +        else:
 | |
| 760 | +            push_remotes = []
 | |
| 761 | + | |
| 762 | +        if not push_remotes:
 | |
| 763 | +            raise ArtifactError("push_message was called, but no remote artifact " +
 | |
| 764 | +                                "servers are configured as push remotes.")
 | |
| 765 | + | |
| 766 | +        for remote in push_remotes:
 | |
| 767 | +            message_digest = self.cas.push_message(remote, message)
 | |
| 768 | + | |
| 769 | +        return message_digest
 | |
| 770 | + | |
| 771 | +    # verify_digest_pushed():
 | |
| 772 | +    #
 | |
| 773 | +    # Check whether the object is already on the server in which case
 | |
| 774 | +    # there is no need to upload it.
 | |
| 775 | +    #
 | |
| 776 | +    # Args:
 | |
| 777 | +    #     project (Project): The current project
 | |
| 778 | +    #     digest (Digest): The object digest.
 | |
| 779 | +    #
 | |
| 780 | +    def verify_digest_pushed(self, project, digest):
 | |
| 781 | + | |
| 782 | +        if self._has_push_remotes:
 | |
| 783 | +            push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 784 | +        else:
 | |
| 785 | +            push_remotes = []
 | |
| 786 | + | |
| 787 | +        if not push_remotes:
 | |
| 788 | +            raise ArtifactError("verify_digest_pushed was called, but no remote artifact " +
 | |
| 789 | +                                "servers are configured as push remotes.")
 | |
| 790 | + | |
| 791 | +        pushed = False
 | |
| 792 | + | |
| 793 | +        for remote in push_remotes:
 | |
| 794 | +            if self.cas.verify_digest_on_remote(remote, digest):
 | |
| 795 | +                pushed = True
 | |
| 796 | + | |
| 797 | +        return pushed
 | |
| 563 | 798 |  | 
| 564 | 799 |      # link_key():
 | 
| 565 | 800 |      #
 | 
| ... | ... | @@ -571,19 +806,10 @@ class ArtifactCache(): | 
| 571 | 806 |      #     newkey (str): A new cache key for the artifact
 | 
| 572 | 807 |      #
 | 
| 573 | 808 |      def link_key(self, element, oldkey, newkey):
 | 
| 574 | -        raise ImplError("Cache '{kind}' does not implement link_key()"
 | |
| 575 | -                        .format(kind=type(self).__name__))
 | |
| 809 | +        oldref = self.get_artifact_fullname(element, oldkey)
 | |
| 810 | +        newref = self.get_artifact_fullname(element, newkey)
 | |
| 576 | 811 |  | 
| 577 | -    # calculate_cache_size()
 | |
| 578 | -    #
 | |
| 579 | -    # Return the real artifact cache size.
 | |
| 580 | -    #
 | |
| 581 | -    # Returns:
 | |
| 582 | -    #    (int): The size of the artifact cache.
 | |
| 583 | -    #
 | |
| 584 | -    def calculate_cache_size(self):
 | |
| 585 | -        raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
 | |
| 586 | -                        .format(kind=type(self).__name__))
 | |
| 812 | +        self.cas.link_ref(oldref, newref)
 | |
| 587 | 813 |  | 
| 588 | 814 |      ################################################
 | 
| 589 | 815 |      #               Local Private Methods          #
 | 
| ... | ... | @@ -20,9 +20,7 @@ | 
| 20 | 20 |  import hashlib
 | 
| 21 | 21 |  import itertools
 | 
| 22 | 22 |  import io
 | 
| 23 | -import multiprocessing
 | |
| 24 | 23 |  import os
 | 
| 25 | -import signal
 | |
| 26 | 24 |  import stat
 | 
| 27 | 25 |  import tempfile
 | 
| 28 | 26 |  import uuid
 | 
| ... | ... | @@ -31,17 +29,13 @@ from urllib.parse import urlparse | 
| 31 | 29 |  | 
| 32 | 30 |  import grpc
 | 
| 33 | 31 |  | 
| 34 | -from .. import _yaml
 | |
| 35 | - | |
| 36 | 32 |  from .._protos.google.rpc import code_pb2
 | 
| 37 | 33 |  from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | 
| 38 | 34 |  from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 39 | 35 |  from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
 | 
| 40 | 36 |  | 
| 41 | -from .. import _signals, utils
 | |
| 42 | -from .._exceptions import ArtifactError
 | |
| 43 | - | |
| 44 | -from . import ArtifactCache
 | |
| 37 | +from .. import utils
 | |
| 38 | +from .._exceptions import CASError
 | |
| 45 | 39 |  | 
| 46 | 40 |  | 
| 47 | 41 |  # The default limit for gRPC messages is 4 MiB.
 | 
| ... | ... | @@ -49,62 +43,68 @@ from . import ArtifactCache | 
| 49 | 43 |  _MAX_PAYLOAD_BYTES = 1024 * 1024
 | 
| 50 | 44 |  | 
| 51 | 45 |  | 
| 52 | -# A CASCache manages artifacts in a CAS repository as specified in the
 | |
| 53 | -# Remote Execution API.
 | |
| 46 | +# A CASCache manages a CAS repository as specified in the Remote Execution API.
 | |
| 54 | 47 |  #
 | 
| 55 | 48 |  # Args:
 | 
| 56 | -#     context (Context): The BuildStream context
 | |
| 57 | -#
 | |
| 58 | -# Pushing is explicitly disabled by the platform in some cases,
 | |
| 59 | -# like when we are falling back to functioning without using
 | |
| 60 | -# user namespaces.
 | |
| 49 | +#     path (str): The root directory for the CAS repository
 | |
| 61 | 50 |  #
 | 
| 62 | -class CASCache(ArtifactCache):
 | |
| 51 | +class CASCache():
 | |
| 63 | 52 |  | 
| 64 | -    def __init__(self, context):
 | |
| 65 | -        super().__init__(context)
 | |
| 66 | - | |
| 67 | -        self.casdir = os.path.join(context.artifactdir, 'cas')
 | |
| 53 | +    def __init__(self, path):
 | |
| 54 | +        self.casdir = os.path.join(path, 'cas')
 | |
| 55 | +        self.tmpdir = os.path.join(path, 'tmp')
 | |
| 68 | 56 |          os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
 | 
| 69 | 57 |          os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
 | 
| 58 | +        os.makedirs(self.tmpdir, exist_ok=True)
 | |
| 70 | 59 |  | 
| 71 | -        self._calculate_cache_quota()
 | |
| 72 | - | |
| 73 | -        # Per-project list of _CASRemote instances.
 | |
| 74 | -        self._remotes = {}
 | |
| 75 | - | |
| 76 | -        self._has_fetch_remotes = False
 | |
| 77 | -        self._has_push_remotes = False
 | |
| 78 | - | |
| 79 | -    ################################################
 | |
| 80 | -    #     Implementation of abstract methods       #
 | |
| 81 | -    ################################################
 | |
| 82 | - | |
| 60 | +    # preflight():
 | |
| 61 | +    #
 | |
| 62 | +    # Preflight check.
 | |
| 63 | +    #
 | |
| 83 | 64 |      def preflight(self):
 | 
| 84 | 65 |          headdir = os.path.join(self.casdir, 'refs', 'heads')
 | 
| 85 | 66 |          objdir = os.path.join(self.casdir, 'objects')
 | 
| 86 | 67 |          if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
 | 
| 87 | -            raise ArtifactError("CAS repository check failed for '{}'"
 | |
| 88 | -                                .format(self.casdir))
 | |
| 68 | +            raise CASError("CAS repository check failed for '{}'".format(self.casdir))
 | |
| 89 | 69 |  | 
| 90 | -    def contains(self, element, key):
 | |
| 91 | -        refpath = self._refpath(self.get_artifact_fullname(element, key))
 | |
| 70 | +    # contains():
 | |
| 71 | +    #
 | |
| 72 | +    # Check whether the specified ref is already available in the local CAS cache.
 | |
| 73 | +    #
 | |
| 74 | +    # Args:
 | |
| 75 | +    #     ref (str): The ref to check
 | |
| 76 | +    #
 | |
| 77 | +    # Returns: True if the ref is in the cache, False otherwise
 | |
| 78 | +    #
 | |
| 79 | +    def contains(self, ref):
 | |
| 80 | +        refpath = self._refpath(ref)
 | |
| 92 | 81 |  | 
| 93 | 82 |          # This assumes that the repository doesn't have any dangling pointers
 | 
| 94 | 83 |          return os.path.exists(refpath)
 | 
| 95 | 84 |  | 
| 96 | -    def extract(self, element, key):
 | |
| 97 | -        ref = self.get_artifact_fullname(element, key)
 | |
| 98 | - | |
| 85 | +    # extract():
 | |
| 86 | +    #
 | |
| 87 | +    # Extract cached directory for the specified ref if it hasn't
 | |
| 88 | +    # already been extracted.
 | |
| 89 | +    #
 | |
| 90 | +    # Args:
 | |
| 91 | +    #     ref (str): The ref whose directory to extract
 | |
| 92 | +    #     path (str): The destination path
 | |
| 93 | +    #
 | |
| 94 | +    # Raises:
 | |
| 95 | +    #     CASError: In cases there was an OSError, or if the ref did not exist.
 | |
| 96 | +    #
 | |
| 97 | +    # Returns: path to extracted directory
 | |
| 98 | +    #
 | |
| 99 | +    def extract(self, ref, path):
 | |
| 99 | 100 |          tree = self.resolve_ref(ref, update_mtime=True)
 | 
| 100 | 101 |  | 
| 101 | -        dest = os.path.join(self.extractdir, element._get_project().name,
 | |
| 102 | -                            element.normal_name, tree.hash)
 | |
| 102 | +        dest = os.path.join(path, tree.hash)
 | |
| 103 | 103 |          if os.path.isdir(dest):
 | 
| 104 | -            # artifact has already been extracted
 | |
| 104 | +            # directory has already been extracted
 | |
| 105 | 105 |              return dest
 | 
| 106 | 106 |  | 
| 107 | -        with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
 | |
| 107 | +        with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
 | |
| 108 | 108 |              checkoutdir = os.path.join(tmpdir, ref)
 | 
| 109 | 109 |              self._checkout(checkoutdir, tree)
 | 
| 110 | 110 |  | 
| ... | ... | @@ -118,23 +118,35 @@ class CASCache(ArtifactCache): | 
| 118 | 118 |                  # If rename fails with these errors, another process beat
 | 
| 119 | 119 |                  # us to it so just ignore.
 | 
| 120 | 120 |                  if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
 | 
| 121 | -                    raise ArtifactError("Failed to extract artifact for ref '{}': {}"
 | |
| 122 | -                                        .format(ref, e)) from e
 | |
| 121 | +                    raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
 | |
| 123 | 122 |  | 
| 124 | 123 |          return dest
 | 
| 125 | 124 |  | 
| 126 | -    def commit(self, element, content, keys):
 | |
| 127 | -        refs = [self.get_artifact_fullname(element, key) for key in keys]
 | |
| 128 | - | |
| 129 | -        tree = self._commit_directory(content)
 | |
| 125 | +    # commit():
 | |
| 126 | +    #
 | |
| 127 | +    # Commit directory to cache.
 | |
| 128 | +    #
 | |
| 129 | +    # Args:
 | |
| 130 | +    #     refs (list): The refs to set
 | |
| 131 | +    #     path (str): The directory to import
 | |
| 132 | +    #
 | |
| 133 | +    def commit(self, refs, path):
 | |
| 134 | +        tree = self._commit_directory(path)
 | |
| 130 | 135 |  | 
| 131 | 136 |          for ref in refs:
 | 
| 132 | 137 |              self.set_ref(ref, tree)
 | 
| 133 | 138 |  | 
| 134 | -    def diff(self, element, key_a, key_b, *, subdir=None):
 | |
| 135 | -        ref_a = self.get_artifact_fullname(element, key_a)
 | |
| 136 | -        ref_b = self.get_artifact_fullname(element, key_b)
 | |
| 137 | - | |
| 139 | +    # diff():
 | |
| 140 | +    #
 | |
| 141 | +    # Return a list of files that have been added or modified between
 | |
| 142 | +    # the refs described by ref_a and ref_b.
 | |
| 143 | +    #
 | |
| 144 | +    # Args:
 | |
| 145 | +    #     ref_a (str): The first ref
 | |
| 146 | +    #     ref_b (str): The second ref
 | |
| 147 | +    #     subdir (str): A subdirectory to limit the comparison to
 | |
| 148 | +    #
 | |
| 149 | +    def diff(self, ref_a, ref_b, *, subdir=None):
 | |
| 138 | 150 |          tree_a = self.resolve_ref(ref_a)
 | 
| 139 | 151 |          tree_b = self.resolve_ref(ref_b)
 | 
| 140 | 152 |  | 
| ... | ... | @@ -150,158 +162,122 @@ class CASCache(ArtifactCache): | 
| 150 | 162 |  | 
| 151 | 163 |          return modified, removed, added
 | 
| 152 | 164 |  | 
| 153 | -    def initialize_remotes(self, *, on_failure=None):
 | |
| 154 | -        remote_specs = self.global_remote_specs
 | |
| 155 | - | |
| 156 | -        for project in self.project_remote_specs:
 | |
| 157 | -            remote_specs += self.project_remote_specs[project]
 | |
| 158 | - | |
| 159 | -        remote_specs = list(utils._deduplicate(remote_specs))
 | |
| 160 | - | |
| 161 | -        remotes = {}
 | |
| 162 | -        q = multiprocessing.Queue()
 | |
| 163 | -        for remote_spec in remote_specs:
 | |
| 164 | -            # Use subprocess to avoid creation of gRPC threads in main BuildStream process
 | |
| 165 | -            # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
 | |
| 166 | -            p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
 | |
| 165 | +    def initialize_remote(self, remote_spec, q):
 | |
| 166 | +        try:
 | |
| 167 | +            remote = CASRemote(remote_spec)
 | |
| 168 | +            remote.init()
 | |
| 167 | 169 |  | 
| 168 | -            try:
 | |
| 169 | -                # Keep SIGINT blocked in the child process
 | |
| 170 | -                with _signals.blocked([signal.SIGINT], ignore=False):
 | |
| 171 | -                    p.start()
 | |
| 172 | - | |
| 173 | -                error = q.get()
 | |
| 174 | -                p.join()
 | |
| 175 | -            except KeyboardInterrupt:
 | |
| 176 | -                utils._kill_process_tree(p.pid)
 | |
| 177 | -                raise
 | |
| 170 | +            request = buildstream_pb2.StatusRequest()
 | |
| 171 | +            response = remote.ref_storage.Status(request)
 | |
| 178 | 172 |  | 
| 179 | -            if error and on_failure:
 | |
| 180 | -                on_failure(remote_spec.url, error)
 | |
| 181 | -            elif error:
 | |
| 182 | -                raise ArtifactError(error)
 | |
| 173 | +            if remote_spec.push and not response.allow_updates:
 | |
| 174 | +                q.put('CAS server does not allow push')
 | |
| 183 | 175 |              else:
 | 
| 184 | -                self._has_fetch_remotes = True
 | |
| 185 | -                if remote_spec.push:
 | |
| 186 | -                    self._has_push_remotes = True
 | |
| 176 | +                # No error
 | |
| 177 | +                q.put(None)
 | |
| 187 | 178 |  | 
| 188 | -                remotes[remote_spec.url] = _CASRemote(remote_spec)
 | |
| 179 | +        except grpc.RpcError as e:
 | |
| 180 | +            # str(e) is too verbose for errors reported to the user
 | |
| 181 | +            q.put(e.details())
 | |
| 189 | 182 |  | 
| 190 | -        for project in self.context.get_projects():
 | |
| 191 | -            remote_specs = self.global_remote_specs
 | |
| 192 | -            if project in self.project_remote_specs:
 | |
| 193 | -                remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
 | |
| 183 | +        except Exception as e:               # pylint: disable=broad-except
 | |
| 184 | +            # Whatever happens, we need to return it to the calling process
 | |
| 185 | +            #
 | |
| 186 | +            q.put(str(e))
 | |
| 194 | 187 |  | 
| 195 | -            project_remotes = []
 | |
| 188 | +    # pull():
 | |
| 189 | +    #
 | |
| 190 | +    # Pull a ref from a remote repository.
 | |
| 191 | +    #
 | |
| 192 | +    # Args:
 | |
| 193 | +    #     ref (str): The ref to pull
 | |
| 194 | +    #     remote (CASRemote): The remote repository to pull from
 | |
| 195 | +    #     progress (callable): The progress callback, if any
 | |
| 196 | +    #
 | |
| 197 | +    # Returns:
 | |
| 198 | +    #   (bool): True if pull was successful, False if ref was not available
 | |
| 199 | +    #
 | |
| 200 | +    def pull(self, ref, remote, *, progress=None):
 | |
| 201 | +        try:
 | |
| 202 | +            remote.init()
 | |
| 196 | 203 |  | 
| 197 | -            for remote_spec in remote_specs:
 | |
| 198 | -                # Errors are already handled in the loop above,
 | |
| 199 | -                # skip unreachable remotes here.
 | |
| 200 | -                if remote_spec.url not in remotes:
 | |
| 201 | -                    continue
 | |
| 204 | +            request = buildstream_pb2.GetReferenceRequest()
 | |
| 205 | +            request.key = ref
 | |
| 206 | +            response = remote.ref_storage.GetReference(request)
 | |
| 202 | 207 |  | 
| 203 | -                remote = remotes[remote_spec.url]
 | |
| 204 | -                project_remotes.append(remote)
 | |
| 208 | +            tree = remote_execution_pb2.Digest()
 | |
| 209 | +            tree.hash = response.digest.hash
 | |
| 210 | +            tree.size_bytes = response.digest.size_bytes
 | |
| 205 | 211 |  | 
| 206 | -            self._remotes[project] = project_remotes
 | |
| 212 | +            self._fetch_directory(remote, tree)
 | |
| 207 | 213 |  | 
| 208 | -    def has_fetch_remotes(self, *, element=None):
 | |
| 209 | -        if not self._has_fetch_remotes:
 | |
| 210 | -            # No project has fetch remotes
 | |
| 211 | -            return False
 | |
| 212 | -        elif element is None:
 | |
| 213 | -            # At least one (sub)project has fetch remotes
 | |
| 214 | -            return True
 | |
| 215 | -        else:
 | |
| 216 | -            # Check whether the specified element's project has fetch remotes
 | |
| 217 | -            remotes_for_project = self._remotes[element._get_project()]
 | |
| 218 | -            return bool(remotes_for_project)
 | |
| 214 | +            self.set_ref(ref, tree)
 | |
| 219 | 215 |  | 
| 220 | -    def has_push_remotes(self, *, element=None):
 | |
| 221 | -        if not self._has_push_remotes:
 | |
| 222 | -            # No project has push remotes
 | |
| 223 | -            return False
 | |
| 224 | -        elif element is None:
 | |
| 225 | -            # At least one (sub)project has push remotes
 | |
| 226 | 216 |              return True
 | 
| 227 | -        else:
 | |
| 228 | -            # Check whether the specified element's project has push remotes
 | |
| 229 | -            remotes_for_project = self._remotes[element._get_project()]
 | |
| 230 | -            return any(remote.spec.push for remote in remotes_for_project)
 | |
| 231 | - | |
| 232 | -    def pull(self, element, key, *, progress=None):
 | |
| 233 | -        ref = self.get_artifact_fullname(element, key)
 | |
| 234 | - | |
| 235 | -        project = element._get_project()
 | |
| 236 | - | |
| 237 | -        for remote in self._remotes[project]:
 | |
| 238 | -            try:
 | |
| 239 | -                remote.init()
 | |
| 240 | -                display_key = element._get_brief_display_key()
 | |
| 241 | -                element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
 | |
| 242 | - | |
| 243 | -                request = buildstream_pb2.GetReferenceRequest()
 | |
| 244 | -                request.key = ref
 | |
| 245 | -                response = remote.ref_storage.GetReference(request)
 | |
| 246 | - | |
| 247 | -                tree = remote_execution_pb2.Digest()
 | |
| 248 | -                tree.hash = response.digest.hash
 | |
| 249 | -                tree.size_bytes = response.digest.size_bytes
 | |
| 250 | - | |
| 251 | -                self._fetch_directory(remote, tree)
 | |
| 252 | - | |
| 253 | -                self.set_ref(ref, tree)
 | |
| 254 | - | |
| 255 | -                element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
 | |
| 256 | -                # no need to pull from additional remotes
 | |
| 257 | -                return True
 | |
| 258 | - | |
| 259 | -            except grpc.RpcError as e:
 | |
| 260 | -                if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 261 | -                    raise ArtifactError("Failed to pull artifact {}: {}".format(
 | |
| 262 | -                        element._get_brief_display_key(), e)) from e
 | |
| 263 | -                else:
 | |
| 264 | -                    element.info("Remote ({}) does not have {} cached".format(
 | |
| 265 | -                        remote.spec.url, element._get_brief_display_key()
 | |
| 266 | -                    ))
 | |
| 267 | - | |
| 268 | -        return False
 | |
| 269 | - | |
| 270 | -    def pull_tree(self, project, digest):
 | |
| 271 | -        """ Pull a single Tree rather than an artifact.
 | |
| 272 | -        Does not update local refs. """
 | |
| 217 | +        except grpc.RpcError as e:
 | |
| 218 | +            if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 219 | +                raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
 | |
| 220 | +            else:
 | |
| 221 | +                return False
 | |
| 273 | 222 |  | 
| 274 | -        for remote in self._remotes[project]:
 | |
| 275 | -            try:
 | |
| 276 | -                remote.init()
 | |
| 223 | +    # pull_tree():
 | |
| 224 | +    #
 | |
| 225 | +    # Pull a single Tree rather than a ref.
 | |
| 226 | +    # Does not update local refs.
 | |
| 227 | +    #
 | |
| 228 | +    # Args:
 | |
| 229 | +    #     remote (CASRemote): The remote to pull from
 | |
| 230 | +    #     digest (Digest): The digest of the tree
 | |
| 231 | +    #
 | |
| 232 | +    def pull_tree(self, remote, digest):
 | |
| 233 | +        try:
 | |
| 234 | +            remote.init()
 | |
| 277 | 235 |  | 
| 278 | -                digest = self._fetch_tree(remote, digest)
 | |
| 236 | +            digest = self._fetch_tree(remote, digest)
 | |
| 279 | 237 |  | 
| 280 | -                # no need to pull from additional remotes
 | |
| 281 | -                return digest
 | |
| 238 | +            return digest
 | |
| 282 | 239 |  | 
| 283 | -            except grpc.RpcError as e:
 | |
| 284 | -                if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 285 | -                    raise
 | |
| 240 | +        except grpc.RpcError as e:
 | |
| 241 | +            if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 242 | +                raise
 | |
| 286 | 243 |  | 
| 287 | 244 |          return None
 | 
| 288 | 245 |  | 
| 289 | -    def link_key(self, element, oldkey, newkey):
 | |
| 290 | -        oldref = self.get_artifact_fullname(element, oldkey)
 | |
| 291 | -        newref = self.get_artifact_fullname(element, newkey)
 | |
| 292 | - | |
| 246 | +    # link_ref():
 | |
| 247 | +    #
 | |
| 248 | +    # Add an alias for an existing ref.
 | |
| 249 | +    #
 | |
| 250 | +    # Args:
 | |
| 251 | +    #     oldref (str): An existing ref
 | |
| 252 | +    #     newref (str): A new ref for the same directory
 | |
| 253 | +    #
 | |
| 254 | +    def link_ref(self, oldref, newref):
 | |
| 293 | 255 |          tree = self.resolve_ref(oldref)
 | 
| 294 | 256 |  | 
| 295 | 257 |          self.set_ref(newref, tree)
 | 
| 296 | 258 |  | 
| 297 | -    def _push_refs_to_remote(self, refs, remote):
 | |
| 259 | +    # push():
 | |
| 260 | +    #
 | |
| 261 | +    # Push committed refs to remote repository.
 | |
| 262 | +    #
 | |
| 263 | +    # Args:
 | |
| 264 | +    #     refs (list): The refs to push
 | |
| 265 | +    #     remote (CASRemote): The remote to push to
 | |
| 266 | +    #
 | |
| 267 | +    # Returns:
 | |
| 268 | +    #   (bool): True if any remote was updated, False if no pushes were required
 | |
| 269 | +    #
 | |
| 270 | +    # Raises:
 | |
| 271 | +    #   (CASError): if there was an error
 | |
| 272 | +    #
 | |
| 273 | +    def push(self, refs, remote):
 | |
| 298 | 274 |          skipped_remote = True
 | 
| 299 | 275 |          try:
 | 
| 300 | 276 |              for ref in refs:
 | 
| 301 | 277 |                  tree = self.resolve_ref(ref)
 | 
| 302 | 278 |  | 
| 303 | 279 |                  # Check whether ref is already on the server in which case
 | 
| 304 | -                # there is no need to push the artifact
 | |
| 280 | +                # there is no need to push the ref
 | |
| 305 | 281 |                  try:
 | 
| 306 | 282 |                      request = buildstream_pb2.GetReferenceRequest()
 | 
| 307 | 283 |                      request.key = ref
 | 
| ... | ... | @@ -327,65 +303,38 @@ class CASCache(ArtifactCache): | 
| 327 | 303 |                  skipped_remote = False
 | 
| 328 | 304 |          except grpc.RpcError as e:
 | 
| 329 | 305 |              if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | 
| 330 | -                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 306 | +                raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
 | |
| 331 | 307 |  | 
| 332 | 308 |          return not skipped_remote
 | 
| 333 | 309 |  | 
| 334 | -    def push(self, element, keys):
 | |
| 335 | - | |
| 336 | -        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
 | |
| 337 | - | |
| 338 | -        project = element._get_project()
 | |
| 339 | - | |
| 340 | -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 341 | - | |
| 342 | -        pushed = False
 | |
| 343 | - | |
| 344 | -        for remote in push_remotes:
 | |
| 345 | -            remote.init()
 | |
| 346 | -            display_key = element._get_brief_display_key()
 | |
| 347 | -            element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
 | |
| 348 | - | |
| 349 | -            if self._push_refs_to_remote(refs, remote):
 | |
| 350 | -                element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
 | |
| 351 | -                pushed = True
 | |
| 352 | -            else:
 | |
| 353 | -                element.info("Remote ({}) already has {} cached".format(
 | |
| 354 | -                    remote.spec.url, element._get_brief_display_key()
 | |
| 355 | -                ))
 | |
| 356 | - | |
| 357 | -        return pushed
 | |
| 358 | - | |
| 359 | -    def push_directory(self, project, directory):
 | |
| 360 | -        """ Push the given virtual directory to all remotes.
 | |
| 361 | - | |
| 362 | -        Args:
 | |
| 363 | -            project (Project): The current project
 | |
| 364 | -            directory (Directory): A virtual directory object to push.
 | |
| 365 | - | |
| 366 | -        Raises: ArtifactError if no push remotes are configured.
 | |
| 367 | -        """
 | |
| 368 | - | |
| 369 | -        if self._has_push_remotes:
 | |
| 370 | -            push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 371 | -        else:
 | |
| 372 | -            push_remotes = []
 | |
| 373 | - | |
| 374 | -        if not push_remotes:
 | |
| 375 | -            raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
 | |
| 376 | -                                "servers are configured as push remotes.")
 | |
| 377 | - | |
| 378 | -        if directory.ref is None:
 | |
| 379 | -            return
 | |
| 380 | - | |
| 381 | -        for remote in push_remotes:
 | |
| 382 | -            remote.init()
 | |
| 383 | - | |
| 384 | -            self._send_directory(remote, directory.ref)
 | |
| 310 | +    # push_directory():
 | |
| 311 | +    #
 | |
| 312 | +    # Push the given virtual directory to a remote.
 | |
| 313 | +    #
 | |
| 314 | +    # Args:
 | |
| 315 | +    #     remote (CASRemote): The remote to push to
 | |
| 316 | +    #     directory (Directory): A virtual directory object to push.
 | |
| 317 | +    #
 | |
| 318 | +    # Raises:
 | |
| 319 | +    #     (CASError): if there was an error
 | |
| 320 | +    #
 | |
| 321 | +    def push_directory(self, remote, directory):
 | |
| 322 | +        remote.init()
 | |
| 385 | 323 |  | 
| 386 | -    def push_message(self, project, message):
 | |
| 324 | +        self._send_directory(remote, directory.ref)
 | |
| 387 | 325 |  | 
| 388 | -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 326 | +    # push_message():
 | |
| 327 | +    #
 | |
| 328 | +    # Push the given protobuf message to a remote.
 | |
| 329 | +    #
 | |
| 330 | +    # Args:
 | |
| 331 | +    #     remote (CASRemote): The remote to push to
 | |
| 332 | +    #     message (Message): A protobuf message to push.
 | |
| 333 | +    #
 | |
| 334 | +    # Raises:
 | |
| 335 | +    #     (CASError): if there was an error
 | |
| 336 | +    #
 | |
| 337 | +    def push_message(self, remote, message):
 | |
| 389 | 338 |  | 
| 390 | 339 |          message_buffer = message.SerializeToString()
 | 
| 391 | 340 |          message_sha = hashlib.sha256(message_buffer)
 | 
| ... | ... | @@ -393,17 +342,25 @@ class CASCache(ArtifactCache): | 
| 393 | 342 |          message_digest.hash = message_sha.hexdigest()
 | 
| 394 | 343 |          message_digest.size_bytes = len(message_buffer)
 | 
| 395 | 344 |  | 
| 396 | -        for remote in push_remotes:
 | |
| 397 | -            remote.init()
 | |
| 345 | +        remote.init()
 | |
| 398 | 346 |  | 
| 399 | -            with io.BytesIO(message_buffer) as b:
 | |
| 400 | -                self._send_blob(remote, message_digest, b)
 | |
| 347 | +        with io.BytesIO(message_buffer) as b:
 | |
| 348 | +            self._send_blob(remote, message_digest, b)
 | |
| 401 | 349 |  | 
| 402 | 350 |          return message_digest
 | 
| 403 | 351 |  | 
| 404 | -    def _verify_digest_on_remote(self, remote, digest):
 | |
| 405 | -        # Check whether ref is already on the server in which case
 | |
| 406 | -        # there is no need to push the artifact
 | |
| 352 | +    # verify_digest_on_remote():
 | |
| 353 | +    #
 | |
| 354 | +    # Check whether the object is already on the server in which case
 | |
| 355 | +    # there is no need to upload it.
 | |
| 356 | +    #
 | |
| 357 | +    # Args:
 | |
| 358 | +    #     remote (CASRemote): The remote to check
 | |
| 359 | +    #     digest (Digest): The object digest.
 | |
| 360 | +    #
 | |
| 361 | +    def verify_digest_on_remote(self, remote, digest):
 | |
| 362 | +        remote.init()
 | |
| 363 | + | |
| 407 | 364 |          request = remote_execution_pb2.FindMissingBlobsRequest()
 | 
| 408 | 365 |          request.blob_digests.extend([digest])
 | 
| 409 | 366 |  | 
| ... | ... | @@ -413,24 +370,6 @@ class CASCache(ArtifactCache): | 
| 413 | 370 |  | 
| 414 | 371 |          return True
 | 
| 415 | 372 |  | 
| 416 | -    def verify_digest_pushed(self, project, digest):
 | |
| 417 | - | |
| 418 | -        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 419 | - | |
| 420 | -        pushed = False
 | |
| 421 | - | |
| 422 | -        for remote in push_remotes:
 | |
| 423 | -            remote.init()
 | |
| 424 | - | |
| 425 | -            if self._verify_digest_on_remote(remote, digest):
 | |
| 426 | -                pushed = True
 | |
| 427 | - | |
| 428 | -        return pushed
 | |
| 429 | - | |
| 430 | -    ################################################
 | |
| 431 | -    #                API Private Methods           #
 | |
| 432 | -    ################################################
 | |
| 433 | - | |
| 434 | 373 |      # objpath():
 | 
| 435 | 374 |      #
 | 
| 436 | 375 |      # Return the path of an object based on its digest.
 | 
| ... | ... | @@ -496,7 +435,7 @@ class CASCache(ArtifactCache): | 
| 496 | 435 |              pass
 | 
| 497 | 436 |  | 
| 498 | 437 |          except OSError as e:
 | 
| 499 | -            raise ArtifactError("Failed to hash object: {}".format(e)) from e
 | |
| 438 | +            raise CASError("Failed to hash object: {}".format(e)) from e
 | |
| 500 | 439 |  | 
| 501 | 440 |          return digest
 | 
| 502 | 441 |  | 
| ... | ... | @@ -537,26 +476,39 @@ class CASCache(ArtifactCache): | 
| 537 | 476 |                  return digest
 | 
| 538 | 477 |  | 
| 539 | 478 |          except FileNotFoundError as e:
 | 
| 540 | -            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
 | |
| 479 | +            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
 | |
| 541 | 480 |  | 
| 542 | -    def update_mtime(self, element, key):
 | |
| 481 | +    # update_mtime()
 | |
| 482 | +    #
 | |
| 483 | +    # Update the mtime of a ref.
 | |
| 484 | +    #
 | |
| 485 | +    # Args:
 | |
| 486 | +    #     ref (str): The ref to update
 | |
| 487 | +    #
 | |
| 488 | +    def update_mtime(self, ref):
 | |
| 543 | 489 |          try:
 | 
| 544 | -            ref = self.get_artifact_fullname(element, key)
 | |
| 545 | 490 |              os.utime(self._refpath(ref))
 | 
| 546 | 491 |          except FileNotFoundError as e:
 | 
| 547 | -            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
 | |
| 492 | +            raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
 | |
| 548 | 493 |  | 
| 494 | +    # calculate_cache_size()
 | |
| 495 | +    #
 | |
| 496 | +    # Return the real disk usage of the CAS cache.
 | |
| 497 | +    #
 | |
| 498 | +    # Returns:
 | |
| 499 | +    #    (int): The size of the cache.
 | |
| 500 | +    #
 | |
| 549 | 501 |      def calculate_cache_size(self):
 | 
| 550 | 502 |          return utils._get_dir_size(self.casdir)
 | 
| 551 | 503 |  | 
| 552 | -    # list_artifacts():
 | |
| 504 | +    # list_refs():
 | |
| 553 | 505 |      #
 | 
| 554 | -    # List cached artifacts in Least Recently Modified (LRM) order.
 | |
| 506 | +    # List refs in Least Recently Modified (LRM) order.
 | |
| 555 | 507 |      #
 | 
| 556 | 508 |      # Returns:
 | 
| 557 | 509 |      #     (list) - A list of refs in LRM order
 | 
| 558 | 510 |      #
 | 
| 559 | -    def list_artifacts(self):
 | |
| 511 | +    def list_refs(self):
 | |
| 560 | 512 |          # string of: /path/to/repo/refs/heads
 | 
| 561 | 513 |          ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | 
| 562 | 514 |  | 
| ... | ... | @@ -571,7 +523,7 @@ class CASCache(ArtifactCache): | 
| 571 | 523 |                  mtimes.append(os.path.getmtime(ref_path))
 | 
| 572 | 524 |  | 
| 573 | 525 |          # NOTE: Sorted will sort from earliest to latest, thus the
 | 
| 574 | -        # first element of this list will be the file modified earliest.
 | |
| 526 | +        # first ref of this list will be the file modified earliest.
 | |
| 575 | 527 |          return [ref for _, ref in sorted(zip(mtimes, refs))]
 | 
| 576 | 528 |  | 
| 577 | 529 |      # remove():
 | 
| ... | ... | @@ -590,28 +542,10 @@ class CASCache(ArtifactCache): | 
| 590 | 542 |      #
 | 
| 591 | 543 |      def remove(self, ref, *, defer_prune=False):
 | 
| 592 | 544 |  | 
| 593 | -        # Remove extract if not used by other ref
 | |
| 594 | -        tree = self.resolve_ref(ref)
 | |
| 595 | -        ref_name, ref_hash = os.path.split(ref)
 | |
| 596 | -        extract = os.path.join(self.extractdir, ref_name, tree.hash)
 | |
| 597 | -        keys_file = os.path.join(extract, 'meta', 'keys.yaml')
 | |
| 598 | -        if os.path.exists(keys_file):
 | |
| 599 | -            keys_meta = _yaml.load(keys_file)
 | |
| 600 | -            keys = [keys_meta['strong'], keys_meta['weak']]
 | |
| 601 | -            remove_extract = True
 | |
| 602 | -            for other_hash in keys:
 | |
| 603 | -                if other_hash == ref_hash:
 | |
| 604 | -                    continue
 | |
| 605 | -                remove_extract = False
 | |
| 606 | -                break
 | |
| 607 | - | |
| 608 | -            if remove_extract:
 | |
| 609 | -                utils._force_rmtree(extract)
 | |
| 610 | - | |
| 611 | 545 |          # Remove cache ref
 | 
| 612 | 546 |          refpath = self._refpath(ref)
 | 
| 613 | 547 |          if not os.path.exists(refpath):
 | 
| 614 | -            raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
 | |
| 548 | +            raise CASError("Could not find ref '{}'".format(ref))
 | |
| 615 | 549 |  | 
| 616 | 550 |          os.unlink(refpath)
 | 
| 617 | 551 |  | 
| ... | ... | @@ -721,7 +655,7 @@ class CASCache(ArtifactCache): | 
| 721 | 655 |                  # The process serving the socket can't be cached anyway
 | 
| 722 | 656 |                  pass
 | 
| 723 | 657 |              else:
 | 
| 724 | -                raise ArtifactError("Unsupported file type for {}".format(full_path))
 | |
| 658 | +                raise CASError("Unsupported file type for {}".format(full_path))
 | |
| 725 | 659 |  | 
| 726 | 660 |          return self.add_object(digest=dir_digest,
 | 
| 727 | 661 |                                 buffer=directory.SerializeToString())
 | 
| ... | ... | @@ -740,7 +674,7 @@ class CASCache(ArtifactCache): | 
| 740 | 674 |              if dirnode.name == name:
 | 
| 741 | 675 |                  return dirnode.digest
 | 
| 742 | 676 |  | 
| 743 | -        raise ArtifactError("Subdirectory {} not found".format(name))
 | |
| 677 | +        raise CASError("Subdirectory {} not found".format(name))
 | |
| 744 | 678 |  | 
| 745 | 679 |      def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
 | 
| 746 | 680 |          dir_a = remote_execution_pb2.Directory()
 | 
| ... | ... | @@ -812,29 +746,6 @@ class CASCache(ArtifactCache): | 
| 812 | 746 |          for dirnode in directory.directories:
 | 
| 813 | 747 |              self._reachable_refs_dir(reachable, dirnode.digest)
 | 
| 814 | 748 |  | 
| 815 | -    def _initialize_remote(self, remote_spec, q):
 | |
| 816 | -        try:
 | |
| 817 | -            remote = _CASRemote(remote_spec)
 | |
| 818 | -            remote.init()
 | |
| 819 | - | |
| 820 | -            request = buildstream_pb2.StatusRequest()
 | |
| 821 | -            response = remote.ref_storage.Status(request)
 | |
| 822 | - | |
| 823 | -            if remote_spec.push and not response.allow_updates:
 | |
| 824 | -                q.put('Artifact server does not allow push')
 | |
| 825 | -            else:
 | |
| 826 | -                # No error
 | |
| 827 | -                q.put(None)
 | |
| 828 | - | |
| 829 | -        except grpc.RpcError as e:
 | |
| 830 | -            # str(e) is too verbose for errors reported to the user
 | |
| 831 | -            q.put(e.details())
 | |
| 832 | - | |
| 833 | -        except Exception as e:               # pylint: disable=broad-except
 | |
| 834 | -            # Whatever happens, we need to return it to the calling process
 | |
| 835 | -            #
 | |
| 836 | -            q.put(str(e))
 | |
| 837 | - | |
| 838 | 749 |      def _required_blobs(self, directory_digest):
 | 
| 839 | 750 |          # parse directory, and recursively add blobs
 | 
| 840 | 751 |          d = remote_execution_pb2.Digest()
 | 
| ... | ... | @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache): | 
| 1080 | 991 |  | 
| 1081 | 992 |  # Represents a single remote CAS cache.
 | 
| 1082 | 993 |  #
 | 
| 1083 | -class _CASRemote():
 | |
| 994 | +class CASRemote():
 | |
| 1084 | 995 |      def __init__(self, spec):
 | 
| 1085 | 996 |          self.spec = spec
 | 
| 1086 | 997 |          self._initialized = False
 | 
| ... | ... | @@ -1125,7 +1036,7 @@ class _CASRemote(): | 
| 1125 | 1036 |                                                             certificate_chain=client_cert_bytes)
 | 
| 1126 | 1037 |                  self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
 | 
| 1127 | 1038 |              else:
 | 
| 1128 | -                raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
 | |
| 1039 | +                raise CASError("Unsupported URL: {}".format(self.spec.url))
 | |
| 1129 | 1040 |  | 
| 1130 | 1041 |              self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
 | 
| 1131 | 1042 |              self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
 | 
| ... | ... | @@ -1203,10 +1114,10 @@ class _CASBatchRead(): | 
| 1203 | 1114 |  | 
| 1204 | 1115 |          for response in batch_response.responses:
 | 
| 1205 | 1116 |              if response.status.code != code_pb2.OK:
 | 
| 1206 | -                raise ArtifactError("Failed to download blob {}: {}".format(
 | |
| 1117 | +                raise CASError("Failed to download blob {}: {}".format(
 | |
| 1207 | 1118 |                      response.digest.hash, response.status.code))
 | 
| 1208 | 1119 |              if response.digest.size_bytes != len(response.data):
 | 
| 1209 | -                raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
 | |
| 1120 | +                raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
 | |
| 1210 | 1121 |                      response.digest.hash, response.digest.size_bytes, len(response.data)))
 | 
| 1211 | 1122 |  | 
| 1212 | 1123 |              yield (response.digest, response.data)
 | 
| ... | ... | @@ -1248,7 +1159,7 @@ class _CASBatchUpdate(): | 
| 1248 | 1159 |  | 
| 1249 | 1160 |          for response in batch_response.responses:
 | 
| 1250 | 1161 |              if response.status.code != code_pb2.OK:
 | 
| 1251 | -                raise ArtifactError("Failed to upload blob {}: {}".format(
 | |
| 1162 | +                raise CASError("Failed to upload blob {}: {}".format(
 | |
| 1252 | 1163 |                      response.digest.hash, response.status.code))
 | 
| 1253 | 1164 |  | 
| 1254 | 1165 |  | 
| ... | ... | @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo | 
| 32 | 32 |  from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | 
| 33 | 33 |  from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
 | 
| 34 | 34 |  | 
| 35 | -from .._exceptions import ArtifactError
 | |
| 36 | -from .._context import Context
 | |
| 35 | +from .._exceptions import CASError
 | |
| 36 | + | |
| 37 | +from .cascache import CASCache
 | |
| 37 | 38 |  | 
| 38 | 39 |  | 
| 39 | 40 |  # The default limit for gRPC messages is 4 MiB.
 | 
| ... | ... | @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): | 
| 55 | 56 |  #     enable_push (bool): Whether to allow blob uploads and artifact updates
 | 
| 56 | 57 |  #
 | 
| 57 | 58 |  def create_server(repo, *, enable_push):
 | 
| 58 | -    context = Context()
 | |
| 59 | -    context.artifactdir = os.path.abspath(repo)
 | |
| 60 | - | |
| 61 | -    artifactcache = context.artifactcache
 | |
| 59 | +    cas = CASCache(os.path.abspath(repo))
 | |
| 62 | 60 |  | 
| 63 | 61 |      # Use max_workers default from Python 3.5+
 | 
| 64 | 62 |      max_workers = (os.cpu_count() or 1) * 5
 | 
| 65 | 63 |      server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 66 | 64 |  | 
| 67 | 65 |      bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
 | 
| 68 | -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
 | |
| 66 | +        _ByteStreamServicer(cas, enable_push=enable_push), server)
 | |
| 69 | 67 |  | 
| 70 | 68 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 71 | -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
 | |
| 69 | +        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
 | |
| 72 | 70 |  | 
| 73 | 71 |      remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 74 | 72 |          _CapabilitiesServicer(), server)
 | 
| 75 | 73 |  | 
| 76 | 74 |      buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
 | 
| 77 | -        _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
 | |
| 75 | +        _ReferenceStorageServicer(cas, enable_push=enable_push), server)
 | |
| 78 | 76 |  | 
| 79 | 77 |      return server
 | 
| 80 | 78 |  | 
| ... | ... | @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 333 | 331 |  | 
| 334 | 332 |              response.digest.hash = tree.hash
 | 
| 335 | 333 |              response.digest.size_bytes = tree.size_bytes
 | 
| 336 | -        except ArtifactError:
 | |
| 334 | +        except CASError:
 | |
| 337 | 335 |              context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 338 | 336 |  | 
| 339 | 337 |          return response
 | 
| ... | ... | @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): | 
| 437 | 435 |          return 0
 | 
| 438 | 436 |  | 
| 439 | 437 |      # obtain a list of LRP artifacts
 | 
| 440 | -    LRP_artifacts = cas.list_artifacts()
 | |
| 438 | +    LRP_artifacts = cas.list_refs()
 | |
| 441 | 439 |  | 
| 442 | 440 |      removed_size = 0  # in bytes
 | 
| 443 | 441 |      while object_size - removed_size > free_disk_space:
 | 
| ... | ... | @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError | 
| 31 | 31 |  from ._message import Message, MessageType
 | 
| 32 | 32 |  from ._profile import Topics, profile_start, profile_end
 | 
| 33 | 33 |  from ._artifactcache import ArtifactCache
 | 
| 34 | -from ._artifactcache.cascache import CASCache
 | |
| 35 | 34 |  from ._workspaces import Workspaces
 | 
| 36 | 35 |  from .plugin import _plugin_lookup
 | 
| 37 | 36 |  | 
| ... | ... | @@ -233,7 +232,7 @@ class Context(): | 
| 233 | 232 |      @property
 | 
| 234 | 233 |      def artifactcache(self):
 | 
| 235 | 234 |          if not self._artifactcache:
 | 
| 236 | -            self._artifactcache = CASCache(self)
 | |
| 235 | +            self._artifactcache = ArtifactCache(self)
 | |
| 237 | 236 |  | 
| 238 | 237 |          return self._artifactcache
 | 
| 239 | 238 |  | 
| ... | ... | @@ -90,6 +90,7 @@ class ErrorDomain(Enum): | 
| 90 | 90 |      APP = 12
 | 
| 91 | 91 |      STREAM = 13
 | 
| 92 | 92 |      VIRTUAL_FS = 14
 | 
| 93 | +    CAS = 15
 | |
| 93 | 94 |  | 
| 94 | 95 |  | 
| 95 | 96 |  # BstError is an internal base exception class for BuildSream
 | 
| ... | ... | @@ -274,6 +275,15 @@ class ArtifactError(BstError): | 
| 274 | 275 |          super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
 | 
| 275 | 276 |  | 
| 276 | 277 |  | 
| 278 | +# CASError
 | |
| 279 | +#
 | |
| 280 | +# Raised when errors are encountered in the CAS
 | |
| 281 | +#
 | |
| 282 | +class CASError(BstError):
 | |
| 283 | +    def __init__(self, message, *, detail=None, reason=None, temporary=False):
 | |
| 284 | +        super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
 | |
| 285 | + | |
| 286 | + | |
| 277 | 287 |  # PipelineError
 | 
| 278 | 288 |  #
 | 
| 279 | 289 |  # Raised from pipeline operations
 | 
| ... | ... | @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory): | 
| 79 | 79 |          self.filename = filename
 | 
| 80 | 80 |          self.common_name = common_name
 | 
| 81 | 81 |          self.pb2_directory = remote_execution_pb2.Directory()
 | 
| 82 | -        self.cas_cache = context.artifactcache
 | |
| 82 | +        self.cas_cache = context.artifactcache.cas
 | |
| 83 | 83 |          if ref:
 | 
| 84 | 84 |              with open(self.cas_cache.objpath(ref), 'rb') as f:
 | 
| 85 | 85 |                  self.pb2_directory.ParseFromString(f.read())
 | 
| ... | ... | @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles): | 
| 190 | 190 |          # Load the project and CAS cache
 | 
| 191 | 191 |          project = Project(project_dir, context)
 | 
| 192 | 192 |          project.ensure_fully_loaded()
 | 
| 193 | -        cas = context.artifactcache
 | |
| 193 | +        artifactcache = context.artifactcache
 | |
| 194 | +        cas = artifactcache.cas
 | |
| 194 | 195 |  | 
| 195 | 196 |          # Assert that the element's artifact is cached
 | 
| 196 | 197 |          element = project.load_elements(['target.bst'])[0]
 | 
| 197 | 198 |          element_key = cli.get_element_key(project_dir, 'target.bst')
 | 
| 198 | -        assert cas.contains(element, element_key)
 | |
| 199 | +        assert artifactcache.contains(element, element_key)
 | |
| 199 | 200 |  | 
| 200 | 201 |          # Retrieve the Directory object from the cached artifact
 | 
| 201 | -        artifact_ref = cas.get_artifact_fullname(element, element_key)
 | |
| 202 | +        artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
 | |
| 202 | 203 |          artifact_digest = cas.resolve_ref(artifact_ref)
 | 
| 203 | 204 |  | 
| 204 | 205 |          queue = multiprocessing.Queue()
 | 
| ... | ... | @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest | 
| 268 | 269 |      project.ensure_fully_loaded()
 | 
| 269 | 270 |  | 
| 270 | 271 |      # Create a local CAS cache handle
 | 
| 271 | -    cas = context.artifactcache
 | |
| 272 | +    artifactcache = context.artifactcache
 | |
| 273 | +    cas = artifactcache.cas
 | |
| 272 | 274 |  | 
| 273 | 275 |      # Manually setup the CAS remote
 | 
| 274 | -    cas.setup_remotes(use_config=True)
 | |
| 276 | +    artifactcache.setup_remotes(use_config=True)
 | |
| 275 | 277 |  | 
| 276 | -    if cas.has_push_remotes():
 | |
| 278 | +    if artifactcache.has_push_remotes():
 | |
| 277 | 279 |          directory = remote_execution_pb2.Directory()
 | 
| 278 | 280 |  | 
| 279 | 281 |          with open(cas.objpath(artifact_digest), 'rb') as f:
 | 
| ... | ... | @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest | 
| 284 | 286 |          tree_maker(cas, tree, directory)
 | 
| 285 | 287 |  | 
| 286 | 288 |          # Push the Tree as a regular message
 | 
| 287 | -        tree_digest = cas.push_message(project, tree)
 | |
| 289 | +        tree_digest = artifactcache.push_message(project, tree)
 | |
| 288 | 290 |  | 
| 289 | 291 |          queue.put((tree_digest.hash, tree_digest.size_bytes))
 | 
| 290 | 292 |      else:
 | 
| ... | ... | @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles): | 
| 165 | 165 |          # Load the project and CAS cache
 | 
| 166 | 166 |          project = Project(project_dir, context)
 | 
| 167 | 167 |          project.ensure_fully_loaded()
 | 
| 168 | -        cas = context.artifactcache
 | |
| 168 | +        artifactcache = context.artifactcache
 | |
| 169 | +        cas = artifactcache.cas
 | |
| 169 | 170 |  | 
| 170 | 171 |          # Assert that the element's artifact is cached
 | 
| 171 | 172 |          element = project.load_elements(['target.bst'])[0]
 | 
| 172 | 173 |          element_key = cli.get_element_key(project_dir, 'target.bst')
 | 
| 173 | -        assert cas.contains(element, element_key)
 | |
| 174 | +        assert artifactcache.contains(element, element_key)
 | |
| 174 | 175 |  | 
| 175 | 176 |          # Manually setup the CAS remote
 | 
| 176 | -        cas.setup_remotes(use_config=True)
 | |
| 177 | -        cas.initialize_remotes()
 | |
| 178 | -        assert cas.has_push_remotes(element=element)
 | |
| 177 | +        artifactcache.setup_remotes(use_config=True)
 | |
| 178 | +        artifactcache.initialize_remotes()
 | |
| 179 | +        assert artifactcache.has_push_remotes(element=element)
 | |
| 179 | 180 |  | 
| 180 | 181 |          # Recreate the CasBasedDirectory object from the cached artifact
 | 
| 181 | -        artifact_ref = cas.get_artifact_fullname(element, element_key)
 | |
| 182 | +        artifact_ref = artifactcache.get_artifact_fullname(element, element_key)
 | |
| 182 | 183 |          artifact_digest = cas.resolve_ref(artifact_ref)
 | 
| 183 | 184 |  | 
| 184 | 185 |          queue = multiprocessing.Queue()
 | 
| ... | ... | @@ -13,7 +13,7 @@ import pytest_cov | 
| 13 | 13 |  from buildstream import _yaml
 | 
| 14 | 14 |  from buildstream._artifactcache.casserver import create_server
 | 
| 15 | 15 |  from buildstream._context import Context
 | 
| 16 | -from buildstream._exceptions import ArtifactError
 | |
| 16 | +from buildstream._exceptions import CASError
 | |
| 17 | 17 |  from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
| 18 | 18 |  | 
| 19 | 19 |  | 
| ... | ... | @@ -48,7 +48,7 @@ class ArtifactShare(): | 
| 48 | 48 |          context = Context()
 | 
| 49 | 49 |          context.artifactdir = self.repodir
 | 
| 50 | 50 |  | 
| 51 | -        self.cas = context.artifactcache
 | |
| 51 | +        self.cas = context.artifactcache.cas
 | |
| 52 | 52 |  | 
| 53 | 53 |          self.total_space = total_space
 | 
| 54 | 54 |          self.free_space = free_space
 | 
| ... | ... | @@ -135,7 +135,7 @@ class ArtifactShare(): | 
| 135 | 135 |          try:
 | 
| 136 | 136 |              tree = self.cas.resolve_ref(artifact_key)
 | 
| 137 | 137 |              return True
 | 
| 138 | -        except ArtifactError:
 | |
| 138 | +        except CASError:
 | |
| 139 | 139 |              return False
 | 
| 140 | 140 |  | 
| 141 | 141 |      # close():
 | 
