Jim MacArthur pushed to branch jmac/no-verify-digests at BuildStream / buildstream
Commits:
- 
499c70fd
by Valentin David at 2018-11-28T09:11:21Z
- 
3513580c
by Valentin David at 2018-11-28T14:02:32Z
- 
26cdee08
by Valentin David at 2018-11-28T14:29:52Z
- 
58ca298f
by Valentin David at 2018-11-28T14:29:52Z
- 
227fa26d
by Valentin David at 2018-11-28T14:29:52Z
- 
5ef19a0b
by Valentin David at 2018-11-28T14:29:52Z
- 
8d2946ff
by Valentin David at 2018-11-28T14:29:52Z
- 
b587953a
by Valentin David at 2018-11-28T14:29:52Z
- 
a64f667d
by Valentin David at 2018-11-28T14:29:52Z
- 
353b90dd
by Valentin David at 2018-11-28T14:29:52Z
- 
ba9afa98
by Valentin David at 2018-11-28T14:29:52Z
- 
9a458402
by Valentin David at 2018-11-29T08:47:40Z
- 
47b7a9ba
by Jim MacArthur at 2018-11-29T09:58:48Z
6 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/sandbox/_sandboxremote.py
- setup.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -25,6 +25,7 @@ import os | 
| 25 | 25 |  import stat
 | 
| 26 | 26 |  import tempfile
 | 
| 27 | 27 |  import uuid
 | 
| 28 | +import contextlib
 | |
| 28 | 29 |  from urllib.parse import urlparse
 | 
| 29 | 30 |  | 
| 30 | 31 |  import grpc
 | 
| ... | ... | @@ -88,6 +89,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key | 
| 88 | 89 |  CASRemoteSpec.__new__.__defaults__ = (None, None, None)
 | 
| 89 | 90 |  | 
| 90 | 91 |  | 
| 92 | +class BlobNotFound(CASError):
 | |
| 93 | + | |
| 94 | +    def __init__(self, blob, msg):
 | |
| 95 | +        self.blob = blob
 | |
| 96 | +        super().__init__(msg)
 | |
| 97 | + | |
| 98 | + | |
| 91 | 99 |  # A CASCache manages a CAS repository as specified in the Remote Execution API.
 | 
| 92 | 100 |  #
 | 
| 93 | 101 |  # Args:
 | 
| ... | ... | @@ -299,6 +307,8 @@ class CASCache(): | 
| 299 | 307 |                  raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
 | 
| 300 | 308 |              else:
 | 
| 301 | 309 |                  return False
 | 
| 310 | +        except BlobNotFound as e:
 | |
| 311 | +            return False
 | |
| 302 | 312 |  | 
| 303 | 313 |      # pull_tree():
 | 
| 304 | 314 |      #
 | 
| ... | ... | @@ -471,13 +481,14 @@ class CASCache(): | 
| 471 | 481 |      #     digest (Digest): An optional Digest object to populate
 | 
| 472 | 482 |      #     path (str): Path to file to add
 | 
| 473 | 483 |      #     buffer (bytes): Byte buffer to add
 | 
| 484 | +    #     link_directly (bool): Whether file given by path can be linked
 | |
| 474 | 485 |      #
 | 
| 475 | 486 |      # Returns:
 | 
| 476 | 487 |      #     (Digest): The digest of the added object
 | 
| 477 | 488 |      #
 | 
| 478 | 489 |      # Either `path` or `buffer` must be passed, but not both.
 | 
| 479 | 490 |      #
 | 
| 480 | -    def add_object(self, *, digest=None, path=None, buffer=None):
 | |
| 491 | +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
 | |
| 481 | 492 |          # Exactly one of the two parameters has to be specified
 | 
| 482 | 493 |          assert (path is None) != (buffer is None)
 | 
| 483 | 494 |  | 
| ... | ... | @@ -487,28 +498,34 @@ class CASCache(): | 
| 487 | 498 |          try:
 | 
| 488 | 499 |              h = hashlib.sha256()
 | 
| 489 | 500 |              # Always write out new file to avoid corruption if input file is modified
 | 
| 490 | -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | |
| 491 | -                # Set mode bits to 0644
 | |
| 492 | -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
 | |
| 493 | - | |
| 494 | -                if path:
 | |
| 495 | -                    with open(path, 'rb') as f:
 | |
| 496 | -                        for chunk in iter(lambda: f.read(4096), b""):
 | |
| 497 | -                            h.update(chunk)
 | |
| 498 | -                            out.write(chunk)
 | |
| 501 | +            with contextlib.ExitStack() as stack:
 | |
| 502 | +                if path is not None and link_directly:
 | |
| 503 | +                    tmp = stack.enter_context(open(path, 'rb'))
 | |
| 504 | +                    for chunk in iter(lambda: tmp.read(4096), b""):
 | |
| 505 | +                        h.update(chunk)
 | |
| 499 | 506 |                  else:
 | 
| 500 | -                    h.update(buffer)
 | |
| 501 | -                    out.write(buffer)
 | |
| 507 | +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
 | |
| 508 | +                    # Set mode bits to 0644
 | |
| 509 | +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
 | |
| 502 | 510 |  | 
| 503 | -                out.flush()
 | |
| 511 | +                    if path:
 | |
| 512 | +                        with open(path, 'rb') as f:
 | |
| 513 | +                            for chunk in iter(lambda: f.read(4096), b""):
 | |
| 514 | +                                h.update(chunk)
 | |
| 515 | +                                tmp.write(chunk)
 | |
| 516 | +                    else:
 | |
| 517 | +                        h.update(buffer)
 | |
| 518 | +                        tmp.write(buffer)
 | |
| 519 | + | |
| 520 | +                    tmp.flush()
 | |
| 504 | 521 |  | 
| 505 | 522 |                  digest.hash = h.hexdigest()
 | 
| 506 | -                digest.size_bytes = os.fstat(out.fileno()).st_size
 | |
| 523 | +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
 | |
| 507 | 524 |  | 
| 508 | 525 |                  # Place file at final location
 | 
| 509 | 526 |                  objpath = self.objpath(digest)
 | 
| 510 | 527 |                  os.makedirs(os.path.dirname(objpath), exist_ok=True)
 | 
| 511 | -                os.link(out.name, objpath)
 | |
| 528 | +                os.link(tmp.name, objpath)
 | |
| 512 | 529 |  | 
| 513 | 530 |          except FileExistsError as e:
 | 
| 514 | 531 |              # We can ignore the failed link() if the object is already in the repo.
 | 
| ... | ... | @@ -606,6 +623,41 @@ class CASCache(): | 
| 606 | 623 |          # first ref of this list will be the file modified earliest.
 | 
| 607 | 624 |          return [ref for _, ref in sorted(zip(mtimes, refs))]
 | 
| 608 | 625 |  | 
| 626 | +    # list_objects():
 | |
| 627 | +    #
 | |
| 628 | +    # List cached objects in Least Recently Modified (LRM) order.
 | |
| 629 | +    #
 | |
| 630 | +    # Returns:
 | |
| 631 | +    #     (list) - A list of objects and timestamps in LRM order
 | |
| 632 | +    #
 | |
| 633 | +    def list_objects(self):
 | |
| 634 | +        objs = []
 | |
| 635 | +        mtimes = []
 | |
| 636 | + | |
| 637 | +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
 | |
| 638 | +            for filename in files:
 | |
| 639 | +                obj_path = os.path.join(root, filename)
 | |
| 640 | +                try:
 | |
| 641 | +                    mtimes.append(os.path.getmtime(obj_path))
 | |
| 642 | +                except FileNotFoundError:
 | |
| 643 | +                    pass
 | |
| 644 | +                else:
 | |
| 645 | +                    objs.append(obj_path)
 | |
| 646 | + | |
| 647 | +        # NOTE: Sorted will sort from earliest to latest, thus the
 | |
| 648 | +        # first element of this list will be the file modified earliest.
 | |
| 649 | +        return sorted(zip(mtimes, objs))
 | |
| 650 | + | |
| 651 | +    def clean_up_refs_until(self, time):
 | |
| 652 | +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | |
| 653 | + | |
| 654 | +        for root, _, files in os.walk(ref_heads):
 | |
| 655 | +            for filename in files:
 | |
| 656 | +                ref_path = os.path.join(root, filename)
 | |
| 657 | +                # Obtain the mtime (the time a file was last modified)
 | |
| 658 | +                if os.path.getmtime(ref_path) < time:
 | |
| 659 | +                    os.unlink(ref_path)
 | |
| 660 | + | |
| 609 | 661 |      # remove():
 | 
| 610 | 662 |      #
 | 
| 611 | 663 |      # Removes the given symbolic ref from the repo.
 | 
| ... | ... | @@ -665,6 +717,10 @@ class CASCache(): | 
| 665 | 717 |  | 
| 666 | 718 |          return pruned
 | 
| 667 | 719 |  | 
| 720 | +    def update_tree_mtime(self, tree):
 | |
| 721 | +        reachable = set()
 | |
| 722 | +        self._reachable_refs_dir(reachable, tree, update_mtime=True)
 | |
| 723 | + | |
| 668 | 724 |      ################################################
 | 
| 669 | 725 |      #             Local Private Methods            #
 | 
| 670 | 726 |      ################################################
 | 
| ... | ... | @@ -811,10 +867,13 @@ class CASCache(): | 
| 811 | 867 |                  a += 1
 | 
| 812 | 868 |                  b += 1
 | 
| 813 | 869 |  | 
| 814 | -    def _reachable_refs_dir(self, reachable, tree):
 | |
| 870 | +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
 | |
| 815 | 871 |          if tree.hash in reachable:
 | 
| 816 | 872 |              return
 | 
| 817 | 873 |  | 
| 874 | +        if update_mtime:
 | |
| 875 | +            os.utime(self.objpath(tree))
 | |
| 876 | + | |
| 818 | 877 |          reachable.add(tree.hash)
 | 
| 819 | 878 |  | 
| 820 | 879 |          directory = remote_execution_pb2.Directory()
 | 
| ... | ... | @@ -823,10 +882,12 @@ class CASCache(): | 
| 823 | 882 |              directory.ParseFromString(f.read())
 | 
| 824 | 883 |  | 
| 825 | 884 |          for filenode in directory.files:
 | 
| 885 | +            if update_mtime:
 | |
| 886 | +                os.utime(self.objpath(filenode.digest))
 | |
| 826 | 887 |              reachable.add(filenode.digest.hash)
 | 
| 827 | 888 |  | 
| 828 | 889 |          for dirnode in directory.directories:
 | 
| 829 | -            self._reachable_refs_dir(reachable, dirnode.digest)
 | |
| 890 | +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
 | |
| 830 | 891 |  | 
| 831 | 892 |      def _required_blobs(self, directory_digest):
 | 
| 832 | 893 |          # parse directory, and recursively add blobs
 | 
| ... | ... | @@ -880,7 +941,7 @@ class CASCache(): | 
| 880 | 941 |          with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | 
| 881 | 942 |              self._fetch_blob(remote, digest, f)
 | 
| 882 | 943 |  | 
| 883 | -            added_digest = self.add_object(path=f.name)
 | |
| 944 | +            added_digest = self.add_object(path=f.name, link_directly=True)
 | |
| 884 | 945 |              assert added_digest.hash == digest.hash
 | 
| 885 | 946 |  | 
| 886 | 947 |          return objpath
 | 
| ... | ... | @@ -891,7 +952,7 @@ class CASCache(): | 
| 891 | 952 |                  f.write(data)
 | 
| 892 | 953 |                  f.flush()
 | 
| 893 | 954 |  | 
| 894 | -                added_digest = self.add_object(path=f.name)
 | |
| 955 | +                added_digest = self.add_object(path=f.name, link_directly=True)
 | |
| 895 | 956 |                  assert added_digest.hash == digest.hash
 | 
| 896 | 957 |  | 
| 897 | 958 |      # Helper function for _fetch_directory().
 | 
| ... | ... | @@ -1203,6 +1264,9 @@ class _CASBatchRead(): | 
| 1203 | 1264 |          batch_response = self._remote.cas.BatchReadBlobs(self._request)
 | 
| 1204 | 1265 |  | 
| 1205 | 1266 |          for response in batch_response.responses:
 | 
| 1267 | +            if response.status.code == code_pb2.NOT_FOUND:
 | |
| 1268 | +                raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
 | |
| 1269 | +                    response.digest.hash, response.status.code))
 | |
| 1206 | 1270 |              if response.status.code != code_pb2.OK:
 | 
| 1207 | 1271 |                  raise CASError("Failed to download blob {}: {}".format(
 | 
| 1208 | 1272 |                      response.digest.hash, response.status.code))
 | 
| ... | ... | @@ -24,6 +24,8 @@ import signal | 
| 24 | 24 |  import sys
 | 
| 25 | 25 |  import tempfile
 | 
| 26 | 26 |  import uuid
 | 
| 27 | +import errno
 | |
| 28 | +import threading
 | |
| 27 | 29 |  | 
| 28 | 30 |  import click
 | 
| 29 | 31 |  import grpc
 | 
| ... | ... | @@ -31,6 +33,7 @@ import grpc | 
| 31 | 33 |  from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | 
| 32 | 34 |  from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
 | 
| 33 | 35 |  from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
 | 
| 36 | +from .._protos.google.rpc import code_pb2
 | |
| 34 | 37 |  | 
| 35 | 38 |  from .._exceptions import CASError
 | 
| 36 | 39 |  | 
| ... | ... | @@ -55,18 +58,22 @@ class ArtifactTooLargeException(Exception): | 
| 55 | 58 |  #     repo (str): Path to CAS repository
 | 
| 56 | 59 |  #     enable_push (bool): Whether to allow blob uploads and artifact updates
 | 
| 57 | 60 |  #
 | 
| 58 | -def create_server(repo, *, enable_push):
 | |
| 61 | +def create_server(repo, *, enable_push,
 | |
| 62 | +                  max_head_size=int(10e9),
 | |
| 63 | +                  min_head_size=int(2e9)):
 | |
| 59 | 64 |      cas = CASCache(os.path.abspath(repo))
 | 
| 60 | 65 |  | 
| 61 | 66 |      # Use max_workers default from Python 3.5+
 | 
| 62 | 67 |      max_workers = (os.cpu_count() or 1) * 5
 | 
| 63 | 68 |      server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 64 | 69 |  | 
| 70 | +    cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
 | |
| 71 | + | |
| 65 | 72 |      bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
 | 
| 66 | -        _ByteStreamServicer(cas, enable_push=enable_push), server)
 | |
| 73 | +        _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
 | |
| 67 | 74 |  | 
| 68 | 75 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 69 | -        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
 | |
| 76 | +        _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
 | |
| 70 | 77 |  | 
| 71 | 78 |      remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 72 | 79 |          _CapabilitiesServicer(), server)
 | 
| ... | ... | @@ -84,9 +91,19 @@ def create_server(repo, *, enable_push): | 
| 84 | 91 |  @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
 | 
| 85 | 92 |  @click.option('--enable-push', default=False, is_flag=True,
 | 
| 86 | 93 |                help="Allow clients to upload blobs and update artifact cache")
 | 
| 94 | +@click.option('--head-room-min', type=click.INT,
 | |
| 95 | +              help="Disk head room minimum in bytes",
 | |
| 96 | +              default=2e9)
 | |
| 97 | +@click.option('--head-room-max', type=click.INT,
 | |
| 98 | +              help="Disk head room maximum in bytes",
 | |
| 99 | +              default=10e9)
 | |
| 87 | 100 |  @click.argument('repo')
 | 
| 88 | -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
 | |
| 89 | -    server = create_server(repo, enable_push=enable_push)
 | |
| 101 | +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
 | |
| 102 | +                head_room_min, head_room_max):
 | |
| 103 | +    server = create_server(repo,
 | |
| 104 | +                           max_head_size=head_room_max,
 | |
| 105 | +                           min_head_size=head_room_min,
 | |
| 106 | +                           enable_push=enable_push)
 | |
| 90 | 107 |  | 
| 91 | 108 |      use_tls = bool(server_key)
 | 
| 92 | 109 |  | 
| ... | ... | @@ -128,10 +145,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): | 
| 128 | 145 |  | 
| 129 | 146 |  | 
| 130 | 147 |  class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 131 | -    def __init__(self, cas, *, enable_push):
 | |
| 148 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | |
| 132 | 149 |          super().__init__()
 | 
| 133 | 150 |          self.cas = cas
 | 
| 134 | 151 |          self.enable_push = enable_push
 | 
| 152 | +        self.cache_cleaner = cache_cleaner
 | |
| 135 | 153 |  | 
| 136 | 154 |      def Read(self, request, context):
 | 
| 137 | 155 |          resource_name = request.resource_name
 | 
| ... | ... | @@ -189,17 +207,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 189 | 207 |                          context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 190 | 208 |                          return response
 | 
| 191 | 209 |  | 
| 192 | -                    try:
 | |
| 193 | -                        _clean_up_cache(self.cas, client_digest.size_bytes)
 | |
| 194 | -                    except ArtifactTooLargeException as e:
 | |
| 195 | -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 196 | -                        context.set_details(str(e))
 | |
| 197 | -                        return response
 | |
| 210 | +                    while True:
 | |
| 211 | +                        if client_digest.size_bytes == 0:
 | |
| 212 | +                            break
 | |
| 213 | +                        try:
 | |
| 214 | +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
 | |
| 215 | +                        except ArtifactTooLargeException as e:
 | |
| 216 | +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 217 | +                            context.set_details(str(e))
 | |
| 218 | +                            return response
 | |
| 219 | + | |
| 220 | +                        try:
 | |
| 221 | +                            os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
 | |
| 222 | +                            break
 | |
| 223 | +                        except OSError as e:
 | |
| 224 | +                            # Multiple upload can happen in the same time
 | |
| 225 | +                            if e.errno != errno.ENOSPC:
 | |
| 226 | +                                raise
 | |
| 227 | + | |
| 198 | 228 |                  elif request.resource_name:
 | 
| 199 | 229 |                      # If it is set on subsequent calls, it **must** match the value of the first request.
 | 
| 200 | 230 |                      if request.resource_name != resource_name:
 | 
| 201 | 231 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 202 | 232 |                          return response
 | 
| 233 | + | |
| 234 | +                if (offset + len(request.data)) > client_digest.size_bytes:
 | |
| 235 | +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | |
| 236 | +                    return response
 | |
| 237 | + | |
| 203 | 238 |                  out.write(request.data)
 | 
| 204 | 239 |                  offset += len(request.data)
 | 
| 205 | 240 |                  if request.finish_write:
 | 
| ... | ... | @@ -207,7 +242,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 207 | 242 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 208 | 243 |                          return response
 | 
| 209 | 244 |                      out.flush()
 | 
| 210 | -                    digest = self.cas.add_object(path=out.name)
 | |
| 245 | +                    digest = self.cas.add_object(path=out.name, link_directly=True)
 | |
| 211 | 246 |                      if digest.hash != client_digest.hash:
 | 
| 212 | 247 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 213 | 248 |                          return response
 | 
| ... | ... | @@ -220,18 +255,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 220 | 255 |  | 
| 221 | 256 |  | 
| 222 | 257 |  class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 223 | -    def __init__(self, cas, *, enable_push):
 | |
| 258 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | |
| 224 | 259 |          super().__init__()
 | 
| 225 | 260 |          self.cas = cas
 | 
| 226 | 261 |          self.enable_push = enable_push
 | 
| 262 | +        self.cache_cleaner = cache_cleaner
 | |
| 227 | 263 |  | 
| 228 | 264 |      def FindMissingBlobs(self, request, context):
 | 
| 229 | 265 |          response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| 230 | 266 |          for digest in request.blob_digests:
 | 
| 231 | -            if not _has_object(self.cas, digest):
 | |
| 232 | -                d = response.missing_blob_digests.add()
 | |
| 233 | -                d.hash = digest.hash
 | |
| 234 | -                d.size_bytes = digest.size_bytes
 | |
| 267 | +            objpath = self.cas.objpath(digest)
 | |
| 268 | +            try:
 | |
| 269 | +                os.utime(objpath)
 | |
| 270 | +            except OSError as e:
 | |
| 271 | +                if e.errno != errno.ENOENT:
 | |
| 272 | +                    raise
 | |
| 273 | +                else:
 | |
| 274 | +                    d = response.missing_blob_digests.add()
 | |
| 275 | +                    d.hash = digest.hash
 | |
| 276 | +                    d.size_bytes = digest.size_bytes
 | |
| 277 | + | |
| 235 | 278 |          return response
 | 
| 236 | 279 |  | 
| 237 | 280 |      def BatchReadBlobs(self, request, context):
 | 
| ... | ... | @@ -250,12 +293,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 250 | 293 |              try:
 | 
| 251 | 294 |                  with open(self.cas.objpath(digest), 'rb') as f:
 | 
| 252 | 295 |                      if os.fstat(f.fileno()).st_size != digest.size_bytes:
 | 
| 253 | -                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
 | |
| 296 | +                        blob_response.status.code = code_pb2.NOT_FOUND
 | |
| 254 | 297 |                          continue
 | 
| 255 | 298 |  | 
| 256 | 299 |                      blob_response.data = f.read(digest.size_bytes)
 | 
| 257 | 300 |              except FileNotFoundError:
 | 
| 258 | -                blob_response.status.code = grpc.StatusCode.NOT_FOUND
 | |
| 301 | +                blob_response.status.code = code_pb2.NOT_FOUND
 | |
| 259 | 302 |  | 
| 260 | 303 |          return response
 | 
| 261 | 304 |  | 
| ... | ... | @@ -285,7 +328,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 285 | 328 |                  continue
 | 
| 286 | 329 |  | 
| 287 | 330 |              try:
 | 
| 288 | -                _clean_up_cache(self.cas, digest.size_bytes)
 | |
| 331 | +                self.cache_cleaner.clean_up(digest.size_bytes)
 | |
| 289 | 332 |  | 
| 290 | 333 |                  with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
 | 
| 291 | 334 |                      out.write(blob_request.data)
 | 
| ... | ... | @@ -328,6 +371,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 328 | 371 |  | 
| 329 | 372 |          try:
 | 
| 330 | 373 |              tree = self.cas.resolve_ref(request.key, update_mtime=True)
 | 
| 374 | +            try:
 | |
| 375 | +                self.cas.update_tree_mtime(tree)
 | |
| 376 | +            except FileNotFoundError:
 | |
| 377 | +                self.cas.remove(request.key, defer_prune=True)
 | |
| 378 | +                context.set_code(grpc.StatusCode.NOT_FOUND)
 | |
| 379 | +                return response
 | |
| 331 | 380 |  | 
| 332 | 381 |              response.digest.hash = tree.hash
 | 
| 333 | 382 |              response.digest.size_bytes = tree.size_bytes
 | 
| ... | ... | @@ -400,60 +449,79 @@ def _digest_from_upload_resource_name(resource_name): | 
| 400 | 449 |          return None
 | 
| 401 | 450 |  | 
| 402 | 451 |  | 
| 403 | -def _has_object(cas, digest):
 | |
| 404 | -    objpath = cas.objpath(digest)
 | |
| 405 | -    return os.path.exists(objpath)
 | |
| 452 | +class _CacheCleaner:
 | |
| 406 | 453 |  | 
| 454 | +    __cleanup_cache_lock = threading.Lock()
 | |
| 407 | 455 |  | 
| 408 | -# _clean_up_cache()
 | |
| 409 | -#
 | |
| 410 | -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | |
| 411 | -# is enough space for the incoming artifact
 | |
| 412 | -#
 | |
| 413 | -# Args:
 | |
| 414 | -#   cas: CASCache object
 | |
| 415 | -#   object_size: The size of the object being received in bytes
 | |
| 416 | -#
 | |
| 417 | -# Returns:
 | |
| 418 | -#   int: The total bytes removed on the filesystem
 | |
| 419 | -#
 | |
| 420 | -def _clean_up_cache(cas, object_size):
 | |
| 421 | -    # Determine the available disk space, in bytes, of the file system
 | |
| 422 | -    # which mounts the repo
 | |
| 423 | -    stats = os.statvfs(cas.casdir)
 | |
| 424 | -    buffer_ = int(2e9)                # Add a 2 GB buffer
 | |
| 425 | -    free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
 | |
| 426 | -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
 | |
| 427 | - | |
| 428 | -    if object_size > total_disk_space:
 | |
| 429 | -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | |
| 430 | -                                        "the filesystem which mounts the remote "
 | |
| 431 | -                                        "cache".format(object_size))
 | |
| 432 | - | |
| 433 | -    if object_size <= free_disk_space:
 | |
| 434 | -        # No need to clean up
 | |
| 435 | -        return 0
 | |
| 436 | - | |
| 437 | -    # obtain a list of LRP artifacts
 | |
| 438 | -    LRP_artifacts = cas.list_refs()
 | |
| 439 | - | |
| 440 | -    removed_size = 0  # in bytes
 | |
| 441 | -    while object_size - removed_size > free_disk_space:
 | |
| 442 | -        try:
 | |
| 443 | -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
 | |
| 444 | -        except IndexError:
 | |
| 445 | -            # This exception is caught if there are no more artifacts in the list
 | |
| 446 | -            # LRP_artifacts. This means the the artifact is too large for the filesystem
 | |
| 447 | -            # so we abort the process
 | |
| 448 | -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | |
| 449 | -                                            "the filesystem which mounts the remote "
 | |
| 450 | -                                            "cache".format(object_size))
 | |
| 456 | +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
 | |
| 457 | +        self.__cas = cas
 | |
| 458 | +        self.__max_head_size = max_head_size
 | |
| 459 | +        self.__min_head_size = min_head_size
 | |
| 451 | 460 |  | 
| 452 | -        removed_size += cas.remove(to_remove, defer_prune=False)
 | |
| 461 | +    def __has_space(self, object_size):
 | |
| 462 | +        stats = os.statvfs(self.__cas.casdir)
 | |
| 463 | +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
 | |
| 464 | +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
 | |
| 453 | 465 |  | 
| 454 | -    if removed_size > 0:
 | |
| 455 | -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | |
| 456 | -    else:
 | |
| 457 | -        logging.info("No artifacts were removed from the cache.")
 | |
| 466 | +        if object_size > total_disk_space:
 | |
| 467 | +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | |
| 468 | +                                            "the filesystem which mounts the remote "
 | |
| 469 | +                                            "cache".format(object_size))
 | |
| 458 | 470 |  | 
| 459 | -    return removed_size | |
| 471 | +        return object_size <= free_disk_space
 | |
| 472 | + | |
| 473 | +    # _clean_up_cache()
 | |
| 474 | +    #
 | |
| 475 | +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | |
| 476 | +    # is enough space for the incoming artifact
 | |
| 477 | +    #
 | |
| 478 | +    # Args:
 | |
| 479 | +    #   object_size: The size of the object being received in bytes
 | |
| 480 | +    #
 | |
| 481 | +    # Returns:
 | |
| 482 | +    #   int: The total bytes removed on the filesystem
 | |
| 483 | +    #
 | |
| 484 | +    def clean_up(self, object_size):
 | |
| 485 | +        if self.__has_space(object_size):
 | |
| 486 | +            return 0
 | |
| 487 | + | |
| 488 | +        with _CacheCleaner.__cleanup_cache_lock:
 | |
| 489 | +            if self.__has_space(object_size):
 | |
| 490 | +                # Another thread has done the cleanup for us
 | |
| 491 | +                return 0
 | |
| 492 | + | |
| 493 | +            stats = os.statvfs(self.__cas.casdir)
 | |
| 494 | +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
 | |
| 495 | + | |
| 496 | +            # obtain a list of LRP artifacts
 | |
| 497 | +            LRP_objects = self.__cas.list_objects()
 | |
| 498 | + | |
| 499 | +            removed_size = 0  # in bytes
 | |
| 500 | +            last_mtime = 0
 | |
| 501 | + | |
| 502 | +            while object_size - removed_size > target_disk_space:
 | |
| 503 | +                try:
 | |
| 504 | +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
 | |
| 505 | +                except IndexError:
 | |
| 506 | +                    # This exception is caught if there are no more artifacts in the list
 | |
| 507 | +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
 | |
| 508 | +                    # so we abort the process
 | |
| 509 | +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | |
| 510 | +                                                    "the filesystem which mounts the remote "
 | |
| 511 | +                                                    "cache".format(object_size))
 | |
| 512 | + | |
| 513 | +                try:
 | |
| 514 | +                    size = os.stat(to_remove).st_size
 | |
| 515 | +                    os.unlink(to_remove)
 | |
| 516 | +                    removed_size += size
 | |
| 517 | +                except FileNotFoundError:
 | |
| 518 | +                    pass
 | |
| 519 | + | |
| 520 | +            self.__cas.clean_up_refs_until(last_mtime)
 | |
| 521 | + | |
| 522 | +            if removed_size > 0:
 | |
| 523 | +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | |
| 524 | +            else:
 | |
| 525 | +                logging.info("No artifacts were removed from the cache.")
 | |
| 526 | + | |
| 527 | +            return removed_size | 
| ... | ... | @@ -139,8 +139,7 @@ class SandboxRemote(Sandbox): | 
| 139 | 139 |  | 
| 140 | 140 |          # Upload the Command message to the remote CAS server
 | 
| 141 | 141 |          command_digest = cascache.push_message(casremote, remote_command)
 | 
| 142 | -        if not command_digest or not cascache.verify_digest_on_remote(casremote, command_digest):
 | |
| 143 | -            raise SandboxError("Failed pushing build command to remote CAS.")
 | |
| 142 | + | |
| 144 | 143 |          # Create and send the action.
 | 
| 145 | 144 |          action = remote_execution_pb2.Action(command_digest=command_digest,
 | 
| 146 | 145 |                                               input_root_digest=input_root_digest,
 | 
| ... | ... | @@ -149,8 +148,6 @@ class SandboxRemote(Sandbox): | 
| 149 | 148 |  | 
| 150 | 149 |          # Upload the Action message to the remote CAS server
 | 
| 151 | 150 |          action_digest = cascache.push_message(casremote, action)
 | 
| 152 | -        if not action_digest or not cascache.verify_digest_on_remote(casremote, action_digest):
 | |
| 153 | -            raise SandboxError("Failed pushing build action to remote CAS.")
 | |
| 154 | 151 |  | 
| 155 | 152 |          # Next, try to create a communication channel to the BuildGrid server.
 | 
| 156 | 153 |          url = urlparse(self.exec_url)
 | 
| ... | ... | @@ -299,15 +296,11 @@ class SandboxRemote(Sandbox): | 
| 299 | 296 |  | 
| 300 | 297 |          casremote = CASRemote(self.storage_remote_spec)
 | 
| 301 | 298 |          # Now, push that key (without necessarily needing a ref) to the remote.
 | 
| 302 | - | |
| 303 | 299 |          try:
 | 
| 304 | 300 |              cascache.push_directory(casremote, upload_vdir)
 | 
| 305 | 301 |          except grpc.RpcError as e:
 | 
| 306 | 302 |              raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
 | 
| 307 | 303 |  | 
| 308 | -        if not cascache.verify_digest_on_remote(casremote, upload_vdir.ref):
 | |
| 309 | -            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
 | |
| 310 | - | |
| 311 | 304 |          # Now transmit the command to execute
 | 
| 312 | 305 |          operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
 | 
| 313 | 306 |  | 
| ... | ... | @@ -337,7 +337,14 @@ setup(name='BuildStream', | 
| 337 | 337 |        install_requires=[
 | 
| 338 | 338 |            'setuptools',
 | 
| 339 | 339 |            'psutil',
 | 
| 340 | -          'ruamel.yaml < 0.15.52',
 | |
| 340 | +          # According to ruamel.yaml's PyPI page, we are suppose to use
 | |
| 341 | +          # "<=0.15" in production until 0.15 becomes API stable.
 | |
| 342 | +          # However we need ruamel.yaml 0.15.41 or greater for Python 3.7.
 | |
| 343 | +          # We know that ruamel.yaml 0.15.52 breaks API in a way that
 | |
| 344 | +          # is incompatible with BuildStream.
 | |
| 345 | +          #
 | |
| 346 | +          # See issues #571 and #790.
 | |
| 347 | +          'ruamel.yaml >= 0.15.41, < 0.15.52',
 | |
| 341 | 348 |            'pluginbase',
 | 
| 342 | 349 |            'Click',
 | 
| 343 | 350 |            'jinja2 >= 2.10',
 | 
| ... | ... | @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): | 
| 230 | 230 |      # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
 | 
| 231 | 231 |      # Mock a file system with 12 MB free disk space
 | 
| 232 | 232 |      with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
 | 
| 233 | +                               min_head_size=int(2e9),
 | |
| 234 | +                               max_head_size=int(2e9),
 | |
| 233 | 235 |                                 total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
 | 
| 234 | 236 |  | 
| 235 | 237 |          # Configure bst to push to the cache
 | 
| ... | ... | @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): | 
| 313 | 315 |      # Create an artifact share (remote cache) in tmpdir/artifactshare
 | 
| 314 | 316 |      # Mock a file system with 12 MB free disk space
 | 
| 315 | 317 |      with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
 | 
| 318 | +                               min_head_size=int(2e9),
 | |
| 319 | +                               max_head_size=int(2e9),
 | |
| 316 | 320 |                                 total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
 | 
| 317 | 321 |  | 
| 318 | 322 |          # Configure bst to push to the cache
 | 
| ... | ... | @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution | 
| 29 | 29 |  #
 | 
| 30 | 30 |  class ArtifactShare():
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, directory, *, total_space=None, free_space=None):
 | |
| 32 | +    def __init__(self, directory, *,
 | |
| 33 | +                 total_space=None,
 | |
| 34 | +                 free_space=None,
 | |
| 35 | +                 min_head_size=int(2e9),
 | |
| 36 | +                 max_head_size=int(10e9)):
 | |
| 33 | 37 |  | 
| 34 | 38 |          # The working directory for the artifact share (in case it
 | 
| 35 | 39 |          # needs to do something outside of its backend's storage folder).
 | 
| ... | ... | @@ -50,6 +54,9 @@ class ArtifactShare(): | 
| 50 | 54 |          self.total_space = total_space
 | 
| 51 | 55 |          self.free_space = free_space
 | 
| 52 | 56 |  | 
| 57 | +        self.max_head_size = max_head_size
 | |
| 58 | +        self.min_head_size = min_head_size
 | |
| 59 | + | |
| 53 | 60 |          q = Queue()
 | 
| 54 | 61 |  | 
| 55 | 62 |          self.process = Process(target=self.run, args=(q,))
 | 
| ... | ... | @@ -74,7 +81,10 @@ class ArtifactShare(): | 
| 74 | 81 |                      self.free_space = self.total_space
 | 
| 75 | 82 |                  os.statvfs = self._mock_statvfs
 | 
| 76 | 83 |  | 
| 77 | -            server = create_server(self.repodir, enable_push=True)
 | |
| 84 | +            server = create_server(self.repodir,
 | |
| 85 | +                                   max_head_size=self.max_head_size,
 | |
| 86 | +                                   min_head_size=self.min_head_size,
 | |
| 87 | +                                   enable_push=True)
 | |
| 78 | 88 |              port = server.add_insecure_port('localhost:0')
 | 
| 79 | 89 |  | 
| 80 | 90 |              server.start()
 | 
| ... | ... | @@ -136,6 +146,15 @@ class ArtifactShare(): | 
| 136 | 146 |  | 
| 137 | 147 |          try:
 | 
| 138 | 148 |              tree = self.cas.resolve_ref(artifact_key)
 | 
| 149 | +            reachable = set()
 | |
| 150 | +            try:
 | |
| 151 | +                self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
 | |
| 152 | +            except FileNotFoundError:
 | |
| 153 | +                return None
 | |
| 154 | +            for digest in reachable:
 | |
| 155 | +                object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
 | |
| 156 | +                if not os.path.exists(object_name):
 | |
| 157 | +                    return None
 | |
| 139 | 158 |              return tree
 | 
| 140 | 159 |          except CASError:
 | 
| 141 | 160 |              return None
 | 
| ... | ... | @@ -167,8 +186,11 @@ class ArtifactShare(): | 
| 167 | 186 |  # Create an ArtifactShare for use in a test case
 | 
| 168 | 187 |  #
 | 
| 169 | 188 |  @contextmanager
 | 
| 170 | -def create_artifact_share(directory, *, total_space=None, free_space=None):
 | |
| 171 | -    share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
 | |
| 189 | +def create_artifact_share(directory, *, total_space=None, free_space=None,
 | |
| 190 | +                          min_head_size=int(2e9),
 | |
| 191 | +                          max_head_size=int(10e9)):
 | |
| 192 | +    share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
 | |
| 193 | +                          min_head_size=min_head_size, max_head_size=max_head_size)
 | |
| 172 | 194 |      try:
 | 
| 173 | 195 |          yield share
 | 
| 174 | 196 |      finally:
 | 
