| ... | ... | @@ -27,6 +27,7 @@ import uuid | 
| 27 | 27 |  import time
 | 
| 28 | 28 |  import errno
 | 
| 29 | 29 |  import ctypes
 | 
|  | 30 | +import threading
 | 
| 30 | 31 |  
 | 
| 31 | 32 |  import click
 | 
| 32 | 33 |  import grpc
 | 
| ... | ... | @@ -62,7 +63,7 @@ class ArtifactTooLargeException(Exception): | 
| 62 | 63 |  #     repo (str): Path to CAS repository
 | 
| 63 | 64 |  #     enable_push (bool): Whether to allow blob uploads and artifact updates
 | 
| 64 | 65 |  #
 | 
| 65 |  | -def create_server(repo, *, enable_push):
 | 
|  | 66 | +def create_server(repo, max_head_size, min_head_size, *, enable_push):
 | 
| 66 | 67 |      context = Context()
 | 
| 67 | 68 |      context.artifactdir = os.path.abspath(repo)
 | 
| 68 | 69 |  
 | 
| ... | ... | @@ -72,11 +73,13 @@ def create_server(repo, *, enable_push): | 
| 72 | 73 |      max_workers = (os.cpu_count() or 1) * 5
 | 
| 73 | 74 |      server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
| 74 | 75 |  
 | 
|  | 76 | +    cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
 | 
|  | 77 | +
 | 
| 75 | 78 |      bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
 | 
| 76 |  | -        _ByteStreamServicer(artifactcache, enable_push=enable_push), server)
 | 
|  | 79 | +        _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | 
| 77 | 80 |  
 | 
| 78 | 81 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 79 |  | -        _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
 | 
|  | 82 | +        _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
 | 
| 80 | 83 |  
 | 
| 81 | 84 |      remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
| 82 | 85 |          _CapabilitiesServicer(), server)
 | 
| ... | ... | @@ -94,9 +97,16 @@ def create_server(repo, *, enable_push): | 
| 94 | 97 |  @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
 | 
| 95 | 98 |  @click.option('--enable-push', default=False, is_flag=True,
 | 
| 96 | 99 |                help="Allow clients to upload blobs and update artifact cache")
 | 
|  | 100 | +@click.option('--head-room-min', type=click.INT,
 | 
|  | 101 | +              help="Disk head room minimum in bytes",
 | 
|  | 102 | +              default=2e9)
 | 
|  | 103 | +@click.option('--head-room-max', type=click.INT,
 | 
|  | 104 | +              help="Disk head room maximum in bytes",
 | 
|  | 105 | +              default=10e9)
 | 
| 97 | 106 |  @click.argument('repo')
 | 
| 98 |  | -def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
 | 
| 99 |  | -    server = create_server(repo, enable_push=enable_push)
 | 
|  | 107 | +def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
 | 
|  | 108 | +                head_room_min, head_room_max):
 | 
|  | 109 | +    server = create_server(repo, head_room_max, head_room_min, enable_push=enable_push)
 | 
| 100 | 110 |  
 | 
| 101 | 111 |      use_tls = bool(server_key)
 | 
| 102 | 112 |  
 | 
| ... | ... | @@ -168,11 +178,12 @@ class _FallocateCall: | 
| 168 | 178 |  
 | 
| 169 | 179 |  
 | 
| 170 | 180 |  class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 171 |  | -    def __init__(self, cas, *, enable_push):
 | 
|  | 181 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | 
| 172 | 182 |          super().__init__()
 | 
| 173 | 183 |          self.cas = cas
 | 
| 174 | 184 |          self.enable_push = enable_push
 | 
| 175 | 185 |          self.fallocate = _FallocateCall()
 | 
|  | 186 | +        self.cache_cleaner = cache_cleaner
 | 
| 176 | 187 |  
 | 
| 177 | 188 |      def Read(self, request, context):
 | 
| 178 | 189 |          resource_name = request.resource_name
 | 
| ... | ... | @@ -234,7 +245,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 234 | 245 |                          if client_digest.size_bytes == 0:
 | 
| 235 | 246 |                              break
 | 
| 236 | 247 |                          try:
 | 
| 237 |  | -                            _clean_up_cache(self.cas, client_digest.size_bytes)
 | 
|  | 248 | +                            self.cache_cleaner.clean_up(client_digest.size_bytes)
 | 
| 238 | 249 |                          except ArtifactTooLargeException as e:
 | 
| 239 | 250 |                              context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | 
| 240 | 251 |                              context.set_details(str(e))
 | 
| ... | ... | @@ -280,10 +291,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 280 | 291 |  
 | 
| 281 | 292 |  
 | 
| 282 | 293 |  class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
 | 
| 283 |  | -    def __init__(self, cas, *, enable_push):
 | 
|  | 294 | +    def __init__(self, cas, cache_cleaner, *, enable_push):
 | 
| 284 | 295 |          super().__init__()
 | 
| 285 | 296 |          self.cas = cas
 | 
| 286 | 297 |          self.enable_push = enable_push
 | 
|  | 298 | +        self.cache_cleaner = cache_cleaner
 | 
| 287 | 299 |  
 | 
| 288 | 300 |      def FindMissingBlobs(self, request, context):
 | 
| 289 | 301 |          response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| ... | ... | @@ -352,7 +364,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 352 | 364 |                  continue
 | 
| 353 | 365 |  
 | 
| 354 | 366 |              try:
 | 
| 355 |  | -                _clean_up_cache(self.cas, digest.size_bytes)
 | 
|  | 367 | +                self.cache_cleaner.clean_up(digest.size_bytes)
 | 
| 356 | 368 |  
 | 
| 357 | 369 |                  with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
 | 
| 358 | 370 |                      out.write(blob_request.data)
 | 
| ... | ... | @@ -473,63 +485,80 @@ def _digest_from_upload_resource_name(resource_name): | 
| 473 | 485 |          return None
 | 
| 474 | 486 |  
 | 
| 475 | 487 |  
 | 
| 476 |  | -# _clean_up_cache()
 | 
| 477 |  | -#
 | 
| 478 |  | -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | 
| 479 |  | -# is enough space for the incoming artifact
 | 
| 480 |  | -#
 | 
| 481 |  | -# Args:
 | 
| 482 |  | -#   cas: CASCache object
 | 
| 483 |  | -#   object_size: The size of the object being received in bytes
 | 
| 484 |  | -#
 | 
| 485 |  | -# Returns:
 | 
| 486 |  | -#   int: The total bytes removed on the filesystem
 | 
| 487 |  | -#
 | 
| 488 |  | -def _clean_up_cache(cas, object_size):
 | 
| 489 |  | -    # Determine the available disk space, in bytes, of the file system
 | 
| 490 |  | -    # which mounts the repo
 | 
| 491 |  | -    stats = os.statvfs(cas.casdir)
 | 
| 492 |  | -    buffer_ = int(2e9)                # Add a 2 GB buffer
 | 
| 493 |  | -    free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
 | 
| 494 |  | -    total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
 | 
| 495 |  | -
 | 
| 496 |  | -    if object_size > total_disk_space:
 | 
| 497 |  | -        raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | 
| 498 |  | -                                        "the filesystem which mounts the remote "
 | 
| 499 |  | -                                        "cache".format(object_size))
 | 
| 500 |  | -
 | 
| 501 |  | -    if object_size <= free_disk_space:
 | 
| 502 |  | -        # No need to clean up
 | 
| 503 |  | -        return 0
 | 
| 504 |  | -
 | 
| 505 |  | -    # obtain a list of LRP artifacts
 | 
| 506 |  | -    LRP_artifacts = cas.list_artifacts()
 | 
| 507 |  | -
 | 
| 508 |  | -    keep_after = time.time() - _OBJECT_MIN_AGE
 | 
| 509 |  | -
 | 
| 510 |  | -    removed_size = 0  # in bytes
 | 
| 511 |  | -    if object_size - removed_size > free_disk_space:
 | 
| 512 |  | -        # First we try to see if some unreferenced objects became old
 | 
| 513 |  | -        # enough to be removed.
 | 
| 514 |  | -        removed_size += cas.prune(keep_after=keep_after)
 | 
| 515 |  | -
 | 
| 516 |  | -    while object_size - removed_size > free_disk_space:
 | 
| 517 |  | -        try:
 | 
| 518 |  | -            to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
 | 
| 519 |  | -        except IndexError:
 | 
| 520 |  | -            # This exception is caught if there are no more artifacts in the list
 | 
| 521 |  | -            # LRP_artifacts. This means the the artifact is too large for the filesystem
 | 
| 522 |  | -            # so we abort the process
 | 
| 523 |  | -            raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | 
| 524 |  | -                                            "the filesystem which mounts the remote "
 | 
| 525 |  | -                                            "cache".format(object_size))
 | 
|  | 488 | +class _CacheCleaner:
 | 
| 526 | 489 |  
 | 
| 527 |  | -        cas.remove(to_remove, defer_prune=True)
 | 
| 528 |  | -        removed_size += cas.prune(keep_after=keep_after)
 | 
|  | 490 | +    __cleanup_cache_lock = threading.Lock()
 | 
| 529 | 491 |  
 | 
| 530 |  | -    if removed_size > 0:
 | 
| 531 |  | -        logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | 
| 532 |  | -    else:
 | 
| 533 |  | -        logging.info("No artifacts were removed from the cache.")
 | 
|  | 492 | +    def __init__(self, cas, max_head_size, min_head_size = int(2e9)):
 | 
|  | 493 | +        self.__cas = cas
 | 
|  | 494 | +        self.__max_head_size = max_head_size
 | 
|  | 495 | +        self.__min_head_size = min_head_size
 | 
|  | 496 | +
 | 
|  | 497 | +    def __has_space(self, object_size):
 | 
|  | 498 | +        stats = os.statvfs(self.__cas.casdir)
 | 
|  | 499 | +        free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
 | 
|  | 500 | +        total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
 | 
|  | 501 | +
 | 
|  | 502 | +        if object_size > total_disk_space:
 | 
|  | 503 | +            raise ArtifactTooLargeException("Artifact of size: {} is too large for "
 | 
|  | 504 | +                                            "the filesystem which mounts the remote "
 | 
|  | 505 | +                                            "cache".format(object_size))
 | 
| 534 | 506 |  
 | 
| 535 |  | -    return removed_size | 
|  | 507 | +        return object_size <= free_disk_space
 | 
|  | 508 | +
 | 
|  | 509 | +    # _clean_up_cache()
 | 
|  | 510 | +    #
 | 
|  | 511 | +    # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | 
|  | 512 | +    # is enough space for the incoming artifact
 | 
|  | 513 | +    #
 | 
|  | 514 | +    # Args:
 | 
|  | 515 | +    #   object_size: The size of the object being received in bytes
 | 
|  | 516 | +    #
 | 
|  | 517 | +    # Returns:
 | 
|  | 518 | +    #   int: The total bytes removed on the filesystem
 | 
|  | 519 | +    #
 | 
|  | 520 | +    def clean_up(self, object_size):
 | 
|  | 521 | +        if self.__has_space(object_size):
 | 
|  | 522 | +            return 0
 | 
|  | 523 | +
 | 
|  | 524 | +        with _CacheCleaner.__cleanup_cache_lock:
 | 
|  | 525 | +            if self.__has_space(object_size):
 | 
|  | 526 | +                # Another thread has done the cleanup for us
 | 
|  | 527 | +                return 0
 | 
|  | 528 | +
 | 
|  | 529 | +            stats = os.statvfs(self.__cas.casdir)
 | 
|  | 530 | +            target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
 | 
|  | 531 | +
 | 
|  | 532 | +            # obtain a list of LRP artifacts
 | 
|  | 533 | +            LRP_objects = self.__cas.list_objects()
 | 
|  | 534 | +
 | 
|  | 535 | +            removed_size = 0  # in bytes
 | 
|  | 536 | +
 | 
|  | 537 | +            last_mtime = 0
 | 
|  | 538 | +
 | 
|  | 539 | +            while object_size - removed_size > target_disk_space:
 | 
|  | 540 | +                try:
 | 
|  | 541 | +                    last_mtime, to_remove = LRP_objects.pop(0)  # The first element in the list is the LRP artifact
 | 
|  | 542 | +                except IndexError:
 | 
|  | 543 | +                    # This exception is caught if there are no more artifacts in the list
 | 
|  | 544 | +                    # LRP_artifacts. This means the the artifact is too large for the filesystem
 | 
|  | 545 | +                    # so we abort the process
 | 
|  | 546 | +                    raise ArtifactTooLargeException("Artifact of size {} is too large for "
 | 
|  | 547 | +                                                    "the filesystem which mounts the remote "
 | 
|  | 548 | +                                                    "cache".format(object_size))
 | 
|  | 549 | +
 | 
|  | 550 | +                try:
 | 
|  | 551 | +                    size = os.stat(to_remove).st_size
 | 
|  | 552 | +                    os.unlink(to_remove)
 | 
|  | 553 | +                    removed_size += size
 | 
|  | 554 | +                except FileNotFoundError:
 | 
|  | 555 | +                    pass
 | 
|  | 556 | +
 | 
|  | 557 | +            self.__cas.clean_up_refs_until(last_mtime)
 | 
|  | 558 | +
 | 
|  | 559 | +            if removed_size > 0:
 | 
|  | 560 | +                logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | 
|  | 561 | +            else:
 | 
|  | 562 | +                logging.info("No artifacts were removed from the cache.")
 | 
|  | 563 | +
 | 
|  | 564 | +            return removed_size |