Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
- 
075ffefa
by Valentin David at 2018-09-28T09:23:19Z
- 
621bd236
by Tiago Gomes at 2018-09-28T09:52:15Z
- 
e298af53
by Valentin David at 2018-09-28T14:42:38Z
- 
0a332520
by Valentin David at 2018-09-28T14:42:38Z
- 
40962772
by Valentin David at 2018-09-28T14:42:38Z
- 
8dbf1d34
by Valentin David at 2018-09-28T14:42:38Z
- 
b6ae1c70
by Valentin David at 2018-09-28T14:42:38Z
- 
3909b52e
by Valentin David at 2018-09-28T14:42:38Z
6 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_yaml.py
- tests/format/project.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -26,6 +26,7 @@ import stat | 
| 26 | 26 |  import tempfile
 | 
| 27 | 27 |  import uuid
 | 
| 28 | 28 |  import errno
 | 
| 29 | +import contextlib
 | |
| 29 | 30 |  from urllib.parse import urlparse
 | 
| 30 | 31 |  | 
| 31 | 32 |  import grpc
 | 
| ... | ... | @@ -393,13 +394,14 @@ class CASCache(ArtifactCache): | 
| 393 | 394 |      #     digest (Digest): An optional Digest object to populate
 | 
| 394 | 395 |      #     path (str): Path to file to add
 | 
| 395 | 396 |      #     buffer (bytes): Byte buffer to add
 | 
| 397 | +    #     link_directly (bool): Whether file given by path can be linked
 | |
| 396 | 398 |      #
 | 
| 397 | 399 |      # Returns:
 | 
| 398 | 400 |      #     (Digest): The digest of the added object
 | 
| 399 | 401 |      #
 | 
| 400 | 402 |      # Either `path` or `buffer` must be passed, but not both.
 | 
| 401 | 403 |      #
 | 
| 402 | -    def add_object(self, *, digest=None, path=None, buffer=None):
 | |
| 404 | +    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
 | |
| 403 | 405 |          # Exactly one of the two parameters has to be specified
 | 
| 404 | 406 |          assert (path is None) != (buffer is None)
 | 
| 405 | 407 |  | 
| ... | ... | @@ -409,28 +411,34 @@ class CASCache(ArtifactCache): | 
| 409 | 411 |          try:
 | 
| 410 | 412 |              h = hashlib.sha256()
 | 
| 411 | 413 |              # Always write out new file to avoid corruption if input file is modified
 | 
| 412 | -            with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | |
| 413 | -                # Set mode bits to 0644
 | |
| 414 | -                os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
 | |
| 415 | - | |
| 416 | -                if path:
 | |
| 417 | -                    with open(path, 'rb') as f:
 | |
| 418 | -                        for chunk in iter(lambda: f.read(4096), b""):
 | |
| 419 | -                            h.update(chunk)
 | |
| 420 | -                            out.write(chunk)
 | |
| 414 | +            with contextlib.ExitStack() as stack:
 | |
| 415 | +                if path is not None and link_directly:
 | |
| 416 | +                    tmp = stack.enter_context(open(path, 'rb'))
 | |
| 417 | +                    for chunk in iter(lambda: tmp.read(4096), b""):
 | |
| 418 | +                        h.update(chunk)
 | |
| 421 | 419 |                  else:
 | 
| 422 | -                    h.update(buffer)
 | |
| 423 | -                    out.write(buffer)
 | |
| 420 | +                    tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
 | |
| 421 | +                    # Set mode bits to 0644
 | |
| 422 | +                    os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
 | |
| 424 | 423 |  | 
| 425 | -                out.flush()
 | |
| 424 | +                    if path:
 | |
| 425 | +                        with open(path, 'rb') as f:
 | |
| 426 | +                            for chunk in iter(lambda: f.read(4096), b""):
 | |
| 427 | +                                h.update(chunk)
 | |
| 428 | +                                tmp.write(chunk)
 | |
| 429 | +                    else:
 | |
| 430 | +                        h.update(buffer)
 | |
| 431 | +                        tmp.write(buffer)
 | |
| 432 | + | |
| 433 | +                    tmp.flush()
 | |
| 426 | 434 |  | 
| 427 | 435 |                  digest.hash = h.hexdigest()
 | 
| 428 | -                digest.size_bytes = os.fstat(out.fileno()).st_size
 | |
| 436 | +                digest.size_bytes = os.fstat(tmp.fileno()).st_size
 | |
| 429 | 437 |  | 
| 430 | 438 |                  # Place file at final location
 | 
| 431 | 439 |                  objpath = self.objpath(digest)
 | 
| 432 | 440 |                  os.makedirs(os.path.dirname(objpath), exist_ok=True)
 | 
| 433 | -                os.link(out.name, objpath)
 | |
| 441 | +                os.link(tmp.name, objpath)
 | |
| 434 | 442 |  | 
| 435 | 443 |          except FileExistsError as e:
 | 
| 436 | 444 |              # We can ignore the failed link() if the object is already in the repo.
 | 
| ... | ... | @@ -565,7 +573,12 @@ class CASCache(ArtifactCache): | 
| 565 | 573 |      #
 | 
| 566 | 574 |      # Prune unreachable objects from the repo.
 | 
| 567 | 575 |      #
 | 
| 568 | -    def prune(self):
 | |
| 576 | +    # Args:
 | |
| 577 | +    #    keep_after (int|None): timestamp after which unreachable objects
 | |
| 578 | +    #                           are kept. None if no unreachable object
 | |
| 579 | +    #                           should be kept.
 | |
| 580 | +    #
 | |
| 581 | +    def prune(self, keep_after=None):
 | |
| 569 | 582 |          ref_heads = os.path.join(self.casdir, 'refs', 'heads')
 | 
| 570 | 583 |  | 
| 571 | 584 |          pruned = 0
 | 
| ... | ... | @@ -586,6 +599,10 @@ class CASCache(ArtifactCache): | 
| 586 | 599 |                  objhash = os.path.basename(root) + filename
 | 
| 587 | 600 |                  if objhash not in reachable:
 | 
| 588 | 601 |                      obj_path = os.path.join(root, filename)
 | 
| 602 | +                    if keep_after:
 | |
| 603 | +                        st = os.stat(obj_path)
 | |
| 604 | +                        if st.st_mtime >= keep_after:
 | |
| 605 | +                            continue
 | |
| 589 | 606 |                      pruned += os.stat(obj_path).st_size
 | 
| 590 | 607 |                      os.unlink(obj_path)
 | 
| 591 | 608 |  | 
| ... | ... | @@ -24,6 +24,9 @@ import signal | 
| 24 | 24 |  import sys
 | 
| 25 | 25 |  import tempfile
 | 
| 26 | 26 |  import uuid
 | 
| 27 | +import time
 | |
| 28 | +import errno
 | |
| 29 | +import ctypes
 | |
| 27 | 30 |  | 
| 28 | 31 |  import click
 | 
| 29 | 32 |  import grpc
 | 
| ... | ... | @@ -41,6 +44,10 @@ from .cascache import CASCache | 
| 41 | 44 |  # The default limit for gRPC messages is 4 MiB
 | 
| 42 | 45 |  _MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
 | 
| 43 | 46 |  | 
| 47 | +# The minimum age in seconds for objects before they can be cleaned
 | |
| 48 | +# up.
 | |
| 49 | +_OBJECT_MIN_AGE = 6 * 60 * 60
 | |
| 50 | + | |
| 44 | 51 |  | 
| 45 | 52 |  # Trying to push an artifact that is too large
 | 
| 46 | 53 |  class ArtifactTooLargeException(Exception):
 | 
| ... | ... | @@ -130,11 +137,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): | 
| 130 | 137 |          server.stop(0)
 | 
| 131 | 138 |  | 
| 132 | 139 |  | 
| 140 | +class _FallocateCall:
 | |
| 141 | + | |
| 142 | +    FALLOC_FL_KEEP_SIZE = 1
 | |
| 143 | +    FALLOC_FL_PUNCH_HOLE = 2
 | |
| 144 | +    FALLOC_FL_NO_HIDE_STALE = 4
 | |
| 145 | +    FALLOC_FL_COLLAPSE_RANGE = 8
 | |
| 146 | +    FALLOC_FL_ZERO_RANGE = 16
 | |
| 147 | +    FALLOC_FL_INSERT_RANGE = 32
 | |
| 148 | +    FALLOC_FL_UNSHARE_RANGE = 64
 | |
| 149 | + | |
| 150 | +    def __init__(self):
 | |
| 151 | +        self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
 | |
| 152 | +        try:
 | |
| 153 | +            self.fallocate64 = self.libc.fallocate64
 | |
| 154 | +        except AttributeError:
 | |
| 155 | +            self.fallocate = self.libc.fallocate
 | |
| 156 | + | |
| 157 | +    def __call__(self, fd, mode, offset, length):
 | |
| 158 | +        if hasattr(self, 'fallocate64'):
 | |
| 159 | +            print(fd, mode, offset, length)
 | |
| 160 | +            ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
 | |
| 161 | +                                   ctypes.c_int64(offset), ctypes.c_int64(length))
 | |
| 162 | +        else:
 | |
| 163 | +            ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
 | |
| 164 | +                                 ctypes.c_int(offset), ctypes.c_int(length))
 | |
| 165 | +        if ret == -1:
 | |
| 166 | +            errno = ctypes.get_errno()
 | |
| 167 | +            raise OSError(errno, os.strerror(errno))
 | |
| 168 | +        return ret
 | |
| 169 | + | |
| 170 | + | |
| 133 | 171 |  class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
 | 
| 134 | 172 |      def __init__(self, cas, *, enable_push):
 | 
| 135 | 173 |          super().__init__()
 | 
| 136 | 174 |          self.cas = cas
 | 
| 137 | 175 |          self.enable_push = enable_push
 | 
| 176 | +        self.fallocate = _FallocateCall()
 | |
| 138 | 177 |  | 
| 139 | 178 |      def Read(self, request, context):
 | 
| 140 | 179 |          resource_name = request.resource_name
 | 
| ... | ... | @@ -192,25 +231,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 192 | 231 |                          context.set_code(grpc.StatusCode.NOT_FOUND)
 | 
| 193 | 232 |                          return response
 | 
| 194 | 233 |  | 
| 195 | -                    try:
 | |
| 196 | -                        _clean_up_cache(self.cas, client_digest.size_bytes)
 | |
| 197 | -                    except ArtifactTooLargeException as e:
 | |
| 198 | -                        context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 199 | -                        context.set_details(str(e))
 | |
| 200 | -                        return response
 | |
| 234 | +                    while True:
 | |
| 235 | +                        if client_digest.size_bytes == 0:
 | |
| 236 | +                            break
 | |
| 237 | +                        try:
 | |
| 238 | +                            _clean_up_cache(self.cas, client_digest.size_bytes)
 | |
| 239 | +                        except ArtifactTooLargeException as e:
 | |
| 240 | +                            context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
 | |
| 241 | +                            context.set_details(str(e))
 | |
| 242 | +                            return response
 | |
| 243 | + | |
| 244 | +                        try:
 | |
| 245 | +                            self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
 | |
| 246 | +                            break
 | |
| 247 | +                        except OSError as e:
 | |
| 248 | +                            # Multiple upload can happen in the same time
 | |
| 249 | +                            if e.errno != errno.ENOSPC:
 | |
| 250 | +                                raise
 | |
| 251 | + | |
| 201 | 252 |                  elif request.resource_name:
 | 
| 202 | 253 |                      # If it is set on subsequent calls, it **must** match the value of the first request.
 | 
| 203 | 254 |                      if request.resource_name != resource_name:
 | 
| 204 | 255 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 205 | 256 |                          return response
 | 
| 257 | + | |
| 258 | +                if (offset + len(request.data)) > client_digest.size_bytes:
 | |
| 259 | +                    context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | |
| 260 | +                    return response
 | |
| 261 | + | |
| 206 | 262 |                  out.write(request.data)
 | 
| 263 | + | |
| 207 | 264 |                  offset += len(request.data)
 | 
| 265 | + | |
| 208 | 266 |                  if request.finish_write:
 | 
| 209 | 267 |                      if client_digest.size_bytes != offset:
 | 
| 210 | 268 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 211 | 269 |                          return response
 | 
| 212 | 270 |                      out.flush()
 | 
| 213 | -                    digest = self.cas.add_object(path=out.name)
 | |
| 271 | +                    digest = self.cas.add_object(path=out.name, link_directly=True)
 | |
| 214 | 272 |                      if digest.hash != client_digest.hash:
 | 
| 215 | 273 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 216 | 274 |                          return response
 | 
| ... | ... | @@ -230,10 +288,17 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 230 | 288 |      def FindMissingBlobs(self, request, context):
 | 
| 231 | 289 |          response = remote_execution_pb2.FindMissingBlobsResponse()
 | 
| 232 | 290 |          for digest in request.blob_digests:
 | 
| 233 | -            if not _has_object(self.cas, digest):
 | |
| 234 | -                d = response.missing_blob_digests.add()
 | |
| 235 | -                d.hash = digest.hash
 | |
| 236 | -                d.size_bytes = digest.size_bytes
 | |
| 291 | +            objpath = self.cas.objpath(digest)
 | |
| 292 | +            try:
 | |
| 293 | +                os.utime(objpath)
 | |
| 294 | +            except OSError as e:
 | |
| 295 | +                if e.errno != errno.ENOENT:
 | |
| 296 | +                    raise
 | |
| 297 | +                else:
 | |
| 298 | +                    d = response.missing_blob_digests.add()
 | |
| 299 | +                    d.hash = digest.hash
 | |
| 300 | +                    d.size_bytes = digest.size_bytes
 | |
| 301 | + | |
| 237 | 302 |          return response
 | 
| 238 | 303 |  | 
| 239 | 304 |      def BatchReadBlobs(self, request, context):
 | 
| ... | ... | @@ -362,11 +427,6 @@ def _digest_from_upload_resource_name(resource_name): | 
| 362 | 427 |          return None
 | 
| 363 | 428 |  | 
| 364 | 429 |  | 
| 365 | -def _has_object(cas, digest):
 | |
| 366 | -    objpath = cas.objpath(digest)
 | |
| 367 | -    return os.path.exists(objpath)
 | |
| 368 | - | |
| 369 | - | |
| 370 | 430 |  # _clean_up_cache()
 | 
| 371 | 431 |  #
 | 
| 372 | 432 |  # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
 | 
| ... | ... | @@ -399,7 +459,14 @@ def _clean_up_cache(cas, object_size): | 
| 399 | 459 |      # obtain a list of LRP artifacts
 | 
| 400 | 460 |      LRP_artifacts = cas.list_artifacts()
 | 
| 401 | 461 |  | 
| 462 | +    keep_after = time.time() - _OBJECT_MIN_AGE
 | |
| 463 | + | |
| 402 | 464 |      removed_size = 0  # in bytes
 | 
| 465 | +    if object_size - removed_size > free_disk_space:
 | |
| 466 | +        # First we try to see if some unreferenced objects became old
 | |
| 467 | +        # enough to be removed.
 | |
| 468 | +        removed_size += cas.prune(keep_after=keep_after)
 | |
| 469 | + | |
| 403 | 470 |      while object_size - removed_size > free_disk_space:
 | 
| 404 | 471 |          try:
 | 
| 405 | 472 |              to_remove = LRP_artifacts.pop(0)  # The first element in the list is the LRP artifact
 | 
| ... | ... | @@ -411,7 +478,8 @@ def _clean_up_cache(cas, object_size): | 
| 411 | 478 |                                              "the filesystem which mounts the remote "
 | 
| 412 | 479 |                                              "cache".format(object_size))
 | 
| 413 | 480 |  | 
| 414 | -        removed_size += cas.remove(to_remove, defer_prune=False)
 | |
| 481 | +        cas.remove(to_remove, defer_prune=True)
 | |
| 482 | +        removed_size += cas.prune(keep_after=keep_after)
 | |
| 415 | 483 |  | 
| 416 | 484 |      if removed_size > 0:
 | 
| 417 | 485 |          logging.info("Successfully removed {} bytes from the cache".format(removed_size))
 | 
| ... | ... | @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *, | 
| 467 | 467 |                          "{}: Specified path '{}' does not exist"
 | 
| 468 | 468 |                          .format(provenance, path_str))
 | 
| 469 | 469 |  | 
| 470 | -    is_inside = project_dir_path in full_resolved_path.parents or (
 | |
| 470 | +    is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
 | |
| 471 | 471 |          full_resolved_path == project_dir_path)
 | 
| 472 | 472 |  | 
| 473 | 473 |      if path.is_absolute() or not is_inside:
 | 
| ... | ... | @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles): | 
| 181 | 181 |  | 
| 182 | 182 |      # Assert that the cache keys are different
 | 
| 183 | 183 |      assert result1.output != result2.output
 | 
| 184 | + | |
| 185 | + | |
| 186 | +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
 | |
| 187 | +def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
 | |
| 188 | +    real_project = str(datafiles)
 | |
| 189 | +    linked_project = os.path.join(str(tmpdir), 'linked')
 | |
| 190 | +    os.symlink(real_project, linked_project)
 | |
| 191 | +    os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
 | |
| 192 | +    with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
 | |
| 193 | +        f.write("kind: manual\n")
 | |
| 194 | +    result = cli.run(project=linked_project, args=['show', 'element.bst'])
 | |
| 195 | +    result.assert_success() | 
| ... | ... | @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): | 
| 231 | 231 |          assert cli.get_element_state(project, 'element2.bst') == 'cached'
 | 
| 232 | 232 |          assert_shared(cli, share, project, 'element2.bst')
 | 
| 233 | 233 |  | 
| 234 | +        share.make_all_objects_older()
 | |
| 235 | + | |
| 234 | 236 |          # Create and build another element of 5 MB (This will exceed the free disk space available)
 | 
| 235 | 237 |          create_element_size('element3.bst', project, element_path, [], int(5e6))
 | 
| 236 | 238 |          result = cli.run(project=project, args=['build', 'element3.bst'])
 | 
| ... | ... | @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): | 
| 327 | 329 |          # Ensure element1 is cached locally
 | 
| 328 | 330 |          assert cli.get_element_state(project, 'element1.bst') == 'cached'
 | 
| 329 | 331 |  | 
| 332 | +        share.make_all_objects_older()
 | |
| 333 | + | |
| 330 | 334 |          # Create and build the element3 (of 5 MB)
 | 
| 331 | 335 |          create_element_size('element3.bst', project, element_path, [], int(5e6))
 | 
| 332 | 336 |          result = cli.run(project=project, args=['build', 'element3.bst'])
 | 
| ... | ... | @@ -122,6 +122,15 @@ class ArtifactShare(): | 
| 122 | 122 |          except ArtifactError:
 | 
| 123 | 123 |              return False
 | 
| 124 | 124 |  | 
| 125 | +    def make_all_objects_older(self):
 | |
| 126 | +        for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
 | |
| 127 | +            for name in files:
 | |
| 128 | +                fullname = os.path.join(root, name)
 | |
| 129 | +                st = os.stat(fullname)
 | |
| 130 | +                mtime = st.st_mtime - 6 * 60 * 60
 | |
| 131 | +                atime = st.st_atime - 6 * 60 * 60
 | |
| 132 | +                os.utime(fullname, times=(atime, mtime))
 | |
| 133 | + | |
| 125 | 134 |      # close():
 | 
| 126 | 135 |      #
 | 
| 127 | 136 |      # Remove the artifact share.
 | 
