Valentin David pushed to branch valentindavid/cache_server_fill_up at BuildStream / buildstream
Commits:
- 
4cfabce8
by Angelos Evripiotis at 2018-11-01T11:35:02Z
- 
48860aac
by Tristan Van Berkom at 2018-11-01T12:01:04Z
- 
d868b409
by Daniel Silverstone at 2018-11-01T13:40:24Z
- 
7f79b9ce
by Tristan Van Berkom at 2018-11-01T14:25:57Z
- 
a3036492
by Valentin David at 2018-11-02T10:14:01Z
- 
6e35ad93
by Valentin David at 2018-11-02T10:14:01Z
- 
5c589e47
by Valentin David at 2018-11-02T10:14:01Z
- 
ac066763
by Valentin David at 2018-11-02T10:18:58Z
- 
cd018d7c
by Valentin David at 2018-11-02T10:18:58Z
- 
afc00580
by Valentin David at 2018-11-02T10:18:58Z
- 
4dd0121c
by Valentin David at 2018-11-02T10:18:58Z
- 
66ff4426
by Valentin David at 2018-11-02T10:18:58Z
- 
724b9fb1
by Valentin David at 2018-11-02T10:18:58Z
- 
121f07b7
by Valentin David at 2018-11-02T10:18:58Z
- 
db8b48ee
by Valentin David at 2018-11-02T10:18:58Z
8 changed files:
- NEWS
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_versions.py
- buildstream/_yaml.py
- buildstream/plugins/elements/manual.yaml
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -2,6 +2,12 @@ | 
| 2 | 2 |  buildstream 1.3.1
 | 
| 3 | 3 |  =================
 | 
| 4 | 4 |  | 
| 5 | +  o BREAKING CHANGE: The 'manual' element lost its default 'MAKEFLAGS' and 'V'
 | |
| 6 | +    environment variables. There is already a 'make' element with the same
 | |
| 7 | +    variables. Note that this is a breaking change, it will require users to
 | |
| 8 | +    make changes to their .bst files if they are expecting these environment
 | |
| 9 | +    variables to be set.
 | |
| 10 | + | |
| 5 | 11 |    o Failed builds are included in the cache as well.
 | 
| 6 | 12 |      `bst checkout` will provide anything in `%{install-root}`.
 | 
| 7 | 13 |      A build including cached fails will cause any dependant elements
 | 
| ... | ... | @@ -27,6 +27,7 @@ import stat | 
| 27 | 27 |  import tempfile
 | 
| 28 | 28 |  import uuid
 | 
| 29 | 29 |  import errno
 | 
| 30 | +import contextlib
 | |
| 30 | 31 |  from urllib.parse import urlparse
 | 
| 31 | 32 |  | 
| 32 | 33 |  import grpc
 | 
| ... | ... | @@ -49,6 +50,13 @@ from . import ArtifactCache | 
| 49 | 50 |  _MAX_PAYLOAD_BYTES = 1024 * 1024
 | 
| 50 | 51 |  | 
| 51 | 52 |  | 
| 53 | +class BlobNotFound(ArtifactError):
 | |
| 54 | + | |
| 55 | +    def __init__(self, blob, msg):
 | |
| 56 | +        self.blob = blob
 | |
| 57 | +        super().__init__(msg)
 | |
| 58 | + | |
| 59 | + | |
| 52 | 60 |  # A CASCache manages artifacts in a CAS repository as specified in the
 | 
| 53 | 61 |  # Remote Execution API.
 | 
| 54 | 62 |  #
 | 
| ... | ... | @@ -264,6 +272,10 @@ class CASCache(ArtifactCache): | 
| 264 | 272 |                      element.info("Remote ({}) does not have {} cached".format(
 | 
| 265 | 273 |                          remote.spec.url, element._get_brief_display_key()
 | 
| 266 | 274 |                      ))
 | 
| 275 | +            except BlobNotFound as e:
 | |
| 276 | +                element.info("Remote ({}) does not have {} cached (blob {} missing)".format(
 | |
| 277 | +                    remote.spec.url, element._get_brief_display_key(), e.blob
 | |
| 278 | +                ))
 | |
| 267 | 279 |  | 
| 268 | 280 |          return False
 | 
| 269 | 281 |  | 
| ... | ... | @@ -452,13 +464,14 @@ class CASCache(ArtifactCache): | 
| 452 | 464 |      #     digest (Digest): An optional Digest object to populate
 | 
| 453 | 465 |      #     path (str): Path to file to add
 | 
| 454 | 466 |      #     buffer (bytes): Byte buffer to add
 | 
| 467 | +    #     link_directly (bool): Whether file given by path can be linked
 | |
| 455 | 468 |      #
 | 
| 456 | 469 |      # Returns:
 | 
| 457 | 470 |      #     (Digest): The digest of the added object
 | 
| 458 | 471 |      #
 | 
| 459 | 472 |      # Either `path` or `buffer` must be passed, but not both.
 | 
| 460 | 473 |      #
 | 
| 461 | -    def add_object(self, *, digest=None, path=None, buffer=None):
 | |
| 474 | +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
 | |
| 462 | 475 |          # Exactly one of the two parameters has to be specified
 | 
| 463 | 476 |          assert (path is None) != (buffer is None)
 | 
| 464 | 477 |  | 
| ... | ... | @@ -468,28 +481,34 @@ class CASCache(ArtifactCache): | 
| 468 | 481 |          try:
 | 
| 469 | 482 |              h = hashlib.sha256()
 | 
| 470 | 483 |              # Always write out new file to avoid corruption if input file is modified
 | 
| 471 | -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | |
| 472 | -                # Set mode bits to 0644
 | |
| 473 | -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
 | |
| 474 | - | |
| 475 | -                if path:
 | |
| 476 | -                    with open(path, 'rb') as f:
 | |
| 477 | -                        for chunk in iter(lambda: f.read(4096), b""):
 | |
| 478 | -                            h.update(chunk)
 | |
| 479 | -                            out.write(chunk)
 | |
| 484 | +            with contextlib.ExitStack() as stack:
 | |
| 485 | +                if path is not None and link_directly:
 | |
| 486 | +                    tmp = stack.enter_context(open(path, 'rb'))
 | |
| 487 | +                    for chunk in iter(lambda: tmp.read(4096), b""):
 | |
| 488 | +                        h.update(chunk)
 | |
| 480 | 489 |                  else:
 | 
| 481 | -                    h.update(buffer)
 | |
| 482 | -                    out.write(buffer)
 | |
| 490 | +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
 | |
| 491 | +                    # Set mode bits to 0644
 | |
| 492 | +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
 | |
| 483 | 493 |  | 
| 484 | -                out.flush()
 | |
| 494 | +                    if path:
 | |
| 495 | +                        with open(path, 'rb') as f:
 | |
| 496 | +                            for chunk in iter(lambda: f.read(4096), b""):
 | |
| 497 | +                                h.update(chunk)
 | |
| 498 | +                                tmp.write(chunk)
 | |
| 499 | +                    else:
 | |
| 500 | +                        h.update(buffer)
 | |
| 501 | +                        tmp.write(buffer)
 | |
| 502 | + | |
| 503 | +                    tmp.flush()
 | |
| 485 | 504 |  | 
| 486 | 505 |                  digest.hash = h.hexdigest()
 | 
| 487 | -                digest.size_bytes = os.fstat(out.fileno()).st_size
 | |
| 506 | +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
 | |
| 488 | 507 |  | 
| 489 | 508 |                  # Place file at final location
 | 
| 490 | 509 |                  objpath = self.objpath(digest)
 | 
| 491 | 510 |                  os.makedirs(os.path.dirname(objpath), exist_ok=True)
 | 
| 492 | -                os.link(out.name, objpath)
 | |
| 511 | +                os.link(tmp.name, objpath)
 | |
| 493 | 512 |  | 
| 494 | 513 |          except FileExistsError as e:
 | 
| 495 | 514 |              # We can ignore the failed link() if the object is already in the repo.
 | 
| ... | ... | @@ -574,6 +593,41 @@ class CASCache(ArtifactCache): | 
| 574 | 593 |          # first element of this list will be the file modified earliest.
 | 
| 575 | 594 |          return [ref for _, ref in sorted(zip(mtimes, refs))]
 | 
| 576 | 595 |  | 
| 596 | +    # list_objects():
 | |
| 597 | +    #
 | |
| 598 | +    # List cached objects in Least Recently Modified (LRM) order.
 | |
| 599 | +    #
 | |
| 600 | +    # Returns:
 | |
| 601 | +    #     (list) - A list of objects and timestamps in LRM order
 | |
| 602 | +    #
 | |
| 603 | +    def list_objects(self):
 | |
| 604 | +        objs = []
 | |
| 605 | +        mtimes = []
 | |
| 606 | + | |
| 607 | +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
 | |
| 608 | +            for filename in files:
 | |
| 609 | +                obj_path = os.path.join(root, filename)
 | |
| 610 | +                try:
 | |
| 611 | +                    mtimes.append(os.path.getmtime(obj_path))
 | |
| 612 | +                except FileNotFoundError:
 | |
| 613 | +                    pass
 | |
| 614 | +                else:
 | |
| 615 | +                    objs.append(obj_path)
 | |
| 616 | + | |
| 617 | +        # NOTE: Sorted will sort from earliest to latest, thus the
 | |
| 618 | +        # first element of this list will be the file modified earliest.
 | |
| 619 | +        return sorted(zip(mtimes, objs))
 | |
| 620 | + | |
| 621 | +    def clean_up_refs_until(self, time):
 | |
| 622 | +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | |
| 623 | + | |
| 624 | +        for root, _, files in os.walk(ref_heads):
 | |
| 625 | +            for filename in files:
 | |
| 626 | +                ref_path = os.path.join(root, filename)
 | |
| 627 | +                # Obtain the mtime (the time a file was last modified)
 | |
| 628 | +                if os.path.getmtime(ref_path) < time:
 | |
| 629 | +                    os.unlink(ref_path)
 | |
| 630 | + | |
| 577 | 631 |      # remove():
 | 
| 578 | 632 |      #
 | 
| 579 | 633 |      # Removes the given symbolic ref from the repo.
 | 
| ... | ... | @@ -625,7 +679,12 @@ class CASCache(ArtifactCache): | 
| 625 | 679 |      #
 | 
| 626 | 680 |      # Prune unreachable objects from the repo.
 | 
| 627 | 681 |      #
 | 
| 628 | -    def prune(self):
 | |
| 682 | +    # Args:
 | |
| 683 | +    #    keep_after (int|None): timestamp after which unreachable objects
 | |
| 684 | +    #                           are kept. None if no unreachable object
 | |
| 685 | +    #                           should be kept.
 | |
| 686 | +    #
 | |
| 687 | +    def prune(self, keep_after=None):
 | |
| 629 | 688 |          ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | 
| 630 | 689 |  | 
| 631 | 690 |          pruned = 0
 | 
| ... | ... | @@ -646,11 +705,19 @@ class CASCache(ArtifactCache): | 
| 646 | 705 |                  objhash = os.path.basename(root) + filename
 | 
| 647 | 706 |                  if objhash not in reachable:
 | 
| 648 | 707 |                      obj_path = os.path.join(root, filename)
 | 
| 708 | +                    if keep_after:
 | |
| 709 | +                        st = os.stat(obj_path)
 | |
| 710 | +                        if st.st_mtime >= keep_after:
 | |
| 711 | +                            continue
 | |
| 649 | 712 |                      pruned += os.stat(obj_path).st_size
 | 
| 650 | 713 |                      os.unlink(obj_path)
 | 
| 651 | 714 |  | 
| 652 | 715 |          return pruned
 | 
| 653 | 716 |  | 
| 717 | +    def update_tree_mtime(self, tree):
 | |
| 718 | +        reachable = set()
 | |
| 719 | +        self._reachable_refs_dir(reachable, tree, update_mtime=True)
 | |
| 720 | + | |
| 654 | 721 |      ################################################
 | 
| 655 | 722 |      #             Local Private Methods            #
 | 
| 656 | 723 |      ################################################
 | 
| ... | ... | @@ -795,7 +862,7 @@ class CASCache(ArtifactCache): | 
| 795 | 862 |                  a += 1
 | 
| 796 | 863 |                  b += 1
 | 
| 797 | 864 |  | 
| 798 | -    def _reachable_refs_dir(self, reachable, tree):
 | |
| 865 | +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
 | |
| 799 | 866 |          if tree.hash in reachable:
 | 
| 800 | 867 |              return
 | 
| 801 | 868 |  | 
| ... | ... | @@ -807,10 +874,14 @@ class CASCache(ArtifactCache): | 
| 807 | 874 |              directory.ParseFromString(f.read())
 | 
| 808 | 875 |  | 
| 809 | 876 |          for filenode in directory.files:
 | 
| 877 | +            if update_mtime:
 | |
| 878 | +                os.utime(self.objpath(filenode.digest))
 | |
| 810 | 879 |              reachable.add(filenode.digest.hash)
 | 
| 811 | 880 |  | 
| 812 | 881 |          for dirnode in directory.directories:
 | 
| 813 | -            self._reachable_refs_dir(reachable, dirnode.digest)
 | |
| 882 | +            if update_mtime:
 | |
| 883 | +                os.utime(self.objpath(dirnode.digest))
 | |
| 884 | +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
 | |
| 814 | 885 |  | 
| 815 | 886 |      def _initialize_remote(self, remote_spec, q):
 | 
| 816 | 887 |          try:
 | 
| ... | ... | @@ -887,7 +958,7 @@ class CASCache(ArtifactCache): | 
| 887 | 958 |          with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | 
| 888 | 959 |              self._fetch_blob(remote, digest, f)
 | 
| 889 | 960 |  | 
| 890 | -            added_digest = self.add_object(path=f.name)
 | |
| 961 | +            added_digest = self.add_object(path=f.name, link_directly=True)
 | |
| 891 | 962 |              assert added_digest.hash == digest.hash
 | 
| 892 | 963 |  | 
| 893 | 964 |          return objpath
 | 
| ... | ... | @@ -898,7 +969,7 @@ class CASCache(ArtifactCache): | 
| 898 | 969 |                  f.write(data)
 | 
| 899 | 970 |                  f.flush()
 | 
| 900 | 971 |  | 
| 901 | -                added_digest = self.add_object(path=f.name)
 | |
| 972 | +                added_digest = self.add_object(path=f.name, link_directly=True)
 | |
| 902 | 973 |                  assert added_digest.hash == digest.hash
 | 
| 903 | 974 |  | 
| 904 | 975 |      # Helper function for _fetch_directory().
 | 
| ... | ... | @@ -1202,6 +1273,9 @@ class _CASBatchRead(): | 
| 1202 | 1273 |          batch_response = self._remote.cas.BatchReadBlobs(self._request)
 | 
| 1203 | 1274 |  | 
| 1204 | 1275 |          for response in batch_response.responses:
 | 
| 1276 | +            if response.status.code == code_pb2.NOT_FOUND:
 | |
| 1277 | +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
 | |
| 1278 | +                    response.digest.hash, response.status.code))
 | |
| 1205 | 1279 |              if response.status.code != code_pb2.OK:
 | 
| 1206 | 1280 |                  raise ArtifactError("Failed to download blob {}: {}".format(
 | 
| 1207 | 1281 |                      response.digest.hash, response.status.code))
 | 
| ... | ... | @@ -24,6 +24,9 @@ import signal | 
| 24 | 24 |  import sys
 | 
| 25 | 25 |  import tempfile
 | 
| 26 | 26 |  import uuid
 | 
| 27 | +import errno
 | |
| 28 | +import ctypes
 | |
| 29 | +import threading
 | |
| 27 | 30 |  | 
| 28 | 31 |  import click
 | 
| 29 | 32 |  import grpc
 | 
| ... | ... | @@ -31,6 +34,7 @@ import grpc | 
| 31 | 34 |  from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 32 | 35 |  from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | 
| 33 | 36 |  from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
 | 
| 37 | +from .._protos.google.rpc import code_pb2
 | |
| 34 | 38 |  | 
| 35 | 39 |  from .._exceptions import ArtifactError
 | 
| 36 | 40 |  from .._context import Context
 | 
| ... | ... | @@ -40,6 +44,10 @@ from .._context import Context | 
| 40 | 44 |  # Limit payload to 1 MiB to leave sufficient headroom for metadata.
 | 
| 41 | 45 |  _MAX_PAYLOAD_BYTES = 1024 * 1024
 | 
| 42 | 46 |  | 
| 47 | +# The minimum age in seconds for objects before they can be cleaned
 | |
| 48 | +# up.
 | |
| 49 | +_OBJECT_MIN_AGE = 6 * 60 * 60
 | |
| 50 | + | |
| 43 | 51 |  | 
| 44 | 52 |  # Trying to push an artifact that is too large
 | 
| 45 | 53 |  class ArtifactTooLargeException(Exception):
 | 
| ... | ... | @@ -54,7 +62,7 @@ class ArtifactTooLargeException(Exception): | 
| 54 | 62 |  #     repo (str): Path to CAS repository
 | 
| 55 | 63 |  #     enable_push (bool): Whether to allow blob uploads and artifact updates
 | 
| 56 | 64 |  #
 | 
| 57 | -def create_server(repo, *, enable_push):
 | |
| 65 | +def create_server(repo, max_head_size, min_head_size, *, enable_push):
 | |
| 58 | 66 |      context = Context()
 | 
| 59 | 67 |      context.artifactdir = os.path.abspath(repo)
 | 
| 60 | 68 |  | 
| ... | ... | @@ -64,11 +72,13 @@ def create_server(repo, *, enable_push): | 
| 64 | 72 |      max_workers = (os.cpu_count() or 1) * 5
 | 
| 65 | 73 |      server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 66 | 74 |  | 
| 75 | +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
 | |
| 76 | + | |
| 67 | 77 |      bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
 | 
| 68 | -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
 | |
| 78 | +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | |
| 69 | 79 |  | 
| 70 | 80 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 71 | -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
 | |
| 81 | +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | |
| 72 | 82 |  | 
| 73 | 83 |      remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 74 | 84 |          _CapabilitiesServicer(), server)
 | 
| ... | ... | @@ -86,9 +96,16 @@ def create_server(repo, *, enable_push): | 
| 86 | 96 |  @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
 | 
| 87 | 97 |  @click.option('--enable-push', default=False, is_flag=True,
 | 
| 88 | 98 |                help="Allow clients to upload blobs and update artifact cache")
 | 
| 99 | +@click.option('--head-room-min', type=click.INT,
 | |
| 100 | +              help="Disk head room minimum in bytes",
 | |
| 101 | +              default=2e9)
 | |
| 102 | +@click.option('--head-room-max', type=click.INT,
 | |
| 103 | +              help="Disk head room maximum in bytes",
 | |
| 104 | +              default=10e9)
 | |
| 89 | 105 |  @click.argument('repo')
 | 
| 90 | -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
 | |
| 91 | -    server = create_server(repo, enable_push=enable_push)
 | |
| 106 | +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
 | |
| 107 | +                head_room_min, head_room_max):
 | |
| 108 | +    server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
 | |
| 92 | 109 |  | 
| 93 | 110 |      use_tls = bool(server_key)
 | 
| 94 | 111 |  | 
| ... | ... | @@ -129,11 +146,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): | 
| 129 | 146 |          server.stop(0)
 | 
| 130 | 147 |  | 
| 131 | 148 |  | 
| 149 | +class _FallocateCall:
 | |
| 150 | + | |
| 151 | +    FALLOC_FL_KEEP_SIZE = 1
 | |
| 152 | +    FALLOC_FL_PUNCH_HOLE = 2
 | |
| 153 | +    FALLOC_FL_NO_HIDE_STALE = 4
 | |
| 154 | +    FALLOC_FL_COLLAPSE_RANGE = 8
 | |
| 155 | +    FALLOC_FL_ZERO_RANGE = 16
 | |
| 156 | +    FALLOC_FL_INSERT_RANGE = 32
 | |
| 157 | +    FALLOC_FL_UNSHARE_RANGE = 64
 | |
| 158 | + | |
| 159 | +    def __init__(self):
 | |
| 160 | +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
 | |
| 161 | +        try:
 | |
| 162 | +            self.fallocate64 = self.libc.fallocate64
 | |
| 163 | +        except AttributeError:
 | |
| 164 | +            self.fallocate = self.libc.fallocate
 | |
| 165 | + | |
| 166 | +    def __call__(self, fd, mode, offset, length):
 | |
| 167 | +        if hasattr(self, 'fallocate64'):
 | |
| 168 | +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
 | |
| 169 | +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
 | |
| 170 | +        else:
 | |
| 171 | +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
 | |
| 172 | +                                 ctypes.c_int(offset), ctypes.c_int(length))
 | |
| 173 | +        if ret == -1:
 | |
| 174 | +            err = ctypes.get_errno()
 | |
| 175 | +            raise OSError(errno, os.strerror(err))
 | |
| 176 | +        return ret
 | |
| 177 | + | |
| 178 | + | |
| 132 | 179 |  class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 133 | -    def __init__(self, cas, *, enable_push):
 | |
| 180 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | |
| 134 | 181 |          super().__init__()
 | 
| 135 | 182 |          self.cas = cas
 | 
| 136 | 183 |          self.enable_push = enable_push
 | 
| 184 | +        self.fallocate = _FallocateCall()
 | |
| 185 | +        self.cache_cleaner = cache_cleaner
 | |
| 137 | 186 |  | 
| 138 | 187 |      def Read(self, request, context):
 | 
| 139 | 188 |          resource_name = request.resource_name
 | 
| ... | ... | @@ -191,25 +240,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 191 | 240 |                          context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 192 | 241 |                          return response
 | 
| 193 | 242 |  | 
| 194 | -                    try:
 | |
| 195 | -                        _clean_up_cache(self.cas, client_digest.size_bytes)
 | |
| 196 | -                    except ArtifactTooLargeException as e:
 | |
| 197 | -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 198 | -                        context.set_details(str(e))
 | |
| 199 | -                        return response
 | |
| 243 | +                    while True:
 | |
| 244 | +                        if client_digest.size_bytes == 0:
 | |
| 245 | +                            break
 | |
| 246 | +                        try:
 | |
| 247 | +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
 | |
| 248 | +                        except ArtifactTooLargeException as e:
 | |
| 249 | +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 250 | +                            context.set_details(str(e))
 | |
| 251 | +                            return response
 | |
| 252 | + | |
| 253 | +                        try:
 | |
| 254 | +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
 | |
| 255 | +                            break
 | |
| 256 | +                        except OSError as e:
 | |
| 257 | +                            # Multiple upload can happen in the same time
 | |
| 258 | +                            if e.errno != errno.ENOSPC:
 | |
| 259 | +                                raise
 | |
| 260 | + | |
| 200 | 261 |                  elif request.resource_name:
 | 
| 201 | 262 |                      # If it is set on subsequent calls, it **must** match the value of the first request.
 | 
| 202 | 263 |                      if request.resource_name != resource_name:
 | 
| 203 | 264 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 204 | 265 |                          return response
 | 
| 266 | + | |
| 267 | +                if (offset + len(request.data)) > client_digest.size_bytes:
 | |
| 268 | +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | |
| 269 | +                    return response
 | |
| 270 | + | |
| 205 | 271 |                  out.write(request.data)
 | 
| 272 | + | |
| 206 | 273 |                  offset += len(request.data)
 | 
| 274 | + | |
| 207 | 275 |                  if request.finish_write:
 | 
| 208 | 276 |                      if client_digest.size_bytes != offset:
 | 
| 209 | 277 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 210 | 278 |                          return response
 | 
| 211 | 279 |                      out.flush()
 | 
| 212 | -                    digest = self.cas.add_object(path=out.name)
 | |
| 280 | +                    digest = self.cas.add_object(path=out.name, link_directly=True)
 | |
| 213 | 281 |                      if digest.hash != client_digest.hash:
 | 
| 214 | 282 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 215 | 283 |                          return response
 | 
| ... | ... | @@ -222,18 +290,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 222 | 290 |  | 
| 223 | 291 |  | 
| 224 | 292 |  class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 225 | -    def __init__(self, cas, *, enable_push):
 | |
| 293 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | |
| 226 | 294 |          super().__init__()
 | 
| 227 | 295 |          self.cas = cas
 | 
| 228 | 296 |          self.enable_push = enable_push
 | 
| 297 | +        self.cache_cleaner = cache_cleaner
 | |
| 229 | 298 |  | 
| 230 | 299 |      def FindMissingBlobs(self, request, context):
 | 
| 231 | 300 |          response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| 232 | 301 |          for digest in request.blob_digests:
 | 
| 233 | -            if not _has_object(self.cas, digest):
 | |
| 234 | -                d = response.missing_blob_digests.add()
 | |
| 235 | -                d.hash = digest.hash
 | |
| 236 | -                d.size_bytes = digest.size_bytes
 | |
| 302 | +            objpath = self.cas.objpath(digest)
 | |
| 303 | +            try:
 | |
| 304 | +                os.utime(objpath)
 | |
| 305 | +            except OSError as e:
 | |
| 306 | +                if e.errno != errno.ENOENT:
 | |
| 307 | +                    raise
 | |
| 308 | +                else:
 | |
| 309 | +                    d = response.missing_blob_digests.add()
 | |
| 310 | +                    d.hash = digest.hash
 | |
| 311 | +                    d.size_bytes = digest.size_bytes
 | |
| 312 | + | |
| 237 | 313 |          return response
 | 
| 238 | 314 |  | 
| 239 | 315 |      def BatchReadBlobs(self, request, context):
 | 
| ... | ... | @@ -252,12 +328,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 252 | 328 |              try:
 | 
| 253 | 329 |                  with open(self.cas.objpath(digest), 'rb') as f:
 | 
| 254 | 330 |                      if os.fstat(f.fileno()).st_size != digest.size_bytes:
 | 
| 255 | -                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
 | |
| 331 | +                        blob_response.status.code = code_pb2.NOT_FOUND
 | |
| 256 | 332 |                          continue
 | 
| 257 | 333 |  | 
| 258 | 334 |                      blob_response.data = f.read(digest.size_bytes)
 | 
| 259 | 335 |              except FileNotFoundError:
 | 
| 260 | -                blob_response.status.code = grpc.StatusCode.NOT_FOUND
 | |
| 336 | +                blob_response.status.code = code_pb2.NOT_FOUND
 | |
| 261 | 337 |  | 
| 262 | 338 |          return response
 | 
| 263 | 339 |  | 
| ... | ... | @@ -287,7 +363,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 287 | 363 |                  continue
 | 
| 288 | 364 |  | 
| 289 | 365 |              try:
 | 
| 290 | -                _clean_up_cache(self.cas, digest.size_bytes)
 | |
| 366 | +                self.cache_cleaner.clean_up(digest.size_bytes)
 | |
| 291 | 367 |  | 
| 292 | 368 |                  with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
 | 
| 293 | 369 |                      out.write(blob_request.data)
 | 
| ... | ... | @@ -330,6 +406,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 330 | 406 |  | 
| 331 | 407 |          try:
 | 
| 332 | 408 |              tree = self.cas.resolve_ref(request.key, update_mtime=True)
 | 
| 409 | +            try:
 | |
| 410 | +                self.cas.update_tree_mtime(tree)
 | |
| 411 | +            except FileNotFoundError:
 | |
| 412 | +                self.cas.remove(request.key, defer_prune=True)
 | |
| 413 | +                context.set_code(grpc.StatusCode.NOT_FOUND)
 | |
| 414 | +                return response
 | |
| 333 | 415 |  | 
| 334 | 416 |              response.digest.hash = tree.hash
 | 
| 335 | 417 |              response.digest.size_bytes = tree.size_bytes
 | 
| ... | ... | @@ -402,60 +484,80 @@ def _digest_from_upload_resource_name(resource_name): | 
| 402 | 484 |          return None
 | 
| 403 | 485 |  | 
| 404 | 486 |  | 
| 405 | -def _has_object(cas, digest):
 | |
| 406 | -    objpath = cas.objpath(digest)
 | |
| 407 | -    return os.path.exists(objpath)
 | |
| 487 | +class _CacheCleaner:
 | |
| 408 | 488 |  | 
| 489 | +    __cleanup_cache_lock = threading.Lock()
 | |
| 409 | 490 |  | 
| 410 | -# _clean_up_cache()
 | |
| 411 | -#
 | |
| 412 | -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | |
| 413 | -# is enough space for the incoming artifact
 | |
| 414 | -#
 | |
| 415 | -# Args:
 | |
| 416 | -#   cas: CASCache object
 | |
| 417 | -#   object_size: The size of the object being received in bytes
 | |
| 418 | -#
 | |
| 419 | -# Returns:
 | |
| 420 | -#   int: The total bytes removed on the filesystem
 | |
| 421 | -#
 | |
| 422 | -def _clean_up_cache(cas, object_size):
 | |
| 423 | -    # Determine the available disk space, in bytes, of the file system
 | |
| 424 | -    # which mounts the repo
 | |
| 425 | -    stats = os.statvfs(cas.casdir)
 | |
| 426 | -    buffer_ = int(2e9)                # Add a 2 GB buffer
 | |
| 427 | -    free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
 | |
| 428 | -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
 | |
| 429 | - | |
| 430 | -    if object_size > total_disk_space:
 | |
| 431 | -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | |
| 432 | -                                        "the filesystem which mounts the remote "
 | |
| 433 | -                                        "cache".format(object_size))
 | |
| 434 | - | |
| 435 | -    if object_size <= free_disk_space:
 | |
| 436 | -        # No need to clean up
 | |
| 437 | -        return 0
 | |
| 438 | - | |
| 439 | -    # obtain a list of LRP artifacts
 | |
| 440 | -    LRP_artifacts = cas.list_artifacts()
 | |
| 441 | - | |
| 442 | -    removed_size = 0  # in bytes
 | |
| 443 | -    while object_size - removed_size > free_disk_space:
 | |
| 444 | -        try:
 | |
| 445 | -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
 | |
| 446 | -        except IndexError:
 | |
| 447 | -            # This exception is caught if there are no more artifacts in the list
 | |
| 448 | -            # LRP_artifacts. This means the the artifact is too large for the filesystem
 | |
| 449 | -            # so we abort the process
 | |
| 450 | -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | |
| 451 | -                                            "the filesystem which mounts the remote "
 | |
| 452 | -                                            "cache".format(object_size))
 | |
| 491 | +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
 | |
| 492 | +        self.__cas = cas
 | |
| 493 | +        self.__max_head_size = max_head_size
 | |
| 494 | +        self.__min_head_size = min_head_size
 | |
| 453 | 495 |  | 
| 454 | -        removed_size += cas.remove(to_remove, defer_prune=False)
 | |
| 496 | +    def __has_space(self, object_size):
 | |
| 497 | +        stats = os.statvfs(self.__cas.casdir)
 | |
| 498 | +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
 | |
| 499 | +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
 | |
| 455 | 500 |  | 
| 456 | -    if removed_size > 0:
 | |
| 457 | -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | |
| 458 | -    else:
 | |
| 459 | -        logging.info("No artifacts were removed from the cache.")
 | |
| 501 | +        if object_size > total_disk_space:
 | |
| 502 | +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | |
| 503 | +                                            "the filesystem which mounts the remote "
 | |
| 504 | +                                            "cache".format(object_size))
 | |
| 460 | 505 |  | 
| 461 | -    return removed_size | |
| 506 | +        return object_size <= free_disk_space
 | |
| 507 | + | |
| 508 | +    # _clean_up_cache()
 | |
| 509 | +    #
 | |
| 510 | +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | |
| 511 | +    # is enough space for the incoming artifact
 | |
| 512 | +    #
 | |
| 513 | +    # Args:
 | |
| 514 | +    #   object_size: The size of the object being received in bytes
 | |
| 515 | +    #
 | |
| 516 | +    # Returns:
 | |
| 517 | +    #   int: The total bytes removed on the filesystem
 | |
| 518 | +    #
 | |
| 519 | +    def clean_up(self, object_size):
 | |
| 520 | +        if self.__has_space(object_size):
 | |
| 521 | +            return 0
 | |
| 522 | + | |
| 523 | +        with _CacheCleaner.__cleanup_cache_lock:
 | |
| 524 | +            if self.__has_space(object_size):
 | |
| 525 | +                # Another thread has done the cleanup for us
 | |
| 526 | +                return 0
 | |
| 527 | + | |
| 528 | +            stats = os.statvfs(self.__cas.casdir)
 | |
| 529 | +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
 | |
| 530 | + | |
| 531 | +            # obtain a list of LRP artifacts
 | |
| 532 | +            LRP_objects = self.__cas.list_objects()
 | |
| 533 | + | |
| 534 | +            removed_size = 0  # in bytes
 | |
| 535 | + | |
| 536 | +            last_mtime = 0
 | |
| 537 | + | |
| 538 | +            while object_size - removed_size > target_disk_space:
 | |
| 539 | +                try:
 | |
| 540 | +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
 | |
| 541 | +                except IndexError:
 | |
| 542 | +                    # This exception is caught if there are no more artifacts in the list
 | |
| 543 | +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
 | |
| 544 | +                    # so we abort the process
 | |
| 545 | +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | |
| 546 | +                                                    "the filesystem which mounts the remote "
 | |
| 547 | +                                                    "cache".format(object_size))
 | |
| 548 | + | |
| 549 | +                try:
 | |
| 550 | +                    size = os.stat(to_remove).st_size
 | |
| 551 | +                    os.unlink(to_remove)
 | |
| 552 | +                    removed_size += size
 | |
| 553 | +                except FileNotFoundError:
 | |
| 554 | +                    pass
 | |
| 555 | + | |
| 556 | +            self.__cas.clean_up_refs_until(last_mtime)
 | |
| 557 | + | |
| 558 | +            if removed_size > 0:
 | |
| 559 | +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | |
| 560 | +            else:
 | |
| 561 | +                logging.info("No artifacts were removed from the cache.")
 | |
| 562 | + | |
| 563 | +            return removed_size | 
| ... | ... | @@ -23,7 +23,7 @@ | 
| 23 | 23 |  # This version is bumped whenever enhancements are made
 | 
| 24 | 24 |  # to the `project.conf` format or the core element format.
 | 
| 25 | 25 |  #
 | 
| 26 | -BST_FORMAT_VERSION = 17
 | |
| 26 | +BST_FORMAT_VERSION = 18
 | |
| 27 | 27 |  | 
| 28 | 28 |  | 
| 29 | 29 |  # The base BuildStream artifact version
 | 
| ... | ... | @@ -1049,6 +1049,12 @@ class ChainMap(collections.ChainMap): | 
| 1049 | 1049 |          for key in clearable:
 | 
| 1050 | 1050 |              del self[key]
 | 
| 1051 | 1051 |  | 
| 1052 | +    def get(self, key, default=None):
 | |
| 1053 | +        try:
 | |
| 1054 | +            return self[key]
 | |
| 1055 | +        except KeyError:
 | |
| 1056 | +            return default
 | |
| 1057 | + | |
| 1052 | 1058 |  | 
| 1053 | 1059 |  def node_chain_copy(source):
 | 
| 1054 | 1060 |      copy = ChainMap({}, source)
 | 
| 1 | -# No variables added for the manual element by default, set
 | |
| 2 | -# this if you plan to use make, and the sources cannot handle
 | |
| 3 | -# parallelization.
 | |
| 4 | -#
 | |
| 5 | -# variables:
 | |
| 6 | -#
 | |
| 7 | -#   notparallel: True
 | |
| 8 | - | |
| 9 | 1 |  # Manual build element does not provide any default
 | 
| 10 | 2 |  # build commands
 | 
| 11 | 3 |  config:
 | 
| ... | ... | @@ -28,14 +20,3 @@ config: | 
| 28 | 20 |    strip-commands:
 | 
| 29 | 21 |    - |
 | 
| 30 | 22 |      %{strip-binaries} | 
| 31 | - | |
| 32 | -# Use max-jobs CPUs for building and enable verbosity
 | |
| 33 | -environment:
 | |
| 34 | -  MAKEFLAGS: -j%{max-jobs}
 | |
| 35 | -  V: 1
 | |
| 36 | - | |
| 37 | -# And dont consider MAKEFLAGS or V as something which may
 | |
| 38 | -# affect build output.
 | |
| 39 | -environment-nocache:
 | |
| 40 | -- MAKEFLAGS
 | |
| 41 | -- V | 
| ... | ... | @@ -253,6 +253,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): | 
| 253 | 253 |          assert cli.get_element_state(project, 'element2.bst') == 'cached'
 | 
| 254 | 254 |          assert_shared(cli, share, project, 'element2.bst')
 | 
| 255 | 255 |  | 
| 256 | +        share.make_all_objects_older()
 | |
| 257 | + | |
| 256 | 258 |          # Create and build another element of 5 MB (This will exceed the free disk space available)
 | 
| 257 | 259 |          create_element_size('element3.bst', project, element_path, [], int(5e6))
 | 
| 258 | 260 |          result = cli.run(project=project, args=['build', 'element3.bst'])
 | 
| ... | ... | @@ -350,6 +352,7 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): | 
| 350 | 352 |          assert cli.get_element_state(project, 'element1.bst') == 'cached'
 | 
| 351 | 353 |  | 
| 352 | 354 |          wait_for_cache_granularity()
 | 
| 355 | +        share.make_all_objects_older()
 | |
| 353 | 356 |  | 
| 354 | 357 |          # Create and build the element3 (of 5 MB)
 | 
| 355 | 358 |          create_element_size('element3.bst', project, element_path, [], int(5e6))
 | 
| ... | ... | @@ -138,6 +138,15 @@ class ArtifactShare(): | 
| 138 | 138 |          except ArtifactError:
 | 
| 139 | 139 |              return False
 | 
| 140 | 140 |  | 
| 141 | +    def make_all_objects_older(self):
 | |
| 142 | +        for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
 | |
| 143 | +            for name in files:
 | |
| 144 | +                fullname = os.path.join(root, name)
 | |
| 145 | +                st = os.stat(fullname)
 | |
| 146 | +                mtime = st.st_mtime - 6 * 60 * 60
 | |
| 147 | +                atime = st.st_atime - 6 * 60 * 60
 | |
| 148 | +                os.utime(fullname, times=(atime, mtime))
 | |
| 149 | + | |
| 141 | 150 |      # close():
 | 
| 142 | 151 |      #
 | 
| 143 | 152 |      # Remove the artifact share.
 | 
