Raoul Hidalgo Charman pushed to branch raoul/802-refactor-artifactcache at BuildStream / buildstream
Commits:
-
f112bfae
by Raoul Hidalgo Charman at 2018-12-18T12:35:04Z
-
5d05a079
by Raoul Hidalgo Charman at 2018-12-18T14:01:24Z
3 changed files:
Changes:
| ... | ... | @@ -656,6 +656,11 @@ class ArtifactCache(): |
| 656 | 656 |
remote.request_blob(blob_digest)
|
| 657 | 657 |
for blob_file in remote.get_blobs():
|
| 658 | 658 |
self.cas.add_object(path=blob_file.name, link_directly=True)
|
| 659 |
+ |
|
| 660 |
+ # request the final CAS batch
|
|
| 661 |
+ for blob_file in remote.get_blobs(request_batch=True):
|
|
| 662 |
+ self.cas.add_object(path=blob_file.name, link_directly=True)
|
|
| 663 |
+ |
|
| 659 | 664 |
self.cas.set_ref(ref, root_digest)
|
| 660 | 665 |
except BlobNotFound:
|
| 661 | 666 |
element.info("Remote ({}) is missing blobs for {}".format(
|
| ... | ... | @@ -689,15 +694,36 @@ class ArtifactCache(): |
| 689 | 694 |
#
|
| 690 | 695 |
# Args:
|
| 691 | 696 |
# project (Project): The current project
|
| 692 |
- # digest (Digest): The digest of the tree
|
|
| 697 |
+ # tree_digest (Digest): The digest of the tree
|
|
| 693 | 698 |
#
|
| 694 |
- def pull_tree(self, project, digest):
|
|
| 699 |
+ def pull_tree(self, project, tree_digest):
|
|
| 695 | 700 |
for remote in self._remotes[project]:
|
| 696 |
- digest = self.cas.pull_tree(remote, digest)
|
|
| 697 |
- |
|
| 698 |
- if digest:
|
|
| 699 |
- # no need to pull from additional remotes
|
|
| 700 |
- return digest
|
|
| 701 |
+ try:
|
|
| 702 |
+ # get tree
|
|
| 703 |
+ tree = remote.get_tree_blob(tree_digest)
|
|
| 704 |
+ |
|
| 705 |
+ # request files
|
|
| 706 |
+ tree.children.extend([tree.root])
|
|
| 707 |
+ for directory in tree.children:
|
|
| 708 |
+ for filenode in directory.files:
|
|
| 709 |
+ if self.cas.check_blob(filenode.digest):
|
|
| 710 |
+ continue
|
|
| 711 |
+ remote.request_blob(blob_digest)
|
|
| 712 |
+ for blob_file in remote.get_blobs():
|
|
| 713 |
+ self.cas.add_object(path=blob_file.name)
|
|
| 714 |
+ |
|
| 715 |
+ # Get the last batch
|
|
| 716 |
+ for blob in remote.get_blobs(request_batch=True):
|
|
| 717 |
+ self.cas.add_object(blob)
|
|
| 718 |
+ |
|
| 719 |
+ # add the directory to CAS
|
|
| 720 |
+ for directory in tree.children:
|
|
| 721 |
+ self.cas.add_object(buffer=directory.SerializeToString())
|
|
| 722 |
+ |
|
| 723 |
+ except BlobNotFound:
|
|
| 724 |
+ continue
|
|
| 725 |
+ else:
|
|
| 726 |
+ return tree_digest
|
|
| 701 | 727 |
|
| 702 | 728 |
return None
|
| 703 | 729 |
|
| ... | ... | @@ -183,29 +183,6 @@ class CASCache(): |
| 183 | 183 |
|
| 184 | 184 |
return modified, removed, added
|
| 185 | 185 |
|
| 186 |
- # pull_tree():
|
|
| 187 |
- #
|
|
| 188 |
- # Pull a single Tree rather than a ref.
|
|
| 189 |
- # Does not update local refs.
|
|
| 190 |
- #
|
|
| 191 |
- # Args:
|
|
| 192 |
- # remote (CASRemote): The remote to pull from
|
|
| 193 |
- # digest (Digest): The digest of the tree
|
|
| 194 |
- #
|
|
| 195 |
- def pull_tree(self, remote, digest):
|
|
| 196 |
- try:
|
|
| 197 |
- remote.init()
|
|
| 198 |
- |
|
| 199 |
- digest = self._fetch_tree(remote, digest)
|
|
| 200 |
- |
|
| 201 |
- return digest
|
|
| 202 |
- |
|
| 203 |
- except grpc.RpcError as e:
|
|
| 204 |
- if e.code() != grpc.StatusCode.NOT_FOUND:
|
|
| 205 |
- raise
|
|
| 206 |
- |
|
| 207 |
- return None
|
|
| 208 |
- |
|
| 209 | 186 |
# link_ref():
|
| 210 | 187 |
#
|
| 211 | 188 |
# Add an alias for an existing ref.
|
| ... | ... | @@ -771,29 +748,6 @@ class CASCache(): |
| 771 | 748 |
|
| 772 | 749 |
return objpath
|
| 773 | 750 |
|
| 774 |
- def _fetch_tree(self, remote, digest):
|
|
| 775 |
- # download but do not store the Tree object
|
|
| 776 |
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
|
|
| 777 |
- remote._fetch_blob(digest, out)
|
|
| 778 |
- |
|
| 779 |
- tree = remote_execution_pb2.Tree()
|
|
| 780 |
- |
|
| 781 |
- with open(out.name, 'rb') as f:
|
|
| 782 |
- tree.ParseFromString(f.read())
|
|
| 783 |
- |
|
| 784 |
- tree.children.extend([tree.root])
|
|
| 785 |
- for directory in tree.children:
|
|
| 786 |
- for filenode in directory.files:
|
|
| 787 |
- self._ensure_blob(remote, filenode.digest)
|
|
| 788 |
- |
|
| 789 |
- # place directory blob only in final location when we've downloaded
|
|
| 790 |
- # all referenced blobs to avoid dangling references in the repository
|
|
| 791 |
- dirbuffer = directory.SerializeToString()
|
|
| 792 |
- dirdigest = self.add_object(buffer=dirbuffer)
|
|
| 793 |
- assert dirdigest.size_bytes == len(dirbuffer)
|
|
| 794 |
- |
|
| 795 |
- return dirdigest
|
|
| 796 |
- |
|
| 797 | 751 |
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
| 798 | 752 |
required_blobs = self._required_blobs(digest)
|
| 799 | 753 |
|
| ... | ... | @@ -95,6 +95,9 @@ class CASRemote(): |
| 95 | 95 |
|
| 96 | 96 |
self.__tmp_downloads = [] # files in the tmpdir waiting to be added to local caches
|
| 97 | 97 |
|
| 98 |
+ self.__batch_read = None
|
|
| 99 |
+ self.__batch_update = None
|
|
| 100 |
+ |
|
| 98 | 101 |
def init(self):
|
| 99 | 102 |
if not self._initialized:
|
| 100 | 103 |
url = urlparse(self.spec.url)
|
| ... | ... | @@ -152,6 +155,7 @@ class CASRemote(): |
| 152 | 155 |
request = remote_execution_pb2.BatchReadBlobsRequest()
|
| 153 | 156 |
response = self.cas.BatchReadBlobs(request)
|
| 154 | 157 |
self.batch_read_supported = True
|
| 158 |
+ self.__batch_read = _CASBatchRead(self)
|
|
| 155 | 159 |
except grpc.RpcError as e:
|
| 156 | 160 |
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
|
| 157 | 161 |
raise
|
| ... | ... | @@ -162,6 +166,7 @@ class CASRemote(): |
| 162 | 166 |
request = remote_execution_pb2.BatchUpdateBlobsRequest()
|
| 163 | 167 |
response = self.cas.BatchUpdateBlobs(request)
|
| 164 | 168 |
self.batch_update_supported = True
|
| 169 |
+ self.__batch_update = _CASBatchUpdate(self)
|
|
| 165 | 170 |
except grpc.RpcError as e:
|
| 166 | 171 |
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
|
| 167 | 172 |
e.code() != grpc.StatusCode.PERMISSION_DENIED):
|
| ... | ... | @@ -276,6 +281,17 @@ class CASRemote(): |
| 276 | 281 |
else:
|
| 277 | 282 |
return None
|
| 278 | 283 |
|
| 284 |
+ def get_tree_blob(self, tree_digest):
|
|
| 285 |
+ self.init()
|
|
| 286 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 287 |
+ self._fetch_blob(tree_digest, f)
|
|
| 288 |
+ |
|
| 289 |
+ tree = remote_execution_pb2.Tree()
|
|
| 290 |
+ with open(f.name, 'rb') as tmp:
|
|
| 291 |
+ tree.ParseFromString(tmp.read())
|
|
| 292 |
+ |
|
| 293 |
+ return tree
|
|
| 294 |
+ |
|
| 279 | 295 |
# yield_directory_digests():
|
| 280 | 296 |
#
|
| 281 | 297 |
# Iterate over blobs digests starting from a root digest
|
| ... | ... | @@ -296,20 +312,30 @@ class CASRemote(): |
| 296 | 312 |
# Fetch artifact, excluded_subdirs determined in pullqueue
|
| 297 | 313 |
yield from self._yield_directory_digests(root_digest, excluded_subdirs=excluded_subdirs)
|
| 298 | 314 |
|
| 315 |
+ def yield_tree_digests(self, tree_digest):
|
|
| 316 |
+ self.init()
|
|
| 317 |
+ tree.children.extend([tree.root])
|
|
| 318 |
+ for directory in tree.children:
|
|
| 319 |
+ for filenode in directory.files:
|
|
| 320 |
+ yield filenode.digest
|
|
| 321 |
+ |
|
| 299 | 322 |
# request_blob():
|
| 300 | 323 |
#
|
| 301 |
- # Request blob and returns path to tmpdir location
|
|
| 324 |
+ # Request blob, triggering download depending via bytestream or cas
|
|
| 325 |
+ # BatchReadBlobs depending on size.
|
|
| 302 | 326 |
#
|
| 303 | 327 |
# Args:
|
| 304 | 328 |
# digest (Digest): digest of the requested blob
|
| 305 |
- # path (str): tmpdir locations of downloaded blobs
|
|
| 306 | 329 |
#
|
| 307 | 330 |
def request_blob(self, digest):
|
| 308 |
- # TODO expand for adding to batches some other logic
|
|
| 309 |
- f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 310 |
- self._fetch_blob(digest, f)
|
|
| 311 |
- self.__tmp_downloads.append(f)
|
|
| 312 |
- return f.name
|
|
| 331 |
+ if (not self.batch_read_supported or
|
|
| 332 |
+ digest.size_bytes > self.max_batch_total_size_bytes):
|
|
| 333 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 334 |
+ self._fetch_blob(digest, f)
|
|
| 335 |
+ self.__tmp_downloads.append(f)
|
|
| 336 |
+ elif self.__batch_read.add(digest) is False:
|
|
| 337 |
+ self._download_batch()
|
|
| 338 |
+ self.__batch_read.add(digest)
|
|
| 313 | 339 |
|
| 314 | 340 |
# get_blobs():
|
| 315 | 341 |
#
|
| ... | ... | @@ -318,7 +344,12 @@ class CASRemote(): |
| 318 | 344 |
#
|
| 319 | 345 |
# Returns:
|
| 320 | 346 |
# iterator over NamedTemporaryFile
|
| 321 |
- def get_blobs(self):
|
|
| 347 |
+ def get_blobs(self, request_batch=False):
|
|
| 348 |
+ # Send read batch request and download
|
|
| 349 |
+ if (request_batch is True and
|
|
| 350 |
+ self.batch_read_supported is True):
|
|
| 351 |
+ self._download_batch()
|
|
| 352 |
+ |
|
| 322 | 353 |
while self.__tmp_downloads:
|
| 323 | 354 |
yield self.__tmp_downloads.pop()
|
| 324 | 355 |
|
| ... | ... | @@ -349,18 +380,18 @@ class CASRemote(): |
| 349 | 380 |
# excluded_subdirs (list): The optional list of subdirs to not fetch
|
| 350 | 381 |
#
|
| 351 | 382 |
def _yield_directory_digests(self, dir_digest, *, excluded_subdirs=[]):
|
| 383 |
+ # get directory blob
|
|
| 384 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 385 |
+ self._fetch_blob(dir_digest, f)
|
|
| 352 | 386 |
|
| 353 |
- objpath = self.request_blob(dir_digest)
|
|
| 354 |
- |
|
| 387 |
+ # need to read in directory structure to iterate over it
|
|
| 355 | 388 |
directory = remote_execution_pb2.Directory()
|
| 356 |
- |
|
| 357 |
- with open(objpath, 'rb') as f:
|
|
| 358 |
- directory.ParseFromString(f.read())
|
|
| 389 |
+ with open(f.name, 'rb') as tmp:
|
|
| 390 |
+ directory.ParseFromString(tmp.read())
|
|
| 359 | 391 |
|
| 360 | 392 |
yield dir_digest
|
| 361 | 393 |
for filenode in directory.files:
|
| 362 | 394 |
yield filenode.digest
|
| 363 |
- |
|
| 364 | 395 |
for dirnode in directory.directories:
|
| 365 | 396 |
if dirnode.name not in excluded_subdirs:
|
| 366 | 397 |
yield from self._yield_directory_digests(dirnode.digest)
|
| ... | ... | @@ -393,6 +424,15 @@ class CASRemote(): |
| 393 | 424 |
|
| 394 | 425 |
assert response.committed_size == digest.size_bytes
|
| 395 | 426 |
|
| 427 |
+ def _download_batch(self):
|
|
| 428 |
+ for _, data in self.__batch_read.send():
|
|
| 429 |
+ f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
|
|
| 430 |
+ f.write(data)
|
|
| 431 |
+ f.flush()
|
|
| 432 |
+ self.__tmp_downloads.append(f)
|
|
| 433 |
+ |
|
| 434 |
+ self.__batch_read = _CASBatchRead(self)
|
|
| 435 |
+ |
|
| 396 | 436 |
|
| 397 | 437 |
# Represents a batch of blobs queued for fetching.
|
| 398 | 438 |
#
|
