Jürg Billeter pushed to branch juerg/cas-batch at BuildStream / buildstream
Commits:
-
68ef69e4
by Tristan Van Berkom at 2018-09-21T05:20:46Z
-
662c729f
by Tristan Van Berkom at 2018-09-21T05:59:30Z
-
461a0588
by Jim MacArthur at 2018-09-21T10:53:11Z
-
aa9caaac
by Jim MacArthur at 2018-09-21T10:53:11Z
-
2aae68c7
by Jim MacArthur at 2018-09-21T10:53:11Z
-
ca1bb72c
by Jim MacArthur at 2018-09-21T10:53:11Z
-
55c93a82
by Jim MacArthur at 2018-09-21T11:26:55Z
-
e209beb0
by Chandan Singh at 2018-09-21T13:10:08Z
-
0b000518
by Chandan Singh at 2018-09-21T13:56:55Z
-
ef26043a
by Chandan Singh at 2018-09-21T17:14:16Z
-
1b2aed40
by Chandan Singh at 2018-09-21T17:40:11Z
-
c3176024
by Jürg Billeter at 2018-09-24T08:55:47Z
-
dd2fb1aa
by Jürg Billeter at 2018-09-24T09:04:18Z
-
d457f14e
by Jürg Billeter at 2018-09-24T09:08:48Z
-
7966dfa0
by Jürg Billeter at 2018-09-24T09:08:48Z
11 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_stream.py
- buildstream/element.py
- buildstream/sandbox/_sandboxremote.py
- buildstream/source.py
- setup.py
- tests/artifactcache/push.py
- + tests/frontend/project/elements/source-bundle/source-bundle-hello.bst
- + tests/frontend/project/files/source-bundle/llamas.txt
- + tests/frontend/source_bundle.py
Changes:
| ... | ... | @@ -44,6 +44,11 @@ from .._exceptions import ArtifactError |
| 44 | 44 |
from . import ArtifactCache
|
| 45 | 45 |
|
| 46 | 46 |
|
| 47 |
+# The default limit for gRPC messages is 4 MiB.
|
|
| 48 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
| 49 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
| 50 |
+ |
|
| 51 |
+ |
|
| 47 | 52 |
# A CASCache manages artifacts in a CAS repository as specified in the
|
| 48 | 53 |
# Remote Execution API.
|
| 49 | 54 |
#
|
| ... | ... | @@ -348,19 +353,29 @@ class CASCache(ArtifactCache): |
| 348 | 353 |
return pushed
|
| 349 | 354 |
|
| 350 | 355 |
def push_directory(self, project, directory):
|
| 356 |
+ """ Push the given virtual directory to all remotes.
|
|
| 357 |
+ |
|
| 358 |
+ Args:
|
|
| 359 |
+ project (Project): The current project
|
|
| 360 |
+ directory (Directory): A virtual directory object to push.
|
|
| 361 |
+ |
|
| 362 |
+ Raises: ArtifactError if no push remotes are configured.
|
|
| 363 |
+ """
|
|
| 351 | 364 |
|
| 352 | 365 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
| 353 | 366 |
|
| 367 |
+ if not push_remotes:
|
|
| 368 |
+ raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
|
|
| 369 |
+ "servers are configured as push remotes.")
|
|
| 370 |
+ |
|
| 354 | 371 |
if directory.ref is None:
|
| 355 |
- return None
|
|
| 372 |
+ return
|
|
| 356 | 373 |
|
| 357 | 374 |
for remote in push_remotes:
|
| 358 | 375 |
remote.init()
|
| 359 | 376 |
|
| 360 | 377 |
self._send_directory(remote, directory.ref)
|
| 361 | 378 |
|
| 362 |
- return directory.ref
|
|
| 363 |
- |
|
| 364 | 379 |
def push_message(self, project, message):
|
| 365 | 380 |
|
| 366 | 381 |
push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
| ... | ... | @@ -844,6 +859,40 @@ class CASCache(ArtifactCache): |
| 844 | 859 |
|
| 845 | 860 |
assert digest.size_bytes == os.fstat(stream.fileno()).st_size
|
| 846 | 861 |
|
| 862 |
+ # _ensure_blob():
|
|
| 863 |
+ #
|
|
| 864 |
+ # Fetch and add blob if it's not already local.
|
|
| 865 |
+ #
|
|
| 866 |
+ # Args:
|
|
| 867 |
+ # remote (Remote): The remote to use.
|
|
| 868 |
+ # digest (Digest): Digest object for the blob to fetch.
|
|
| 869 |
+ #
|
|
| 870 |
+ # Returns:
|
|
| 871 |
+ # (str): The path of the object
|
|
| 872 |
+ #
|
|
| 873 |
+ def _ensure_blob(self, remote, digest):
|
|
| 874 |
+ objpath = self.objpath(digest)
|
|
| 875 |
+ if os.path.exists(objpath):
|
|
| 876 |
+ # already in local repository
|
|
| 877 |
+ return objpath
|
|
| 878 |
+ |
|
| 879 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 880 |
+ self._fetch_blob(remote, digest, f)
|
|
| 881 |
+ |
|
| 882 |
+ added_digest = self.add_object(path=f.name)
|
|
| 883 |
+ assert added_digest.hash == digest.hash
|
|
| 884 |
+ |
|
| 885 |
+ return objpath
|
|
| 886 |
+ |
|
| 887 |
+ def _batch_download_complete(self, batch):
|
|
| 888 |
+ for digest, data in batch.send():
|
|
| 889 |
+ with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 890 |
+ f.write(data)
|
|
| 891 |
+ f.flush()
|
|
| 892 |
+ |
|
| 893 |
+ added_digest = self.add_object(path=f.name)
|
|
| 894 |
+ assert added_digest.hash == digest.hash
|
|
| 895 |
+ |
|
| 847 | 896 |
# _fetch_directory():
|
| 848 | 897 |
#
|
| 849 | 898 |
# Fetches remote directory and adds it to content addressable store.
|
| ... | ... | @@ -857,39 +906,73 @@ class CASCache(ArtifactCache): |
| 857 | 906 |
# dir_digest (Digest): Digest object for the directory to fetch.
|
| 858 | 907 |
#
|
| 859 | 908 |
def _fetch_directory(self, remote, dir_digest):
|
| 860 |
- objpath = self.objpath(dir_digest)
|
|
| 861 |
- if os.path.exists(objpath):
|
|
| 862 |
- # already in local cache
|
|
| 863 |
- return
|
|
| 909 |
+ fetch_queue = [dir_digest]
|
|
| 910 |
+ fetch_next_queue = []
|
|
| 911 |
+ batch = _CASBatchRead(remote)
|
|
| 864 | 912 |
|
| 865 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 866 |
- self._fetch_blob(remote, dir_digest, out)
|
|
| 913 |
+ def fetch_batch(batch):
|
|
| 914 |
+ self._batch_download_complete(batch)
|
|
| 867 | 915 |
|
| 868 |
- directory = remote_execution_pb2.Directory()
|
|
| 916 |
+ # All previously scheduled directories are now locally available,
|
|
| 917 |
+ # move them to the processing queue.
|
|
| 918 |
+ fetch_queue.extend(fetch_next_queue)
|
|
| 919 |
+ fetch_next_queue.clear()
|
|
| 920 |
+ return _CASBatchRead(remote)
|
|
| 869 | 921 |
|
| 870 |
- with open(out.name, 'rb') as f:
|
|
| 922 |
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
|
|
| 923 |
+ if len(fetch_queue) == 0:
|
|
| 924 |
+ batch = fetch_batch(batch)
|
|
| 925 |
+ |
|
| 926 |
+ dir_digest = fetch_queue.pop(0)
|
|
| 927 |
+ |
|
| 928 |
+ objpath = self._ensure_blob(remote, dir_digest)
|
|
| 929 |
+ |
|
| 930 |
+ directory = remote_execution_pb2.Directory()
|
|
| 931 |
+ with open(objpath, 'rb') as f:
|
|
| 871 | 932 |
directory.ParseFromString(f.read())
|
| 872 | 933 |
|
| 873 |
- for filenode in directory.files:
|
|
| 874 |
- fileobjpath = self.objpath(filenode.digest)
|
|
| 875 |
- if os.path.exists(fileobjpath):
|
|
| 876 |
- # already in local cache
|
|
| 934 |
+ for dirnode in directory.directories:
|
|
| 935 |
+ if os.path.exists(self.objpath(dirnode.digest)):
|
|
| 936 |
+ # Skip download, already in local cache.
|
|
| 937 |
+ # Add directory to processing queue.
|
|
| 938 |
+ fetch_queue.append(dirnode.digest)
|
|
| 877 | 939 |
continue
|
| 878 | 940 |
|
| 879 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 880 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
| 941 |
+ if (dirnode.digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 942 |
+ not remote.batch_read_supported):
|
|
| 943 |
+ # Too large for batch request, download in independent request.
|
|
| 944 |
+ self._ensure_blob(remote, dirnode.digest)
|
|
| 945 |
+ # Add directory to processing queue.
|
|
| 946 |
+ fetch_queue.append(dirnode.digest)
|
|
| 947 |
+ else:
|
|
| 948 |
+ if not batch.add(dirnode.digest):
|
|
| 949 |
+ # Not enough space left in batch request.
|
|
| 950 |
+ # Complete pending batch first.
|
|
| 951 |
+ batch = fetch_batch(batch)
|
|
| 952 |
+ batch.add(dirnode.digest)
|
|
| 881 | 953 |
|
| 882 |
- digest = self.add_object(path=f.name)
|
|
| 883 |
- assert digest.hash == filenode.digest.hash
|
|
| 954 |
+ # Directory will be available after completing pending batch.
|
|
| 955 |
+ # Add directory to deferred processing queue.
|
|
| 956 |
+ fetch_next_queue.append(dirnode.digest)
|
|
| 884 | 957 |
|
| 885 |
- for dirnode in directory.directories:
|
|
| 886 |
- self._fetch_directory(remote, dirnode.digest)
|
|
| 958 |
+ for filenode in directory.files:
|
|
| 959 |
+ if os.path.exists(self.objpath(filenode.digest)):
|
|
| 960 |
+ # Skip download, already in local cache.
|
|
| 961 |
+ continue
|
|
| 962 |
+ |
|
| 963 |
+ if (filenode.digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 964 |
+ not remote.batch_read_supported):
|
|
| 965 |
+ # Too large for batch request, download in independent request.
|
|
| 966 |
+ self._ensure_blob(remote, filenode.digest)
|
|
| 967 |
+ else:
|
|
| 968 |
+ if not batch.add(filenode.digest):
|
|
| 969 |
+ # Not enough space left in batch request.
|
|
| 970 |
+ # Complete pending batch first.
|
|
| 971 |
+ batch = fetch_batch(batch)
|
|
| 972 |
+ batch.add(filenode.digest)
|
|
| 887 | 973 |
|
| 888 |
- # Place directory blob only in final location when we've
|
|
| 889 |
- # downloaded all referenced blobs to avoid dangling
|
|
| 890 |
- # references in the repository.
|
|
| 891 |
- digest = self.add_object(path=out.name)
|
|
| 892 |
- assert digest.hash == dir_digest.hash
|
|
| 974 |
+ # Fetch final batch
|
|
| 975 |
+ fetch_batch(batch)
|
|
| 893 | 976 |
|
| 894 | 977 |
def _fetch_tree(self, remote, digest):
|
| 895 | 978 |
# download but do not store the Tree object
|
| ... | ... | @@ -904,16 +987,7 @@ class CASCache(ArtifactCache): |
| 904 | 987 |
tree.children.extend([tree.root])
|
| 905 | 988 |
for directory in tree.children:
|
| 906 | 989 |
for filenode in directory.files:
|
| 907 |
- fileobjpath = self.objpath(filenode.digest)
|
|
| 908 |
- if os.path.exists(fileobjpath):
|
|
| 909 |
- # already in local cache
|
|
| 910 |
- continue
|
|
| 911 |
- |
|
| 912 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
|
|
| 913 |
- self._fetch_blob(remote, filenode.digest, f)
|
|
| 914 |
- |
|
| 915 |
- added_digest = self.add_object(path=f.name)
|
|
| 916 |
- assert added_digest.hash == filenode.digest.hash
|
|
| 990 |
+ self._ensure_blob(remote, filenode.digest)
|
|
| 917 | 991 |
|
| 918 | 992 |
# place directory blob only in final location when we've downloaded
|
| 919 | 993 |
# all referenced blobs to avoid dangling references in the repository
|
| ... | ... | @@ -932,12 +1006,12 @@ class CASCache(ArtifactCache): |
| 932 | 1006 |
finished = False
|
| 933 | 1007 |
remaining = digest.size_bytes
|
| 934 | 1008 |
while not finished:
|
| 935 |
- chunk_size = min(remaining, 64 * 1024)
|
|
| 1009 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
| 936 | 1010 |
remaining -= chunk_size
|
| 937 | 1011 |
|
| 938 | 1012 |
request = bytestream_pb2.WriteRequest()
|
| 939 | 1013 |
request.write_offset = offset
|
| 940 |
- # max. 64 kB chunks
|
|
| 1014 |
+ # max. _MAX_PAYLOAD_BYTES chunks
|
|
| 941 | 1015 |
request.data = instream.read(chunk_size)
|
| 942 | 1016 |
request.resource_name = resname
|
| 943 | 1017 |
request.finish_write = remaining <= 0
|
| ... | ... | @@ -1025,11 +1099,78 @@ class _CASRemote(): |
| 1025 | 1099 |
|
| 1026 | 1100 |
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
| 1027 | 1101 |
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
| 1102 |
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
|
|
| 1028 | 1103 |
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
|
| 1029 | 1104 |
|
| 1105 |
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
| 1106 |
+ try:
|
|
| 1107 |
+ request = remote_execution_pb2.GetCapabilitiesRequest()
|
|
| 1108 |
+ response = self.capabilities.GetCapabilities(request)
|
|
| 1109 |
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
|
|
| 1110 |
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
|
|
| 1111 |
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
|
|
| 1112 |
+ except grpc.RpcError as e:
|
|
| 1113 |
+ # Simply use the defaults for servers that don't implement GetCapabilities()
|
|
| 1114 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
| 1115 |
+ raise
|
|
| 1116 |
+ |
|
| 1117 |
+ # Check whether the server supports BatchReadBlobs()
|
|
| 1118 |
+ self.batch_read_supported = False
|
|
| 1119 |
+ try:
|
|
| 1120 |
+ request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
| 1121 |
+ response = self.cas.BatchReadBlobs(request)
|
|
| 1122 |
+ self.batch_read_supported = True
|
|
| 1123 |
+ except grpc.RpcError as e:
|
|
| 1124 |
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
|
| 1125 |
+ raise
|
|
| 1126 |
+ |
|
| 1030 | 1127 |
self._initialized = True
|
| 1031 | 1128 |
|
| 1032 | 1129 |
|
| 1130 |
+# Represents a batch of blobs queued for fetching.
|
|
| 1131 |
+#
|
|
| 1132 |
+class _CASBatchRead():
|
|
| 1133 |
+ def __init__(self, remote):
|
|
| 1134 |
+ self._remote = remote
|
|
| 1135 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
| 1136 |
+ self._request = remote_execution_pb2.BatchReadBlobsRequest()
|
|
| 1137 |
+ self._size = 0
|
|
| 1138 |
+ self._sent = False
|
|
| 1139 |
+ |
|
| 1140 |
+ def add(self, digest):
|
|
| 1141 |
+ assert not self._sent
|
|
| 1142 |
+ |
|
| 1143 |
+ new_batch_size = self._size + digest.size_bytes
|
|
| 1144 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
| 1145 |
+ # Not enough space left in current batch
|
|
| 1146 |
+ return False
|
|
| 1147 |
+ |
|
| 1148 |
+ request_digest = self._request.digests.add()
|
|
| 1149 |
+ request_digest.hash = digest.hash
|
|
| 1150 |
+ request_digest.size_bytes = digest.size_bytes
|
|
| 1151 |
+ self._size = new_batch_size
|
|
| 1152 |
+ return True
|
|
| 1153 |
+ |
|
| 1154 |
+ def send(self):
|
|
| 1155 |
+ assert not self._sent
|
|
| 1156 |
+ self._sent = True
|
|
| 1157 |
+ |
|
| 1158 |
+ if len(self._request.digests) == 0:
|
|
| 1159 |
+ return
|
|
| 1160 |
+ |
|
| 1161 |
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
|
|
| 1162 |
+ |
|
| 1163 |
+ for response in batch_response.responses:
|
|
| 1164 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
| 1165 |
+ raise ArtifactError("Failed to download blob {}: {}".format(
|
|
| 1166 |
+ response.digest.hash, response.status.code))
|
|
| 1167 |
+ if response.digest.size_bytes != len(response.data):
|
|
| 1168 |
+ raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
|
|
| 1169 |
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
|
|
| 1170 |
+ |
|
| 1171 |
+ yield (response.digest, response.data)
|
|
| 1172 |
+ |
|
| 1173 |
+ |
|
| 1033 | 1174 |
def _grouper(iterable, n):
|
| 1034 | 1175 |
while True:
|
| 1035 | 1176 |
try:
|
| ... | ... | @@ -38,8 +38,9 @@ from .._context import Context |
| 38 | 38 |
from .cascache import CASCache
|
| 39 | 39 |
|
| 40 | 40 |
|
| 41 |
-# The default limit for gRPC messages is 4 MiB
|
|
| 42 |
-_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
|
|
| 41 |
+# The default limit for gRPC messages is 4 MiB.
|
|
| 42 |
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
|
|
| 43 |
+_MAX_PAYLOAD_BYTES = 1024 * 1024
|
|
| 43 | 44 |
|
| 44 | 45 |
|
| 45 | 46 |
# Trying to push an artifact that is too large
|
| ... | ... | @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 158 | 159 |
|
| 159 | 160 |
remaining = client_digest.size_bytes - request.read_offset
|
| 160 | 161 |
while remaining > 0:
|
| 161 |
- chunk_size = min(remaining, 64 * 1024)
|
|
| 162 |
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
|
|
| 162 | 163 |
remaining -= chunk_size
|
| 163 | 164 |
|
| 164 | 165 |
response = bytestream_pb2.ReadResponse()
|
| ... | ... | @@ -242,7 +243,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 242 | 243 |
|
| 243 | 244 |
for digest in request.digests:
|
| 244 | 245 |
batch_size += digest.size_bytes
|
| 245 |
- if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
|
|
| 246 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
| 246 | 247 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 247 | 248 |
return response
|
| 248 | 249 |
|
| ... | ... | @@ -269,7 +270,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): |
| 269 | 270 |
cache_capabilities = response.cache_capabilities
|
| 270 | 271 |
cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
|
| 271 | 272 |
cache_capabilities.action_cache_update_capabilities.update_enabled = False
|
| 272 |
- cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
|
|
| 273 |
+ cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
|
|
| 273 | 274 |
cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
|
| 274 | 275 |
|
| 275 | 276 |
response.deprecated_api_version.major = 2
|
| ... | ... | @@ -703,6 +703,7 @@ class Stream(): |
| 703 | 703 |
|
| 704 | 704 |
# Create a temporary directory to build the source tree in
|
| 705 | 705 |
builddir = self._context.builddir
|
| 706 |
+ os.makedirs(builddir, exist_ok=True)
|
|
| 706 | 707 |
prefix = "{}-".format(target.normal_name)
|
| 707 | 708 |
|
| 708 | 709 |
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
|
| ... | ... | @@ -1085,6 +1086,7 @@ class Stream(): |
| 1085 | 1086 |
for element in elements:
|
| 1086 | 1087 |
source_dir = os.path.join(directory, "source")
|
| 1087 | 1088 |
element_source_dir = os.path.join(source_dir, element.normal_name)
|
| 1089 |
+ os.makedirs(element_source_dir)
|
|
| 1088 | 1090 |
|
| 1089 | 1091 |
element._stage_sources_at(element_source_dir)
|
| 1090 | 1092 |
|
| ... | ... | @@ -2137,14 +2137,11 @@ class Element(Plugin): |
| 2137 | 2137 |
project = self._get_project()
|
| 2138 | 2138 |
platform = Platform.get_platform()
|
| 2139 | 2139 |
|
| 2140 |
- if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
|
|
| 2141 |
- if not self.__artifacts.has_push_remotes(element=self):
|
|
| 2142 |
- # Give an early warning if remote execution will not work
|
|
| 2143 |
- raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
|
|
| 2144 |
- .format(self.name) +
|
|
| 2145 |
- "The remote artifact server(s) may not be correctly configured or contactable.")
|
|
| 2146 |
- |
|
| 2147 |
- self.info("Using a remote sandbox for artifact {}".format(self.name))
|
|
| 2140 |
+ if (directory is not None and
|
|
| 2141 |
+ self.__remote_execution_url and
|
|
| 2142 |
+ self.BST_VIRTUAL_DIRECTORY):
|
|
| 2143 |
+ |
|
| 2144 |
+ self.info("Using a remote sandbox for artifact {} with directory '{}'".format(self.name, directory))
|
|
| 2148 | 2145 |
|
| 2149 | 2146 |
sandbox = SandboxRemote(context, project,
|
| 2150 | 2147 |
directory,
|
| ... | ... | @@ -173,8 +173,8 @@ class SandboxRemote(Sandbox): |
| 173 | 173 |
platform = Platform.get_platform()
|
| 174 | 174 |
cascache = platform.artifactcache
|
| 175 | 175 |
# Now, push that key (without necessarily needing a ref) to the remote.
|
| 176 |
- vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
|
|
| 177 |
- if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest):
|
|
| 176 |
+ cascache.push_directory(self._get_project(), upload_vdir)
|
|
| 177 |
+ if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
|
|
| 178 | 178 |
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
| 179 | 179 |
|
| 180 | 180 |
# Set up environment and working directory
|
| ... | ... | @@ -930,6 +930,38 @@ class Source(Plugin): |
| 930 | 930 |
# Local Private Methods #
|
| 931 | 931 |
#############################################################
|
| 932 | 932 |
|
| 933 |
+ # __clone_for_uri()
|
|
| 934 |
+ #
|
|
| 935 |
+ # Clone the source with an alternative URI setup for the alias
|
|
| 936 |
+ # which this source uses.
|
|
| 937 |
+ #
|
|
| 938 |
+ # This is used for iteration over source mirrors.
|
|
| 939 |
+ #
|
|
| 940 |
+ # Args:
|
|
| 941 |
+ # uri (str): The alternative URI for this source's alias
|
|
| 942 |
+ #
|
|
| 943 |
+ # Returns:
|
|
| 944 |
+ # (Source): A new clone of this Source, with the specified URI
|
|
| 945 |
+ # as the value of the alias this Source has marked as
|
|
| 946 |
+ # primary with either mark_download_url() or
|
|
| 947 |
+ # translate_url().
|
|
| 948 |
+ #
|
|
| 949 |
+ def __clone_for_uri(self, uri):
|
|
| 950 |
+ project = self._get_project()
|
|
| 951 |
+ context = self._get_context()
|
|
| 952 |
+ alias = self._get_alias()
|
|
| 953 |
+ source_kind = type(self)
|
|
| 954 |
+ |
|
| 955 |
+ clone = source_kind(context, project, self.__meta, alias_override=(alias, uri))
|
|
| 956 |
+ |
|
| 957 |
+ # Do the necessary post instantiation routines here
|
|
| 958 |
+ #
|
|
| 959 |
+ clone._preflight()
|
|
| 960 |
+ clone._load_ref()
|
|
| 961 |
+ clone._update_state()
|
|
| 962 |
+ |
|
| 963 |
+ return clone
|
|
| 964 |
+ |
|
| 933 | 965 |
# Tries to call fetch for every mirror, stopping once it succeeds
|
| 934 | 966 |
def __do_fetch(self, **kwargs):
|
| 935 | 967 |
project = self._get_project()
|
| ... | ... | @@ -968,12 +1000,8 @@ class Source(Plugin): |
| 968 | 1000 |
self.fetch(**kwargs)
|
| 969 | 1001 |
return
|
| 970 | 1002 |
|
| 971 |
- context = self._get_context()
|
|
| 972 |
- source_kind = type(self)
|
|
| 973 | 1003 |
for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
|
| 974 |
- new_source = source_kind(context, project, self.__meta,
|
|
| 975 |
- alias_override=(alias, uri))
|
|
| 976 |
- new_source._preflight()
|
|
| 1004 |
+ new_source = self.__clone_for_uri(uri)
|
|
| 977 | 1005 |
try:
|
| 978 | 1006 |
new_source.fetch(**kwargs)
|
| 979 | 1007 |
# FIXME: Need to consider temporary vs. permanent failures,
|
| ... | ... | @@ -1006,9 +1034,7 @@ class Source(Plugin): |
| 1006 | 1034 |
# NOTE: We are assuming here that tracking only requires substituting the
|
| 1007 | 1035 |
# first alias used
|
| 1008 | 1036 |
for uri in reversed(project.get_alias_uris(alias, first_pass=self.__first_pass)):
|
| 1009 |
- new_source = source_kind(context, project, self.__meta,
|
|
| 1010 |
- alias_override=(alias, uri))
|
|
| 1011 |
- new_source._preflight()
|
|
| 1037 |
+ new_source = self.__clone_for_uri(uri)
|
|
| 1012 | 1038 |
try:
|
| 1013 | 1039 |
ref = new_source.track(**kwargs)
|
| 1014 | 1040 |
# FIXME: Need to consider temporary vs. permanent failures,
|
| ... | ... | @@ -264,8 +264,9 @@ setup(name='BuildStream', |
| 264 | 264 |
license='LGPL',
|
| 265 | 265 |
long_description=long_description,
|
| 266 | 266 |
long_description_content_type='text/x-rst; charset=UTF-8',
|
| 267 |
- url='https://gitlab.com/BuildStream/buildstream',
|
|
| 267 |
+ url='https://buildstream.build',
|
|
| 268 | 268 |
project_urls={
|
| 269 |
+ 'Source': 'https://gitlab.com/BuildStream/buildstream',
|
|
| 269 | 270 |
'Documentation': 'https://buildstream.gitlab.io/buildstream/',
|
| 270 | 271 |
'Tracker': 'https://gitlab.com/BuildStream/buildstream/issues',
|
| 271 | 272 |
'Mailing List': 'https://mail.gnome.org/mailman/listinfo/buildstream-list'
|
| ... | ... | @@ -228,9 +228,9 @@ def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_d |
| 228 | 228 |
directory = CasBasedDirectory(context, ref=artifact_digest)
|
| 229 | 229 |
|
| 230 | 230 |
# Push the CasBasedDirectory object
|
| 231 |
- directory_digest = cas.push_directory(project, directory)
|
|
| 231 |
+ cas.push_directory(project, directory)
|
|
| 232 | 232 |
|
| 233 |
- queue.put(directory_digest.hash)
|
|
| 233 |
+ queue.put(directory.ref.hash)
|
|
| 234 | 234 |
else:
|
| 235 | 235 |
queue.put("No remote configured")
|
| 236 | 236 |
|
| 1 |
+kind: import
|
|
| 2 |
+description: the kind of this element must implement generate_script() method
|
|
| 3 |
+ |
|
| 4 |
+sources:
|
|
| 5 |
+- kind: local
|
|
| 6 |
+ path: files/source-bundle
|
| 1 |
+llamas
|
| 1 |
+#
|
|
| 2 |
+# Copyright (C) 2018 Bloomberg Finance LP
|
|
| 3 |
+#
|
|
| 4 |
+# This program is free software; you can redistribute it and/or
|
|
| 5 |
+# modify it under the terms of the GNU Lesser General Public
|
|
| 6 |
+# License as published by the Free Software Foundation; either
|
|
| 7 |
+# version 2 of the License, or (at your option) any later version.
|
|
| 8 |
+#
|
|
| 9 |
+# This library is distributed in the hope that it will be useful,
|
|
| 10 |
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
| 11 |
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
| 12 |
+# Lesser General Public License for more details.
|
|
| 13 |
+#
|
|
| 14 |
+# You should have received a copy of the GNU Lesser General Public
|
|
| 15 |
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
| 16 |
+#
|
|
| 17 |
+# Authors: Chandan Singh <csingh43 bloomberg net>
|
|
| 18 |
+#
|
|
| 19 |
+ |
|
| 20 |
+import os
|
|
| 21 |
+import tarfile
|
|
| 22 |
+ |
|
| 23 |
+import pytest
|
|
| 24 |
+ |
|
| 25 |
+from tests.testutils import cli
|
|
| 26 |
+ |
|
| 27 |
+# Project directory
|
|
| 28 |
+DATA_DIR = os.path.join(
|
|
| 29 |
+ os.path.dirname(os.path.realpath(__file__)),
|
|
| 30 |
+ "project",
|
|
| 31 |
+)
|
|
| 32 |
+ |
|
| 33 |
+ |
|
| 34 |
+@pytest.mark.datafiles(DATA_DIR)
|
|
| 35 |
+def test_source_bundle(cli, tmpdir, datafiles):
|
|
| 36 |
+ project_path = os.path.join(datafiles.dirname, datafiles.basename)
|
|
| 37 |
+ element_name = 'source-bundle/source-bundle-hello.bst'
|
|
| 38 |
+ normal_name = 'source-bundle-source-bundle-hello'
|
|
| 39 |
+ |
|
| 40 |
+ # Verify that we can correctly produce a source-bundle
|
|
| 41 |
+ args = ['source-bundle', element_name, '--directory', str(tmpdir)]
|
|
| 42 |
+ result = cli.run(project=project_path, args=args)
|
|
| 43 |
+ result.assert_success()
|
|
| 44 |
+ |
|
| 45 |
+ # Verify that the source-bundle contains our sources and a build script
|
|
| 46 |
+ with tarfile.open(os.path.join(str(tmpdir), '{}.tar.gz'.format(normal_name))) as bundle:
|
|
| 47 |
+ assert os.path.join(normal_name, 'source', normal_name, 'llamas.txt') in bundle.getnames()
|
|
| 48 |
+ assert os.path.join(normal_name, 'build.sh') in bundle.getnames()
|
