Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
- 
1c4b410f
by Valentin David at 2018-11-29T16:17:21Z
- 
2259dff1
by Valentin David at 2018-11-29T16:17:27Z
- 
d6b661e1
by Valentin David at 2018-11-29T16:17:27Z
- 
dbca7c70
by Valentin David at 2018-11-29T16:17:27Z
- 
65ea6832
by Valentin David at 2018-11-29T16:17:27Z
4 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -500,6 +500,41 @@ class CASCache(ArtifactCache): | 
| 500 | 500 |          # first element of this list will be the file modified earliest.
 | 
| 501 | 501 |          return [ref for _, ref in sorted(zip(mtimes, refs))]
 | 
| 502 | 502 |  | 
| 503 | +    # list_objects():
 | |
| 504 | +    #
 | |
| 505 | +    # List cached objects in Least Recently Modified (LRM) order.
 | |
| 506 | +    #
 | |
| 507 | +    # Returns:
 | |
| 508 | +    #     (list) - A list of objects and timestamps in LRM order
 | |
| 509 | +    #
 | |
| 510 | +    def list_objects(self):
 | |
| 511 | +        objs = []
 | |
| 512 | +        mtimes = []
 | |
| 513 | + | |
| 514 | +        for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
 | |
| 515 | +            for filename in files:
 | |
| 516 | +                obj_path = os.path.join(root, filename)
 | |
| 517 | +                try:
 | |
| 518 | +                    mtimes.append(os.path.getmtime(obj_path))
 | |
| 519 | +                except FileNotFoundError:
 | |
| 520 | +                    pass
 | |
| 521 | +                else:
 | |
| 522 | +                    objs.append(obj_path)
 | |
| 523 | + | |
| 524 | +        # NOTE: Sorted will sort from earliest to latest, thus the
 | |
| 525 | +        # first element of this list will be the file modified earliest.
 | |
| 526 | +        return sorted(zip(mtimes, objs))
 | |
| 527 | + | |
| 528 | +    def clean_up_refs_until(self, time):
 | |
| 529 | +        ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | |
| 530 | + | |
| 531 | +        for root, _, files in os.walk(ref_heads):
 | |
| 532 | +            for filename in files:
 | |
| 533 | +                ref_path = os.path.join(root, filename)
 | |
| 534 | +                # Obtain the mtime (the time a file was last modified)
 | |
| 535 | +                if os.path.getmtime(ref_path) < time:
 | |
| 536 | +                    os.unlink(ref_path)
 | |
| 537 | + | |
| 503 | 538 |      # remove():
 | 
| 504 | 539 |      #
 | 
| 505 | 540 |      # Removes the given symbolic ref from the repo.
 | 
| ... | ... | @@ -577,6 +612,10 @@ class CASCache(ArtifactCache): | 
| 577 | 612 |  | 
| 578 | 613 |          return pruned
 | 
| 579 | 614 |  | 
| 615 | +    def update_tree_mtime(self, tree):
 | |
| 616 | +        reachable = set()
 | |
| 617 | +        self._reachable_refs_dir(reachable, tree, update_mtime=True)
 | |
| 618 | + | |
| 580 | 619 |      ################################################
 | 
| 581 | 620 |      #             Local Private Methods            #
 | 
| 582 | 621 |      ################################################
 | 
| ... | ... | @@ -718,10 +757,13 @@ class CASCache(ArtifactCache): | 
| 718 | 757 |                  a += 1
 | 
| 719 | 758 |                  b += 1
 | 
| 720 | 759 |  | 
| 721 | -    def _reachable_refs_dir(self, reachable, tree):
 | |
| 760 | +    def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
 | |
| 722 | 761 |          if tree.hash in reachable:
 | 
| 723 | 762 |              return
 | 
| 724 | 763 |  | 
| 764 | +        if update_mtime:
 | |
| 765 | +            os.utime(self.objpath(tree))
 | |
| 766 | + | |
| 725 | 767 |          reachable.add(tree.hash)
 | 
| 726 | 768 |  | 
| 727 | 769 |          directory = remote_execution_pb2.Directory()
 | 
| ... | ... | @@ -730,10 +772,12 @@ class CASCache(ArtifactCache): | 
| 730 | 772 |              directory.ParseFromString(f.read())
 | 
| 731 | 773 |  | 
| 732 | 774 |          for filenode in directory.files:
 | 
| 775 | +            if update_mtime:
 | |
| 776 | +                os.utime(self.objpath(filenode.digest))
 | |
| 733 | 777 |              reachable.add(filenode.digest.hash)
 | 
| 734 | 778 |  | 
| 735 | 779 |          for dirnode in directory.directories:
 | 
| 736 | -            self._reachable_refs_dir(reachable, dirnode.digest)
 | |
| 780 | +            self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
 | |
| 737 | 781 |  | 
| 738 | 782 |      def _initialize_remote(self, remote_spec, q):
 | 
| 739 | 783 |          try:
 | 
| ... | ... | @@ -24,6 +24,8 @@ import signal | 
| 24 | 24 |  import sys
 | 
| 25 | 25 |  import tempfile
 | 
| 26 | 26 |  import uuid
 | 
| 27 | +import errno
 | |
| 28 | +import threading
 | |
| 27 | 29 |  | 
| 28 | 30 |  import click
 | 
| 29 | 31 |  import grpc
 | 
| ... | ... | @@ -56,7 +58,9 @@ class ArtifactTooLargeException(Exception): | 
| 56 | 58 |  #     repo (str): Path to CAS repository
 | 
| 57 | 59 |  #     enable_push (bool): Whether to allow blob uploads and artifact updates
 | 
| 58 | 60 |  #
 | 
| 59 | -def create_server(repo, *, enable_push):
 | |
| 61 | +def create_server(repo, *, enable_push,
 | |
| 62 | +                  max_head_size=int(10e9),
 | |
| 63 | +                  min_head_size=int(2e9)):
 | |
| 60 | 64 |      context = Context()
 | 
| 61 | 65 |      context.artifactdir = os.path.abspath(repo)
 | 
| 62 | 66 |  | 
| ... | ... | @@ -66,11 +70,13 @@ def create_server(repo, *, enable_push): | 
| 66 | 70 |      max_workers = (os.cpu_count() or 1) * 5
 | 
| 67 | 71 |      server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 68 | 72 |  | 
| 73 | +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
 | |
| 74 | + | |
| 69 | 75 |      bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
 | 
| 70 | -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
 | |
| 76 | +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | |
| 71 | 77 |  | 
| 72 | 78 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 73 | -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
 | |
| 79 | +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | |
| 74 | 80 |  | 
| 75 | 81 |      remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 76 | 82 |          _CapabilitiesServicer(), server)
 | 
| ... | ... | @@ -88,9 +94,19 @@ def create_server(repo, *, enable_push): | 
| 88 | 94 |  @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
 | 
| 89 | 95 |  @click.option('--enable-push', default=False, is_flag=True,
 | 
| 90 | 96 |                help="Allow clients to upload blobs and update artifact cache")
 | 
| 97 | +@click.option('--head-room-min', type=click.INT,
 | |
| 98 | +              help="Disk head room minimum in bytes",
 | |
| 99 | +              default=2e9)
 | |
| 100 | +@click.option('--head-room-max', type=click.INT,
 | |
| 101 | +              help="Disk head room maximum in bytes",
 | |
| 102 | +              default=10e9)
 | |
| 91 | 103 |  @click.argument('repo')
 | 
| 92 | -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
 | |
| 93 | -    server = create_server(repo, enable_push=enable_push)
 | |
| 104 | +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
 | |
| 105 | +                head_room_min, head_room_max):
 | |
| 106 | +    server = create_server(repo,
 | |
| 107 | +                           max_head_size=head_room_max,
 | |
| 108 | +                           min_head_size=head_room_min,
 | |
| 109 | +                           enable_push=enable_push)
 | |
| 94 | 110 |  | 
| 95 | 111 |      use_tls = bool(server_key)
 | 
| 96 | 112 |  | 
| ... | ... | @@ -132,10 +148,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): | 
| 132 | 148 |  | 
| 133 | 149 |  | 
| 134 | 150 |  class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 135 | -    def __init__(self, cas, *, enable_push):
 | |
| 151 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | |
| 136 | 152 |          super().__init__()
 | 
| 137 | 153 |          self.cas = cas
 | 
| 138 | 154 |          self.enable_push = enable_push
 | 
| 155 | +        self.cache_cleaner = cache_cleaner
 | |
| 139 | 156 |  | 
| 140 | 157 |      def Read(self, request, context):
 | 
| 141 | 158 |          resource_name = request.resource_name
 | 
| ... | ... | @@ -193,17 +210,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 193 | 210 |                          context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 194 | 211 |                          return response
 | 
| 195 | 212 |  | 
| 196 | -                    try:
 | |
| 197 | -                        _clean_up_cache(self.cas, client_digest.size_bytes)
 | |
| 198 | -                    except ArtifactTooLargeException as e:
 | |
| 199 | -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 200 | -                        context.set_details(str(e))
 | |
| 201 | -                        return response
 | |
| 213 | +                    while True:
 | |
| 214 | +                        if client_digest.size_bytes == 0:
 | |
| 215 | +                            break
 | |
| 216 | +                        try:
 | |
| 217 | +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
 | |
| 218 | +                        except ArtifactTooLargeException as e:
 | |
| 219 | +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 220 | +                            context.set_details(str(e))
 | |
| 221 | +                            return response
 | |
| 222 | + | |
| 223 | +                        try:
 | |
| 224 | +                            os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
 | |
| 225 | +                            break
 | |
| 226 | +                        except OSError as e:
 | |
| 227 | +                            # Multiple upload can happen in the same time
 | |
| 228 | +                            if e.errno != errno.ENOSPC:
 | |
| 229 | +                                raise
 | |
| 230 | + | |
| 202 | 231 |                  elif request.resource_name:
 | 
| 203 | 232 |                      # If it is set on subsequent calls, it **must** match the value of the first request.
 | 
| 204 | 233 |                      if request.resource_name != resource_name:
 | 
| 205 | 234 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 206 | 235 |                          return response
 | 
| 236 | + | |
| 237 | +                if (offset + len(request.data)) > client_digest.size_bytes:
 | |
| 238 | +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | |
| 239 | +                    return response
 | |
| 240 | + | |
| 207 | 241 |                  out.write(request.data)
 | 
| 208 | 242 |                  offset += len(request.data)
 | 
| 209 | 243 |                  if request.finish_write:
 | 
| ... | ... | @@ -224,18 +258,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 224 | 258 |  | 
| 225 | 259 |  | 
| 226 | 260 |  class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 227 | -    def __init__(self, cas, *, enable_push):
 | |
| 261 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | |
| 228 | 262 |          super().__init__()
 | 
| 229 | 263 |          self.cas = cas
 | 
| 230 | 264 |          self.enable_push = enable_push
 | 
| 265 | +        self.cache_cleaner = cache_cleaner
 | |
| 231 | 266 |  | 
| 232 | 267 |      def FindMissingBlobs(self, request, context):
 | 
| 233 | 268 |          response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| 234 | 269 |          for digest in request.blob_digests:
 | 
| 235 | -            if not _has_object(self.cas, digest):
 | |
| 236 | -                d = response.missing_blob_digests.add()
 | |
| 237 | -                d.hash = digest.hash
 | |
| 238 | -                d.size_bytes = digest.size_bytes
 | |
| 270 | +            objpath = self.cas.objpath(digest)
 | |
| 271 | +            try:
 | |
| 272 | +                os.utime(objpath)
 | |
| 273 | +            except OSError as e:
 | |
| 274 | +                if e.errno != errno.ENOENT:
 | |
| 275 | +                    raise
 | |
| 276 | +                else:
 | |
| 277 | +                    d = response.missing_blob_digests.add()
 | |
| 278 | +                    d.hash = digest.hash
 | |
| 279 | +                    d.size_bytes = digest.size_bytes
 | |
| 280 | + | |
| 239 | 281 |          return response
 | 
| 240 | 282 |  | 
| 241 | 283 |      def BatchReadBlobs(self, request, context):
 | 
| ... | ... | @@ -289,7 +331,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 289 | 331 |                  continue
 | 
| 290 | 332 |  | 
| 291 | 333 |              try:
 | 
| 292 | -                _clean_up_cache(self.cas, digest.size_bytes)
 | |
| 334 | +                self.cache_cleaner.clean_up(digest.size_bytes)
 | |
| 293 | 335 |  | 
| 294 | 336 |                  with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
 | 
| 295 | 337 |                      out.write(blob_request.data)
 | 
| ... | ... | @@ -332,6 +374,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): | 
| 332 | 374 |  | 
| 333 | 375 |          try:
 | 
| 334 | 376 |              tree = self.cas.resolve_ref(request.key, update_mtime=True)
 | 
| 377 | +            try:
 | |
| 378 | +                self.cas.update_tree_mtime(tree)
 | |
| 379 | +            except FileNotFoundError:
 | |
| 380 | +                self.cas.remove(request.key, defer_prune=True)
 | |
| 381 | +                context.set_code(grpc.StatusCode.NOT_FOUND)
 | |
| 382 | +                return response
 | |
| 335 | 383 |  | 
| 336 | 384 |              response.digest.hash = tree.hash
 | 
| 337 | 385 |              response.digest.size_bytes = tree.size_bytes
 | 
| ... | ... | @@ -404,60 +452,79 @@ def _digest_from_upload_resource_name(resource_name): | 
| 404 | 452 |          return None
 | 
| 405 | 453 |  | 
| 406 | 454 |  | 
| 407 | -def _has_object(cas, digest):
 | |
| 408 | -    objpath = cas.objpath(digest)
 | |
| 409 | -    return os.path.exists(objpath)
 | |
| 455 | +class _CacheCleaner:
 | |
| 410 | 456 |  | 
| 457 | +    __cleanup_cache_lock = threading.Lock()
 | |
| 411 | 458 |  | 
| 412 | -# _clean_up_cache()
 | |
| 413 | -#
 | |
| 414 | -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | |
| 415 | -# is enough space for the incoming artifact
 | |
| 416 | -#
 | |
| 417 | -# Args:
 | |
| 418 | -#   cas: CASCache object
 | |
| 419 | -#   object_size: The size of the object being received in bytes
 | |
| 420 | -#
 | |
| 421 | -# Returns:
 | |
| 422 | -#   int: The total bytes removed on the filesystem
 | |
| 423 | -#
 | |
| 424 | -def _clean_up_cache(cas, object_size):
 | |
| 425 | -    # Determine the available disk space, in bytes, of the file system
 | |
| 426 | -    # which mounts the repo
 | |
| 427 | -    stats = os.statvfs(cas.casdir)
 | |
| 428 | -    buffer_ = int(2e9)                # Add a 2 GB buffer
 | |
| 429 | -    free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
 | |
| 430 | -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
 | |
| 431 | - | |
| 432 | -    if object_size > total_disk_space:
 | |
| 433 | -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | |
| 434 | -                                        "the filesystem which mounts the remote "
 | |
| 435 | -                                        "cache".format(object_size))
 | |
| 436 | - | |
| 437 | -    if object_size <= free_disk_space:
 | |
| 438 | -        # No need to clean up
 | |
| 439 | -        return 0
 | |
| 440 | - | |
| 441 | -    # obtain a list of LRP artifacts
 | |
| 442 | -    LRP_artifacts = cas.list_artifacts()
 | |
| 443 | - | |
| 444 | -    removed_size = 0  # in bytes
 | |
| 445 | -    while object_size - removed_size > free_disk_space:
 | |
| 446 | -        try:
 | |
| 447 | -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
 | |
| 448 | -        except IndexError:
 | |
| 449 | -            # This exception is caught if there are no more artifacts in the list
 | |
| 450 | -            # LRP_artifacts. This means the the artifact is too large for the filesystem
 | |
| 451 | -            # so we abort the process
 | |
| 452 | -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | |
| 453 | -                                            "the filesystem which mounts the remote "
 | |
| 454 | -                                            "cache".format(object_size))
 | |
| 459 | +    def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
 | |
| 460 | +        self.__cas = cas
 | |
| 461 | +        self.__max_head_size = max_head_size
 | |
| 462 | +        self.__min_head_size = min_head_size
 | |
| 455 | 463 |  | 
| 456 | -        removed_size += cas.remove(to_remove, defer_prune=False)
 | |
| 464 | +    def __has_space(self, object_size):
 | |
| 465 | +        stats = os.statvfs(self.__cas.casdir)
 | |
| 466 | +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
 | |
| 467 | +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
 | |
| 457 | 468 |  | 
| 458 | -    if removed_size > 0:
 | |
| 459 | -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | |
| 460 | -    else:
 | |
| 461 | -        logging.info("No artifacts were removed from the cache.")
 | |
| 469 | +        if object_size > total_disk_space:
 | |
| 470 | +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | |
| 471 | +                                            "the filesystem which mounts the remote "
 | |
| 472 | +                                            "cache".format(object_size))
 | |
| 462 | 473 |  | 
| 463 | -    return removed_size | |
| 474 | +        return object_size <= free_disk_space
 | |
| 475 | + | |
| 476 | +    # _clean_up_cache()
 | |
| 477 | +    #
 | |
| 478 | +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | |
| 479 | +    # is enough space for the incoming artifact
 | |
| 480 | +    #
 | |
| 481 | +    # Args:
 | |
| 482 | +    #   object_size: The size of the object being received in bytes
 | |
| 483 | +    #
 | |
| 484 | +    # Returns:
 | |
| 485 | +    #   int: The total bytes removed on the filesystem
 | |
| 486 | +    #
 | |
| 487 | +    def clean_up(self, object_size):
 | |
| 488 | +        if self.__has_space(object_size):
 | |
| 489 | +            return 0
 | |
| 490 | + | |
| 491 | +        with _CacheCleaner.__cleanup_cache_lock:
 | |
| 492 | +            if self.__has_space(object_size):
 | |
| 493 | +                # Another thread has done the cleanup for us
 | |
| 494 | +                return 0
 | |
| 495 | + | |
| 496 | +            stats = os.statvfs(self.__cas.casdir)
 | |
| 497 | +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
 | |
| 498 | + | |
| 499 | +            # obtain a list of LRP artifacts
 | |
| 500 | +            LRP_objects = self.__cas.list_objects()
 | |
| 501 | + | |
| 502 | +            removed_size = 0  # in bytes
 | |
| 503 | +            last_mtime = 0
 | |
| 504 | + | |
| 505 | +            while object_size - removed_size > target_disk_space:
 | |
| 506 | +                try:
 | |
| 507 | +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
 | |
| 508 | +                except IndexError:
 | |
| 509 | +                    # This exception is caught if there are no more artifacts in the list
 | |
| 510 | +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
 | |
| 511 | +                    # so we abort the process
 | |
| 512 | +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | |
| 513 | +                                                    "the filesystem which mounts the remote "
 | |
| 514 | +                                                    "cache".format(object_size))
 | |
| 515 | + | |
| 516 | +                try:
 | |
| 517 | +                    size = os.stat(to_remove).st_size
 | |
| 518 | +                    os.unlink(to_remove)
 | |
| 519 | +                    removed_size += size
 | |
| 520 | +                except FileNotFoundError:
 | |
| 521 | +                    pass
 | |
| 522 | + | |
| 523 | +            self.__cas.clean_up_refs_until(last_mtime)
 | |
| 524 | + | |
| 525 | +            if removed_size > 0:
 | |
| 526 | +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | |
| 527 | +            else:
 | |
| 528 | +                logging.info("No artifacts were removed from the cache.")
 | |
| 529 | + | |
| 530 | +            return removed_size | 
| ... | ... | @@ -208,6 +208,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): | 
| 208 | 208 |      # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
 | 
| 209 | 209 |      # Mock a file system with 12 MB free disk space
 | 
| 210 | 210 |      with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
 | 
| 211 | +                               min_head_size=int(2e9),
 | |
| 212 | +                               max_head_size=int(2e9),
 | |
| 211 | 213 |                                 total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
 | 
| 212 | 214 |  | 
| 213 | 215 |          # Configure bst to push to the cache
 | 
| ... | ... | @@ -291,6 +293,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): | 
| 291 | 293 |      # Create an artifact share (remote cache) in tmpdir/artifactshare
 | 
| 292 | 294 |      # Mock a file system with 12 MB free disk space
 | 
| 293 | 295 |      with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
 | 
| 296 | +                               min_head_size=int(2e9),
 | |
| 297 | +                               max_head_size=int(2e9),
 | |
| 294 | 298 |                                 total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
 | 
| 295 | 299 |  | 
| 296 | 300 |          # Configure bst to push to the cache
 | 
| ... | ... | @@ -29,7 +29,11 @@ from buildstream._exceptions import ArtifactError | 
| 29 | 29 |  #
 | 
| 30 | 30 |  class ArtifactShare():
 | 
| 31 | 31 |  | 
| 32 | -    def __init__(self, directory, *, total_space=None, free_space=None):
 | |
| 32 | +    def __init__(self, directory, *,
 | |
| 33 | +                 total_space=None,
 | |
| 34 | +                 free_space=None,
 | |
| 35 | +                 min_head_size=int(2e9),
 | |
| 36 | +                 max_head_size=int(10e9)):
 | |
| 33 | 37 |  | 
| 34 | 38 |          # The working directory for the artifact share (in case it
 | 
| 35 | 39 |          # needs to do something outside of it's backend's storage folder).
 | 
| ... | ... | @@ -53,6 +57,9 @@ class ArtifactShare(): | 
| 53 | 57 |          self.total_space = total_space
 | 
| 54 | 58 |          self.free_space = free_space
 | 
| 55 | 59 |  | 
| 60 | +        self.max_head_size = max_head_size
 | |
| 61 | +        self.min_head_size = min_head_size
 | |
| 62 | + | |
| 56 | 63 |          q = Queue()
 | 
| 57 | 64 |  | 
| 58 | 65 |          self.process = Process(target=self.run, args=(q,))
 | 
| ... | ... | @@ -76,7 +83,10 @@ class ArtifactShare(): | 
| 76 | 83 |                  self.free_space = self.total_space
 | 
| 77 | 84 |              os.statvfs = self._mock_statvfs
 | 
| 78 | 85 |  | 
| 79 | -        server = create_server(self.repodir, enable_push=True)
 | |
| 86 | +        server = create_server(self.repodir,
 | |
| 87 | +                               max_head_size=self.max_head_size,
 | |
| 88 | +                               min_head_size=self.min_head_size,
 | |
| 89 | +                               enable_push=True)
 | |
| 80 | 90 |          port = server.add_insecure_port('localhost:0')
 | 
| 81 | 91 |  | 
| 82 | 92 |          server.start()
 | 
| ... | ... | @@ -118,6 +128,15 @@ class ArtifactShare(): | 
| 118 | 128 |  | 
| 119 | 129 |          try:
 | 
| 120 | 130 |              tree = self.cas.resolve_ref(artifact_key)
 | 
| 131 | +            reachable = set()
 | |
| 132 | +            try:
 | |
| 133 | +                self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
 | |
| 134 | +            except FileNotFoundError:
 | |
| 135 | +                return False
 | |
| 136 | +            for digest in reachable:
 | |
| 137 | +                object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
 | |
| 138 | +                if not os.path.exists(object_name):
 | |
| 139 | +                    return False
 | |
| 121 | 140 |              return True
 | 
| 122 | 141 |          except ArtifactError:
 | 
| 123 | 142 |              return False
 | 
| ... | ... | @@ -149,8 +168,11 @@ class ArtifactShare(): | 
| 149 | 168 |  # Create an ArtifactShare for use in a test case
 | 
| 150 | 169 |  #
 | 
| 151 | 170 |  @contextmanager
 | 
| 152 | -def create_artifact_share(directory, *, total_space=None, free_space=None):
 | |
| 153 | -    share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
 | |
| 171 | +def create_artifact_share(directory, *, total_space=None, free_space=None,
 | |
| 172 | +                          min_head_size=int(2e9),
 | |
| 173 | +                          max_head_size=int(10e9)):
 | |
| 174 | +    share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
 | |
| 175 | +                          min_head_size=min_head_size, max_head_size=max_head_size)
 | |
| 154 | 176 |      try:
 | 
| 155 | 177 |          yield share
 | 
| 156 | 178 |      finally:
 | 
