Valentin David pushed to branch valentindavid/cache_server_fill_up-1.2 at BuildStream / buildstream
Commits:
-
075ffefa
by Valentin David at 2018-09-28T09:23:19Z
-
621bd236
by Tiago Gomes at 2018-09-28T09:52:15Z
-
e298af53
by Valentin David at 2018-09-28T14:42:38Z
-
0a332520
by Valentin David at 2018-09-28T14:42:38Z
-
40962772
by Valentin David at 2018-09-28T14:42:38Z
-
8dbf1d34
by Valentin David at 2018-09-28T14:42:38Z
-
b6ae1c70
by Valentin David at 2018-09-28T14:42:38Z
-
3909b52e
by Valentin David at 2018-09-28T14:42:38Z
6 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_yaml.py
- tests/format/project.py
- tests/frontend/push.py
- tests/testutils/artifactshare.py
Changes:
| ... | ... | @@ -26,6 +26,7 @@ import stat |
| 26 | 26 |
import tempfile
|
| 27 | 27 |
import uuid
|
| 28 | 28 |
import errno
|
| 29 |
+import contextlib
|
|
| 29 | 30 |
from urllib.parse import urlparse
|
| 30 | 31 |
|
| 31 | 32 |
import grpc
|
| ... | ... | @@ -393,13 +394,14 @@ class CASCache(ArtifactCache): |
| 393 | 394 |
# digest (Digest): An optional Digest object to populate
|
| 394 | 395 |
# path (str): Path to file to add
|
| 395 | 396 |
# buffer (bytes): Byte buffer to add
|
| 397 |
+ # link_directly (bool): Whether file given by path can be linked
|
|
| 396 | 398 |
#
|
| 397 | 399 |
# Returns:
|
| 398 | 400 |
# (Digest): The digest of the added object
|
| 399 | 401 |
#
|
| 400 | 402 |
# Either `path` or `buffer` must be passed, but not both.
|
| 401 | 403 |
#
|
| 402 |
- def add_object(self, *, digest=None, path=None, buffer=None):
|
|
| 404 |
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
|
|
| 403 | 405 |
# Exactly one of the two parameters has to be specified
|
| 404 | 406 |
assert (path is None) != (buffer is None)
|
| 405 | 407 |
|
| ... | ... | @@ -409,28 +411,34 @@ class CASCache(ArtifactCache): |
| 409 | 411 |
try:
|
| 410 | 412 |
h = hashlib.sha256()
|
| 411 | 413 |
# Always write out new file to avoid corruption if input file is modified
|
| 412 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 413 |
- # Set mode bits to 0644
|
|
| 414 |
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 415 |
- |
|
| 416 |
- if path:
|
|
| 417 |
- with open(path, 'rb') as f:
|
|
| 418 |
- for chunk in iter(lambda: f.read(4096), b""):
|
|
| 419 |
- h.update(chunk)
|
|
| 420 |
- out.write(chunk)
|
|
| 414 |
+ with contextlib.ExitStack() as stack:
|
|
| 415 |
+ if path is not None and link_directly:
|
|
| 416 |
+ tmp = stack.enter_context(open(path, 'rb'))
|
|
| 417 |
+ for chunk in iter(lambda: tmp.read(4096), b""):
|
|
| 418 |
+ h.update(chunk)
|
|
| 421 | 419 |
else:
|
| 422 |
- h.update(buffer)
|
|
| 423 |
- out.write(buffer)
|
|
| 420 |
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
|
|
| 421 |
+ # Set mode bits to 0644
|
|
| 422 |
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
|
|
| 424 | 423 |
|
| 425 |
- out.flush()
|
|
| 424 |
+ if path:
|
|
| 425 |
+ with open(path, 'rb') as f:
|
|
| 426 |
+ for chunk in iter(lambda: f.read(4096), b""):
|
|
| 427 |
+ h.update(chunk)
|
|
| 428 |
+ tmp.write(chunk)
|
|
| 429 |
+ else:
|
|
| 430 |
+ h.update(buffer)
|
|
| 431 |
+ tmp.write(buffer)
|
|
| 432 |
+ |
|
| 433 |
+ tmp.flush()
|
|
| 426 | 434 |
|
| 427 | 435 |
digest.hash = h.hexdigest()
|
| 428 |
- digest.size_bytes = os.fstat(out.fileno()).st_size
|
|
| 436 |
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
|
|
| 429 | 437 |
|
| 430 | 438 |
# Place file at final location
|
| 431 | 439 |
objpath = self.objpath(digest)
|
| 432 | 440 |
os.makedirs(os.path.dirname(objpath), exist_ok=True)
|
| 433 |
- os.link(out.name, objpath)
|
|
| 441 |
+ os.link(tmp.name, objpath)
|
|
| 434 | 442 |
|
| 435 | 443 |
except FileExistsError as e:
|
| 436 | 444 |
# We can ignore the failed link() if the object is already in the repo.
|
| ... | ... | @@ -565,7 +573,12 @@ class CASCache(ArtifactCache): |
| 565 | 573 |
#
|
| 566 | 574 |
# Prune unreachable objects from the repo.
|
| 567 | 575 |
#
|
| 568 |
- def prune(self):
|
|
| 576 |
+ # Args:
|
|
| 577 |
+ # keep_after (int|None): timestamp after which unreachable objects
|
|
| 578 |
+ # are kept. None if no unreachable object
|
|
| 579 |
+ # should be kept.
|
|
| 580 |
+ #
|
|
| 581 |
+ def prune(self, keep_after=None):
|
|
| 569 | 582 |
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
|
| 570 | 583 |
|
| 571 | 584 |
pruned = 0
|
| ... | ... | @@ -586,6 +599,10 @@ class CASCache(ArtifactCache): |
| 586 | 599 |
objhash = os.path.basename(root) + filename
|
| 587 | 600 |
if objhash not in reachable:
|
| 588 | 601 |
obj_path = os.path.join(root, filename)
|
| 602 |
+ if keep_after:
|
|
| 603 |
+ st = os.stat(obj_path)
|
|
| 604 |
+ if st.st_mtime >= keep_after:
|
|
| 605 |
+ continue
|
|
| 589 | 606 |
pruned += os.stat(obj_path).st_size
|
| 590 | 607 |
os.unlink(obj_path)
|
| 591 | 608 |
|
| ... | ... | @@ -24,6 +24,9 @@ import signal |
| 24 | 24 |
import sys
|
| 25 | 25 |
import tempfile
|
| 26 | 26 |
import uuid
|
| 27 |
+import time
|
|
| 28 |
+import errno
|
|
| 29 |
+import ctypes
|
|
| 27 | 30 |
|
| 28 | 31 |
import click
|
| 29 | 32 |
import grpc
|
| ... | ... | @@ -41,6 +44,10 @@ from .cascache import CASCache |
| 41 | 44 |
# The default limit for gRPC messages is 4 MiB
|
| 42 | 45 |
_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
| 43 | 46 |
|
| 47 |
+# The minimum age in seconds for objects before they can be cleaned
|
|
| 48 |
+# up.
|
|
| 49 |
+_OBJECT_MIN_AGE = 6 * 60 * 60
|
|
| 50 |
+ |
|
| 44 | 51 |
|
| 45 | 52 |
# Trying to push an artifact that is too large
|
| 46 | 53 |
class ArtifactTooLargeException(Exception):
|
| ... | ... | @@ -130,11 +137,43 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): |
| 130 | 137 |
server.stop(0)
|
| 131 | 138 |
|
| 132 | 139 |
|
| 140 |
+class _FallocateCall:
|
|
| 141 |
+ |
|
| 142 |
+ FALLOC_FL_KEEP_SIZE = 1
|
|
| 143 |
+ FALLOC_FL_PUNCH_HOLE = 2
|
|
| 144 |
+ FALLOC_FL_NO_HIDE_STALE = 4
|
|
| 145 |
+ FALLOC_FL_COLLAPSE_RANGE = 8
|
|
| 146 |
+ FALLOC_FL_ZERO_RANGE = 16
|
|
| 147 |
+ FALLOC_FL_INSERT_RANGE = 32
|
|
| 148 |
+ FALLOC_FL_UNSHARE_RANGE = 64
|
|
| 149 |
+ |
|
| 150 |
+ def __init__(self):
|
|
| 151 |
+ self.libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
|
| 152 |
+ try:
|
|
| 153 |
+ self.fallocate64 = self.libc.fallocate64
|
|
| 154 |
+ except AttributeError:
|
|
| 155 |
+ self.fallocate = self.libc.fallocate
|
|
| 156 |
+ |
|
| 157 |
+ def __call__(self, fd, mode, offset, length):
|
|
| 158 |
+ if hasattr(self, 'fallocate64'):
|
|
| 159 |
+ print(fd, mode, offset, length)
|
|
| 160 |
+ ret = self.fallocate64(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
| 161 |
+ ctypes.c_int64(offset), ctypes.c_int64(length))
|
|
| 162 |
+ else:
|
|
| 163 |
+ ret = self.fallocate(ctypes.c_int(fd), ctypes.c_int(mode),
|
|
| 164 |
+ ctypes.c_int(offset), ctypes.c_int(length))
|
|
| 165 |
+ if ret == -1:
|
|
| 166 |
+ errno = ctypes.get_errno()
|
|
| 167 |
+ raise OSError(errno, os.strerror(errno))
|
|
| 168 |
+ return ret
|
|
| 169 |
+ |
|
| 170 |
+ |
|
| 133 | 171 |
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
|
| 134 | 172 |
def __init__(self, cas, *, enable_push):
|
| 135 | 173 |
super().__init__()
|
| 136 | 174 |
self.cas = cas
|
| 137 | 175 |
self.enable_push = enable_push
|
| 176 |
+ self.fallocate = _FallocateCall()
|
|
| 138 | 177 |
|
| 139 | 178 |
def Read(self, request, context):
|
| 140 | 179 |
resource_name = request.resource_name
|
| ... | ... | @@ -192,25 +231,44 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 192 | 231 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 193 | 232 |
return response
|
| 194 | 233 |
|
| 195 |
- try:
|
|
| 196 |
- _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 197 |
- except ArtifactTooLargeException as e:
|
|
| 198 |
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 199 |
- context.set_details(str(e))
|
|
| 200 |
- return response
|
|
| 234 |
+ while True:
|
|
| 235 |
+ if client_digest.size_bytes == 0:
|
|
| 236 |
+ break
|
|
| 237 |
+ try:
|
|
| 238 |
+ _clean_up_cache(self.cas, client_digest.size_bytes)
|
|
| 239 |
+ except ArtifactTooLargeException as e:
|
|
| 240 |
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
|
|
| 241 |
+ context.set_details(str(e))
|
|
| 242 |
+ return response
|
|
| 243 |
+ |
|
| 244 |
+ try:
|
|
| 245 |
+ self.fallocate(out.fileno(), 0, 0, client_digest.size_bytes)
|
|
| 246 |
+ break
|
|
| 247 |
+ except OSError as e:
|
|
| 248 |
+ # Multiple upload can happen in the same time
|
|
| 249 |
+ if e.errno != errno.ENOSPC:
|
|
| 250 |
+ raise
|
|
| 251 |
+ |
|
| 201 | 252 |
elif request.resource_name:
|
| 202 | 253 |
# If it is set on subsequent calls, it **must** match the value of the first request.
|
| 203 | 254 |
if request.resource_name != resource_name:
|
| 204 | 255 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 205 | 256 |
return response
|
| 257 |
+ |
|
| 258 |
+ if (offset + len(request.data)) > client_digest.size_bytes:
|
|
| 259 |
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
| 260 |
+ return response
|
|
| 261 |
+ |
|
| 206 | 262 |
out.write(request.data)
|
| 263 |
+ |
|
| 207 | 264 |
offset += len(request.data)
|
| 265 |
+ |
|
| 208 | 266 |
if request.finish_write:
|
| 209 | 267 |
if client_digest.size_bytes != offset:
|
| 210 | 268 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 211 | 269 |
return response
|
| 212 | 270 |
out.flush()
|
| 213 |
- digest = self.cas.add_object(path=out.name)
|
|
| 271 |
+ digest = self.cas.add_object(path=out.name, link_directly=True)
|
|
| 214 | 272 |
if digest.hash != client_digest.hash:
|
| 215 | 273 |
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
| 216 | 274 |
return response
|
| ... | ... | @@ -230,10 +288,17 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 230 | 288 |
def FindMissingBlobs(self, request, context):
|
| 231 | 289 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| 232 | 290 |
for digest in request.blob_digests:
|
| 233 |
- if not _has_object(self.cas, digest):
|
|
| 234 |
- d = response.missing_blob_digests.add()
|
|
| 235 |
- d.hash = digest.hash
|
|
| 236 |
- d.size_bytes = digest.size_bytes
|
|
| 291 |
+ objpath = self.cas.objpath(digest)
|
|
| 292 |
+ try:
|
|
| 293 |
+ os.utime(objpath)
|
|
| 294 |
+ except OSError as e:
|
|
| 295 |
+ if e.errno != errno.ENOENT:
|
|
| 296 |
+ raise
|
|
| 297 |
+ else:
|
|
| 298 |
+ d = response.missing_blob_digests.add()
|
|
| 299 |
+ d.hash = digest.hash
|
|
| 300 |
+ d.size_bytes = digest.size_bytes
|
|
| 301 |
+ |
|
| 237 | 302 |
return response
|
| 238 | 303 |
|
| 239 | 304 |
def BatchReadBlobs(self, request, context):
|
| ... | ... | @@ -362,11 +427,6 @@ def _digest_from_upload_resource_name(resource_name): |
| 362 | 427 |
return None
|
| 363 | 428 |
|
| 364 | 429 |
|
| 365 |
-def _has_object(cas, digest):
|
|
| 366 |
- objpath = cas.objpath(digest)
|
|
| 367 |
- return os.path.exists(objpath)
|
|
| 368 |
- |
|
| 369 |
- |
|
| 370 | 430 |
# _clean_up_cache()
|
| 371 | 431 |
#
|
| 372 | 432 |
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
|
| ... | ... | @@ -399,7 +459,14 @@ def _clean_up_cache(cas, object_size): |
| 399 | 459 |
# obtain a list of LRP artifacts
|
| 400 | 460 |
LRP_artifacts = cas.list_artifacts()
|
| 401 | 461 |
|
| 462 |
+ keep_after = time.time() - _OBJECT_MIN_AGE
|
|
| 463 |
+ |
|
| 402 | 464 |
removed_size = 0 # in bytes
|
| 465 |
+ if object_size - removed_size > free_disk_space:
|
|
| 466 |
+ # First we try to see if some unreferenced objects became old
|
|
| 467 |
+ # enough to be removed.
|
|
| 468 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
| 469 |
+ |
|
| 403 | 470 |
while object_size - removed_size > free_disk_space:
|
| 404 | 471 |
try:
|
| 405 | 472 |
to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
|
| ... | ... | @@ -411,7 +478,8 @@ def _clean_up_cache(cas, object_size): |
| 411 | 478 |
"the filesystem which mounts the remote "
|
| 412 | 479 |
"cache".format(object_size))
|
| 413 | 480 |
|
| 414 |
- removed_size += cas.remove(to_remove, defer_prune=False)
|
|
| 481 |
+ cas.remove(to_remove, defer_prune=True)
|
|
| 482 |
+ removed_size += cas.prune(keep_after=keep_after)
|
|
| 415 | 483 |
|
| 416 | 484 |
if removed_size > 0:
|
| 417 | 485 |
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
|
| ... | ... | @@ -467,7 +467,7 @@ def node_get_project_path(node, key, project_dir, *, |
| 467 | 467 |
"{}: Specified path '{}' does not exist"
|
| 468 | 468 |
.format(provenance, path_str))
|
| 469 | 469 |
|
| 470 |
- is_inside = project_dir_path in full_resolved_path.parents or (
|
|
| 470 |
+ is_inside = project_dir_path.resolve() in full_resolved_path.parents or (
|
|
| 471 | 471 |
full_resolved_path == project_dir_path)
|
| 472 | 472 |
|
| 473 | 473 |
if path.is_absolute() or not is_inside:
|
| ... | ... | @@ -181,3 +181,15 @@ def test_project_refs_options(cli, datafiles): |
| 181 | 181 |
|
| 182 | 182 |
# Assert that the cache keys are different
|
| 183 | 183 |
assert result1.output != result2.output
|
| 184 |
+ |
|
| 185 |
+ |
|
| 186 |
+@pytest.mark.datafiles(os.path.join(DATA_DIR, 'element-path'))
|
|
| 187 |
+def test_element_path_project_path_contains_symlinks(cli, datafiles, tmpdir):
|
|
| 188 |
+ real_project = str(datafiles)
|
|
| 189 |
+ linked_project = os.path.join(str(tmpdir), 'linked')
|
|
| 190 |
+ os.symlink(real_project, linked_project)
|
|
| 191 |
+ os.makedirs(os.path.join(real_project, 'elements'), exist_ok=True)
|
|
| 192 |
+ with open(os.path.join(real_project, 'elements', 'element.bst'), 'w') as f:
|
|
| 193 |
+ f.write("kind: manual\n")
|
|
| 194 |
+ result = cli.run(project=linked_project, args=['show', 'element.bst'])
|
|
| 195 |
+ result.assert_success()
|
| ... | ... | @@ -231,6 +231,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): |
| 231 | 231 |
assert cli.get_element_state(project, 'element2.bst') == 'cached'
|
| 232 | 232 |
assert_shared(cli, share, project, 'element2.bst')
|
| 233 | 233 |
|
| 234 |
+ share.make_all_objects_older()
|
|
| 235 |
+ |
|
| 234 | 236 |
# Create and build another element of 5 MB (This will exceed the free disk space available)
|
| 235 | 237 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
| 236 | 238 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
| ... | ... | @@ -327,6 +329,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): |
| 327 | 329 |
# Ensure element1 is cached locally
|
| 328 | 330 |
assert cli.get_element_state(project, 'element1.bst') == 'cached'
|
| 329 | 331 |
|
| 332 |
+ share.make_all_objects_older()
|
|
| 333 |
+ |
|
| 330 | 334 |
# Create and build the element3 (of 5 MB)
|
| 331 | 335 |
create_element_size('element3.bst', project, element_path, [], int(5e6))
|
| 332 | 336 |
result = cli.run(project=project, args=['build', 'element3.bst'])
|
| ... | ... | @@ -122,6 +122,15 @@ class ArtifactShare(): |
| 122 | 122 |
except ArtifactError:
|
| 123 | 123 |
return False
|
| 124 | 124 |
|
| 125 |
+ def make_all_objects_older(self):
|
|
| 126 |
+ for root, dirs, files in os.walk(os.path.join(self.cas.casdir, 'objects')):
|
|
| 127 |
+ for name in files:
|
|
| 128 |
+ fullname = os.path.join(root, name)
|
|
| 129 |
+ st = os.stat(fullname)
|
|
| 130 |
+ mtime = st.st_mtime - 6 * 60 * 60
|
|
| 131 |
+ atime = st.st_atime - 6 * 60 * 60
|
|
| 132 |
+ os.utime(fullname, times=(atime, mtime))
|
|
| 133 |
+ |
|
| 125 | 134 |
# close():
|
| 126 | 135 |
#
|
| 127 | 136 |
# Remove the artifact share.
|
