Jürg Billeter pushed to branch juerg/sandbox at BuildStream / buildstream
Commits:
-
14e1a3b3
by Jürg Billeter at 2018-10-01T08:18:05Z
-
232662f1
by Jürg Billeter at 2018-10-01T08:53:06Z
-
f447aedd
by Tiago Gomes at 2018-10-01T10:33:11Z
-
682dddce
by Tiago Gomes at 2018-10-01T10:35:12Z
-
fafa8136
by Tiago Gomes at 2018-10-01T10:59:54Z
-
26e1a3c7
by Jürg Billeter at 2018-10-01T14:58:06Z
-
f47895c0
by Jürg Billeter at 2018-10-01T14:58:06Z
-
cf00c0a1
by Jürg Billeter at 2018-10-01T15:32:30Z
-
5f4ae90b
by Jürg Billeter at 2018-10-02T06:34:02Z
6 changed files:
- .gitlab-ci.yml
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_platform/darwin.py
- buildstream/_platform/platform.py
- buildstream/sandbox/_sandboxremote.py
Changes:
| ... | ... | @@ -161,14 +161,14 @@ docs: |
| 161 | 161 |
.overnight-tests: &overnight-tests-template
|
| 162 | 162 |
stage: test
|
| 163 | 163 |
variables:
|
| 164 |
- bst_ext_url: git+https://gitlab.com/BuildStream/bst-external.git
|
|
| 165 |
- bst_ext_ref: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
|
| 166 |
- fd_sdk_ref: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.12
|
|
| 164 |
+ BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-external.git
|
|
| 165 |
+ BST_EXT_REF: 1d6ab71151b93c8cbc0a91a36ffe9270f3b835f1 # 0.5.1
|
|
| 166 |
+ FD_SDK_REF: 88d7c22c2281b987faa02edd57df80d430eecf1f # 18.08.11-35-g88d7c22c
|
|
| 167 | 167 |
before_script:
|
| 168 | 168 |
- (cd dist && ./unpack.sh && cd buildstream && pip3 install .)
|
| 169 |
- - pip3 install --user -e ${bst_ext_url}@${bst_ext_ref}#egg=bst_ext
|
|
| 169 |
+ - pip3 install --user -e ${BST_EXT_URL}@${BST_EXT_REF}#egg=bst_ext
|
|
| 170 | 170 |
- git clone https://gitlab.com/freedesktop-sdk/freedesktop-sdk.git
|
| 171 |
- - git -C freedesktop-sdk checkout ${fd_sdk_ref}
|
|
| 171 |
+ - git -C freedesktop-sdk checkout ${FD_SDK_REF}
|
|
| 172 | 172 |
only:
|
| 173 | 173 |
- schedules
|
| 174 | 174 |
|
| ... | ... | @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache): |
| 1048 | 1048 |
missing_blobs[d.hash] = d
|
| 1049 | 1049 |
|
| 1050 | 1050 |
# Upload any blobs missing on the server
|
| 1051 |
- for blob_digest in missing_blobs.values():
|
|
| 1052 |
- with open(self.objpath(blob_digest), 'rb') as f:
|
|
| 1053 |
- assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
|
|
| 1054 |
- self._send_blob(remote, blob_digest, f, u_uid=u_uid)
|
|
| 1051 |
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
|
|
| 1052 |
+ |
|
| 1053 |
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
|
|
| 1054 |
+ batch = _CASBatchUpdate(remote)
|
|
| 1055 |
+ |
|
| 1056 |
+ for digest in digests:
|
|
| 1057 |
+ with open(self.objpath(digest), 'rb') as f:
|
|
| 1058 |
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
|
|
| 1059 |
+ |
|
| 1060 |
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
|
|
| 1061 |
+ not remote.batch_update_supported):
|
|
| 1062 |
+ # Too large for batch request, upload in independent request.
|
|
| 1063 |
+ self._send_blob(remote, digest, f, u_uid=u_uid)
|
|
| 1064 |
+ else:
|
|
| 1065 |
+ if not batch.add(digest, f):
|
|
| 1066 |
+ # Not enough space left in batch request.
|
|
| 1067 |
+ # Complete pending batch first.
|
|
| 1068 |
+ batch.send()
|
|
| 1069 |
+ batch = _CASBatchUpdate(remote)
|
|
| 1070 |
+ batch.add(digest, f)
|
|
| 1071 |
+ |
|
| 1072 |
+ # Send final batch
|
|
| 1073 |
+ batch.send()
|
|
| 1055 | 1074 |
|
| 1056 | 1075 |
|
| 1057 | 1076 |
# Represents a single remote CAS cache.
|
| ... | ... | @@ -1126,6 +1145,17 @@ class _CASRemote(): |
| 1126 | 1145 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
| 1127 | 1146 |
raise
|
| 1128 | 1147 |
|
| 1148 |
+ # Check whether the server supports BatchUpdateBlobs()
|
|
| 1149 |
+ self.batch_update_supported = False
|
|
| 1150 |
+ try:
|
|
| 1151 |
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1152 |
+ response = self.cas.BatchUpdateBlobs(request)
|
|
| 1153 |
+ self.batch_update_supported = True
|
|
| 1154 |
+ except grpc.RpcError as e:
|
|
| 1155 |
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
|
| 1156 |
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
|
| 1157 |
+ raise
|
|
| 1158 |
+ |
|
| 1129 | 1159 |
self._initialized = True
|
| 1130 | 1160 |
|
| 1131 | 1161 |
|
| ... | ... | @@ -1173,6 +1203,46 @@ class _CASBatchRead(): |
| 1173 | 1203 |
yield (response.digest, response.data)
|
| 1174 | 1204 |
|
| 1175 | 1205 |
|
| 1206 |
+# Represents a batch of blobs queued for upload.
|
|
| 1207 |
+#
|
|
| 1208 |
+class _CASBatchUpdate():
|
|
| 1209 |
+ def __init__(self, remote):
|
|
| 1210 |
+ self._remote = remote
|
|
| 1211 |
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
|
|
| 1212 |
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
|
| 1213 |
+ self._size = 0
|
|
| 1214 |
+ self._sent = False
|
|
| 1215 |
+ |
|
| 1216 |
+ def add(self, digest, stream):
|
|
| 1217 |
+ assert not self._sent
|
|
| 1218 |
+ |
|
| 1219 |
+ new_batch_size = self._size + digest.size_bytes
|
|
| 1220 |
+ if new_batch_size > self._max_total_size_bytes:
|
|
| 1221 |
+ # Not enough space left in current batch
|
|
| 1222 |
+ return False
|
|
| 1223 |
+ |
|
| 1224 |
+ blob_request = self._request.requests.add()
|
|
| 1225 |
+ blob_request.digest.hash = digest.hash
|
|
| 1226 |
+ blob_request.digest.size_bytes = digest.size_bytes
|
|
| 1227 |
+ blob_request.data = stream.read(digest.size_bytes)
|
|
| 1228 |
+ self._size = new_batch_size
|
|
| 1229 |
+ return True
|
|
| 1230 |
+ |
|
| 1231 |
+ def send(self):
|
|
| 1232 |
+ assert not self._sent
|
|
| 1233 |
+ self._sent = True
|
|
| 1234 |
+ |
|
| 1235 |
+ if len(self._request.requests) == 0:
|
|
| 1236 |
+ return
|
|
| 1237 |
+ |
|
| 1238 |
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
|
|
| 1239 |
+ |
|
| 1240 |
+ for response in batch_response.responses:
|
|
| 1241 |
+ if response.status.code != grpc.StatusCode.OK.value[0]:
|
|
| 1242 |
+ raise ArtifactError("Failed to upload blob {}: {}".format(
|
|
| 1243 |
+ response.digest.hash, response.status.code))
|
|
| 1244 |
+ |
|
| 1245 |
+ |
|
| 1176 | 1246 |
def _grouper(iterable, n):
|
| 1177 | 1247 |
while True:
|
| 1178 | 1248 |
try:
|
| ... | ... | @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push): |
| 68 | 68 |
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
|
| 69 | 69 |
|
| 70 | 70 |
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
| 71 |
- _ContentAddressableStorageServicer(artifactcache), server)
|
|
| 71 |
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
|
|
| 72 | 72 |
|
| 73 | 73 |
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
|
| 74 | 74 |
_CapabilitiesServicer(), server)
|
| ... | ... | @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
| 222 | 222 |
|
| 223 | 223 |
|
| 224 | 224 |
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| 225 |
- def __init__(self, cas):
|
|
| 225 |
+ def __init__(self, cas, *, enable_push):
|
|
| 226 | 226 |
super().__init__()
|
| 227 | 227 |
self.cas = cas
|
| 228 |
+ self.enable_push = enable_push
|
|
| 228 | 229 |
|
| 229 | 230 |
def FindMissingBlobs(self, request, context):
|
| 230 | 231 |
response = remote_execution_pb2.FindMissingBlobsResponse()
|
| ... | ... | @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres |
| 260 | 261 |
|
| 261 | 262 |
return response
|
| 262 | 263 |
|
| 264 |
+ def BatchUpdateBlobs(self, request, context):
|
|
| 265 |
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
|
|
| 266 |
+ |
|
| 267 |
+ if not self.enable_push:
|
|
| 268 |
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
|
|
| 269 |
+ return response
|
|
| 270 |
+ |
|
| 271 |
+ batch_size = 0
|
|
| 272 |
+ |
|
| 273 |
+ for blob_request in request.requests:
|
|
| 274 |
+ digest = blob_request.digest
|
|
| 275 |
+ |
|
| 276 |
+ batch_size += digest.size_bytes
|
|
| 277 |
+ if batch_size > _MAX_PAYLOAD_BYTES:
|
|
| 278 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 279 |
+ return response
|
|
| 280 |
+ |
|
| 281 |
+ blob_response = response.responses.add()
|
|
| 282 |
+ blob_response.digest.hash = digest.hash
|
|
| 283 |
+ blob_response.digest.size_bytes = digest.size_bytes
|
|
| 284 |
+ |
|
| 285 |
+ if len(blob_request.data) != digest.size_bytes:
|
|
| 286 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 287 |
+ continue
|
|
| 288 |
+ |
|
| 289 |
+ try:
|
|
| 290 |
+ _clean_up_cache(self.cas, digest.size_bytes)
|
|
| 291 |
+ |
|
| 292 |
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
|
| 293 |
+ out.write(blob_request.data)
|
|
| 294 |
+ out.flush()
|
|
| 295 |
+ server_digest = self.cas.add_object(path=out.name)
|
|
| 296 |
+ if server_digest.hash != digest.hash:
|
|
| 297 |
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
|
|
| 298 |
+ |
|
| 299 |
+ except ArtifactTooLargeException:
|
|
| 300 |
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
|
|
| 301 |
+ |
|
| 302 |
+ return response
|
|
| 303 |
+ |
|
| 263 | 304 |
|
| 264 | 305 |
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
|
| 265 | 306 |
def GetCapabilities(self, request, context):
|
| ... | ... | @@ -41,10 +41,11 @@ class Darwin(Platform): |
| 41 | 41 |
return True
|
| 42 | 42 |
|
| 43 | 43 |
def get_cpu_count(self, cap=None):
|
| 44 |
- if cap < os.cpu_count():
|
|
| 45 |
- return cap
|
|
| 44 |
+ cpu_count = os.cpu_count()
|
|
| 45 |
+ if cap is None:
|
|
| 46 |
+ return cpu_count
|
|
| 46 | 47 |
else:
|
| 47 |
- return os.cpu_count()
|
|
| 48 |
+ return min(cpu_count, cap)
|
|
| 48 | 49 |
|
| 49 | 50 |
def set_resource_limits(self, soft_limit=OPEN_MAX, hard_limit=None):
|
| 50 | 51 |
super().set_resource_limits(soft_limit)
|
| ... | ... | @@ -67,7 +67,11 @@ class Platform(): |
| 67 | 67 |
return cls._instance
|
| 68 | 68 |
|
| 69 | 69 |
def get_cpu_count(self, cap=None):
|
| 70 |
- return min(len(os.sched_getaffinity(0)), cap)
|
|
| 70 |
+ cpu_count = len(os.sched_getaffinity(0))
|
|
| 71 |
+ if cap is None:
|
|
| 72 |
+ return cpu_count
|
|
| 73 |
+ else:
|
|
| 74 |
+ return min(cpu_count, cap)
|
|
| 71 | 75 |
|
| 72 | 76 |
##################################################################
|
| 73 | 77 |
# Sandbox functions #
|
| ... | ... | @@ -177,15 +177,11 @@ class SandboxRemote(Sandbox): |
| 177 | 177 |
if not cascache.verify_digest_pushed(self._get_project(), upload_vdir.ref):
|
| 178 | 178 |
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
|
| 179 | 179 |
|
| 180 |
- # Set up environment and working directory
|
|
| 181 |
- if cwd is None:
|
|
| 182 |
- cwd = self._get_work_directory()
|
|
| 183 |
- |
|
| 184 |
- if cwd is None:
|
|
| 185 |
- cwd = '/'
|
|
| 186 |
- |
|
| 187 |
- if env is None:
|
|
| 188 |
- env = self._get_environment()
|
|
| 180 |
+ # Fallback to the sandbox default settings for
|
|
| 181 |
+ # the cwd and env.
|
|
| 182 |
+ #
|
|
| 183 |
+ cwd = self._get_work_directory(cwd=cwd)
|
|
| 184 |
+ env = self._get_environment(cwd=cwd, env=env)
|
|
| 189 | 185 |
|
| 190 | 186 |
# We want command args as a list of strings
|
| 191 | 187 |
if isinstance(command, str):
|
