Tom Pollard pushed to branch tpollard/494 at BuildStream / buildstream
Commits:
-
f447aedd
by Tiago Gomes at 2018-10-01T10:33:11Z
-
682dddce
by Tiago Gomes at 2018-10-01T10:35:12Z
-
fafa8136
by Tiago Gomes at 2018-10-01T10:59:54Z
-
26e1a3c7
by Jürg Billeter at 2018-10-01T14:58:06Z
-
f47895c0
by Jürg Billeter at 2018-10-01T14:58:06Z
-
cf00c0a1
by Jürg Billeter at 2018-10-01T15:32:30Z
-
5f4ae90b
by Jürg Billeter at 2018-10-02T06:34:02Z
-
0458bc4e
by Jürg Billeter at 2018-10-02T07:08:35Z
-
a9c819be
by Tom Pollard at 2018-10-02T13:18:14Z
10 changed files:
- .gitlab-ci.yml
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_frontend/cli.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_stream.py
- buildstream/element.py
- buildstream/sandbox/_sandboxremote.py
- tests/completions/completions.py
Changes:
| ... | ... | @@ -161,14 +161,14 @@ docs: |
| 161 | 161 |
.overnight-tests: &overnight-tests-template
|
| 162 | 162 |
stage: test
|
| 163 | 163 |
variables:
|
| 164 |
- bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
|
|
| 165 |
- bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
|
| 166 |
- fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
|
|
| 164 |
+ BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
|
|
| 165 |
+ BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
|
| 166 |
+ FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
|
|
| 167 | 167 |
before_script:
|
| 168 | 168 |
- (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
|
| 169 |
- - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
|
|
| 169 |
+ - pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
|
|
| 170 | 170 |
- git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
|
| 171 |
- - git -C freedesktop-sdk checkout ${fd_sdk_ref}
|
|
| 171 |
+ - git -C freedesktop-sdk checkout ${FD_SDK_REF}
|
|
| 172 | 172 |
only:
|
| 173 | 173 |
- schedules
|
| 174 | 174 |
|
| ... | ... | @@ -38,8 +38,9 @@ CACHE_SIZE_FILE = "cache_size" |
| 38 | 38 |
# url (str): Location of the remote artifact cache
|
| 39 | 39 |
# push (bool): Whether we should attempt to push artifacts to this cache,
|
| 40 | 40 |
# in addition to pulling from it.
|
| 41 |
+# buildtrees (bool): Whether the default action of pull should include the artifact buildtree
|
|
| 41 | 42 |
#
|
| 42 |
-class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert client_key client_cert')):
|
|
| 43 |
+class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push buildtrees server_cert client_key client_cert')):
|
|
| 43 | 44 |
|
| 44 | 45 |
# _new_from_config_node
|
| 45 | 46 |
#
|
| ... | ... | @@ -47,9 +48,10 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl |
| 47 | 48 |
#
|
| 48 | 49 |
@staticmethod
|
| 49 | 50 |
def _new_from_config_node(spec_node, basedir=None):
|
| 50 |
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
|
|
| 51 |
+ _yaml.node_validate(spec_node, ['url', 'push', 'pullbuildtrees', 'server-cert', 'client-key', 'client-cert'])
|
|
| 51 | 52 |
url = _yaml.node_get(spec_node, str, 'url')
|
| 52 | 53 |
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
|
| 54 |
+ buildtrees = _yaml.node_get(spec_node, bool, 'pullbuildtrees', default_value=False)
|
|
| 53 | 55 |
if not url:
|
| 54 | 56 |
provenance = _yaml.node_get_provenance(spec_node, 'url')
|
| 55 | 57 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
| ... | ... | @@ -77,10 +79,10 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl |
| 77 | 79 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
| 78 | 80 |
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
|
| 79 | 81 |
|
| 80 |
- return ArtifactCacheSpec(url, push, server_cert, client_key, client_cert)
|
|
| 82 |
+ return ArtifactCacheSpec(url, push, buildtrees, server_cert, client_key, client_cert)
|
|
| 81 | 83 |
|
| 82 | 84 |
|
| 83 |
-ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
|
|
| 85 |
+ArtifactCacheSpec.__new__.__defaults__ = (False, None, None, None)
|
|
| 84 | 86 |
|
| 85 | 87 |
|
| 86 | 88 |
# An ArtifactCache manages artifacts.
|
| ... | ... | @@ -426,6 +428,22 @@ class ArtifactCache(): |
| 426 | 428 |
raise ImplError("Cache '{kind}' does not implement contains()"
|
| 427 | 429 |
.format(kind=type(self).__name__))
|
| 428 | 430 |
|
| 431 |
+ # contains_subdir_artifact():
|
|
| 432 |
+ #
|
|
| 433 |
+ # Check whether an artifact element contains a digest for a subdir
|
|
| 434 |
+ # which is populated in the cache, i.e non dangling.
|
|
| 435 |
+ #
|
|
| 436 |
+ # Args:
|
|
| 437 |
+ # element (Element): The Element to check
|
|
| 438 |
+ # key (str): The cache key to use
|
|
| 439 |
+ # subdir (str): The subdir to check
|
|
| 440 |
+ #
|
|
| 441 |
+ # Returns: True if the subdir exists & is populated in the cache, False otherwise
|
|
| 442 |
+ #
|
|
| 443 |
+ def contains_subdir_artifact(self, element, key, subdir):
|
|
| 444 |
+ raise ImplError("Cache '{kind}' does not implement contains_subdir_artifact()"
|
|
| 445 |
+ .format(kind=type(self).__name__))
|
|
| 446 |
+ |
|
| 429 | 447 |
# list_artifacts():
|
| 430 | 448 |
#
|
| 431 | 449 |
# List artifacts in this cache in LRU order.
|
| ... | ... | @@ -551,11 +569,12 @@ class ArtifactCache(): |
| 551 | 569 |
# element (Element): The Element whose artifact is to be fetched
|
| 552 | 570 |
# key (str): The cache key to use
|
| 553 | 571 |
# progress (callable): The progress callback, if any
|
| 572 |
+ # subdir (str): The optional specific subdir to pull
|
|
| 554 | 573 |
#
|
| 555 | 574 |
# Returns:
|
| 556 | 575 |
# (bool): True if pull was successful, False if artifact was not available
|
| 557 | 576 |
#
|
| 558 |
- def pull(self, element, key, *, progress=None):
|
|
| 577 |
+ def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
| 559 | 578 |
raise ImplError("Cache '{kind}' does not implement pull()"
|
| 560 | 579 |
.format(kind=type(self).__name__))
|
| 561 | 580 |
|
| ... | ... | @@ -92,6 +92,16 @@ class CASCache(ArtifactCache): |
| 92 | 92 |
# This assumes that the repository doesn't have any dangling pointers
|
| 93 | 93 |
return os.path.exists(refpath)
|
| 94 | 94 |
|
| 95 |
+ def contains_subdir_artifact(self, element, key, subdir):
|
|
| 96 |
+ tree = self.resolve_ref(self.get_artifact_fullname(element, key))
|
|
| 97 |
+ |
|
| 98 |
+ # This assumes that the subdir digest is present in the element tree
|
|
| 99 |
+ subdirdigest = self._get_subdir(tree, subdir)
|
|
| 100 |
+ objpath = self.objpath(subdirdigest)
|
|
| 101 |
+ |
|
| 102 |
+ # True if subdir content is cached or if empty as expected
|
|
| 103 |
+ return os.path.exists(objpath)
|
|
| 104 |
+ |
|
| 95 | 105 |
def extract(self, element, key):
|
| 96 | 106 |
ref = self.get_artifact_fullname(element, key)
|
| 97 | 107 |
|
| ... | ... | @@ -228,7 +238,7 @@ class CASCache(ArtifactCache): |
| 228 | 238 |
remotes_for_project = self._remotes[element._get_project()]
|
| 229 | 239 |
return any(remote.spec.push for remote in remotes_for_project)
|
| 230 | 240 |
|
| 231 |
- def pull(self, element, key, *, progress=None):
|
|
| 241 |
+ def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
| 232 | 242 |
ref = self.get_artifact_fullname(element, key)
|
| 233 | 243 |
|
| 234 | 244 |
project = element._get_project()
|
| ... | ... | @@ -247,8 +257,14 @@ class CASCache(ArtifactCache): |
| 247 | 257 |
tree.hash = response.digest.hash
|
| 248 | 258 |
tree.size_bytes = response.digest.size_bytes
|
| 249 | 259 |
|
| 250 |
- self._fetch_directory(remote, tree)
|
|
| 260 |
+ # Check if the element artifact is present, if so just fetch subdir
|
|
| 261 |
+ if subdir and os.path.exists(self.objpath(tree)):
|
|
| 262 |
+ self._fetch_subdir(remote, tree, subdir)
|
|
| 263 |
+ else:
|
|
| 264 |
+ # Fetch artifact, excluded_subdirs determined in pullqueue
|
|
| 265 |
+ self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
|
|
| 251 | 266 |
|
| 267 |
+ # tree is the remote value, so is the same without or without dangling ref locally
|
|
| 252 | 268 |
self.set_ref(ref, tree)
|
| 253 | 269 |
|
| 254 | 270 |
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
|
| ... | ... | @@ -668,8 +684,10 @@ class CASCache(ArtifactCache): |
| 668 | 684 |
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
|
| 669 | 685 |
|
| 670 | 686 |
for dirnode in directory.directories:
|
| 671 |
- fullpath = os.path.join(dest, dirnode.name)
|
|
| 672 |
- self._checkout(fullpath, dirnode.digest)
|
|
| 687 |
+ # Don't try to checkout a dangling ref
|
|
| 688 |
+ if os.path.exists(self.objpath(dirnode.digest)):
|
|
| 689 |
+ fullpath = os.path.join(dest, dirnode.name)
|
|
| 690 |
+ self._checkout(fullpath, dirnode.digest)
|
|
| 673 | 691 |
|
| 674 | 692 |
for symlinknode in directory.symlinks:
|
| 675 | 693 |
# symlink
|
| ... | ... | @@ -948,10 +966,12 @@ class CASCache(ArtifactCache): |
| 948 | 966 |
# remote (Remote): The remote to use.
|
| 949 | 967 |
# dir_digest (Digest): Digest object for the directory to fetch.
|
| 950 | 968 |
#
|
| 951 |
- def _fetch_directory(self, remote, dir_digest):
|
|
| 969 |
+ def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
|
|
| 952 | 970 |
fetch_queue = [dir_digest]
|
| 953 | 971 |
fetch_next_queue = []
|
| 954 | 972 |
batch = _CASBatchRead(remote)
|
| 973 |
+ if not excluded_subdirs:
|
|
| 974 |
+ excluded_subdirs = []
|
|
| 955 | 975 |
|
| 956 | 976 |
while len(fetch_queue) + len(fetch_next_queue) > 0:
|
| 957 | 977 |
if len(fetch_queue) == 0:
|
| ... | ... | @@ -966,8 +986,9 @@ class CASCache(ArtifactCache): |
| 966 | 986 |
directory.ParseFromString(f.read())
|
| 967 | 987 |
|
| 968 | 988 |
for dirnode in directory.directories:
|
| 969 |
- batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
| 970 |
- fetch_queue, fetch_next_queue, recursive=True)
|
|
| 989 |
+ if dirnode.name not in excluded_subdirs:
|
|
| 990 |
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
|
|
| 991 |
+ fetch_queue, fetch_next_queue, recursive=True)
|
|
| 971 | 992 |
|
| 972 | 993 |
for filenode in directory.files:
|
| 973 | 994 |
batch = self._fetch_directory_node(remote, filenode.digest, batch,
|
| ... | ... | @@ -976,6 +997,10 @@ class CASCache(ArtifactCache): |
| 976 | 997 |
# Fetch final batch
|
| 977 | 998 |
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
|
| 978 | 999 |
|
| 1000 |
+ def _fetch_subdir(self, remote, tree, subdir):
|
|
| 1001 |
+ subdirdigest = self._get_subdir(tree, subdir)
|
|
| 1002 |
+ self._fetch_directory(remote, subdirdigest)
|
|
| 1003 |
+ |
|
| 979 | 1004 |
def _fetch_tree(self, remote, digest):
|
| 980 | 1005 |
# download but do not store the Tree object
|
| 981 | 1006 |
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
| ... | ... | @@ -1048,10 +1073,29 @@ class CASCache(ArtifactCache): |
| 1048 | 1073 |
missing_blobs[d.hash] = d
|
| 1049 | 1074 |
|
| 1050 | 1075 |
# Upload any blobs missing on the server
|
| 1051 |
- for blob_digest in missing_blobs.values():
|
|
| 1052 |
- with open(self.objpath(blob_digest), 'rb') as f:
|
|
| 1053 |
- assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
|
|
| 1054 |
- self._send_blob(remote, blob_digest, f, u_uid=u_uid)
|
|
| 1076 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
| 1077 |
+ |
|
| 1078 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
| 1079 |
+ batch = _CASBatchUpdate(remote)
|
|
| 1080 |
+ |
|
| 1081 |
+ for digest in digests:
|
|
| 1082 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
| 1083 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
| 1084 |
+ |
|
| 1085 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 1086 |
+ not remote.batch_update_supported):
|
|
| 1087 |
+ # Too large for batch request, upload in independent request.
|
|
| 1088 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
| 1089 |
+ else:
|
|
| 1090 |
+ if not batch.add(digest, f):
|
|
| 1091 |
+ # Not enough space left in batch request.
|
|
| 1092 |
+ # Complete pending batch first.
|
|
| 1093 |
+ batch.send()
|
|
| 1094 |
+ batch = _CASBatchUpdate(remote)
|
|
| 1095 |
+ batch.add(digest, f)
|
|
| 1096 |
+ |
|
| 1097 |
+ # Send final batch
|
|
| 1098 |
+ batch.send()
|
|
| 1055 | 1099 |
|
| 1056 | 1100 |
|
| 1057 | 1101 |
# Represents a single remote CAS cache.
|
| ... | ... | @@ -1126,6 +1170,17 @@ class _CASRemote(): |
| 1126 | 1170 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
| 1127 | 1171 |
raise
|
| 1128 | 1172 |
|
| 1173 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
| 1174 |
+ self.batch_update_supported = False
|
|
| 1175 |
+ try:
|
|
| 1176 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1177 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
| 1178 |
+ self.batch_update_supported = True
|
|
| 1179 |
+ except grpc.RpcError as e:
|
|
| 1180 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
| 1181 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
| 1182 |
+ raise
|
|
| 1183 |
+ |
|
| 1129 | 1184 |
self._initialized = True
|
| 1130 | 1185 |
|
| 1131 | 1186 |
|
| ... | ... | @@ -1173,6 +1228,46 @@ class _CASBatchRead(): |
| 1173 | 1228 |
yield (response.digest, response.data)
|
| 1174 | 1229 |
|
| 1175 | 1230 |
|
| 1231 |
+# Represents a batch of blobs queued for upload.
|
|
| 1232 |
+#
|
|
| 1233 |
+class _CASBatchUpdate():
|
|
| 1234 |
+ def __init__(self, remote):
|
|
| 1235 |
+ self._remote = remote
|
|
| 1236 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
| 1237 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1238 |
+ self._size = 0
|
|
| 1239 |
+ self._sent = False
|
|
| 1240 |
+ |
|
| 1241 |
+ def add(self, digest, stream):
|
|
| 1242 |
+ assert not self._sent
|
|
| 1243 |
+ |
|
| 1244 |
+ new_batch_size = self._size + digest.size_bytes
|
|
| 1245 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
| 1246 |
+ # Not enough space left in current batch
|
|
| 1247 |
+ return False
|
|
| 1248 |
+ |
|
| 1249 |
+ blob_request = self._request.requests.add()
|
|
| 1250 |
+ blob_request.digest.hash = digest.hash
|
|
| 1251 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
| 1252 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
| 1253 |
+ self._size = new_batch_size
|
|
| 1254 |
+ return True
|
|
| 1255 |
+ |
|
| 1256 |
+ def send(self):
|
|
| 1257 |
+ assert not self._sent
|
|
| 1258 |
+ self._sent = True
|
|
| 1259 |
+ |
|
| 1260 |
+ if len(self._request.requests) == 0:
|
|
| 1261 |
+ return
|
|
| 1262 |
+ |
|
| 1263 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
| 1264 |
+ |
|
| 1265 |
+ for response in batch_response.responses:
|
|
| 1266 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
| 1267 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
| 1268 |
+ response.digest.hash, response.status.code))
|
|
| 1269 |
+ |
|
| 1270 |
+ |
|
| 1176 | 1271 |
def _grouper(iterable, n):
|
| 1177 | 1272 |
while True:
|
| 1178 | 1273 |
try:
|
| ... | ... | @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push): |
| 68 | 68 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
| 69 | 69 |
|
| 70 | 70 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 71 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
| 71 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 72 | 72 |
|
| 73 | 73 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 74 | 74 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 222 | 222 |
|
| 223 | 223 |
|
| 224 | 224 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 225 |
- def __init__(self, cas):
|
|
| 225 |
+ def __init__(self, cas, *, enable_push):
|
|
| 226 | 226 |
super().__init__()
|
| 227 | 227 |
self.cas = cas
|
| 228 |
+ self.enable_push = enable_push
|
|
| 228 | 229 |
|
| 229 | 230 |
def FindMissingBlobs(self, request, context):
|
| 230 | 231 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| ... | ... | @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 260 | 261 |
|
| 261 | 262 |
return response
|
| 262 | 263 |
|
| 264 |
+ def BatchUpdateBlobs(self, request, context):
|
|
| 265 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
| 266 |
+ |
|
| 267 |
+ if not self.enable_push:
|
|
| 268 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
| 269 |
+ return response
|
|
| 270 |
+ |
|
| 271 |
+ batch_size = 0
|
|
| 272 |
+ |
|
| 273 |
+ for blob_request in request.requests:
|
|
| 274 |
+ digest = blob_request.digest
|
|
| 275 |
+ |
|
| 276 |
+ batch_size += digest.size_bytes
|
|
| 277 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
| 278 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 279 |
+ return response
|
|
| 280 |
+ |
|
| 281 |
+ blob_response = response.responses.add()
|
|
| 282 |
+ blob_response.digest.hash = digest.hash
|
|
| 283 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
| 284 |
+ |
|
| 285 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
| 286 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 287 |
+ continue
|
|
| 288 |
+ |
|
| 289 |
+ try:
|
|
| 290 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 291 |
+ |
|
| 292 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
| 293 |
+ out.write(blob_request.data)
|
|
| 294 |
+ out.flush()
|
|
| 295 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
| 296 |
+ if server_digest.hash != digest.hash:
|
|
| 297 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 298 |
+ |
|
| 299 |
+ except ArtifactTooLargeException:
|
|
| 300 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
| 301 |
+ |
|
| 302 |
+ return response
|
|
| 303 |
+ |
|
| 263 | 304 |
|
| 264 | 305 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
| 265 | 306 |
def GetCapabilities(self, request, context):
|
| ... | ... | @@ -305,10 +305,12 @@ def init(app, project_name, format_version, element_path, force): |
| 305 | 305 |
help="Allow tracking to cross junction boundaries")
|
| 306 | 306 |
@click.option('--track-save', default=False, is_flag=True,
|
| 307 | 307 |
help="Deprecated: This is ignored")
|
| 308 |
+@click.option('--pull-buildtrees', default=False, is_flag=True,
|
|
| 309 |
+ help="Pull buildtrees from a remote cache server")
|
|
| 308 | 310 |
@click.argument('elements', nargs=-1,
|
| 309 | 311 |
type=click.Path(readable=False))
|
| 310 | 312 |
@click.pass_obj
|
| 311 |
-def build(app, elements, all_, track_, track_save, track_all, track_except, track_cross_junctions):
|
|
| 313 |
+def build(app, elements, all_, track_, track_save, track_all, track_except, track_cross_junctions, pull_buildtrees):
|
|
| 312 | 314 |
"""Build elements in a pipeline"""
|
| 313 | 315 |
|
| 314 | 316 |
if (track_except or track_cross_junctions) and not (track_ or track_all):
|
| ... | ... | @@ -327,7 +329,8 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac |
| 327 | 329 |
track_targets=track_,
|
| 328 | 330 |
track_except=track_except,
|
| 329 | 331 |
track_cross_junctions=track_cross_junctions,
|
| 330 |
- build_all=all_)
|
|
| 332 |
+ build_all=all_,
|
|
| 333 |
+ pull_buildtrees=pull_buildtrees)
|
|
| 331 | 334 |
|
| 332 | 335 |
|
| 333 | 336 |
##################################################################
|
| ... | ... | @@ -429,10 +432,12 @@ def track(app, elements, deps, except_, cross_junctions): |
| 429 | 432 |
help='The dependency artifacts to pull (default: none)')
|
| 430 | 433 |
@click.option('--remote', '-r',
|
| 431 | 434 |
help="The URL of the remote cache (defaults to the first configured cache)")
|
| 435 |
+@click.option('--pull-buildtrees', default=False, is_flag=True,
|
|
| 436 |
+ help="Pull buildtrees from a remote cache server")
|
|
| 432 | 437 |
@click.argument('elements', nargs=-1,
|
| 433 | 438 |
type=click.Path(readable=False))
|
| 434 | 439 |
@click.pass_obj
|
| 435 |
-def pull(app, elements, deps, remote):
|
|
| 440 |
+def pull(app, elements, deps, remote, pull_buildtrees):
|
|
| 436 | 441 |
"""Pull a built artifact from the configured remote artifact cache.
|
| 437 | 442 |
|
| 438 | 443 |
By default the artifact will be pulled one of the configured caches
|
| ... | ... | @@ -446,7 +451,7 @@ def pull(app, elements, deps, remote): |
| 446 | 451 |
all: All dependencies
|
| 447 | 452 |
"""
|
| 448 | 453 |
with app.initialized(session_name="Pull"):
|
| 449 |
- app.stream.pull(elements, selection=deps, remote=remote)
|
|
| 454 |
+ app.stream.pull(elements, selection=deps, remote=remote, pull_buildtrees=pull_buildtrees)
|
|
| 450 | 455 |
|
| 451 | 456 |
|
| 452 | 457 |
##################################################################
|
| ... | ... | @@ -32,9 +32,20 @@ class PullQueue(Queue): |
| 32 | 32 |
complete_name = "Pulled"
|
| 33 | 33 |
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
|
| 34 | 34 |
|
| 35 |
+ def __init__(self, scheduler, buildtrees=False):
|
|
| 36 |
+ super().__init__(scheduler)
|
|
| 37 |
+ |
|
| 38 |
+ # Current default exclusions on pull
|
|
| 39 |
+ self._excluded_subdirs = ["buildtree"]
|
|
| 40 |
+ self._subdir = None
|
|
| 41 |
+ # If buildtrees are to be pulled, remove the value from exclusion list
|
|
| 42 |
+ if buildtrees:
|
|
| 43 |
+ self._subdir = "buildtree"
|
|
| 44 |
+ self._excluded_subdirs.remove(self._subdir)
|
|
| 45 |
+ |
|
| 35 | 46 |
def process(self, element):
|
| 36 | 47 |
# returns whether an artifact was downloaded or not
|
| 37 |
- if not element._pull():
|
|
| 48 |
+ if not element._pull(subdir=self._subdir, excluded_subdirs=self._excluded_subdirs):
|
|
| 38 | 49 |
raise SkipJob(self.action_name)
|
| 39 | 50 |
|
| 40 | 51 |
def status(self, element):
|
| ... | ... | @@ -49,7 +60,7 @@ class PullQueue(Queue): |
| 49 | 60 |
if not element._can_query_cache():
|
| 50 | 61 |
return QueueStatus.WAIT
|
| 51 | 62 |
|
| 52 |
- if element._pull_pending():
|
|
| 63 |
+ if element._pull_pending(subdir=self._subdir):
|
|
| 53 | 64 |
return QueueStatus.READY
|
| 54 | 65 |
else:
|
| 55 | 66 |
return QueueStatus.SKIP
|
| ... | ... | @@ -160,12 +160,14 @@ class Stream(): |
| 160 | 160 |
# track_cross_junctions (bool): Whether tracking should cross junction boundaries
|
| 161 | 161 |
# build_all (bool): Whether to build all elements, or only those
|
| 162 | 162 |
# which are required to build the target.
|
| 163 |
+ # pull_buildtrees (bool): Whether to pull buildtrees from a remote cache server
|
|
| 163 | 164 |
#
|
| 164 | 165 |
def build(self, targets, *,
|
| 165 | 166 |
track_targets=None,
|
| 166 | 167 |
track_except=None,
|
| 167 | 168 |
track_cross_junctions=False,
|
| 168 |
- build_all=False):
|
|
| 169 |
+ build_all=False,
|
|
| 170 |
+ pull_buildtrees=False):
|
|
| 169 | 171 |
|
| 170 | 172 |
if build_all:
|
| 171 | 173 |
selection = PipelineSelection.ALL
|
| ... | ... | @@ -195,7 +197,11 @@ class Stream(): |
| 195 | 197 |
self._add_queue(track_queue, track=True)
|
| 196 | 198 |
|
| 197 | 199 |
if self._artifacts.has_fetch_remotes():
|
| 198 |
- self._add_queue(PullQueue(self._scheduler))
|
|
| 200 |
+ # Query if any of the user defined artifact servers have buildtrees set
|
|
| 201 |
+ for cache in self._context.artifact_cache_specs:
|
|
| 202 |
+ if cache.buildtrees:
|
|
| 203 |
+ pull_buildtrees = True
|
|
| 204 |
+ self._add_queue(PullQueue(self._scheduler, buildtrees=pull_buildtrees))
|
|
| 199 | 205 |
|
| 200 | 206 |
self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
|
| 201 | 207 |
self._add_queue(BuildQueue(self._scheduler))
|
| ... | ... | @@ -295,7 +301,8 @@ class Stream(): |
| 295 | 301 |
#
|
| 296 | 302 |
def pull(self, targets, *,
|
| 297 | 303 |
selection=PipelineSelection.NONE,
|
| 298 |
- remote=None):
|
|
| 304 |
+ remote=None,
|
|
| 305 |
+ pull_buildtrees=False):
|
|
| 299 | 306 |
|
| 300 | 307 |
use_config = True
|
| 301 | 308 |
if remote:
|
| ... | ... | @@ -310,8 +317,13 @@ class Stream(): |
| 310 | 317 |
if not self._artifacts.has_fetch_remotes():
|
| 311 | 318 |
raise StreamError("No artifact caches available for pulling artifacts")
|
| 312 | 319 |
|
| 320 |
+ # Query if any of the user defined artifact servers have buildtrees set
|
|
| 321 |
+ for cache in self._context.artifact_cache_specs:
|
|
| 322 |
+ if cache.buildtrees:
|
|
| 323 |
+ pull_buildtrees = True
|
|
| 324 |
+ |
|
| 313 | 325 |
self._pipeline.assert_consistent(elements)
|
| 314 |
- self._add_queue(PullQueue(self._scheduler))
|
|
| 326 |
+ self._add_queue(PullQueue(self._scheduler, buildtrees=pull_buildtrees))
|
|
| 315 | 327 |
self._enqueue_plan(elements)
|
| 316 | 328 |
self._run()
|
| 317 | 329 |
|
| ... | ... | @@ -1689,18 +1689,26 @@ class Element(Plugin): |
| 1689 | 1689 |
|
| 1690 | 1690 |
# _pull_pending()
|
| 1691 | 1691 |
#
|
| 1692 |
- # Check whether the artifact will be pulled.
|
|
| 1692 |
+ # Check whether the artifact will be pulled. If the pull operation is to
|
|
| 1693 |
+ # include a specific subdir of the element artifact (from cli or user conf)
|
|
| 1694 |
+ # then the local cache is queried for the subdirs existence.
|
|
| 1695 |
+ #
|
|
| 1696 |
+ # Args:
|
|
| 1697 |
+ # subdir (str): Whether the pull has been invoked with a specific subdir set
|
|
| 1693 | 1698 |
#
|
| 1694 | 1699 |
# Returns:
|
| 1695 | 1700 |
# (bool): Whether a pull operation is pending
|
| 1696 | 1701 |
#
|
| 1697 |
- def _pull_pending(self):
|
|
| 1702 |
+ def _pull_pending(self, subdir=None):
|
|
| 1698 | 1703 |
if self._get_workspace():
|
| 1699 | 1704 |
# Workspace builds are never pushed to artifact servers
|
| 1700 | 1705 |
return False
|
| 1701 | 1706 |
|
| 1702 |
- if self.__strong_cached:
|
|
| 1703 |
- # Artifact already in local cache
|
|
| 1707 |
+ if self.__strong_cached and subdir:
|
|
| 1708 |
+ # If we've specified a subdir, check if the subdir is cached locally
|
|
| 1709 |
+ if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
|
|
| 1710 |
+ return False
|
|
| 1711 |
+ elif self.__strong_cached:
|
|
| 1704 | 1712 |
return False
|
| 1705 | 1713 |
|
| 1706 | 1714 |
# Pull is pending if artifact remote server available
|
| ... | ... | @@ -1722,11 +1730,10 @@ class Element(Plugin): |
| 1722 | 1730 |
|
| 1723 | 1731 |
self._update_state()
|
| 1724 | 1732 |
|
| 1725 |
- def _pull_strong(self, *, progress=None):
|
|
| 1733 |
+ def _pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
| 1726 | 1734 |
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
| 1727 |
- |
|
| 1728 | 1735 |
key = self.__strict_cache_key
|
| 1729 |
- if not self.__artifacts.pull(self, key, progress=progress):
|
|
| 1736 |
+ if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
|
|
| 1730 | 1737 |
return False
|
| 1731 | 1738 |
|
| 1732 | 1739 |
# update weak ref by pointing it to this newly fetched artifact
|
| ... | ... | @@ -1734,10 +1741,10 @@ class Element(Plugin): |
| 1734 | 1741 |
|
| 1735 | 1742 |
return True
|
| 1736 | 1743 |
|
| 1737 |
- def _pull_weak(self, *, progress=None):
|
|
| 1744 |
+ def _pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
|
|
| 1738 | 1745 |
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
|
| 1739 |
- |
|
| 1740 |
- if not self.__artifacts.pull(self, weak_key, progress=progress):
|
|
| 1746 |
+ if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
|
|
| 1747 |
+ excluded_subdirs=excluded_subdirs):
|
|
| 1741 | 1748 |
return False
|
| 1742 | 1749 |
|
| 1743 | 1750 |
# extract strong cache key from this newly fetched artifact
|
| ... | ... | @@ -1755,17 +1762,17 @@ class Element(Plugin): |
| 1755 | 1762 |
#
|
| 1756 | 1763 |
# Returns: True if the artifact has been downloaded, False otherwise
|
| 1757 | 1764 |
#
|
| 1758 |
- def _pull(self):
|
|
| 1765 |
+ def _pull(self, subdir=None, excluded_subdirs=None):
|
|
| 1759 | 1766 |
context = self._get_context()
|
| 1760 | 1767 |
|
| 1761 | 1768 |
def progress(percent, message):
|
| 1762 | 1769 |
self.status(message)
|
| 1763 | 1770 |
|
| 1764 | 1771 |
# Attempt to pull artifact without knowing whether it's available
|
| 1765 |
- pulled = self._pull_strong(progress=progress)
|
|
| 1772 |
+ pulled = self._pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
|
|
| 1766 | 1773 |
|
| 1767 | 1774 |
if not pulled and not self._cached() and not context.get_strict():
|
| 1768 |
- pulled = self._pull_weak(progress=progress)
|
|
| 1775 |
+ pulled = self._pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
|
|
| 1769 | 1776 |
|
| 1770 | 1777 |
if not pulled:
|
| 1771 | 1778 |
return False
|
| ... | ... | @@ -1788,10 +1795,14 @@ class Element(Plugin): |
| 1788 | 1795 |
if not self._cached():
|
| 1789 | 1796 |
return True
|
| 1790 | 1797 |
|
| 1791 |
- # Do not push tained artifact
|
|
| 1798 |
+ # Do not push tainted artifact
|
|
| 1792 | 1799 |
if self.__get_tainted():
|
| 1793 | 1800 |
return True
|
| 1794 | 1801 |
|
| 1802 |
+ # Do not push elements that have a dangling buildtree artifact unless element type is
|
|
| 1803 |
+ # expected to have an empty buildtree directory
|
|
| 1804 |
+ if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
|
|
| 1805 |
+ return True
|
|
| 1795 | 1806 |
return False
|
| 1796 | 1807 |
|
| 1797 | 1808 |
# _push():
|
| ... | ... | @@ -177,15 +177,11 @@ class SandboxRemote(Sandbox): |
| 177 | 177 |
if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
|
| 178 | 178 |
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
| 179 | 179 |
|
| 180 |
- # Set up environment and working directory
|
|
| 181 |
- if cwd is None:
|
|
| 182 |
- cwd = self._get_work_directory()
|
|
| 183 |
- |
|
| 184 |
- if cwd is None:
|
|
| 185 |
- cwd = '/'
|
|
| 186 |
- |
|
| 187 |
- if env is None:
|
|
| 188 |
- env = self._get_environment()
|
|
| 180 |
+ # Fallback to the sandbox default settings for
|
|
| 181 |
+ # the cwd and env.
|
|
| 182 |
+ #
|
|
| 183 |
+ cwd = self._get_work_directory(cwd=cwd)
|
|
| 184 |
+ env = self._get_environment(cwd=cwd, env=env)
|
|
| 189 | 185 |
|
| 190 | 186 |
# We want command args as a list of strings
|
| 191 | 187 |
if isinstance(command, str):
|
| ... | ... | @@ -103,7 +103,7 @@ def test_commands(cli, cmd, word_idx, expected): |
| 103 | 103 |
('bst --no-colors build -', 3, ['--all ', '--track ', '--track-all ',
|
| 104 | 104 |
'--track-except ',
|
| 105 | 105 |
'--track-cross-junctions ', '-J ',
|
| 106 |
- '--track-save ']),
|
|
| 106 |
+ '--track-save ', '--pull-buildtrees ']),
|
|
| 107 | 107 |
|
| 108 | 108 |
# Test the behavior of completing after an option that has a
|
| 109 | 109 |
# parameter that cannot be completed, vs an option that has
|
