Martin Blanchard pushed to branch jmac/remote_execution_client at BuildStream / buildstream
Commits:
- 
680e4fe1
by knownexus at 2018-08-30T13:37:55Z
- 
8cd719eb
by Phillip Smyth at 2018-08-30T15:47:06Z
- 
18d0bfb4
by Tom Pollard at 2018-08-30T19:14:33Z
- 
95121148
by Tiago Gomes at 2018-08-30T20:52:29Z
- 
775d3fca
by Tristan Van Berkom at 2018-08-31T06:05:32Z
- 
0682aa1c
by Josh Smith at 2018-08-31T10:58:12Z
- 
68eff9ba
by Josh Smith at 2018-08-31T10:58:20Z
- 
9b18e1bf
by Tristan Van Berkom at 2018-08-31T11:22:51Z
- 
e2d8fedf
by Josh Smith at 2018-08-31T15:53:43Z
- 
6805a2ab
by Tristan Van Berkom at 2018-09-01T08:24:05Z
- 
f8b06acc
by Tristan Van Berkom at 2018-09-02T09:33:39Z
- 
559b6fbc
by Tristan Van Berkom at 2018-09-02T09:34:10Z
- 
43ad22d7
by Tristan Van Berkom at 2018-09-02T09:34:10Z
- 
16462e9c
by Tristan Van Berkom at 2018-09-02T09:36:49Z
- 
1f9c4147
by Tristan Van Berkom at 2018-09-02T09:37:12Z
- 
f9b2f1a9
by Tristan Van Berkom at 2018-09-02T09:37:21Z
- 
88460cd2
by Tristan Van Berkom at 2018-09-03T06:23:38Z
- 
2339f0c4
by Tom Pollard at 2018-09-03T09:44:36Z
- 
c96fec5d
by Tom Pollard at 2018-09-03T09:44:36Z
- 
6a0cdedf
by Tom Pollard at 2018-09-03T10:16:39Z
- 
9bd3a625
by Jim MacArthur at 2018-09-04T09:46:02Z
- 
7e1f9bb3
by Martin Blanchard at 2018-09-04T09:46:46Z
- 
8a033d85
by Jim MacArthur at 2018-09-04T09:46:53Z
- 
32773bf7
by Jim MacArthur at 2018-09-04T09:47:09Z
- 
e7b26e07
by Jim MacArthur at 2018-09-04T09:47:15Z
- 
ee47f659
by Jim MacArthur at 2018-09-04T09:47:22Z
- 
28bd46d6
by Jim MacArthur at 2018-09-04T09:49:04Z
- 
db1b357b
by Jim MacArthur at 2018-09-04T09:49:11Z
- 
2a96530e
by Jim MacArthur at 2018-09-04T09:49:17Z
- 
ac116aed
by Jim MacArthur at 2018-09-04T09:49:22Z
22 changed files:
- buildstream/_artifactcache/cascache.py
- buildstream/_project.py
- buildstream/_scheduler/jobs/job.py
- buildstream/_scheduler/scheduler.py
- buildstream/buildelement.py
- buildstream/data/projectconfig.yaml
- buildstream/element.py
- buildstream/plugin.py
- buildstream/plugins/elements/autotools.py
- buildstream/plugins/sources/git.py
- buildstream/sandbox/__init__.py
- + buildstream/sandbox/_sandboxremote.py
- buildstream/sandbox/sandbox.py
- buildstream/source.py
- buildstream/storage/_casbaseddirectory.py
- doc/source/format_project.rst
- tests/frontend/project/sources/fetch_source.py
- tests/sources/deb.py
- tests/sources/git.py
- tests/sources/tar.py
- tests/sources/zip.py
- tests/testutils/repo/bzr.py
Changes:
| ... | ... | @@ -19,6 +19,7 @@ | 
| 19 | 19 |  | 
| 20 | 20 |  import hashlib
 | 
| 21 | 21 |  import itertools
 | 
| 22 | +import io
 | |
| 22 | 23 |  import multiprocessing
 | 
| 23 | 24 |  import os
 | 
| 24 | 25 |  import signal
 | 
| ... | ... | @@ -66,6 +67,10 @@ class CASCache(ArtifactCache): | 
| 66 | 67 |          self._calculate_cache_quota()
 | 
| 67 | 68 |  | 
| 68 | 69 |          self._enable_push = enable_push
 | 
| 70 | +        if self._enable_push:
 | |
| 71 | +            self._uuid = str(uuid.uuid4())
 | |
| 72 | +        else:
 | |
| 73 | +            self._uuid = None
 | |
| 69 | 74 |  | 
| 70 | 75 |          # Per-project list of _CASRemote instances.
 | 
| 71 | 76 |          self._remotes = {}
 | 
| ... | ... | @@ -76,6 +81,7 @@ class CASCache(ArtifactCache): | 
| 76 | 81 |      ################################################
 | 
| 77 | 82 |      #     Implementation of abstract methods       #
 | 
| 78 | 83 |      ################################################
 | 
| 84 | + | |
| 79 | 85 |      def contains(self, element, key):
 | 
| 80 | 86 |          refpath = self._refpath(self.get_artifact_fullname(element, key))
 | 
| 81 | 87 |  | 
| ... | ... | @@ -259,6 +265,25 @@ class CASCache(ArtifactCache): | 
| 259 | 265 |  | 
| 260 | 266 |          return False
 | 
| 261 | 267 |  | 
| 268 | +    def pull_tree(self, project, digest):
 | |
| 269 | +        """ Pull a single Tree rather than an artifact.
 | |
| 270 | +        Does not update local refs. """
 | |
| 271 | + | |
| 272 | +        for remote in self._remotes[project]:
 | |
| 273 | +            try:
 | |
| 274 | +                remote.init()
 | |
| 275 | + | |
| 276 | +                digest = self._fetch_tree(remote, digest)
 | |
| 277 | + | |
| 278 | +                # no need to pull from additional remotes
 | |
| 279 | +                return digest
 | |
| 280 | + | |
| 281 | +            except grpc.RpcError as e:
 | |
| 282 | +                if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 283 | +                    raise
 | |
| 284 | + | |
| 285 | +        return None
 | |
| 286 | + | |
| 262 | 287 |      def link_key(self, element, oldkey, newkey):
 | 
| 263 | 288 |          oldref = self.get_artifact_fullname(element, oldkey)
 | 
| 264 | 289 |          newref = self.get_artifact_fullname(element, newkey)
 | 
| ... | ... | @@ -267,106 +292,123 @@ class CASCache(ArtifactCache): | 
| 267 | 292 |  | 
| 268 | 293 |          self.set_ref(newref, tree)
 | 
| 269 | 294 |  | 
| 295 | +    def _push_refs_to_remote(self, refs, remote):
 | |
| 296 | +        skipped_remote = True
 | |
| 297 | +        try:
 | |
| 298 | +            for ref in refs:
 | |
| 299 | +                tree = self.resolve_ref(ref)
 | |
| 300 | + | |
| 301 | +                # Check whether ref is already on the server in which case
 | |
| 302 | +                # there is no need to push the artifact
 | |
| 303 | +                try:
 | |
| 304 | +                    request = buildstream_pb2.GetReferenceRequest()
 | |
| 305 | +                    request.key = ref
 | |
| 306 | +                    response = remote.ref_storage.GetReference(request)
 | |
| 307 | + | |
| 308 | +                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 309 | +                        # ref is already on the server with the same tree
 | |
| 310 | +                        continue
 | |
| 311 | + | |
| 312 | +                except grpc.RpcError as e:
 | |
| 313 | +                    if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 314 | +                        # Intentionally re-raise RpcError for outer except block.
 | |
| 315 | +                        raise
 | |
| 316 | + | |
| 317 | +                self._send_directory(remote, tree)
 | |
| 318 | + | |
| 319 | +                request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 320 | +                request.keys.append(ref)
 | |
| 321 | +                request.digest.hash = tree.hash
 | |
| 322 | +                request.digest.size_bytes = tree.size_bytes
 | |
| 323 | +                remote.ref_storage.UpdateReference(request)
 | |
| 324 | + | |
| 325 | +        except grpc.RpcError as e:
 | |
| 326 | +            if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 327 | +                raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 328 | + | |
| 329 | +        return not skipped_remote
 | |
| 330 | + | |
| 270 | 331 |      def push(self, element, keys):
 | 
| 271 | -        refs = [self.get_artifact_fullname(element, key) for key in keys]
 | |
| 272 | 332 |  | 
| 273 | 333 |          project = element._get_project()
 | 
| 274 | 334 |  | 
| 335 | +        refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
 | |
| 275 | 336 |          push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | 
| 276 | 337 |  | 
| 277 | 338 |          pushed = False
 | 
| 278 | - | |
| 279 | 339 |          for remote in push_remotes:
 | 
| 280 | 340 |              remote.init()
 | 
| 281 | -            skipped_remote = True
 | |
| 282 | -            element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
 | |
| 283 | - | |
| 284 | -            try:
 | |
| 285 | -                for ref in refs:
 | |
| 286 | -                    tree = self.resolve_ref(ref)
 | |
| 287 | - | |
| 288 | -                    # Check whether ref is already on the server in which case
 | |
| 289 | -                    # there is no need to push the artifact
 | |
| 290 | -                    try:
 | |
| 291 | -                        request = buildstream_pb2.GetReferenceRequest()
 | |
| 292 | -                        request.key = ref
 | |
| 293 | -                        response = remote.ref_storage.GetReference(request)
 | |
| 294 | - | |
| 295 | -                        if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
 | |
| 296 | -                            # ref is already on the server with the same tree
 | |
| 297 | -                            continue
 | |
| 298 | - | |
| 299 | -                    except grpc.RpcError as e:
 | |
| 300 | -                        if e.code() != grpc.StatusCode.NOT_FOUND:
 | |
| 301 | -                            # Intentionally re-raise RpcError for outer except block.
 | |
| 302 | -                            raise
 | |
| 303 | - | |
| 304 | -                    missing_blobs = {}
 | |
| 305 | -                    required_blobs = self._required_blobs(tree)
 | |
| 306 | - | |
| 307 | -                    # Limit size of FindMissingBlobs request
 | |
| 308 | -                    for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 309 | -                        request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 310 | - | |
| 311 | -                        for required_digest in required_blobs_group:
 | |
| 312 | -                            d = request.blob_digests.add()
 | |
| 313 | -                            d.hash = required_digest.hash
 | |
| 314 | -                            d.size_bytes = required_digest.size_bytes
 | |
| 315 | - | |
| 316 | -                        response = remote.cas.FindMissingBlobs(request)
 | |
| 317 | -                        for digest in response.missing_blob_digests:
 | |
| 318 | -                            d = remote_execution_pb2.Digest()
 | |
| 319 | -                            d.hash = digest.hash
 | |
| 320 | -                            d.size_bytes = digest.size_bytes
 | |
| 321 | -                            missing_blobs[d.hash] = d
 | |
| 322 | - | |
| 323 | -                    # Upload any blobs missing on the server
 | |
| 324 | -                    skipped_remote = False
 | |
| 325 | -                    for digest in missing_blobs.values():
 | |
| 326 | -                        uuid_ = uuid.uuid4()
 | |
| 327 | -                        resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
 | |
| 328 | -                                                  digest.hash, str(digest.size_bytes)])
 | |
| 329 | - | |
| 330 | -                        def request_stream(resname):
 | |
| 331 | -                            with open(self.objpath(digest), 'rb') as f:
 | |
| 332 | -                                assert os.fstat(f.fileno()).st_size == digest.size_bytes
 | |
| 333 | -                                offset = 0
 | |
| 334 | -                                finished = False
 | |
| 335 | -                                remaining = digest.size_bytes
 | |
| 336 | -                                while not finished:
 | |
| 337 | -                                    chunk_size = min(remaining, 64 * 1024)
 | |
| 338 | -                                    remaining -= chunk_size
 | |
| 339 | - | |
| 340 | -                                    request = bytestream_pb2.WriteRequest()
 | |
| 341 | -                                    request.write_offset = offset
 | |
| 342 | -                                    # max. 64 kB chunks
 | |
| 343 | -                                    request.data = f.read(chunk_size)
 | |
| 344 | -                                    request.resource_name = resname
 | |
| 345 | -                                    request.finish_write = remaining <= 0
 | |
| 346 | -                                    yield request
 | |
| 347 | -                                    offset += chunk_size
 | |
| 348 | -                                    finished = request.finish_write
 | |
| 349 | -                        response = remote.bytestream.Write(request_stream(resource_name))
 | |
| 350 | - | |
| 351 | -                    request = buildstream_pb2.UpdateReferenceRequest()
 | |
| 352 | -                    request.keys.append(ref)
 | |
| 353 | -                    request.digest.hash = tree.hash
 | |
| 354 | -                    request.digest.size_bytes = tree.size_bytes
 | |
| 355 | -                    remote.ref_storage.UpdateReference(request)
 | |
| 356 | - | |
| 357 | -                    pushed = True
 | |
| 358 | - | |
| 359 | -            except grpc.RpcError as e:
 | |
| 360 | -                if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
 | |
| 361 | -                    raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
 | |
| 362 | 341 |  | 
| 363 | -            if skipped_remote:
 | |
| 342 | +            if element:
 | |
| 343 | +                element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
 | |
| 344 | +            if self._push_refs_to_remote(refs, remote):
 | |
| 345 | +                pushed = True
 | |
| 346 | +            elif element:
 | |
| 364 | 347 |                  self.context.message(Message(
 | 
| 365 | 348 |                      None,
 | 
| 366 | 349 |                      MessageType.SKIPPED,
 | 
| 367 | 350 |                      "Remote ({}) already has {} cached".format(
 | 
| 368 | 351 |                          remote.spec.url, element._get_brief_display_key())
 | 
| 369 | 352 |                  ))
 | 
| 353 | + | |
| 354 | +        return pushed
 | |
| 355 | + | |
| 356 | +    def push_directory(self, project, directory):
 | |
| 357 | + | |
| 358 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 359 | + | |
| 360 | +        if directory.ref is None:
 | |
| 361 | +            return None
 | |
| 362 | + | |
| 363 | +        for remote in push_remotes:
 | |
| 364 | +            remote.init()
 | |
| 365 | + | |
| 366 | +            self._send_directory(remote, directory.ref)
 | |
| 367 | + | |
| 368 | +        return directory.ref
 | |
| 369 | + | |
| 370 | +    def push_message(self, project, message):
 | |
| 371 | + | |
| 372 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 373 | + | |
| 374 | +        message_buffer = message.SerializeToString()
 | |
| 375 | +        message_sha = hashlib.sha256(message_buffer)
 | |
| 376 | +        message_digest = remote_execution_pb2.Digest()
 | |
| 377 | +        message_digest.hash = message_sha.hexdigest()
 | |
| 378 | +        message_digest.size_bytes = len(message_buffer)
 | |
| 379 | + | |
| 380 | +        for remote in push_remotes:
 | |
| 381 | +            remote.init()
 | |
| 382 | + | |
| 383 | +            with io.BytesIO(message_buffer) as b:
 | |
| 384 | +                self._send_blob(remote, message_digest, b)
 | |
| 385 | + | |
| 386 | +        return message_digest
 | |
| 387 | + | |
| 388 | +    def _verify_digest_on_remote(self, remote, digest):
 | |
| 389 | +        # Check whether ref is already on the server in which case
 | |
| 390 | +        # there is no need to push the artifact
 | |
| 391 | +        request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 392 | +        request.blob_digests.extend([digest])
 | |
| 393 | + | |
| 394 | +        response = remote.cas.FindMissingBlobs(request)
 | |
| 395 | +        if digest in response.missing_blob_digests:
 | |
| 396 | +            return False
 | |
| 397 | + | |
| 398 | +        return True
 | |
| 399 | + | |
| 400 | +    def verify_key_pushed(self, project, digest):
 | |
| 401 | + | |
| 402 | +        push_remotes = [r for r in self._remotes[project] if r.spec.push]
 | |
| 403 | + | |
| 404 | +        pushed = False
 | |
| 405 | + | |
| 406 | +        for remote in push_remotes:
 | |
| 407 | +            remote.init()
 | |
| 408 | + | |
| 409 | +            if self._verify_digest_on_remote(remote, digest):
 | |
| 410 | +                pushed = True
 | |
| 411 | + | |
| 370 | 412 |          return pushed
 | 
| 371 | 413 |  | 
| 372 | 414 |      ################################################
 | 
| ... | ... | @@ -599,6 +641,7 @@ class CASCache(ArtifactCache): | 
| 599 | 641 |      ################################################
 | 
| 600 | 642 |      #             Local Private Methods            #
 | 
| 601 | 643 |      ################################################
 | 
| 644 | + | |
| 602 | 645 |      def _checkout(self, dest, tree):
 | 
| 603 | 646 |          os.makedirs(dest, exist_ok=True)
 | 
| 604 | 647 |  | 
| ... | ... | @@ -782,16 +825,16 @@ class CASCache(ArtifactCache): | 
| 782 | 825 |          for dirnode in directory.directories:
 | 
| 783 | 826 |              yield from self._required_blobs(dirnode.digest)
 | 
| 784 | 827 |  | 
| 785 | -    def _fetch_blob(self, remote, digest, out):
 | |
| 828 | +    def _fetch_blob(self, remote, digest, stream):
 | |
| 786 | 829 |          resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
 | 
| 787 | 830 |          request = bytestream_pb2.ReadRequest()
 | 
| 788 | 831 |          request.resource_name = resource_name
 | 
| 789 | 832 |          request.read_offset = 0
 | 
| 790 | 833 |          for response in remote.bytestream.Read(request):
 | 
| 791 | -            out.write(response.data)
 | |
| 834 | +            stream.write(response.data)
 | |
| 835 | +        stream.flush()
 | |
| 792 | 836 |  | 
| 793 | -        out.flush()
 | |
| 794 | -        assert digest.size_bytes == os.fstat(out.fileno()).st_size
 | |
| 837 | +        assert digest.size_bytes == os.fstat(stream.fileno()).st_size
 | |
| 795 | 838 |  | 
| 796 | 839 |      def _fetch_directory(self, remote, tree):
 | 
| 797 | 840 |          objpath = self.objpath(tree)
 | 
| ... | ... | @@ -827,6 +870,92 @@ class CASCache(ArtifactCache): | 
| 827 | 870 |              digest = self.add_object(path=out.name)
 | 
| 828 | 871 |              assert digest.hash == tree.hash
 | 
| 829 | 872 |  | 
| 873 | +    def _fetch_tree(self, remote, digest):
 | |
| 874 | +        # download but do not store the Tree object
 | |
| 875 | +        with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | |
| 876 | +            self._fetch_blob(remote, digest, out)
 | |
| 877 | + | |
| 878 | +            tree = remote_execution_pb2.Tree()
 | |
| 879 | + | |
| 880 | +            with open(out.name, 'rb') as f:
 | |
| 881 | +                tree.ParseFromString(f.read())
 | |
| 882 | + | |
| 883 | +            tree.children.extend([tree.root])
 | |
| 884 | +            for directory in tree.children:
 | |
| 885 | +                for filenode in directory.files:
 | |
| 886 | +                    fileobjpath = self.objpath(filenode.digest)
 | |
| 887 | +                    if os.path.exists(fileobjpath):
 | |
| 888 | +                        # already in local cache
 | |
| 889 | +                        continue
 | |
| 890 | + | |
| 891 | +                    with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | |
| 892 | +                        self._fetch_blob(remote, filenode.digest, f)
 | |
| 893 | + | |
| 894 | +                        added_digest = self.add_object(path=f.name)
 | |
| 895 | +                        assert added_digest.hash == filenode.digest.hash
 | |
| 896 | + | |
| 897 | +                # place directory blob only in final location when we've downloaded
 | |
| 898 | +                # all referenced blobs to avoid dangling references in the repository
 | |
| 899 | +                dirbuffer = directory.SerializeToString()
 | |
| 900 | +                dirdigest = self.add_object(buffer=dirbuffer)
 | |
| 901 | +                assert dirdigest.size_bytes == len(dirbuffer)
 | |
| 902 | + | |
| 903 | +        return dirdigest
 | |
| 904 | + | |
| 905 | +    def _send_blob(self, remote, digest, stream):
 | |
| 906 | +        resource_name = '/'.join(['uploads', self._uuid, 'blobs',
 | |
| 907 | +                              digest.hash, str(digest.size_bytes)])
 | |
| 908 | + | |
| 909 | +        def request_stream(resname, instream):
 | |
| 910 | +            offset = 0
 | |
| 911 | +            finished = False
 | |
| 912 | +            remaining = digest.size_bytes
 | |
| 913 | +            while not finished:
 | |
| 914 | +                chunk_size = min(remaining, 64 * 1024)
 | |
| 915 | +                remaining -= chunk_size
 | |
| 916 | + | |
| 917 | +                request = bytestream_pb2.WriteRequest()
 | |
| 918 | +                request.write_offset = offset
 | |
| 919 | +                # max. 64 kB chunks
 | |
| 920 | +                request.data = instream.read(chunk_size)
 | |
| 921 | +                request.resource_name = resname
 | |
| 922 | +                request.finish_write = remaining <= 0
 | |
| 923 | + | |
| 924 | +                yield request
 | |
| 925 | + | |
| 926 | +                offset += chunk_size
 | |
| 927 | +                finished = request.finish_write
 | |
| 928 | + | |
| 929 | +        response = remote.bytestream.Write(request_stream(resource_name, stream))
 | |
| 930 | + | |
| 931 | +        assert response.committed_size == digest.size_bytes
 | |
| 932 | + | |
| 933 | +    def _send_directory(self, remote, digest):
 | |
| 934 | +        required_blobs = self._required_blobs(digest)
 | |
| 935 | + | |
| 936 | +        missing_blobs = dict()
 | |
| 937 | +        # Limit size of FindMissingBlobs request
 | |
| 938 | +        for required_blobs_group in _grouper(required_blobs, 512):
 | |
| 939 | +            request = remote_execution_pb2.FindMissingBlobsRequest()
 | |
| 940 | + | |
| 941 | +            for required_digest in required_blobs_group:
 | |
| 942 | +                d = request.blob_digests.add()
 | |
| 943 | +                d.hash = required_digest.hash
 | |
| 944 | +                d.size_bytes = required_digest.size_bytes
 | |
| 945 | + | |
| 946 | +            response = remote.cas.FindMissingBlobs(request)
 | |
| 947 | +            for digest in response.missing_blob_digests:
 | |
| 948 | +                d = remote_execution_pb2.Digest()
 | |
| 949 | +                d.hash = digest.hash
 | |
| 950 | +                d.size_bytes = digest.size_bytes
 | |
| 951 | +                missing_blobs[d.hash] = d
 | |
| 952 | + | |
| 953 | +        # Upload any blobs missing on the server
 | |
| 954 | +        for blob_digest in missing_blobs.values():
 | |
| 955 | +            with open(self.objpath(blob_digest), 'rb') as f:
 | |
| 956 | +                assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
 | |
| 957 | +                self._send_blob(remote, blob_digest, f)
 | |
| 958 | + | |
| 830 | 959 |  | 
| 831 | 960 |  # Represents a single remote CAS cache.
 | 
| 832 | 961 |  #
 | 
| ... | ... | @@ -129,6 +129,7 @@ class Project(): | 
| 129 | 129 |  | 
| 130 | 130 |          self.artifact_cache_specs = None
 | 
| 131 | 131 |          self._sandbox = None
 | 
| 132 | +        self._remote_execution = None
 | |
| 132 | 133 |          self._splits = None
 | 
| 133 | 134 |  | 
| 134 | 135 |          self._context.add_project(self)
 | 
| ... | ... | @@ -398,6 +399,17 @@ class Project(): | 
| 398 | 399 |                  "Project requested format version {}, but BuildStream {}.{} only supports up until format version {}"
 | 
| 399 | 400 |                  .format(format_version, major, minor, BST_FORMAT_VERSION))
 | 
| 400 | 401 |  | 
| 402 | +        # FIXME:
 | |
| 403 | +        #
 | |
| 404 | +        #   Performing this check manually in the absense
 | |
| 405 | +        #   of proper support from _yaml.node_get(), this should
 | |
| 406 | +        #   be removed in favor of a proper accessor function
 | |
| 407 | +        #   from the _yaml module when #591 is fixed.
 | |
| 408 | +        #
 | |
| 409 | +        if self._project_conf.get('name') is None:
 | |
| 410 | +            raise LoadError(LoadErrorReason.INVALID_DATA,
 | |
| 411 | +                            "{}: project.conf does not contain expected key '{}'".format(projectfile, 'name'))
 | |
| 412 | + | |
| 401 | 413 |          # The project name, element path and option declarations
 | 
| 402 | 414 |          # are constant and cannot be overridden by option conditional statements
 | 
| 403 | 415 |          self.name = _yaml.node_get(pre_config_node, str, 'name')
 | 
| ... | ... | @@ -460,7 +472,7 @@ class Project(): | 
| 460 | 472 |              'aliases', 'name',
 | 
| 461 | 473 |              'artifacts', 'options',
 | 
| 462 | 474 |              'fail-on-overlap', 'shell', 'fatal-warnings',
 | 
| 463 | -            'ref-storage', 'sandbox', 'mirrors'
 | |
| 475 | +            'ref-storage', 'sandbox', 'mirrors', 'remote-execution'
 | |
| 464 | 476 |          ])
 | 
| 465 | 477 |  | 
| 466 | 478 |          #
 | 
| ... | ... | @@ -478,6 +490,9 @@ class Project(): | 
| 478 | 490 |          # Load sandbox configuration
 | 
| 479 | 491 |          self._sandbox = _yaml.node_get(config, Mapping, 'sandbox')
 | 
| 480 | 492 |  | 
| 493 | +        # Load remote execution configuration
 | |
| 494 | +        self._remote_execution = _yaml.node_get(config, Mapping, 'remote-execution')
 | |
| 495 | + | |
| 481 | 496 |          # Load project split rules
 | 
| 482 | 497 |          self._splits = _yaml.node_get(config, Mapping, 'split-rules')
 | 
| 483 | 498 |  | 
| ... | ... | @@ -403,7 +403,7 @@ class Job(): | 
| 403 | 403 |                  if self._retry_flag and (self._tries <= self._max_retries):
 | 
| 404 | 404 |                      self.message(MessageType.FAIL,
 | 
| 405 | 405 |                                   "Try #{} failed, retrying".format(self._tries),
 | 
| 406 | -                                 elapsed=elapsed)
 | |
| 406 | +                                 elapsed=elapsed, logfile=filename)
 | |
| 407 | 407 |                  else:
 | 
| 408 | 408 |                      self.message(MessageType.FAIL, str(e),
 | 
| 409 | 409 |                                   elapsed=elapsed, detail=e.detail,
 | 
| ... | ... | @@ -430,7 +430,8 @@ class Job(): | 
| 430 | 430 |                  self.message(MessageType.BUG, self.action_name,
 | 
| 431 | 431 |                               elapsed=elapsed, detail=detail,
 | 
| 432 | 432 |                               logfile=filename)
 | 
| 433 | -                self._child_shutdown(RC_FAIL)
 | |
| 433 | +                # Unhandled exceptions should permenantly fail
 | |
| 434 | +                self._child_shutdown(RC_PERM_FAIL)
 | |
| 434 | 435 |  | 
| 435 | 436 |              else:
 | 
| 436 | 437 |                  # No exception occurred in the action
 | 
| ... | ... | @@ -509,11 +510,6 @@ class Job(): | 
| 509 | 510 |          message.action_name = self.action_name
 | 
| 510 | 511 |          message.task_id = self._task_id
 | 
| 511 | 512 |  | 
| 512 | -        if (message.message_type == MessageType.FAIL and
 | |
| 513 | -                self._tries <= self._max_retries and self._retry_flag):
 | |
| 514 | -            # Job will be retried, display failures as warnings in the frontend
 | |
| 515 | -            message.message_type = MessageType.WARN
 | |
| 516 | - | |
| 517 | 513 |          # Send to frontend if appropriate
 | 
| 518 | 514 |          if context.silent_messages() and (message.message_type not in unconditional_messages):
 | 
| 519 | 515 |              return
 | 
| ... | ... | @@ -329,7 +329,7 @@ class Scheduler(): | 
| 329 | 329 |          self.schedule_jobs([job])
 | 
| 330 | 330 |  | 
| 331 | 331 |      def _check_cache_size_real(self):
 | 
| 332 | -        job = CacheSizeJob(self, 'cache_size', 'cache_size',
 | |
| 332 | +        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
 | |
| 333 | 333 |                             resources=[ResourceType.CACHE,
 | 
| 334 | 334 |                                        ResourceType.PROCESS],
 | 
| 335 | 335 |                             exclusive_resources=[ResourceType.CACHE],
 | 
| ... | ... | @@ -155,6 +155,9 @@ class BuildElement(Element): | 
| 155 | 155 |              command_dir = build_root
 | 
| 156 | 156 |          sandbox.set_work_directory(command_dir)
 | 
| 157 | 157 |  | 
| 158 | +        # Tell sandbox which directory is preserved in the finished artifact
 | |
| 159 | +        sandbox.set_output_directory(install_root)
 | |
| 160 | + | |
| 158 | 161 |          # Setup environment
 | 
| 159 | 162 |          sandbox.set_environment(self.get_environment())
 | 
| 160 | 163 |  | 
| ... | ... | @@ -204,3 +204,6 @@ shell: | 
| 204 | 204 |    # Command to run when `bst shell` does not provide a command
 | 
| 205 | 205 |    #
 | 
| 206 | 206 |    command: [ 'sh', '-i' ]
 | 
| 207 | + | |
| 208 | +remote-execution:
 | |
| 209 | +  url: "" | |
| \ No newline at end of file | 
| ... | ... | @@ -95,6 +95,7 @@ from . import _site | 
| 95 | 95 |  from ._platform import Platform
 | 
| 96 | 96 |  from .plugin import CoreWarnings
 | 
| 97 | 97 |  from .sandbox._config import SandboxConfig
 | 
| 98 | +from .sandbox._sandboxremote import SandboxRemote
 | |
| 98 | 99 |  | 
| 99 | 100 |  from .storage.directory import Directory
 | 
| 100 | 101 |  from .storage._filebaseddirectory import FileBasedDirectory
 | 
| ... | ... | @@ -245,11 +246,14 @@ class Element(Plugin): | 
| 245 | 246 |          # Collect the composited element configuration and
 | 
| 246 | 247 |          # ask the element to configure itself.
 | 
| 247 | 248 |          self.__config = self.__extract_config(meta)
 | 
| 248 | -        self.configure(self.__config)
 | |
| 249 | +        self._configure(self.__config)
 | |
| 249 | 250 |  | 
| 250 | 251 |          # Extract Sandbox config
 | 
| 251 | 252 |          self.__sandbox_config = self.__extract_sandbox_config(meta)
 | 
| 252 | 253 |  | 
| 254 | +        # Extract remote execution URL
 | |
| 255 | +        self.__remote_execution_url = self.__extract_remote_execution_config(meta)
 | |
| 256 | + | |
| 253 | 257 |      def __lt__(self, other):
 | 
| 254 | 258 |          return self.name < other.name
 | 
| 255 | 259 |  | 
| ... | ... | @@ -1570,6 +1574,8 @@ class Element(Plugin): | 
| 1570 | 1574 |                  finally:
 | 
| 1571 | 1575 |                      if collect is not None:
 | 
| 1572 | 1576 |                          try:
 | 
| 1577 | +                            # Sandbox will probably have replaced its virtual directory, so get it again
 | |
| 1578 | +                            sandbox_vroot = sandbox.get_virtual_directory()
 | |
| 1573 | 1579 |                              collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep))
 | 
| 1574 | 1580 |                          except VirtualDirectoryError:
 | 
| 1575 | 1581 |                              # No collect directory existed
 | 
| ... | ... | @@ -2146,7 +2152,32 @@ class Element(Plugin): | 
| 2146 | 2152 |          project = self._get_project()
 | 
| 2147 | 2153 |          platform = Platform.get_platform()
 | 
| 2148 | 2154 |  | 
| 2149 | -        if directory is not None and os.path.exists(directory):
 | |
| 2155 | +        if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY:
 | |
| 2156 | +            if not self.__artifacts.has_push_remotes(element=self):
 | |
| 2157 | +                # Give an early warning if remote execution will not work
 | |
| 2158 | +                raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. "
 | |
| 2159 | +                                   .format(self.name) +
 | |
| 2160 | +                                   "The remote artifact server(s) may not be correctly configured or contactable.")
 | |
| 2161 | + | |
| 2162 | +            self.info("Using a remote sandbox for artifact {}".format(self.name))
 | |
| 2163 | + | |
| 2164 | +            sandbox = SandboxRemote(context, project,
 | |
| 2165 | +                                    directory,
 | |
| 2166 | +                                    stdout=stdout,
 | |
| 2167 | +                                    stderr=stderr,
 | |
| 2168 | +                                    config=config,
 | |
| 2169 | +                                    server_url=self.__remote_execution_url,
 | |
| 2170 | +                                    allow_real_directory=False)
 | |
| 2171 | +            yield sandbox
 | |
| 2172 | + | |
| 2173 | +        elif directory is not None and os.path.exists(directory):
 | |
| 2174 | +            if self.__remote_execution_url:
 | |
| 2175 | +                self.warn("Artifact {} is configured to use remote execution but element plugin does not support it."
 | |
| 2176 | +                          .format(self.name), detail="Element plugin '{kind}' does not support virtual directories."
 | |
| 2177 | +                          .format(kind=self.get_kind()), warning_token="remote-failure")
 | |
| 2178 | + | |
| 2179 | +                self.info("Falling back to local sandbox for artifact {}".format(self.name))
 | |
| 2180 | + | |
| 2150 | 2181 |              sandbox = platform.create_sandbox(context, project,
 | 
| 2151 | 2182 |                                                directory,
 | 
| 2152 | 2183 |                                                stdout=stdout,
 | 
| ... | ... | @@ -2318,6 +2349,20 @@ class Element(Plugin): | 
| 2318 | 2349 |          return SandboxConfig(self.node_get_member(sandbox_config, int, 'build-uid'),
 | 
| 2319 | 2350 |                               self.node_get_member(sandbox_config, int, 'build-gid'))
 | 
| 2320 | 2351 |  | 
| 2352 | +    # Remote execution configuration data (server URL), to be used by the remote sandbox.
 | |
| 2353 | +    #
 | |
| 2354 | +    def __extract_remote_execution_config(self, meta):
 | |
| 2355 | +        if self.__is_junction:
 | |
| 2356 | +            return None
 | |
| 2357 | +        else:
 | |
| 2358 | +            project = self._get_project()
 | |
| 2359 | +            project.ensure_fully_loaded()
 | |
| 2360 | +            if project._remote_execution:
 | |
| 2361 | +                rexec_config = _yaml.node_chain_copy(project._remote_execution)
 | |
| 2362 | +                return self.node_get_member(rexec_config, str, 'url')
 | |
| 2363 | +            else:
 | |
| 2364 | +                return None
 | |
| 2365 | + | |
| 2321 | 2366 |      # This makes a special exception for the split rules, which
 | 
| 2322 | 2367 |      # elements may extend but whos defaults are defined in the project.
 | 
| 2323 | 2368 |      #
 | 
| ... | ... | @@ -179,6 +179,7 @@ class Plugin(): | 
| 179 | 179 |          self.__provenance = provenance  # The Provenance information
 | 
| 180 | 180 |          self.__type_tag = type_tag      # The type of plugin (element or source)
 | 
| 181 | 181 |          self.__unique_id = _plugin_register(self)  # Unique ID
 | 
| 182 | +        self.__configuring = False      # Whether we are currently configuring
 | |
| 182 | 183 |  | 
| 183 | 184 |          # Infer the kind identifier
 | 
| 184 | 185 |          modulename = type(self).__module__
 | 
| ... | ... | @@ -682,7 +683,32 @@ class Plugin(): | 
| 682 | 683 |          else:
 | 
| 683 | 684 |              yield log
 | 
| 684 | 685 |  | 
| 686 | +    # _configure():
 | |
| 687 | +    #
 | |
| 688 | +    # Calls configure() for the plugin, this must be called by
 | |
| 689 | +    # the core instead of configure() directly, so that the
 | |
| 690 | +    # _get_configuring() state is up to date.
 | |
| 691 | +    #
 | |
| 692 | +    # Args:
 | |
| 693 | +    #    node (dict): The loaded configuration dictionary
 | |
| 694 | +    #
 | |
| 695 | +    def _configure(self, node):
 | |
| 696 | +        self.__configuring = True
 | |
| 697 | +        self.configure(node)
 | |
| 698 | +        self.__configuring = False
 | |
| 699 | + | |
| 700 | +    # _get_configuring():
 | |
| 701 | +    #
 | |
| 702 | +    # Checks whether the plugin is in the middle of having
 | |
| 703 | +    # its Plugin.configure() method called
 | |
| 704 | +    #
 | |
| 705 | +    # Returns:
 | |
| 706 | +    #    (bool): Whether we are currently configuring
 | |
| 707 | +    def _get_configuring(self):
 | |
| 708 | +        return self.__configuring
 | |
| 709 | + | |
| 685 | 710 |      # _preflight():
 | 
| 711 | +    #
 | |
| 686 | 712 |      # Calls preflight() for the plugin, and allows generic preflight
 | 
| 687 | 713 |      # checks to be added
 | 
| 688 | 714 |      #
 | 
| ... | ... | @@ -690,6 +716,7 @@ class Plugin(): | 
| 690 | 716 |      #    SourceError: If it's a Source implementation
 | 
| 691 | 717 |      #    ElementError: If it's an Element implementation
 | 
| 692 | 718 |      #    ProgramNotFoundError: If a required host tool is not found
 | 
| 719 | +    #
 | |
| 693 | 720 |      def _preflight(self):
 | 
| 694 | 721 |          self.preflight()
 | 
| 695 | 722 |  | 
| ... | ... | @@ -57,7 +57,7 @@ from buildstream import BuildElement | 
| 57 | 57 |  | 
| 58 | 58 |  # Element implementation for the 'autotools' kind.
 | 
| 59 | 59 |  class AutotoolsElement(BuildElement):
 | 
| 60 | -    pass
 | |
| 60 | +    BST_VIRTUAL_DIRECTORY = True
 | |
| 61 | 61 |  | 
| 62 | 62 |  | 
| 63 | 63 |  # Plugin entry point
 | 
| ... | ... | @@ -74,6 +74,9 @@ This plugin provides the following configurable warnings: | 
| 74 | 74 |  | 
| 75 | 75 |  - 'git:inconsistent-submodule' - A submodule was found to be missing from the underlying git repository.
 | 
| 76 | 76 |  | 
| 77 | +This plugin also utilises the following configurable core plugin warnings:
 | |
| 78 | + | |
| 79 | +- 'ref-not-in-track' - The provided ref was not found in the provided track in the element's git repository.
 | |
| 77 | 80 |  """
 | 
| 78 | 81 |  | 
| 79 | 82 |  import os
 | 
| ... | ... | @@ -87,6 +90,7 @@ from configparser import RawConfigParser | 
| 87 | 90 |  | 
| 88 | 91 |  from buildstream import Source, SourceError, Consistency, SourceFetcher
 | 
| 89 | 92 |  from buildstream import utils
 | 
| 93 | +from buildstream.plugin import CoreWarnings
 | |
| 90 | 94 |  | 
| 91 | 95 |  GIT_MODULES = '.gitmodules'
 | 
| 92 | 96 |  | 
| ... | ... | @@ -100,13 +104,14 @@ INCONSISTENT_SUBMODULE = "inconsistent-submodules" | 
| 100 | 104 |  #
 | 
| 101 | 105 |  class GitMirror(SourceFetcher):
 | 
| 102 | 106 |  | 
| 103 | -    def __init__(self, source, path, url, ref):
 | |
| 107 | +    def __init__(self, source, path, url, ref, *, primary=False):
 | |
| 104 | 108 |  | 
| 105 | 109 |          super().__init__()
 | 
| 106 | 110 |          self.source = source
 | 
| 107 | 111 |          self.path = path
 | 
| 108 | 112 |          self.url = url
 | 
| 109 | 113 |          self.ref = ref
 | 
| 114 | +        self.primary = primary
 | |
| 110 | 115 |          self.mirror = os.path.join(source.get_mirror_directory(), utils.url_directory_name(url))
 | 
| 111 | 116 |          self.mark_download_url(url)
 | 
| 112 | 117 |  | 
| ... | ... | @@ -124,7 +129,8 @@ class GitMirror(SourceFetcher): | 
| 124 | 129 |              # system configured tmpdir is not on the same partition.
 | 
| 125 | 130 |              #
 | 
| 126 | 131 |              with self.source.tempdir() as tmpdir:
 | 
| 127 | -                url = self.source.translate_url(self.url, alias_override=alias_override)
 | |
| 132 | +                url = self.source.translate_url(self.url, alias_override=alias_override,
 | |
| 133 | +                                                primary=self.primary)
 | |
| 128 | 134 |                  self.source.call([self.source.host_git, 'clone', '--mirror', '-n', url, tmpdir],
 | 
| 129 | 135 |                                   fail="Failed to clone git repository {}".format(url),
 | 
| 130 | 136 |                                   fail_temporarily=True)
 | 
| ... | ... | @@ -146,7 +152,9 @@ class GitMirror(SourceFetcher): | 
| 146 | 152 |                                            .format(self.source, url, tmpdir, self.mirror, e)) from e
 | 
| 147 | 153 |  | 
| 148 | 154 |      def _fetch(self, alias_override=None):
 | 
| 149 | -        url = self.source.translate_url(self.url, alias_override=alias_override)
 | |
| 155 | +        url = self.source.translate_url(self.url,
 | |
| 156 | +                                        alias_override=alias_override,
 | |
| 157 | +                                        primary=self.primary)
 | |
| 150 | 158 |  | 
| 151 | 159 |          if alias_override:
 | 
| 152 | 160 |              remote_name = utils.url_directory_name(alias_override)
 | 
| ... | ... | @@ -199,7 +207,7 @@ class GitMirror(SourceFetcher): | 
| 199 | 207 |              cwd=self.mirror)
 | 
| 200 | 208 |          return output.rstrip('\n')
 | 
| 201 | 209 |  | 
| 202 | -    def stage(self, directory):
 | |
| 210 | +    def stage(self, directory, track=None):
 | |
| 203 | 211 |          fullpath = os.path.join(directory, self.path)
 | 
| 204 | 212 |  | 
| 205 | 213 |          # Using --shared here avoids copying the objects into the checkout, in any
 | 
| ... | ... | @@ -213,10 +221,14 @@ class GitMirror(SourceFetcher): | 
| 213 | 221 |                           fail="Failed to checkout git ref {}".format(self.ref),
 | 
| 214 | 222 |                           cwd=fullpath)
 | 
| 215 | 223 |  | 
| 224 | +        # Check that the user specified ref exists in the track if provided & not already tracked
 | |
| 225 | +        if track:
 | |
| 226 | +            self.assert_ref_in_track(fullpath, track)
 | |
| 227 | + | |
| 216 | 228 |          # Remove .git dir
 | 
| 217 | 229 |          shutil.rmtree(os.path.join(fullpath, ".git"))
 | 
| 218 | 230 |  | 
| 219 | -    def init_workspace(self, directory):
 | |
| 231 | +    def init_workspace(self, directory, track=None):
 | |
| 220 | 232 |          fullpath = os.path.join(directory, self.path)
 | 
| 221 | 233 |          url = self.source.translate_url(self.url)
 | 
| 222 | 234 |  | 
| ... | ... | @@ -232,6 +244,10 @@ class GitMirror(SourceFetcher): | 
| 232 | 244 |                           fail="Failed to checkout git ref {}".format(self.ref),
 | 
| 233 | 245 |                           cwd=fullpath)
 | 
| 234 | 246 |  | 
| 247 | +        # Check that the user specified ref exists in the track if provided & not already tracked
 | |
| 248 | +        if track:
 | |
| 249 | +            self.assert_ref_in_track(fullpath, track)
 | |
| 250 | + | |
| 235 | 251 |      # List the submodules (path/url tuples) present at the given ref of this repo
 | 
| 236 | 252 |      def submodule_list(self):
 | 
| 237 | 253 |          modules = "{}:{}".format(self.ref, GIT_MODULES)
 | 
| ... | ... | @@ -296,6 +312,28 @@ class GitMirror(SourceFetcher): | 
| 296 | 312 |  | 
| 297 | 313 |              return None
 | 
| 298 | 314 |  | 
| 315 | +    # Assert that ref exists in track, if track has been specified.
 | |
| 316 | +    def assert_ref_in_track(self, fullpath, track):
 | |
| 317 | +        _, branch = self.source.check_output([self.source.host_git, 'branch', '--list', track,
 | |
| 318 | +                                              '--contains', self.ref],
 | |
| 319 | +                                             cwd=fullpath,)
 | |
| 320 | +        if branch:
 | |
| 321 | +            return True
 | |
| 322 | +        else:
 | |
| 323 | +            _, tag = self.source.check_output([self.source.host_git, 'tag', '--list', track,
 | |
| 324 | +                                               '--contains', self.ref],
 | |
| 325 | +                                              cwd=fullpath,)
 | |
| 326 | +            if tag:
 | |
| 327 | +                return True
 | |
| 328 | + | |
| 329 | +        detail = "The ref provided for the element does not exist locally in the provided track branch / tag " + \
 | |
| 330 | +                 "'{}'.\nYou may wish to track the element to update the ref from '{}' ".format(track, track) + \
 | |
| 331 | +                 "with `bst track`,\nor examine the upstream at '{}' for the specific ref.".format(self.url)
 | |
| 332 | + | |
| 333 | +        self.source.warn("{}: expected ref '{}' was not found in given track '{}' for staged repository: '{}'\n"
 | |
| 334 | +                         .format(self.source, self.ref, track, self.url),
 | |
| 335 | +                         detail=detail, warning_token=CoreWarnings.REF_NOT_IN_TRACK)
 | |
| 336 | + | |
| 299 | 337 |  | 
| 300 | 338 |  class GitSource(Source):
 | 
| 301 | 339 |      # pylint: disable=attribute-defined-outside-init
 | 
| ... | ... | @@ -307,7 +345,7 @@ class GitSource(Source): | 
| 307 | 345 |          self.node_validate(node, config_keys + Source.COMMON_CONFIG_KEYS)
 | 
| 308 | 346 |  | 
| 309 | 347 |          self.original_url = self.node_get_member(node, str, 'url')
 | 
| 310 | -        self.mirror = GitMirror(self, '', self.original_url, ref)
 | |
| 348 | +        self.mirror = GitMirror(self, '', self.original_url, ref, primary=True)
 | |
| 311 | 349 |          self.tracking = self.node_get_member(node, str, 'track', None)
 | 
| 312 | 350 |  | 
| 313 | 351 |          # At this point we now know if the source has a ref and/or a track.
 | 
| ... | ... | @@ -327,12 +365,18 @@ class GitSource(Source): | 
| 327 | 365 |          for path, _ in self.node_items(modules):
 | 
| 328 | 366 |              submodule = self.node_get_member(modules, Mapping, path)
 | 
| 329 | 367 |              url = self.node_get_member(submodule, str, 'url', None)
 | 
| 368 | + | |
| 369 | +            # Make sure to mark all URLs that are specified in the configuration
 | |
| 370 | +            if url:
 | |
| 371 | +                self.mark_download_url(url, primary=False)
 | |
| 372 | + | |
| 330 | 373 |              self.submodule_overrides[path] = url
 | 
| 331 | 374 |              if 'checkout' in submodule:
 | 
| 332 | 375 |                  checkout = self.node_get_member(submodule, bool, 'checkout')
 | 
| 333 | 376 |                  self.submodule_checkout_overrides[path] = checkout
 | 
| 334 | 377 |  | 
| 335 | 378 |          self.mark_download_url(self.original_url)
 | 
| 379 | +        self.tracked = False
 | |
| 336 | 380 |  | 
| 337 | 381 |      def preflight(self):
 | 
| 338 | 382 |          # Check if git is installed, get the binary at the same time
 | 
| ... | ... | @@ -398,6 +442,8 @@ class GitSource(Source): | 
| 398 | 442 |              # Update self.mirror.ref and node.ref from the self.tracking branch
 | 
| 399 | 443 |              ret = self.mirror.latest_commit(self.tracking)
 | 
| 400 | 444 |  | 
| 445 | +        # Set tracked attribute, parameter for if self.mirror.assert_ref_in_track is needed
 | |
| 446 | +        self.tracked = True
 | |
| 401 | 447 |          return ret
 | 
| 402 | 448 |  | 
| 403 | 449 |      def init_workspace(self, directory):
 | 
| ... | ... | @@ -405,7 +451,7 @@ class GitSource(Source): | 
| 405 | 451 |          self.refresh_submodules()
 | 
| 406 | 452 |  | 
| 407 | 453 |          with self.timed_activity('Setting up workspace "{}"'.format(directory), silent_nested=True):
 | 
| 408 | -            self.mirror.init_workspace(directory)
 | |
| 454 | +            self.mirror.init_workspace(directory, track=(self.tracking if not self.tracked else None))
 | |
| 409 | 455 |              for mirror in self.submodules:
 | 
| 410 | 456 |                  mirror.init_workspace(directory)
 | 
| 411 | 457 |  | 
| ... | ... | @@ -421,7 +467,7 @@ class GitSource(Source): | 
| 421 | 467 |          # Stage the main repo in the specified directory
 | 
| 422 | 468 |          #
 | 
| 423 | 469 |          with self.timed_activity("Staging {}".format(self.mirror.url), silent_nested=True):
 | 
| 424 | -            self.mirror.stage(directory)
 | |
| 470 | +            self.mirror.stage(directory, track=(self.tracking if not self.tracked else None))
 | |
| 425 | 471 |              for mirror in self.submodules:
 | 
| 426 | 472 |                  if mirror.path in self.submodule_checkout_overrides:
 | 
| 427 | 473 |                      checkout = self.submodule_checkout_overrides[mirror.path]
 | 
| ... | ... | @@ -20,3 +20,4 @@ | 
| 20 | 20 |  from .sandbox import Sandbox, SandboxFlags
 | 
| 21 | 21 |  from ._sandboxchroot import SandboxChroot
 | 
| 22 | 22 |  from ._sandboxbwrap import SandboxBwrap
 | 
| 23 | +from ._sandboxremote import SandboxRemote | 
| 1 | +#!/usr/bin/env python3
 | |
| 2 | +#
 | |
| 3 | +#  Copyright (C) 2018 Bloomberg LP
 | |
| 4 | +#
 | |
| 5 | +#  This program is free software; you can redistribute it and/or
 | |
| 6 | +#  modify it under the terms of the GNU Lesser General Public
 | |
| 7 | +#  License as published by the Free Software Foundation; either
 | |
| 8 | +#  version 2 of the License, or (at your option) any later version.
 | |
| 9 | +#
 | |
| 10 | +#  This library is distributed in the hope that it will be useful,
 | |
| 11 | +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| 12 | +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
 | |
| 13 | +#  Lesser General Public License for more details.
 | |
| 14 | +#
 | |
| 15 | +#  You should have received a copy of the GNU Lesser General Public
 | |
| 16 | +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 17 | +#
 | |
| 18 | +#  Authors:
 | |
| 19 | +#        Jim MacArthur <jim macarthur codethink co uk>
 | |
| 20 | + | |
| 21 | +import os
 | |
| 22 | +import re
 | |
| 23 | +from urllib.parse import urlparse
 | |
| 24 | + | |
| 25 | +import grpc
 | |
| 26 | + | |
| 27 | +from . import Sandbox
 | |
| 28 | +from ..storage._filebaseddirectory import FileBasedDirectory
 | |
| 29 | +from ..storage._casbaseddirectory import CasBasedDirectory
 | |
| 30 | +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 | |
| 31 | +from .._artifactcache.cascache import CASCache
 | |
| 32 | + | |
| 33 | + | |
| 34 | +class SandboxError(Exception):
 | |
| 35 | +    pass
 | |
| 36 | + | |
| 37 | + | |
| 38 | +# SandboxRemote()
 | |
| 39 | +#
 | |
| 40 | +# This isn't really a sandbox, it's a stub which sends all the sources and build
 | |
| 41 | +# commands to a remote server and retrieves the results from it.
 | |
| 42 | +#
 | |
| 43 | +class SandboxRemote(Sandbox):
 | |
| 44 | + | |
| 45 | +    def __init__(self, *args, **kwargs):
 | |
| 46 | +        super().__init__(*args, **kwargs)
 | |
| 47 | +        self.cascache = None
 | |
| 48 | + | |
| 49 | +        url = urlparse(kwargs['server_url'])
 | |
| 50 | +        if not url.scheme or not url.hostname or not url.port:
 | |
| 51 | +            raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
 | |
| 52 | +                               .format(self.server_url) +
 | |
| 53 | +                               "It should be of the form <protocol>://<domain name>:<port>.")
 | |
| 54 | +        elif url.scheme != 'http':
 | |
| 55 | +            raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
 | |
| 56 | +                               "Only plain HTTP is currenlty supported (no HTTPS).")
 | |
| 57 | + | |
| 58 | +        self.server_url = '{}:{}'.format(url.hostname, url.port)
 | |
| 59 | + | |
| 60 | +    def _get_cascache(self):
 | |
| 61 | +        if self.cascache is None:
 | |
| 62 | +            self.cascache = CASCache(self._get_context())
 | |
| 63 | +            self.cascache.setup_remotes(use_config=True)
 | |
| 64 | +        return self.cascache
 | |
| 65 | + | |
| 66 | +    def run_remote_command(self, command, input_root_digest, working_directory, environment):
 | |
| 67 | +        # Sends an execution request to the remote execution server.
 | |
| 68 | +        #
 | |
| 69 | +        # This function blocks until it gets a response from the server.
 | |
| 70 | +        #
 | |
| 71 | +        environment_variables = [remote_execution_pb2.Command.
 | |
| 72 | +                                 EnvironmentVariable(name=k, value=v)
 | |
| 73 | +                                 for (k, v) in environment.items()]
 | |
| 74 | + | |
| 75 | +        # Create and send the Command object.
 | |
| 76 | +        remote_command = remote_execution_pb2.Command(arguments=command,
 | |
| 77 | +                                                      working_directory=working_directory,
 | |
| 78 | +                                                      environment_variables=environment_variables,
 | |
| 79 | +                                                      output_files=[],
 | |
| 80 | +                                                      output_directories=[self._output_directory],
 | |
| 81 | +                                                      platform=None)
 | |
| 82 | + | |
| 83 | +        cascache = self._get_cascache()
 | |
| 84 | +        # Upload the Command message to the remote CAS server
 | |
| 85 | +        command_digest = cascache.push_message(self._get_project(), remote_command)
 | |
| 86 | +        if not command_digest or not cascache.verify_key_pushed(self._get_project(), command_digest):
 | |
| 87 | +            # Command push failed
 | |
| 88 | +            return None
 | |
| 89 | + | |
| 90 | +        # Create and send the action.
 | |
| 91 | +        action = remote_execution_pb2.Action(command_digest=command_digest,
 | |
| 92 | +                                             input_root_digest=input_root_digest,
 | |
| 93 | +                                             timeout=None,
 | |
| 94 | +                                             do_not_cache=True)
 | |
| 95 | + | |
| 96 | +        # Upload the Action message to the remote CAS server
 | |
| 97 | +        action_digest = cascache.push_message(self._get_project(), action)
 | |
| 98 | +        if not action_digest or not cascache.verify_key_pushed(self._get_project(), action_digest):
 | |
| 99 | +            # Action push failed
 | |
| 100 | +            return None
 | |
| 101 | + | |
| 102 | +        # Next, try to create a communication channel to the BuildGrid server.
 | |
| 103 | +        channel = grpc.insecure_channel(self.server_url)
 | |
| 104 | +        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
 | |
| 105 | +        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
 | |
| 106 | +                                                      skip_cache_lookup=True)
 | |
| 107 | +        try:
 | |
| 108 | +            operation_iterator = stub.Execute(request)
 | |
| 109 | +        except grpc.RpcError:
 | |
| 110 | +            return None
 | |
| 111 | + | |
| 112 | +        operation = None
 | |
| 113 | +        with self._get_context().timed_activity("Waiting for the remote build to complete"):
 | |
| 114 | +            # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here,
 | |
| 115 | +            # which will check the server is actually contactable. However, calling it when the
 | |
| 116 | +            # server is available seems to cause .code() to hang forever.
 | |
| 117 | +            for operation in operation_iterator:
 | |
| 118 | +                if operation.done:
 | |
| 119 | +                    break
 | |
| 120 | + | |
| 121 | +        return operation
 | |
| 122 | + | |
| 123 | +    def process_job_output(self, output_directories, output_files):
 | |
| 124 | +        # Reads the remote execution server response to an execution request.
 | |
| 125 | +        #
 | |
| 126 | +        # output_directories is an array of OutputDirectory objects.
 | |
| 127 | +        # output_files is an array of OutputFile objects.
 | |
| 128 | +        #
 | |
| 129 | +        # We only specify one output_directory, so it's an error
 | |
| 130 | +        # for there to be any output files or more than one directory at the moment.
 | |
| 131 | +        #
 | |
| 132 | +        if output_files:
 | |
| 133 | +            raise SandboxError("Output files were returned when we didn't request any.")
 | |
| 134 | +        elif not output_directories:
 | |
| 135 | +            error_text = "No output directory was returned from the build server."
 | |
| 136 | +            raise SandboxError(error_text)
 | |
| 137 | +        elif len(output_directories) > 1:
 | |
| 138 | +            error_text = "More than one output directory was returned from the build server: {}."
 | |
| 139 | +            raise SandboxError(error_text.format(output_directories))
 | |
| 140 | + | |
| 141 | +        tree_digest = output_directories[0].tree_digest
 | |
| 142 | +        if tree_digest is None or not tree_digest.hash:
 | |
| 143 | +            raise SandboxError("Output directory structure had no digest attached.")
 | |
| 144 | + | |
| 145 | +        cascache = self._get_cascache()
 | |
| 146 | +        # Now do a pull to ensure we have the necessary parts.
 | |
| 147 | +        dir_digest = cascache.pull_tree(self._get_project(), tree_digest)
 | |
| 148 | +        if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
 | |
| 149 | +            raise SandboxError("Output directory structure pulling from remote failed.")
 | |
| 150 | + | |
| 151 | +        path_components = os.path.split(self._output_directory)
 | |
| 152 | + | |
| 153 | +        # Now what we have is a digest for the output. Once we return, the calling process will
 | |
| 154 | +        # attempt to descend into our directory and find that directory, so we need to overwrite
 | |
| 155 | +        # that.
 | |
| 156 | + | |
| 157 | +        if not path_components:
 | |
| 158 | +            # The artifact wants the whole directory; we could just return the returned hash in its
 | |
| 159 | +            # place, but we don't have a means to do that yet.
 | |
| 160 | +            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")
 | |
| 161 | + | |
| 162 | +        # At the moment, we will get the whole directory back in the first directory argument and we need
 | |
| 163 | +        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
 | |
| 164 | +        # from another hash will be interesting, though...
 | |
| 165 | + | |
| 166 | +        new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest)
 | |
| 167 | +        self._set_virtual_directory(new_dir)
 | |
| 168 | + | |
| 169 | +    def run(self, command, flags, *, cwd=None, env=None):
 | |
| 170 | +        # Upload sources
 | |
| 171 | +        upload_vdir = self.get_virtual_directory()
 | |
| 172 | + | |
| 173 | +        if isinstance(upload_vdir, FileBasedDirectory):
 | |
| 174 | +            # Make a new temporary directory to put source in
 | |
| 175 | +            upload_vdir = CasBasedDirectory(self._get_context(), ref=None)
 | |
| 176 | +            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
 | |
| 177 | + | |
| 178 | +        upload_vdir.recalculate_hash()
 | |
| 179 | + | |
| 180 | +        cascache = self._get_cascache()
 | |
| 181 | +        # Now, push that key (without necessarily needing a ref) to the remote.
 | |
| 182 | +        vdir_digest = cascache.push_directory(self._get_project(), upload_vdir)
 | |
| 183 | +        if not vdir_digest or not cascache.verify_key_pushed(self._get_project(), vdir_digest):
 | |
| 184 | +            raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
 | |
| 185 | + | |
| 186 | +        # Set up environment and working directory
 | |
| 187 | +        if cwd is None:
 | |
| 188 | +            cwd = self._get_work_directory()
 | |
| 189 | + | |
| 190 | +        if cwd is None:
 | |
| 191 | +            cwd = '/'
 | |
| 192 | + | |
| 193 | +        if env is None:
 | |
| 194 | +            env = self._get_environment()
 | |
| 195 | + | |
| 196 | +        # We want command args as a list of strings
 | |
| 197 | +        if isinstance(command, str):
 | |
| 198 | +            command = [command]
 | |
| 199 | + | |
| 200 | +        # Now transmit the command to execute
 | |
| 201 | +        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)
 | |
| 202 | + | |
| 203 | +        if operation is None:
 | |
| 204 | +            # Failure of remote execution, usually due to an error in BuildStream
 | |
| 205 | +            # NB This error could be raised in __run_remote_command
 | |
| 206 | +            raise SandboxError("No response returned from server")
 | |
| 207 | + | |
| 208 | +        assert(not operation.HasField('error') and operation.HasField('response'))
 | |
| 209 | + | |
| 210 | +        execution_response = remote_execution_pb2.ExecuteResponse()
 | |
| 211 | +        # The response is expected to be an ExecutionResponse message
 | |
| 212 | +        assert(operation.response.Is(execution_response.DESCRIPTOR))
 | |
| 213 | + | |
| 214 | +        operation.response.Unpack(execution_response)
 | |
| 215 | + | |
| 216 | +        if execution_response.status.code != 0:
 | |
| 217 | +            # A normal error during the build: the remote execution system
 | |
| 218 | +            # has worked correctly but the command failed.
 | |
| 219 | +            # execution_response.error also contains 'message' (str) and
 | |
| 220 | +            # 'details' (iterator of Any) which we ignore at the moment.
 | |
| 221 | +            return execution_response.status.code
 | |
| 222 | + | |
| 223 | +        action_result = execution_response.result
 | |
| 224 | + | |
| 225 | +        self.process_job_output(action_result.output_directories, action_result.output_files)
 | |
| 226 | + | |
| 227 | +        return 0 | 
| ... | ... | @@ -99,9 +99,11 @@ class Sandbox(): | 
| 99 | 99 |          self.__stdout = kwargs['stdout']
 | 
| 100 | 100 |          self.__stderr = kwargs['stderr']
 | 
| 101 | 101 |  | 
| 102 | -        # Setup the directories. Root should be available to subclasses, hence
 | |
| 103 | -        # being single-underscore. The others are private to this class.
 | |
| 102 | +        # Setup the directories. Root and output_directory should be
 | |
| 103 | +        # available to subclasses, hence being single-underscore. The
 | |
| 104 | +        # others are private to this class.
 | |
| 104 | 105 |          self._root = os.path.join(directory, 'root')
 | 
| 106 | +        self._output_directory = None
 | |
| 105 | 107 |          self.__directory = directory
 | 
| 106 | 108 |          self.__scratch = os.path.join(self.__directory, 'scratch')
 | 
| 107 | 109 |          for directory_ in [self._root, self.__scratch]:
 | 
| ... | ... | @@ -144,11 +146,17 @@ class Sandbox(): | 
| 144 | 146 |                  self._vdir = FileBasedDirectory(self._root)
 | 
| 145 | 147 |          return self._vdir
 | 
| 146 | 148 |  | 
| 149 | +    def _set_virtual_directory(self, virtual_directory):
 | |
| 150 | +        """ Sets virtual directory. Useful after remote execution
 | |
| 151 | +        has rewritten the working directory.
 | |
| 152 | +        """
 | |
| 153 | +        self._vdir = virtual_directory
 | |
| 154 | + | |
| 147 | 155 |      def set_environment(self, environment):
 | 
| 148 | 156 |          """Sets the environment variables for the sandbox
 | 
| 149 | 157 |  | 
| 150 | 158 |          Args:
 | 
| 151 | -           directory (dict): The environment variables to use in the sandbox
 | |
| 159 | +           environment (dict): The environment variables to use in the sandbox
 | |
| 152 | 160 |          """
 | 
| 153 | 161 |          self.__env = environment
 | 
| 154 | 162 |  | 
| ... | ... | @@ -160,6 +168,15 @@ class Sandbox(): | 
| 160 | 168 |          """
 | 
| 161 | 169 |          self.__cwd = directory
 | 
| 162 | 170 |  | 
| 171 | +    def set_output_directory(self, directory):
 | |
| 172 | +        """Sets the output directory - the directory which is preserved
 | |
| 173 | +        as an artifact after assembly.
 | |
| 174 | + | |
| 175 | +        Args:
 | |
| 176 | +           directory (str): An absolute path within the sandbox
 | |
| 177 | +        """
 | |
| 178 | +        self._output_directory = directory
 | |
| 179 | + | |
| 163 | 180 |      def mark_directory(self, directory, *, artifact=False):
 | 
| 164 | 181 |          """Marks a sandbox directory and ensures it will exist
 | 
| 165 | 182 |  | 
| ... | ... | @@ -28,6 +28,18 @@ Abstract Methods | 
| 28 | 28 |  For loading and configuration purposes, Sources must implement the
 | 
| 29 | 29 |  :ref:`Plugin base class abstract methods <core_plugin_abstract_methods>`.
 | 
| 30 | 30 |  | 
| 31 | +.. attention::
 | |
| 32 | + | |
| 33 | +   In order to ensure that all configuration data is processed at
 | |
| 34 | +   load time, it is important that all URLs have been processed during
 | |
| 35 | +   :func:`Plugin.configure() <buildstream.plugin.Plugin.configure>`.
 | |
| 36 | + | |
| 37 | +   Source implementations *must* either call
 | |
| 38 | +   :func:`Source.translate_url() <buildstream.source.Source.translate_url>` or
 | |
| 39 | +   :func:`Source.mark_download_url() <buildstream.source.Source.mark_download_url>`
 | |
| 40 | +   for every URL that has been specified in the configuration during
 | |
| 41 | +   :func:`Plugin.configure() <buildstream.plugin.Plugin.configure>`
 | |
| 42 | + | |
| 31 | 43 |  Sources expose the following abstract methods. Unless explicitly mentioned,
 | 
| 32 | 44 |  these methods are mandatory to implement.
 | 
| 33 | 45 |  | 
| ... | ... | @@ -184,6 +196,13 @@ class SourceFetcher(): | 
| 184 | 196 |      fetching and substituting aliases.
 | 
| 185 | 197 |  | 
| 186 | 198 |      *Since: 1.2*
 | 
| 199 | + | |
| 200 | +    .. attention::
 | |
| 201 | + | |
| 202 | +       When implementing a SourceFetcher, remember to call
 | |
| 203 | +       :func:`Source.mark_download_url() <buildstream.source.Source.mark_download_url>`
 | |
| 204 | +       for every URL found in the configuration data at
 | |
| 205 | +       :func:`Plugin.configure() <buildstream.plugin.Plugin.configure>` time.
 | |
| 187 | 206 |      """
 | 
| 188 | 207 |      def __init__(self):
 | 
| 189 | 208 |          self.__alias = None
 | 
| ... | ... | @@ -206,7 +225,7 @@ class SourceFetcher(): | 
| 206 | 225 |          Implementors should raise :class:`.SourceError` if the there is some
 | 
| 207 | 226 |          network error or if the source reference could not be matched.
 | 
| 208 | 227 |          """
 | 
| 209 | -        raise ImplError("Source fetcher '{}' does not implement fetch()".format(type(self)))
 | |
| 228 | +        raise ImplError("SourceFetcher '{}' does not implement fetch()".format(type(self)))
 | |
| 210 | 229 |  | 
| 211 | 230 |      #############################################################
 | 
| 212 | 231 |      #                       Public Methods                      #
 | 
| ... | ... | @@ -277,8 +296,11 @@ class Source(Plugin): | 
| 277 | 296 |          self.__element_kind = meta.element_kind         # The kind of the element owning this source
 | 
| 278 | 297 |          self.__directory = meta.directory               # Staging relative directory
 | 
| 279 | 298 |          self.__consistency = Consistency.INCONSISTENT   # Cached consistency state
 | 
| 299 | + | |
| 300 | +        # The alias_override is only set on a re-instantiated Source
 | |
| 280 | 301 |          self.__alias_override = alias_override          # Tuple of alias and its override to use instead
 | 
| 281 | -        self.__expected_alias = None                    # A hacky way to store the first alias used
 | |
| 302 | +        self.__expected_alias = None                    # The primary alias
 | |
| 303 | +        self.__marked_urls = set()                      # Set of marked download URLs
 | |
| 282 | 304 |  | 
| 283 | 305 |          # FIXME: Reconstruct a MetaSource from a Source instead of storing it.
 | 
| 284 | 306 |          self.__meta = meta                              # MetaSource stored so we can copy this source later.
 | 
| ... | ... | @@ -289,7 +311,7 @@ class Source(Plugin): | 
| 289 | 311 |          self.__config = self.__extract_config(meta)
 | 
| 290 | 312 |          self.__first_pass = meta.first_pass
 | 
| 291 | 313 |  | 
| 292 | -        self.configure(self.__config)
 | |
| 314 | +        self._configure(self.__config)
 | |
| 293 | 315 |  | 
| 294 | 316 |      COMMON_CONFIG_KEYS = ['kind', 'directory']
 | 
| 295 | 317 |      """Common source config keys
 | 
| ... | ... | @@ -351,10 +373,10 @@ class Source(Plugin): | 
| 351 | 373 |          Args:
 | 
| 352 | 374 |             ref (simple object): The internal source reference to set, or ``None``
 | 
| 353 | 375 |             node (dict): The same dictionary which was previously passed
 | 
| 354 | -                        to :func:`~buildstream.source.Source.configure`
 | |
| 376 | +                        to :func:`Plugin.configure() <buildstream.plugin.Plugin.configure>`
 | |
| 355 | 377 |  | 
| 356 | -        See :func:`~buildstream.source.Source.get_ref` for a discussion on
 | |
| 357 | -        the *ref* parameter.
 | |
| 378 | +        See :func:`Source.get_ref() <buildstream.source.Source.get_ref>`
 | |
| 379 | +        for a discussion on the *ref* parameter.
 | |
| 358 | 380 |  | 
| 359 | 381 |          .. note::
 | 
| 360 | 382 |  | 
| ... | ... | @@ -384,8 +406,8 @@ class Source(Plugin): | 
| 384 | 406 |          backend store allows one to query for a new ref from a symbolic
 | 
| 385 | 407 |          tracking data without downloading then that is desirable.
 | 
| 386 | 408 |  | 
| 387 | -        See :func:`~buildstream.source.Source.get_ref` for a discussion on
 | |
| 388 | -        the *ref* parameter.
 | |
| 409 | +        See :func:`Source.get_ref() <buildstream.source.Source.get_ref>`
 | |
| 410 | +        for a discussion on the *ref* parameter.
 | |
| 389 | 411 |          """
 | 
| 390 | 412 |          # Allow a non implementation
 | 
| 391 | 413 |          return None
 | 
| ... | ... | @@ -435,7 +457,7 @@ class Source(Plugin): | 
| 435 | 457 |             :class:`.SourceError`
 | 
| 436 | 458 |  | 
| 437 | 459 |          Default implementation is to call
 | 
| 438 | -        :func:`~buildstream.source.Source.stage`.
 | |
| 460 | +        :func:`Source.stage() <buildstream.source.Source.stage>`.
 | |
| 439 | 461 |  | 
| 440 | 462 |          Implementors overriding this method should assume that *directory*
 | 
| 441 | 463 |          already exists.
 | 
| ... | ... | @@ -453,8 +475,15 @@ class Source(Plugin): | 
| 453 | 475 |          is recommended.
 | 
| 454 | 476 |  | 
| 455 | 477 |          Returns:
 | 
| 456 | -           list: A list of SourceFetchers. If SourceFetchers are not supported,
 | |
| 457 | -                 this will be an empty list.
 | |
| 478 | +           iterable: The Source's SourceFetchers, if any.
 | |
| 479 | + | |
| 480 | +        .. note::
 | |
| 481 | + | |
| 482 | +           Implementors can implement this as a generator.
 | |
| 483 | + | |
| 484 | +           The :func:`SourceFetcher.fetch() <buildstream.source.SourceFetcher.fetch>`
 | |
| 485 | +           method will be called on the returned fetchers one by one,
 | |
| 486 | +           before consuming the next fetcher in the list.
 | |
| 458 | 487 |  | 
| 459 | 488 |          *Since: 1.2*
 | 
| 460 | 489 |          """
 | 
| ... | ... | @@ -477,17 +506,27 @@ class Source(Plugin): | 
| 477 | 506 |          os.makedirs(directory, exist_ok=True)
 | 
| 478 | 507 |          return directory
 | 
| 479 | 508 |  | 
| 480 | -    def translate_url(self, url, *, alias_override=None):
 | |
| 509 | +    def translate_url(self, url, *, alias_override=None, primary=True):
 | |
| 481 | 510 |          """Translates the given url which may be specified with an alias
 | 
| 482 | 511 |          into a fully qualified url.
 | 
| 483 | 512 |  | 
| 484 | 513 |          Args:
 | 
| 485 | -           url (str): A url, which may be using an alias
 | |
| 514 | +           url (str): A URL, which may be using an alias
 | |
| 486 | 515 |             alias_override (str): Optionally, an URI to override the alias with. (*Since: 1.2*)
 | 
| 516 | +           primary (bool): Whether this is the primary URL for the source. (*Since: 1.2*)
 | |
| 487 | 517 |  | 
| 488 | 518 |          Returns:
 | 
| 489 | -           str: The fully qualified url, with aliases resolved
 | |
| 519 | +           str: The fully qualified URL, with aliases resolved
 | |
| 520 | +        .. note::
 | |
| 521 | + | |
| 522 | +           This must be called for every URL in the configuration during
 | |
| 523 | +           :func:`Plugin.configure() <buildstream.plugin.Plugin.configure>` if
 | |
| 524 | +           :func:`Source.mark_download_url() <buildstream.source.Source.mark_download_url>`
 | |
| 525 | +           is not called.
 | |
| 490 | 526 |          """
 | 
| 527 | +        # Ensure that the download URL is also marked
 | |
| 528 | +        self.mark_download_url(url, primary=primary)
 | |
| 529 | + | |
| 491 | 530 |          # Alias overriding can happen explicitly (by command-line) or
 | 
| 492 | 531 |          # implicitly (the Source being constructed with an __alias_override).
 | 
| 493 | 532 |          if alias_override or self.__alias_override:
 | 
| ... | ... | @@ -506,25 +545,55 @@ class Source(Plugin): | 
| 506 | 545 |                          url = override_url + url_body
 | 
| 507 | 546 |              return url
 | 
| 508 | 547 |          else:
 | 
| 509 | -            # Sneakily store the alias if it hasn't already been stored
 | |
| 510 | -            if not self.__expected_alias and url and utils._ALIAS_SEPARATOR in url:
 | |
| 511 | -                self.mark_download_url(url)
 | |
| 512 | - | |
| 513 | 548 |              project = self._get_project()
 | 
| 514 | 549 |              return project.translate_url(url, first_pass=self.__first_pass)
 | 
| 515 | 550 |  | 
| 516 | -    def mark_download_url(self, url):
 | |
| 551 | +    def mark_download_url(self, url, *, primary=True):
 | |
| 517 | 552 |          """Identifies the URL that this Source uses to download
 | 
| 518 | 553 |  | 
| 519 | -        This must be called during :func:`~buildstream.plugin.Plugin.configure` if
 | |
| 520 | -        :func:`~buildstream.source.Source.translate_url` is not called.
 | |
| 521 | - | |
| 522 | 554 |          Args:
 | 
| 523 | -           url (str): The url used to download
 | |
| 555 | +           url (str): The URL used to download
 | |
| 556 | +           primary (bool): Whether this is the primary URL for the source
 | |
| 557 | + | |
| 558 | +        .. note::
 | |
| 559 | + | |
| 560 | +           This must be called for every URL in the configuration during
 | |
| 561 | +           :func:`Plugin.configure() <buildstream.plugin.Plugin.configure>` if
 | |
| 562 | +           :func:`Source.translate_url() <buildstream.source.Source.translate_url>`
 | |
| 563 | +           is not called.
 | |
| 524 | 564 |  | 
| 525 | 565 |          *Since: 1.2*
 | 
| 526 | 566 |          """
 | 
| 527 | -        self.__expected_alias = _extract_alias(url)
 | |
| 567 | +        # Only mark the Source level aliases on the main instance, not in
 | |
| 568 | +        # a reinstantiated instance in mirroring.
 | |
| 569 | +        if not self.__alias_override:
 | |
| 570 | +            if primary:
 | |
| 571 | +                expected_alias = _extract_alias(url)
 | |
| 572 | + | |
| 573 | +                assert (self.__expected_alias is None or
 | |
| 574 | +                        self.__expected_alias == expected_alias), \
 | |
| 575 | +                    "Primary URL marked twice with different URLs"
 | |
| 576 | + | |
| 577 | +                self.__expected_alias = expected_alias
 | |
| 578 | + | |
| 579 | +        # Enforce proper behaviour of plugins by ensuring that all
 | |
| 580 | +        # aliased URLs have been marked at Plugin.configure() time.
 | |
| 581 | +        #
 | |
| 582 | +        if self._get_configuring():
 | |
| 583 | +            # Record marked urls while configuring
 | |
| 584 | +            #
 | |
| 585 | +            self.__marked_urls.add(url)
 | |
| 586 | +        else:
 | |
| 587 | +            # If an unknown aliased URL is seen after configuring,
 | |
| 588 | +            # this is an error.
 | |
| 589 | +            #
 | |
| 590 | +            # It is still possible that a URL that was not mentioned
 | |
| 591 | +            # in the element configuration can be marked, this is
 | |
| 592 | +            # the case for git submodules which might be automatically
 | |
| 593 | +            # discovered.
 | |
| 594 | +            #
 | |
| 595 | +            assert (url in self.__marked_urls or not _extract_alias(url)), \
 | |
| 596 | +                "URL was not seen at configure time: {}".format(url)
 | |
| 528 | 597 |  | 
| 529 | 598 |      def get_project_directory(self):
 | 
| 530 | 599 |          """Fetch the project base directory
 | 
| ... | ... | @@ -543,6 +543,15 @@ class CasBasedDirectory(Directory): | 
| 543 | 543 |                  filelist.append(k)
 | 
| 544 | 544 |          return filelist
 | 
| 545 | 545 |  | 
| 546 | +    def recalculate_hash(self):
 | |
| 547 | +        """ Recalcuates the hash for this directory and store the results in
 | |
| 548 | +        the cache. If this directory has a parent, tell it to
 | |
| 549 | +        recalculate (since changing this directory changes an entry in
 | |
| 550 | +        the parent). Hashes for subdirectories also get recalculated.
 | |
| 551 | +        """
 | |
| 552 | +        self._recalculate_recursing_up()
 | |
| 553 | +        self._recalculate_recursing_down()
 | |
| 554 | + | |
| 546 | 555 |      def _get_identifier(self):
 | 
| 547 | 556 |          path = ""
 | 
| 548 | 557 |          if self.parent:
 | 
| ... | ... | @@ -204,6 +204,24 @@ with an artifact share. | 
| 204 | 204 |  You can also specify a list of caches here; earlier entries in the list
 | 
| 205 | 205 |  will have higher priority than later ones.
 | 
| 206 | 206 |  | 
| 207 | +Remote execution
 | |
| 208 | +~~~~~~~~~~~~~~~~
 | |
| 209 | +BuildStream supports remote execution using the Google Remote Execution API
 | |
| 210 | +(REAPI). A description of how remote execution works is beyond the scope
 | |
| 211 | +of this document, but you can specify a remote server complying with the REAPI
 | |
| 212 | +using the `remote-execution` option:
 | |
| 213 | + | |
| 214 | +.. code:: yaml
 | |
| 215 | + | |
| 216 | +  remote-execution:
 | |
| 217 | + | |
| 218 | +    # A url defining a remote execution server
 | |
| 219 | +    url: http://buildserver.example.com:50051
 | |
| 220 | + | |
| 221 | +The url should contain a hostname and port separated by ':'. Only plain HTTP is
 | |
| 222 | +currently suported (no HTTPS).
 | |
| 223 | + | |
| 224 | +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
 | |
| 207 | 225 |  | 
| 208 | 226 |  .. _project_essentials_mirrors:
 | 
| 209 | 227 |  | 
| ... | ... | @@ -15,14 +15,17 @@ from buildstream import Source, Consistency, SourceError, SourceFetcher | 
| 15 | 15 |  | 
| 16 | 16 |  | 
| 17 | 17 |  class FetchFetcher(SourceFetcher):
 | 
| 18 | -    def __init__(self, source, url):
 | |
| 18 | +    def __init__(self, source, url, primary=False):
 | |
| 19 | 19 |          super().__init__()
 | 
| 20 | 20 |          self.source = source
 | 
| 21 | 21 |          self.original_url = url
 | 
| 22 | +        self.primary = primary
 | |
| 22 | 23 |          self.mark_download_url(url)
 | 
| 23 | 24 |  | 
| 24 | 25 |      def fetch(self, alias_override=None):
 | 
| 25 | -        url = self.source.translate_url(self.original_url, alias_override=alias_override)
 | |
| 26 | +        url = self.source.translate_url(self.original_url,
 | |
| 27 | +                                        alias_override=alias_override,
 | |
| 28 | +                                        primary=self.primary)
 | |
| 26 | 29 |          with open(self.source.output_file, "a") as f:
 | 
| 27 | 30 |              success = url in self.source.fetch_succeeds and self.source.fetch_succeeds[url]
 | 
| 28 | 31 |              message = "Fetch {} {} from {}\n".format(self.original_url,
 | 
| ... | ... | @@ -37,12 +40,21 @@ class FetchSource(Source): | 
| 37 | 40 |      # Read config to know which URLs to fetch
 | 
| 38 | 41 |      def configure(self, node):
 | 
| 39 | 42 |          self.original_urls = self.node_get_member(node, list, 'urls')
 | 
| 40 | -        self.fetchers = [FetchFetcher(self, url) for url in self.original_urls]
 | |
| 41 | 43 |          self.output_file = self.node_get_member(node, str, 'output-text')
 | 
| 42 | 44 |          self.fetch_succeeds = {}
 | 
| 43 | 45 |          if 'fetch-succeeds' in node:
 | 
| 44 | 46 |              self.fetch_succeeds = {x[0]: x[1] for x in self.node_items(node['fetch-succeeds'])}
 | 
| 45 | 47 |  | 
| 48 | +        # First URL is the primary one for this test
 | |
| 49 | +        #
 | |
| 50 | +        primary = True
 | |
| 51 | +        self.fetchers = []
 | |
| 52 | +        for url in self.original_urls:
 | |
| 53 | +            self.mark_download_url(url, primary=primary)
 | |
| 54 | +            fetcher = FetchFetcher(self, url, primary=primary)
 | |
| 55 | +            self.fetchers.append(fetcher)
 | |
| 56 | +            primary = False
 | |
| 57 | + | |
| 46 | 58 |      def get_source_fetchers(self):
 | 
| 47 | 59 |          return self.fetchers
 | 
| 48 | 60 |  | 
| ... | ... | @@ -56,7 +56,7 @@ def test_fetch_bad_url(cli, tmpdir, datafiles): | 
| 56 | 56 |      result = cli.run(project=project, args=[
 | 
| 57 | 57 |          'fetch', 'target.bst'
 | 
| 58 | 58 |      ])
 | 
| 59 | -    assert "Try #" in result.stderr
 | |
| 59 | +    assert "FAILURE Try #" in result.stderr
 | |
| 60 | 60 |      result.assert_main_error(ErrorDomain.STREAM, None)
 | 
| 61 | 61 |      result.assert_task_error(ErrorDomain.SOURCE, None)
 | 
| 62 | 62 |  | 
| ... | ... | @@ -25,6 +25,7 @@ import pytest | 
| 25 | 25 |  | 
| 26 | 26 |  from buildstream._exceptions import ErrorDomain
 | 
| 27 | 27 |  from buildstream import _yaml
 | 
| 28 | +from buildstream.plugin import CoreWarnings
 | |
| 28 | 29 |  | 
| 29 | 30 |  from tests.testutils import cli, create_repo
 | 
| 30 | 31 |  from tests.testutils.site import HAVE_GIT
 | 
| ... | ... | @@ -408,3 +409,70 @@ def test_submodule_track_no_ref_or_track(cli, tmpdir, datafiles): | 
| 408 | 409 |      result = cli.run(project=project, args=['show', 'target.bst'])
 | 
| 409 | 410 |      result.assert_main_error(ErrorDomain.SOURCE, "missing-track-and-ref")
 | 
| 410 | 411 |      result.assert_task_error(None, None)
 | 
| 412 | + | |
| 413 | + | |
| 414 | +@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
 | |
| 415 | +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
 | |
| 416 | +def test_ref_not_in_track_warn(cli, tmpdir, datafiles):
 | |
| 417 | +    project = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 418 | + | |
| 419 | +    # Create the repo from 'repofiles', create a branch without latest commit
 | |
| 420 | +    repo = create_repo('git', str(tmpdir))
 | |
| 421 | +    ref = repo.create(os.path.join(project, 'repofiles'))
 | |
| 422 | + | |
| 423 | +    gitsource = repo.source_config(ref=ref)
 | |
| 424 | + | |
| 425 | +    # Overwrite the track value to the added branch
 | |
| 426 | +    gitsource['track'] = 'foo'
 | |
| 427 | + | |
| 428 | +    # Write out our test target
 | |
| 429 | +    element = {
 | |
| 430 | +        'kind': 'import',
 | |
| 431 | +        'sources': [
 | |
| 432 | +            gitsource
 | |
| 433 | +        ]
 | |
| 434 | +    }
 | |
| 435 | +    _yaml.dump(element, os.path.join(project, 'target.bst'))
 | |
| 436 | + | |
| 437 | +    # Assert the warning is raised as ref is not in branch foo.
 | |
| 438 | +    # Assert warning not error to the user, when not set as fatal.
 | |
| 439 | +    result = cli.run(project=project, args=['build', 'target.bst'])
 | |
| 440 | +    assert "The ref provided for the element does not exist locally" in result.stderr
 | |
| 441 | + | |
| 442 | + | |
| 443 | +@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
 | |
| 444 | +@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
 | |
| 445 | +def test_ref_not_in_track_warn_error(cli, tmpdir, datafiles):
 | |
| 446 | +    project = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 447 | + | |
| 448 | +    # Add fatal-warnings ref-not-in-track to project.conf
 | |
| 449 | +    project_template = {
 | |
| 450 | +        "name": "foo",
 | |
| 451 | +        "fatal-warnings": [CoreWarnings.REF_NOT_IN_TRACK]
 | |
| 452 | +    }
 | |
| 453 | + | |
| 454 | +    _yaml.dump(project_template, os.path.join(project, 'project.conf'))
 | |
| 455 | + | |
| 456 | +    # Create the repo from 'repofiles', create a branch without latest commit
 | |
| 457 | +    repo = create_repo('git', str(tmpdir))
 | |
| 458 | +    ref = repo.create(os.path.join(project, 'repofiles'))
 | |
| 459 | + | |
| 460 | +    gitsource = repo.source_config(ref=ref)
 | |
| 461 | + | |
| 462 | +    # Overwrite the track value to the added branch
 | |
| 463 | +    gitsource['track'] = 'foo'
 | |
| 464 | + | |
| 465 | +    # Write out our test target
 | |
| 466 | +    element = {
 | |
| 467 | +        'kind': 'import',
 | |
| 468 | +        'sources': [
 | |
| 469 | +            gitsource
 | |
| 470 | +        ]
 | |
| 471 | +    }
 | |
| 472 | +    _yaml.dump(element, os.path.join(project, 'target.bst'))
 | |
| 473 | + | |
| 474 | +    # Assert that build raises a warning here that is captured
 | |
| 475 | +    # as plugin error, due to the fatal warning being set
 | |
| 476 | +    result = cli.run(project=project, args=['build', 'target.bst'])
 | |
| 477 | +    result.assert_main_error(ErrorDomain.STREAM, None)
 | |
| 478 | +    result.assert_task_error(ErrorDomain.PLUGIN, CoreWarnings.REF_NOT_IN_TRACK) | 
| ... | ... | @@ -67,7 +67,7 @@ def test_fetch_bad_url(cli, tmpdir, datafiles): | 
| 67 | 67 |      result = cli.run(project=project, args=[
 | 
| 68 | 68 |          'fetch', 'target.bst'
 | 
| 69 | 69 |      ])
 | 
| 70 | -    assert "Try #" in result.stderr
 | |
| 70 | +    assert "FAILURE Try #" in result.stderr
 | |
| 71 | 71 |      result.assert_main_error(ErrorDomain.STREAM, None)
 | 
| 72 | 72 |      result.assert_task_error(ErrorDomain.SOURCE, None)
 | 
| 73 | 73 |  | 
| ... | ... | @@ -53,7 +53,7 @@ def test_fetch_bad_url(cli, tmpdir, datafiles): | 
| 53 | 53 |      result = cli.run(project=project, args=[
 | 
| 54 | 54 |          'fetch', 'target.bst'
 | 
| 55 | 55 |      ])
 | 
| 56 | -    assert "Try #" in result.stderr
 | |
| 56 | +    assert "FAILURE Try #" in result.stderr
 | |
| 57 | 57 |      result.assert_main_error(ErrorDomain.STREAM, None)
 | 
| 58 | 58 |      result.assert_task_error(ErrorDomain.SOURCE, None)
 | 
| 59 | 59 |  | 
| ... | ... | @@ -2,6 +2,7 @@ import os | 
| 2 | 2 |  import subprocess
 | 
| 3 | 3 |  import pytest
 | 
| 4 | 4 |  | 
| 5 | +from buildstream import utils
 | |
| 5 | 6 |  from .repo import Repo
 | 
| 6 | 7 |  from ..site import HAVE_BZR
 | 
| 7 | 8 |  | 
| ... | ... | @@ -16,15 +17,16 @@ class Bzr(Repo): | 
| 16 | 17 |          if not HAVE_BZR:
 | 
| 17 | 18 |              pytest.skip("bzr is not available")
 | 
| 18 | 19 |          super(Bzr, self).__init__(directory, subdir)
 | 
| 20 | +        self.bzr = utils.get_host_tool('bzr')
 | |
| 19 | 21 |  | 
| 20 | 22 |      def create(self, directory):
 | 
| 21 | 23 |          branch_dir = os.path.join(self.repo, 'trunk')
 | 
| 22 | 24 |  | 
| 23 | -        subprocess.call(['bzr', 'init-repo', self.repo], env=BZR_ENV)
 | |
| 24 | -        subprocess.call(['bzr', 'init', branch_dir], env=BZR_ENV)
 | |
| 25 | +        subprocess.call([self.bzr, 'init-repo', self.repo], env=BZR_ENV)
 | |
| 26 | +        subprocess.call([self.bzr, 'init', branch_dir], env=BZR_ENV)
 | |
| 25 | 27 |          self.copy_directory(directory, branch_dir)
 | 
| 26 | -        subprocess.call(['bzr', 'add', '.'], env=BZR_ENV, cwd=branch_dir)
 | |
| 27 | -        subprocess.call(['bzr', 'commit', '--message="Initial commit"'],
 | |
| 28 | +        subprocess.call([self.bzr, 'add', '.'], env=BZR_ENV, cwd=branch_dir)
 | |
| 29 | +        subprocess.call([self.bzr, 'commit', '--message="Initial commit"'],
 | |
| 28 | 30 |                          env=BZR_ENV, cwd=branch_dir)
 | 
| 29 | 31 |  | 
| 30 | 32 |          return self.latest_commit()
 | 
| ... | ... | @@ -42,7 +44,7 @@ class Bzr(Repo): | 
| 42 | 44 |  | 
| 43 | 45 |      def latest_commit(self):
 | 
| 44 | 46 |          output = subprocess.check_output([
 | 
| 45 | -            'bzr', 'version-info',
 | |
| 47 | +            self.bzr, 'version-info',
 | |
| 46 | 48 |              '--custom', '--template={revno}',
 | 
| 47 | 49 |              os.path.join(self.repo, 'trunk')
 | 
| 48 | 50 |          ], env=BZR_ENV)
 | 
