Tom Pollard pushed to branch tpollard/566 at BuildStream / buildstream
Commits:
-
d1f1ba10
by Tom Pollard at 2018-12-06T17:39:54Z
3 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/element.py
Changes:
... | ... | @@ -74,6 +74,7 @@ class ArtifactCache(): |
74 | 74 |
|
75 | 75 |
self._has_fetch_remotes = False
|
76 | 76 |
self._has_push_remotes = False
|
77 |
+ self._has_partial_push_remotes = False
|
|
77 | 78 |
|
78 | 79 |
os.makedirs(self.extractdir, exist_ok=True)
|
79 | 80 |
|
... | ... | @@ -398,6 +399,8 @@ class ArtifactCache(): |
398 | 399 |
self._has_fetch_remotes = True
|
399 | 400 |
if remote_spec.push:
|
400 | 401 |
self._has_push_remotes = True
|
402 |
+ if remote_spec.partial_push:
|
|
403 |
+ self._has_partial_push_remotes = True
|
|
401 | 404 |
|
402 | 405 |
remotes[remote_spec.url] = CASRemote(remote_spec)
|
403 | 406 |
|
... | ... | @@ -596,6 +599,31 @@ class ArtifactCache(): |
596 | 599 |
remotes_for_project = self._remotes[element._get_project()]
|
597 | 600 |
return any(remote.spec.push for remote in remotes_for_project)
|
598 | 601 |
|
602 |
+ # has_partial_push_remotes():
|
|
603 |
+ #
|
|
604 |
+ # Check whether any remote repositories are available for pushing
|
|
605 |
+ # non-complete artifacts
|
|
606 |
+ #
|
|
607 |
+ # Args:
|
|
608 |
+ # element (Element): The Element to check
|
|
609 |
+ #
|
|
610 |
+ # Returns:
|
|
611 |
+ # (bool): True if any remote repository is configured for optional
|
|
612 |
+ # partial pushes, False otherwise
|
|
613 |
+ #
|
|
614 |
+ def has_partial_push_remotes(self, *, element=None):
|
|
615 |
+ # If there's no partial push remotes available, we can't partial push at all
|
|
616 |
+ if not self._has_partial_push_remotes:
|
|
617 |
+ return False
|
|
618 |
+ elif element is None:
|
|
619 |
+ # At least one remote is set to allow partial pushes
|
|
620 |
+ return True
|
|
621 |
+ else:
|
|
622 |
+ # Check whether the specified element's project has push remotes configured
|
|
623 |
+ # to not accept partial artifact pushes
|
|
624 |
+ remotes_for_project = self._remotes[element._get_project()]
|
|
625 |
+ return any(remote.spec.partial_push for remote in remotes_for_project)
|
|
626 |
+ |
|
599 | 627 |
# push():
|
600 | 628 |
#
|
601 | 629 |
# Push committed artifact to remote repository.
|
... | ... | @@ -603,6 +631,8 @@ class ArtifactCache(): |
603 | 631 |
# Args:
|
604 | 632 |
# element (Element): The Element whose artifact is to be pushed
|
605 | 633 |
# keys (list): The cache keys to use
|
634 |
+ # partial(bool): If the artifact is cached in a partial state
|
|
635 |
+ # subdir(string): Optional subdir to not push
|
|
606 | 636 |
#
|
607 | 637 |
# Returns:
|
608 | 638 |
# (bool): True if any remote was updated, False if no pushes were required
|
... | ... | @@ -610,12 +640,23 @@ class ArtifactCache(): |
610 | 640 |
# Raises:
|
611 | 641 |
# (ArtifactError): if there was an error
|
612 | 642 |
#
|
613 |
- def push(self, element, keys):
|
|
643 |
+ def push(self, element, keys, partial=False, subdir=None):
|
|
614 | 644 |
refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
|
615 | 645 |
|
616 | 646 |
project = element._get_project()
|
617 | 647 |
|
618 |
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
648 |
+ push_remotes = []
|
|
649 |
+ partial_remotes = []
|
|
650 |
+ |
|
651 |
+ # Create list of remotes to push to, given current element and partial push config
|
|
652 |
+ if not partial:
|
|
653 |
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
|
|
654 |
+ |
|
655 |
+ if self._has_partial_push_remotes:
|
|
656 |
+ # Create a specific list of the remotes expecting the artifact to be push in a partial
|
|
657 |
+ # state. This list needs to be pushed in a partial state, without the optional subdir if
|
|
658 |
+ # exists locally
|
|
659 |
+ partial_remotes = [r for r in self._remotes[project] if (r.spec.partial_push and r.spec.push)]
|
|
619 | 660 |
|
620 | 661 |
pushed = False
|
621 | 662 |
|
... | ... | @@ -632,6 +673,19 @@ class ArtifactCache(): |
632 | 673 |
remote.spec.url, element._get_brief_display_key()
|
633 | 674 |
))
|
634 | 675 |
|
676 |
+ for remote in partial_remotes:
|
|
677 |
+ remote.init()
|
|
678 |
+ display_key = element._get_brief_display_key()
|
|
679 |
+ element.status("Pushing partial artifact {} -> {}".format(display_key, remote.spec.url))
|
|
680 |
+ |
|
681 |
+ if self.cas.push(refs, remote, subdir=subdir):
|
|
682 |
+ element.info("Pushed partial artifact {} -> {}".format(display_key, remote.spec.url))
|
|
683 |
+ pushed = True
|
|
684 |
+ else:
|
|
685 |
+ element.info("Remote ({}) already has {} partial cached".format(
|
|
686 |
+ remote.spec.url, element._get_brief_display_key()
|
|
687 |
+ ))
|
|
688 |
+ |
|
635 | 689 |
return pushed
|
636 | 690 |
|
637 | 691 |
# pull():
|
... | ... | @@ -45,7 +45,7 @@ from .. import _yaml |
45 | 45 |
_MAX_PAYLOAD_BYTES = 1024 * 1024
|
46 | 46 |
|
47 | 47 |
|
48 |
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
|
|
48 |
+class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push partial_push server_cert client_key client_cert')):
|
|
49 | 49 |
|
50 | 50 |
# _new_from_config_node
|
51 | 51 |
#
|
... | ... | @@ -53,9 +53,12 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
53 | 53 |
#
|
54 | 54 |
@staticmethod
|
55 | 55 |
def _new_from_config_node(spec_node, basedir=None):
|
56 |
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
|
|
56 |
+ _yaml.node_validate(spec_node,
|
|
57 |
+ ['url', 'push', 'allow-partial-push', 'server-cert', 'client-key', 'client-cert'])
|
|
57 | 58 |
url = _yaml.node_get(spec_node, str, 'url')
|
58 | 59 |
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
|
60 |
+ partial_push = _yaml.node_get(spec_node, bool, 'allow-partial-push', default_value=False)
|
|
61 |
+ |
|
59 | 62 |
if not url:
|
60 | 63 |
provenance = _yaml.node_get_provenance(spec_node, 'url')
|
61 | 64 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
... | ... | @@ -83,10 +86,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key |
83 | 86 |
raise LoadError(LoadErrorReason.INVALID_DATA,
|
84 | 87 |
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
|
85 | 88 |
|
86 |
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
|
|
89 |
+ return CASRemoteSpec(url, push, partial_push, server_cert, client_key, client_cert)
|
|
87 | 90 |
|
88 | 91 |
|
89 |
-CASRemoteSpec.__new__.__defaults__ = (None, None, None)
|
|
92 |
+CASRemoteSpec.__new__.__defaults__ = (False, None, None, None)
|
|
90 | 93 |
|
91 | 94 |
|
92 | 95 |
class BlobNotFound(CASError):
|
... | ... | @@ -353,6 +356,7 @@ class CASCache(): |
353 | 356 |
# Args:
|
354 | 357 |
# refs (list): The refs to push
|
355 | 358 |
# remote (CASRemote): The remote to push to
|
359 |
+ # subdir (string): Optional specific subdir to exempt from the push
|
|
356 | 360 |
#
|
357 | 361 |
# Returns:
|
358 | 362 |
# (bool): True if any remote was updated, False if no pushes were required
|
... | ... | @@ -360,7 +364,7 @@ class CASCache(): |
360 | 364 |
# Raises:
|
361 | 365 |
# (CASError): if there was an error
|
362 | 366 |
#
|
363 |
- def push(self, refs, remote):
|
|
367 |
+ def push(self, refs, remote, subdir=None):
|
|
364 | 368 |
skipped_remote = True
|
365 | 369 |
try:
|
366 | 370 |
for ref in refs:
|
... | ... | @@ -382,7 +386,7 @@ class CASCache(): |
382 | 386 |
# Intentionally re-raise RpcError for outer except block.
|
383 | 387 |
raise
|
384 | 388 |
|
385 |
- self._send_directory(remote, tree)
|
|
389 |
+ self._send_directory(remote, tree, excluded_dir=subdir)
|
|
386 | 390 |
|
387 | 391 |
request = buildstream_pb2.UpdateReferenceRequest()
|
388 | 392 |
request.keys.append(ref)
|
... | ... | @@ -886,7 +890,7 @@ class CASCache(): |
886 | 890 |
for dirnode in directory.directories:
|
887 | 891 |
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
|
888 | 892 |
|
889 |
- def _required_blobs(self, directory_digest):
|
|
893 |
+ def _required_blobs(self, directory_digest, excluded_dir=None):
|
|
890 | 894 |
# parse directory, and recursively add blobs
|
891 | 895 |
d = remote_execution_pb2.Digest()
|
892 | 896 |
d.hash = directory_digest.hash
|
... | ... | @@ -905,7 +909,8 @@ class CASCache(): |
905 | 909 |
yield d
|
906 | 910 |
|
907 | 911 |
for dirnode in directory.directories:
|
908 |
- yield from self._required_blobs(dirnode.digest)
|
|
912 |
+ if dirnode.name != excluded_dir:
|
|
913 |
+ yield from self._required_blobs(dirnode.digest)
|
|
909 | 914 |
|
910 | 915 |
def _fetch_blob(self, remote, digest, stream):
|
911 | 916 |
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
... | ... | @@ -1091,8 +1096,8 @@ class CASCache(): |
1091 | 1096 |
|
1092 | 1097 |
assert response.committed_size == digest.size_bytes
|
1093 | 1098 |
|
1094 |
- def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
|
|
1095 |
- required_blobs = self._required_blobs(digest)
|
|
1099 |
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4(), excluded_dir=None):
|
|
1100 |
+ required_blobs = self._required_blobs(digest, excluded_dir=excluded_dir)
|
|
1096 | 1101 |
|
1097 | 1102 |
missing_blobs = dict()
|
1098 | 1103 |
# Limit size of FindMissingBlobs request
|
... | ... | @@ -1800,13 +1800,19 @@ class Element(Plugin): |
1800 | 1800 |
# (bool): True if this element does not need a push job to be created
|
1801 | 1801 |
#
|
1802 | 1802 |
def _skip_push(self):
|
1803 |
+ |
|
1803 | 1804 |
if not self.__artifacts.has_push_remotes(element=self):
|
1804 | 1805 |
# No push remotes for this element's project
|
1805 | 1806 |
return True
|
1806 | 1807 |
|
1807 | 1808 |
# Do not push elements that aren't cached, or that are cached with a dangling buildtree
|
1808 |
- # artifact unless element type is expected to have an an empty buildtree directory
|
|
1809 |
- if not self._cached_buildtree():
|
|
1809 |
+ # artifact unless element type is expected to have an an empty buildtree directory. Check
|
|
1810 |
+ # that this default behaviour is not overriden via a remote configured to allow pushing
|
|
1811 |
+ # artifacts without their corresponding buildtree.
|
|
1812 |
+ if not self._cached():
|
|
1813 |
+ return True
|
|
1814 |
+ |
|
1815 |
+ if not self._cached_buildtree() and not self.__artifacts.has_partial_push_remotes(element=self):
|
|
1810 | 1816 |
return True
|
1811 | 1817 |
|
1812 | 1818 |
# Do not push tainted artifact
|
... | ... | @@ -1817,7 +1823,8 @@ class Element(Plugin): |
1817 | 1823 |
|
1818 | 1824 |
# _push():
|
1819 | 1825 |
#
|
1820 |
- # Push locally cached artifact to remote artifact repository.
|
|
1826 |
+ # Push locally cached artifact to remote artifact repository. An attempt
|
|
1827 |
+ # will be made to push partial artifacts given current config
|
|
1821 | 1828 |
#
|
1822 | 1829 |
# Returns:
|
1823 | 1830 |
# (bool): True if the remote was updated, False if it already existed
|
... | ... | @@ -1830,8 +1837,20 @@ class Element(Plugin): |
1830 | 1837 |
self.warn("Not pushing tainted artifact.")
|
1831 | 1838 |
return False
|
1832 | 1839 |
|
1833 |
- # Push all keys used for local commit
|
|
1834 |
- pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
|
|
1840 |
+ # Push all keys used for local commit, this could be full or partial,
|
|
1841 |
+ # given previous _skip_push() logic. If buildtree isn't cached, then
|
|
1842 |
+ # set partial push
|
|
1843 |
+ |
|
1844 |
+ partial = False
|
|
1845 |
+ subdir = 'buildtree'
|
|
1846 |
+ if not self._cached_buildtree():
|
|
1847 |
+ partial = True
|
|
1848 |
+ subdir = ''
|
|
1849 |
+ |
|
1850 |
+ pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit(), partial=partial, subdir=subdir)
|
|
1851 |
+ |
|
1852 |
+ # Artifact might be cached in the server partially with the top level ref existing.
|
|
1853 |
+ # Check if we need to attempt a push of a locally cached buildtree given current config
|
|
1835 | 1854 |
if not pushed:
|
1836 | 1855 |
return False
|
1837 | 1856 |
|