Tiago Gomes pushed to branch tiagogomes/issue-573 at BuildStream / buildstream
Commits:
- 
55956762
by Richard Maw at 2018-09-13T16:50:33Z
- 
fc7f83ac
by richardmaw-codethink at 2018-09-13T17:14:31Z
- 
233a7d83
by Richard Maw at 2018-09-14T08:53:14Z
- 
f06f234a
by Richard Maw at 2018-09-14T08:53:14Z
- 
f86ab8f6
by richardmaw-codethink at 2018-09-14T09:46:07Z
- 
e7427462
by Richard Maw at 2018-09-14T10:28:17Z
- 
800a8403
by Richard Maw at 2018-09-14T10:28:17Z
- 
d7152ef4
by richardmaw-codethink at 2018-09-14T10:55:16Z
- 
160bb0c6
by Tristan Van Berkom at 2018-09-14T12:07:46Z
- 
39125d24
by Tristan Van Berkom at 2018-09-14T12:07:46Z
- 
f60558a3
by Tristan Van Berkom at 2018-09-14T12:07:46Z
- 
ce68fd27
by Tristan Van Berkom at 2018-09-14T12:07:46Z
- 
532ec1eb
by Tristan Van Berkom at 2018-09-14T12:07:46Z
- 
20b797cb
by Tristan Van Berkom at 2018-09-14T12:07:46Z
- 
c2af0d51
by Tristan Van Berkom at 2018-09-14T12:44:42Z
- 
924cdc75
by Tiago Gomes at 2018-09-14T15:32:01Z
- 
510ccbfd
by Tiago Gomes at 2018-09-14T15:32:21Z
- 
82d4e2ac
by Tiago Gomes at 2018-09-14T15:32:21Z
- 
18b37aab
by Tiago Gomes at 2018-09-14T15:34:10Z
- 
32fad24f
by Tiago Gomes at 2018-09-14T15:38:41Z
- 
46cbd889
by Tiago Gomes at 2018-09-14T15:38:43Z
- 
2fa92716
by Tiago Gomes at 2018-09-14T15:38:43Z
- 
c285f244
by Tiago Gomes at 2018-09-14T16:08:07Z
- 
3bd589b9
by Tiago Gomes at 2018-09-14T16:36:06Z
- 
fccb722a
by Tiago Gomes at 2018-09-14T16:46:57Z
- 
2d9cb0c0
by Tiago Gomes at 2018-09-14T16:47:15Z
- 
372c48f2
by Tiago Gomes at 2018-09-14T16:47:18Z
- 
e2af2d51
by Tiago Gomes at 2018-09-14T16:47:18Z
- 
23b54e60
by Tiago Gomes at 2018-09-14T16:47:19Z
23 changed files:
- buildstream/_artifactcache/artifactcache.py
- buildstream/_artifactcache/cascache.py
- buildstream/_artifactcache/casserver.py
- buildstream/_context.py
- buildstream/_scheduler/jobs/__init__.py
- − buildstream/_scheduler/jobs/cachesizejob.py
- buildstream/_scheduler/jobs/cleanupjob.py
- buildstream/_scheduler/queues/buildqueue.py
- buildstream/_scheduler/queues/pullqueue.py
- buildstream/_scheduler/scheduler.py
- buildstream/_stream.py
- buildstream/element.py
- buildstream/storage/_casbaseddirectory.py
- buildstream/utils.py
- tests/artifactcache/expiry.py
- tests/frontend/logging.py
- tests/frontend/workspace.py
- + tests/integration/project/elements/sockets/make-builddir-socket.bst
- + tests/integration/project/elements/sockets/make-install-root-socket.bst
- + tests/integration/sockets.py
- tests/testutils/__init__.py
- tests/testutils/element_generators.py
- tests/testutils/repo/git.py
Changes:
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2017-2018 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -16,6 +16,7 @@ | 
| 16 | 16 |  #
 | 
| 17 | 17 |  #  Authors:
 | 
| 18 | 18 |  #        Tristan Maat <tristan maat codethink co uk>
 | 
| 19 | +#        Tiago Gomes <tiago gomes codethink co uk>
 | |
| 19 | 20 |  | 
| 20 | 21 |  import os
 | 
| 21 | 22 |  import string
 | 
| ... | ... | @@ -87,8 +88,8 @@ class ArtifactCache(): | 
| 87 | 88 |          self.global_remote_specs = []
 | 
| 88 | 89 |          self.project_remote_specs = {}
 | 
| 89 | 90 |  | 
| 90 | -        self._required_artifacts = set()      # The artifacts required for this session
 | |
| 91 | -        self._cache_size = None               # The current cache size, sometimes it's an estimate
 | |
| 91 | +        self._required_elements = set()       # The elements required for this session
 | |
| 92 | +        self._cache_size = None               # The current cache size
 | |
| 92 | 93 |          self._cache_quota = None              # The cache quota
 | 
| 93 | 94 |          self._cache_lower_threshold = None    # The target cache size for a cleanup
 | 
| 94 | 95 |  | 
| ... | ... | @@ -189,33 +190,40 @@ class ArtifactCache(): | 
| 189 | 190 |                                    (str(provenance)))
 | 
| 190 | 191 |          return cache_specs
 | 
| 191 | 192 |  | 
| 192 | -    # append_required_artifacts():
 | |
| 193 | +    # mark_required_elements():
 | |
| 193 | 194 |      #
 | 
| 194 | -    # Append to the list of elements whose artifacts are required for
 | |
| 195 | -    # the current run. Artifacts whose elements are in this list will
 | |
| 196 | -    # be locked by the artifact cache and not touched for the duration
 | |
| 197 | -    # of the current pipeline.
 | |
| 195 | +    # Mark elements whose artifacts are required for the current run.
 | |
| 196 | +    #
 | |
| 197 | +    # Artifacts whose elements are in this list will be locked by the artifact
 | |
| 198 | +    # cache and not touched for the duration of the current pipeline.
 | |
| 198 | 199 |      #
 | 
| 199 | 200 |      # Args:
 | 
| 200 | 201 |      #     elements (iterable): A set of elements to mark as required
 | 
| 201 | 202 |      #
 | 
| 202 | -    def append_required_artifacts(self, elements):
 | |
| 203 | -        # We lock both strong and weak keys - deleting one but not the
 | |
| 204 | -        # other won't save space in most cases anyway, but would be a
 | |
| 205 | -        # user inconvenience.
 | |
| 203 | +    def mark_required_elements(self, elements):
 | |
| 204 | + | |
| 205 | +        # We risk calling this function with a generator, so we
 | |
| 206 | +        # better consume it first.
 | |
| 207 | +        #
 | |
| 208 | +        elements = list(elements)
 | |
| 209 | + | |
| 210 | +        # Mark the elements as required. We cannot know that we know the
 | |
| 211 | +        # cache keys yet, so we only check that later when deleting.
 | |
| 212 | +        #
 | |
| 213 | +        self._required_elements.update(elements)
 | |
| 206 | 214 |  | 
| 215 | +        # For the cache keys which were resolved so far, we bump
 | |
| 216 | +        # the atime of them.
 | |
| 217 | +        #
 | |
| 218 | +        # This is just in case we have concurrent instances of
 | |
| 219 | +        # BuildStream running with the same artifact cache, it will
 | |
| 220 | +        # reduce the likelyhood of one instance deleting artifacts
 | |
| 221 | +        # which are required by the other.
 | |
| 207 | 222 |          for element in elements:
 | 
| 208 | 223 |              strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
 | 
| 209 | 224 |              weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
 | 
| 210 | - | |
| 211 | 225 |              for key in (strong_key, weak_key):
 | 
| 212 | -                if key and key not in self._required_artifacts:
 | |
| 213 | -                    self._required_artifacts.add(key)
 | |
| 214 | - | |
| 215 | -                    # We also update the usage times of any artifacts
 | |
| 216 | -                    # we will be using, which helps preventing a
 | |
| 217 | -                    # buildstream process that runs in parallel with
 | |
| 218 | -                    # this one from removing artifacts in-use.
 | |
| 226 | +                if key:
 | |
| 219 | 227 |                      try:
 | 
| 220 | 228 |                          self.update_atime(key)
 | 
| 221 | 229 |                      except ArtifactError:
 | 
| ... | ... | @@ -226,13 +234,25 @@ class ArtifactCache(): | 
| 226 | 234 |      # Clean the artifact cache as much as possible.
 | 
| 227 | 235 |      #
 | 
| 228 | 236 |      # Returns:
 | 
| 229 | -    #    (int): The size of the cache after having cleaned up
 | |
| 237 | +    #     (int): Amount of bytes cleaned from the cache.
 | |
| 230 | 238 |      #
 | 
| 231 | 239 |      def clean(self):
 | 
| 232 | 240 |          artifacts = self.list_artifacts()
 | 
| 233 | 241 |  | 
| 242 | +        # Build a set of the cache keys which are required
 | |
| 243 | +        # based on the required elements at cleanup time
 | |
| 244 | +        #
 | |
| 245 | +        # We lock both strong and weak keys - deleting one but not the
 | |
| 246 | +        # other won't save space, but would be a user inconvenience.
 | |
| 247 | +        required_artifacts = set()
 | |
| 248 | +        for element in self._required_elements:
 | |
| 249 | +            required_artifacts.update([
 | |
| 250 | +                element._get_cache_key(strength=_KeyStrength.STRONG),
 | |
| 251 | +                element._get_cache_key(strength=_KeyStrength.WEAK)
 | |
| 252 | +            ])
 | |
| 253 | + | |
| 234 | 254 |          # Do a real computation of the cache size once, just in case
 | 
| 235 | -        self.compute_cache_size()
 | |
| 255 | +        old_cache_size = self._cache_size = self.calculate_cache_size()
 | |
| 236 | 256 |  | 
| 237 | 257 |          while self.get_cache_size() >= self._cache_lower_threshold:
 | 
| 238 | 258 |              try:
 | 
| ... | ... | @@ -248,7 +268,7 @@ class ArtifactCache(): | 
| 248 | 268 |                            "Please increase the cache-quota in {}."
 | 
| 249 | 269 |                            .format(self.context.config_origin or default_conf))
 | 
| 250 | 270 |  | 
| 251 | -                if self.get_quota_exceeded():
 | |
| 271 | +                if self.has_quota_exceeded():
 | |
| 252 | 272 |                      raise ArtifactError("Cache too full. Aborting.",
 | 
| 253 | 273 |                                          detail=detail,
 | 
| 254 | 274 |                                          reason="cache-too-full")
 | 
| ... | ... | @@ -256,93 +276,69 @@ class ArtifactCache(): | 
| 256 | 276 |                      break
 | 
| 257 | 277 |  | 
| 258 | 278 |              key = to_remove.rpartition('/')[2]
 | 
| 259 | -            if key not in self._required_artifacts:
 | |
| 279 | +            if key not in required_artifacts:
 | |
| 260 | 280 |  | 
| 261 | 281 |                  # Remove the actual artifact, if it's not required.
 | 
| 262 | 282 |                  size = self.remove(to_remove)
 | 
| 283 | +                self._cache_size -= size
 | |
| 284 | +                self._message(MessageType.DEBUG,
 | |
| 285 | +                              "Removed artifact {} ({})".format(
 | |
| 286 | +                                  to_remove[:-(len(key) - self.context.log_key_length)],
 | |
| 287 | +                                  utils._pretty_size(size)))
 | |
| 263 | 288 |  | 
| 264 | -                # Remove the size from the removed size
 | |
| 265 | -                self.set_cache_size(self._cache_size - size)
 | |
| 289 | +        self._message(MessageType.INFO,
 | |
| 290 | +                      "New artifact cache size: {}".format(
 | |
| 291 | +                          utils._pretty_size(self._cache_size)))
 | |
| 266 | 292 |  | 
| 267 | -        # This should be O(1) if implemented correctly
 | |
| 268 | -        return self.get_cache_size()
 | |
| 293 | +        return old_cache_size - self._cache_size
 | |
| 269 | 294 |  | 
| 270 | -    # compute_cache_size()
 | |
| 295 | +    # add_artifact_size()
 | |
| 271 | 296 |      #
 | 
| 272 | -    # Computes the real artifact cache size by calling
 | |
| 273 | -    # the abstract calculate_cache_size() method.
 | |
| 297 | +    # Adds given artifact size to the cache size
 | |
| 274 | 298 |      #
 | 
| 275 | -    # Returns:
 | |
| 276 | -    #    (int): The size of the artifact cache.
 | |
| 299 | +    # Args:
 | |
| 300 | +    #     artifact_size (int): The artifact size to add.
 | |
| 277 | 301 |      #
 | 
| 278 | -    def compute_cache_size(self):
 | |
| 279 | -        self._cache_size = self.calculate_cache_size()
 | |
| 302 | +    def add_artifact_size(self, artifact_size):
 | |
| 303 | +        assert utils._is_main_process()
 | |
| 280 | 304 |  | 
| 281 | -        return self._cache_size
 | |
| 305 | +        self._cache_size = self.get_cache_size() + artifact_size
 | |
| 306 | +        self._write_cache_size(self._cache_size)
 | |
| 282 | 307 |  | 
| 283 | -    # add_artifact_size()
 | |
| 308 | +    # subtract_artifact_size()
 | |
| 284 | 309 |      #
 | 
| 285 | -    # Adds the reported size of a newly cached artifact to the
 | |
| 286 | -    # overall estimated size.
 | |
| 310 | +    # Subtracts given artifact size from the cache size
 | |
| 287 | 311 |      #
 | 
| 288 | 312 |      # Args:
 | 
| 289 | -    #     artifact_size (int): The size to add.
 | |
| 313 | +    #     artifact_size (int): The artifact size to subtract.
 | |
| 290 | 314 |      #
 | 
| 291 | -    def add_artifact_size(self, artifact_size):
 | |
| 292 | -        cache_size = self.get_cache_size()
 | |
| 293 | -        cache_size += artifact_size
 | |
| 294 | - | |
| 295 | -        self.set_cache_size(cache_size)
 | |
| 315 | +    def subtract_artifact_size(self, artifact_size):
 | |
| 316 | +        self.add_artifact_size(artifact_size * -1)
 | |
| 296 | 317 |  | 
| 297 | 318 |      # get_cache_size()
 | 
| 298 | 319 |      #
 | 
| 299 | -    # Fetches the cached size of the cache, this is sometimes
 | |
| 300 | -    # an estimate and periodically adjusted to the real size
 | |
| 301 | -    # when a cache size calculation job runs.
 | |
| 302 | -    #
 | |
| 303 | -    # When it is an estimate, the value is either correct, or
 | |
| 304 | -    # it is greater than the actual cache size.
 | |
| 320 | +    # Returns the size of the artifact cache.
 | |
| 305 | 321 |      #
 | 
| 306 | 322 |      # Returns:
 | 
| 307 | -    #     (int) An approximation of the artifact cache size.
 | |
| 323 | +    #     (int): The size of the artifact cache.
 | |
| 308 | 324 |      #
 | 
| 309 | 325 |      def get_cache_size(self):
 | 
| 326 | +        if self._cache_size is None:
 | |
| 327 | +            self._cache_size = self._read_cache_size()
 | |
| 310 | 328 |  | 
| 311 | -        # If we don't currently have an estimate, figure out the real cache size.
 | |
| 312 | 329 |          if self._cache_size is None:
 | 
| 313 | -            stored_size = self._read_cache_size()
 | |
| 314 | -            if stored_size is not None:
 | |
| 315 | -                self._cache_size = stored_size
 | |
| 316 | -            else:
 | |
| 317 | -                self.compute_cache_size()
 | |
| 330 | +            self._cache_size = self.calculate_cache_size()
 | |
| 318 | 331 |  | 
| 319 | 332 |          return self._cache_size
 | 
| 320 | 333 |  | 
| 321 | -    # set_cache_size()
 | |
| 322 | -    #
 | |
| 323 | -    # Forcefully set the overall cache size.
 | |
| 324 | -    #
 | |
| 325 | -    # This is used to update the size in the main process after
 | |
| 326 | -    # having calculated in a cleanup or a cache size calculation job.
 | |
| 327 | -    #
 | |
| 328 | -    # Args:
 | |
| 329 | -    #     cache_size (int): The size to set.
 | |
| 330 | -    #
 | |
| 331 | -    def set_cache_size(self, cache_size):
 | |
| 332 | - | |
| 333 | -        assert cache_size is not None
 | |
| 334 | - | |
| 335 | -        self._cache_size = cache_size
 | |
| 336 | -        self._write_cache_size(self._cache_size)
 | |
| 337 | - | |
| 338 | -    # get_quota_exceeded()
 | |
| 334 | +    # has_quota_exceeded()
 | |
| 339 | 335 |      #
 | 
| 340 | 336 |      # Checks if the current artifact cache size exceeds the quota.
 | 
| 341 | 337 |      #
 | 
| 342 | 338 |      # Returns:
 | 
| 343 | 339 |      #    (bool): True of the quota is exceeded
 | 
| 344 | 340 |      #
 | 
| 345 | -    def get_quota_exceeded(self):
 | |
| 341 | +    def has_quota_exceeded(self):
 | |
| 346 | 342 |          return self.get_cache_size() > self._cache_quota
 | 
| 347 | 343 |  | 
| 348 | 344 |      ################################################
 | 
| ... | ... | @@ -441,6 +437,10 @@ class ArtifactCache(): | 
| 441 | 437 |      #     content (str): The element's content directory
 | 
| 442 | 438 |      #     keys (list): The cache keys to use
 | 
| 443 | 439 |      #
 | 
| 440 | +    # Returns:
 | |
| 441 | +    #     (int): Disk size overhead in bytes required to cache the
 | |
| 442 | +    #            artifact
 | |
| 443 | +    #
 | |
| 444 | 444 |      def commit(self, element, content, keys):
 | 
| 445 | 445 |          raise ImplError("Cache '{kind}' does not implement commit()"
 | 
| 446 | 446 |                          .format(kind=type(self).__name__))
 | 
| ... | ... | @@ -512,8 +512,9 @@ class ArtifactCache(): | 
| 512 | 512 |      #     progress (callable): The progress callback, if any
 | 
| 513 | 513 |      #
 | 
| 514 | 514 |      # Returns:
 | 
| 515 | -    #   (bool): True if pull was successful, False if artifact was not available
 | |
| 516 | -    #
 | |
| 515 | +    #     (bool): True if pull was successful, False if artifact was not available
 | |
| 516 | +    #     (int): Disk size overhead in bytes required to cache the
 | |
| 517 | +    #            artifact
 | |
| 517 | 518 |      def pull(self, element, key, *, progress=None):
 | 
| 518 | 519 |          raise ImplError("Cache '{kind}' does not implement pull()"
 | 
| 519 | 520 |                          .format(kind=type(self).__name__))
 | 
| ... | ... | @@ -16,6 +16,7 @@ | 
| 16 | 16 |  #
 | 
| 17 | 17 |  #  Authors:
 | 
| 18 | 18 |  #        Jürg Billeter <juerg billeter codethink co uk>
 | 
| 19 | +#        Tiago Gomes <tiago gomes codethink co uk>
 | |
| 19 | 20 |  | 
| 20 | 21 |  import hashlib
 | 
| 21 | 22 |  import itertools
 | 
| ... | ... | @@ -117,11 +118,13 @@ class CASCache(ArtifactCache): | 
| 117 | 118 |      def commit(self, element, content, keys):
 | 
| 118 | 119 |          refs = [self.get_artifact_fullname(element, key) for key in keys]
 | 
| 119 | 120 |  | 
| 120 | -        tree = self._create_tree(content)
 | |
| 121 | +        tree, size = self._commit_directory(content)
 | |
| 121 | 122 |  | 
| 122 | 123 |          for ref in refs:
 | 
| 123 | 124 |              self.set_ref(ref, tree)
 | 
| 124 | 125 |  | 
| 126 | +        return size
 | |
| 127 | + | |
| 125 | 128 |      def diff(self, element, key_a, key_b, *, subdir=None):
 | 
| 126 | 129 |          ref_a = self.get_artifact_fullname(element, key_a)
 | 
| 127 | 130 |          ref_b = self.get_artifact_fullname(element, key_b)
 | 
| ... | ... | @@ -239,12 +242,12 @@ class CASCache(ArtifactCache): | 
| 239 | 242 |                  tree.hash = response.digest.hash
 | 
| 240 | 243 |                  tree.size_bytes = response.digest.size_bytes
 | 
| 241 | 244 |  | 
| 242 | -                self._fetch_directory(remote, tree)
 | |
| 245 | +                size = self._fetch_directory(remote, tree)
 | |
| 243 | 246 |  | 
| 244 | 247 |                  self.set_ref(ref, tree)
 | 
| 245 | 248 |  | 
| 246 | 249 |                  # no need to pull from additional remotes
 | 
| 247 | -                return True
 | |
| 250 | +                return True, size
 | |
| 248 | 251 |  | 
| 249 | 252 |              except grpc.RpcError as e:
 | 
| 250 | 253 |                  if e.code() != grpc.StatusCode.NOT_FOUND:
 | 
| ... | ... | @@ -258,7 +261,7 @@ class CASCache(ArtifactCache): | 
| 258 | 261 |                              remote.spec.url, element._get_brief_display_key())
 | 
| 259 | 262 |                      ))
 | 
| 260 | 263 |  | 
| 261 | -        return False
 | |
| 264 | +        return False, 0
 | |
| 262 | 265 |  | 
| 263 | 266 |      def pull_tree(self, project, digest):
 | 
| 264 | 267 |          """ Pull a single Tree rather than an artifact.
 | 
| ... | ... | @@ -437,6 +440,7 @@ class CASCache(ArtifactCache): | 
| 437 | 440 |      #
 | 
| 438 | 441 |      # Returns:
 | 
| 439 | 442 |      #     (Digest): The digest of the added object
 | 
| 443 | +    #     (int): The amount of bytes required to store the object
 | |
| 440 | 444 |      #
 | 
| 441 | 445 |      # Either `path` or `buffer` must be passed, but not both.
 | 
| 442 | 446 |      #
 | 
| ... | ... | @@ -465,22 +469,38 @@ class CASCache(ArtifactCache): | 
| 465 | 469 |  | 
| 466 | 470 |                  out.flush()
 | 
| 467 | 471 |  | 
| 472 | +                file_size = os.fstat(out.fileno()).st_size
 | |
| 473 | + | |
| 468 | 474 |                  digest.hash = h.hexdigest()
 | 
| 469 | -                digest.size_bytes = os.fstat(out.fileno()).st_size
 | |
| 475 | +                digest.size_bytes = file_size
 | |
| 470 | 476 |  | 
| 471 | 477 |                  # Place file at final location
 | 
| 472 | 478 |                  objpath = self.objpath(digest)
 | 
| 473 | -                os.makedirs(os.path.dirname(objpath), exist_ok=True)
 | |
| 479 | +                dirpath = os.path.dirname(objpath)
 | |
| 480 | + | |
| 481 | +                # Track the increased size on the parent directory caused by
 | |
| 482 | +                # adding a new entry, as these directories can contain a large
 | |
| 483 | +                # number of files.
 | |
| 484 | +                new_dir_size = 0
 | |
| 485 | +                old_dir_size = 0
 | |
| 486 | +                try:
 | |
| 487 | +                    os.makedirs(dirpath)
 | |
| 488 | +                except FileExistsError:
 | |
| 489 | +                    old_dir_size = os.stat(dirpath).st_size
 | |
| 490 | +                else:
 | |
| 491 | +                    new_dir_size = os.stat(dirpath).st_size
 | |
| 492 | + | |
| 474 | 493 |                  os.link(out.name, objpath)
 | 
| 494 | +                new_dir_size = os.stat(dirpath).st_size - old_dir_size
 | |
| 475 | 495 |  | 
| 476 | 496 |          except FileExistsError as e:
 | 
| 477 | 497 |              # We can ignore the failed link() if the object is already in the repo.
 | 
| 478 | -            pass
 | |
| 498 | +            file_size = 0
 | |
| 479 | 499 |  | 
| 480 | 500 |          except OSError as e:
 | 
| 481 | 501 |              raise ArtifactError("Failed to hash object: {}".format(e)) from e
 | 
| 482 | 502 |  | 
| 483 | -        return digest
 | |
| 503 | +        return digest, file_size + new_dir_size
 | |
| 484 | 504 |  | 
| 485 | 505 |      # set_ref():
 | 
| 486 | 506 |      #
 | 
| ... | ... | @@ -489,6 +509,8 @@ class CASCache(ArtifactCache): | 
| 489 | 509 |      # Args:
 | 
| 490 | 510 |      #     ref (str): The name of the ref
 | 
| 491 | 511 |      #
 | 
| 512 | +    # Note: as setting a ref has very low disk size overhead, don't
 | |
| 513 | +    # bother to track this.
 | |
| 492 | 514 |      def set_ref(self, ref, tree):
 | 
| 493 | 515 |          refpath = self._refpath(ref)
 | 
| 494 | 516 |          os.makedirs(os.path.dirname(refpath), exist_ok=True)
 | 
| ... | ... | @@ -665,7 +687,23 @@ class CASCache(ArtifactCache): | 
| 665 | 687 |      def _refpath(self, ref):
 | 
| 666 | 688 |          return os.path.join(self.casdir, 'refs', 'heads', ref)
 | 
| 667 | 689 |  | 
| 668 | -    def _create_tree(self, path, *, digest=None):
 | |
| 690 | +    # _commit_directory():
 | |
| 691 | +    #
 | |
| 692 | +    # Adds local directory to content addressable store.
 | |
| 693 | +    #
 | |
| 694 | +    # Adds files, symbolic links and recursively other directories in
 | |
| 695 | +    # a local directory to the content addressable store.
 | |
| 696 | +    #
 | |
| 697 | +    # Args:
 | |
| 698 | +    #     path (str): Path to the directory to add.
 | |
| 699 | +    #     dir_digest (Digest): An optional Digest object to use.
 | |
| 700 | +    #
 | |
| 701 | +    # Returns:
 | |
| 702 | +    #     (Digest): Digest object for the directory added.
 | |
| 703 | +    #     (int): Bytes required to cache local directory
 | |
| 704 | +    #
 | |
| 705 | +    def _commit_directory(self, path, *, dir_digest=None):
 | |
| 706 | +        size = 0
 | |
| 669 | 707 |          directory = remote_execution_pb2.Directory()
 | 
| 670 | 708 |  | 
| 671 | 709 |          for name in sorted(os.listdir(path)):
 | 
| ... | ... | @@ -674,20 +712,26 @@ class CASCache(ArtifactCache): | 
| 674 | 712 |              if stat.S_ISDIR(mode):
 | 
| 675 | 713 |                  dirnode = directory.directories.add()
 | 
| 676 | 714 |                  dirnode.name = name
 | 
| 677 | -                self._create_tree(full_path, digest=dirnode.digest)
 | |
| 715 | +                size += self._commit_directory(full_path, dir_digest=dirnode.digest)[1]
 | |
| 678 | 716 |              elif stat.S_ISREG(mode):
 | 
| 679 | 717 |                  filenode = directory.files.add()
 | 
| 680 | 718 |                  filenode.name = name
 | 
| 681 | -                self.add_object(path=full_path, digest=filenode.digest)
 | |
| 719 | +                size += self.add_object(path=full_path, digest=filenode.digest)[1]
 | |
| 682 | 720 |                  filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
 | 
| 683 | 721 |              elif stat.S_ISLNK(mode):
 | 
| 684 | 722 |                  symlinknode = directory.symlinks.add()
 | 
| 685 | 723 |                  symlinknode.name = name
 | 
| 686 | 724 |                  symlinknode.target = os.readlink(full_path)
 | 
| 725 | +            elif stat.S_ISSOCK(mode):
 | |
| 726 | +                # The process serving the socket can't be cached anyway
 | |
| 727 | +                pass
 | |
| 687 | 728 |              else:
 | 
| 688 | 729 |                  raise ArtifactError("Unsupported file type for {}".format(full_path))
 | 
| 689 | 730 |  | 
| 690 | -        return self.add_object(digest=digest, buffer=directory.SerializeToString())
 | |
| 731 | +        dir_digest, dir_object_size = self.add_object(
 | |
| 732 | +            digest=dir_digest, buffer=directory.SerializeToString())
 | |
| 733 | + | |
| 734 | +        return dir_digest, size + dir_object_size
 | |
| 691 | 735 |  | 
| 692 | 736 |      def _get_subdir(self, tree, subdir):
 | 
| 693 | 737 |          head, name = os.path.split(subdir)
 | 
| ... | ... | @@ -830,14 +874,30 @@ class CASCache(ArtifactCache): | 
| 830 | 874 |  | 
| 831 | 875 |          assert digest.size_bytes == os.fstat(stream.fileno()).st_size
 | 
| 832 | 876 |  | 
| 833 | -    def _fetch_directory(self, remote, tree):
 | |
| 834 | -        objpath = self.objpath(tree)
 | |
| 877 | +    # _fetch_directory():
 | |
| 878 | +    #
 | |
| 879 | +    # Fetches remote directory and adds it to content addressable store.
 | |
| 880 | +    #
 | |
| 881 | +    # Fetches files, symbolic links and recursively other directories in
 | |
| 882 | +    # the remote directory and adds them to the content addressable
 | |
| 883 | +    # store.
 | |
| 884 | +    #
 | |
| 885 | +    # Args:
 | |
| 886 | +    #     remote (Remote): The remote to use.
 | |
| 887 | +    #     dir_digest (Digest): Digest object for the directory to fetch.
 | |
| 888 | +    #
 | |
| 889 | +    # Returns:
 | |
| 890 | +    #     (int): Bytes required to cache fetched directory
 | |
| 891 | +    #
 | |
| 892 | +    def _fetch_directory(self, remote, dir_digest):
 | |
| 893 | +        size = 0
 | |
| 894 | +        objpath = self.objpath(dir_digest)
 | |
| 835 | 895 |          if os.path.exists(objpath):
 | 
| 836 | 896 |              # already in local cache
 | 
| 837 | -            return
 | |
| 897 | +            return 0
 | |
| 838 | 898 |  | 
| 839 | 899 |          with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
 | 
| 840 | -            self._fetch_blob(remote, tree, out)
 | |
| 900 | +            self._fetch_blob(remote, dir_digest, out)
 | |
| 841 | 901 |  | 
| 842 | 902 |              directory = remote_execution_pb2.Directory()
 | 
| 843 | 903 |  | 
| ... | ... | @@ -845,7 +905,7 @@ class CASCache(ArtifactCache): | 
| 845 | 905 |                  directory.ParseFromString(f.read())
 | 
| 846 | 906 |  | 
| 847 | 907 |              for filenode in directory.files:
 | 
| 848 | -                fileobjpath = self.objpath(tree)
 | |
| 908 | +                fileobjpath = self.objpath(filenode.digest)
 | |
| 849 | 909 |                  if os.path.exists(fileobjpath):
 | 
| 850 | 910 |                      # already in local cache
 | 
| 851 | 911 |                      continue
 | 
| ... | ... | @@ -853,16 +913,23 @@ class CASCache(ArtifactCache): | 
| 853 | 913 |                  with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | 
| 854 | 914 |                      self._fetch_blob(remote, filenode.digest, f)
 | 
| 855 | 915 |  | 
| 856 | -                    digest = self.add_object(path=f.name)
 | |
| 916 | +                    digest, obj_size = self.add_object(path=f.name)
 | |
| 917 | +                    size += obj_size
 | |
| 857 | 918 |                      assert digest.hash == filenode.digest.hash
 | 
| 858 | 919 |  | 
| 859 | 920 |              for dirnode in directory.directories:
 | 
| 860 | -                self._fetch_directory(remote, dirnode.digest)
 | |
| 921 | +                size += self._fetch_directory(remote, dirnode.digest)
 | |
| 922 | + | |
| 923 | +            # Place directory blob only in final location when we've
 | |
| 924 | +            # downloaded all referenced blobs to avoid dangling
 | |
| 925 | +            # references in the repository.
 | |
| 926 | +            digest, obj_size = self.add_object(path=out.name)
 | |
| 927 | + | |
| 928 | +            assert digest.hash == dir_digest.hash
 | |
| 929 | + | |
| 930 | +            size += obj_size
 | |
| 861 | 931 |  | 
| 862 | -            # place directory blob only in final location when we've downloaded
 | |
| 863 | -            # all referenced blobs to avoid dangling references in the repository
 | |
| 864 | -            digest = self.add_object(path=out.name)
 | |
| 865 | -            assert digest.hash == tree.hash
 | |
| 932 | +            return size
 | |
| 866 | 933 |  | 
| 867 | 934 |      def _fetch_tree(self, remote, digest):
 | 
| 868 | 935 |          # download but do not store the Tree object
 | 
| ... | ... | @@ -885,13 +952,13 @@ class CASCache(ArtifactCache): | 
| 885 | 952 |                      with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
 | 
| 886 | 953 |                          self._fetch_blob(remote, filenode.digest, f)
 | 
| 887 | 954 |  | 
| 888 | -                        added_digest = self.add_object(path=f.name)
 | |
| 955 | +                        added_digest = self.add_object(path=f.name)[0]
 | |
| 889 | 956 |                          assert added_digest.hash == filenode.digest.hash
 | 
| 890 | 957 |  | 
| 891 | 958 |                  # place directory blob only in final location when we've downloaded
 | 
| 892 | 959 |                  # all referenced blobs to avoid dangling references in the repository
 | 
| 893 | 960 |                  dirbuffer = directory.SerializeToString()
 | 
| 894 | -                dirdigest = self.add_object(buffer=dirbuffer)
 | |
| 961 | +                dirdigest = self.add_object(buffer=dirbuffer)[0]
 | |
| 895 | 962 |                  assert dirdigest.size_bytes == len(dirbuffer)
 | 
| 896 | 963 |  | 
| 897 | 964 |          return dirdigest
 | 
| ... | ... | @@ -210,7 +210,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): | 
| 210 | 210 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 211 | 211 |                          return response
 | 
| 212 | 212 |                      out.flush()
 | 
| 213 | -                    digest = self.cas.add_object(path=out.name)
 | |
| 213 | +                    digest = self.cas.add_object(path=out.name)[0]
 | |
| 214 | 214 |                      if digest.hash != client_digest.hash:
 | 
| 215 | 215 |                          context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
 | 
| 216 | 216 |                          return response
 | 
| ... | ... | @@ -119,7 +119,6 @@ class Context(): | 
| 119 | 119 |          self._log_handle = None
 | 
| 120 | 120 |          self._log_filename = None
 | 
| 121 | 121 |          self.config_cache_quota = 'infinity'
 | 
| 122 | -        self.artifactdir_volume = None
 | |
| 123 | 122 |  | 
| 124 | 123 |      # load()
 | 
| 125 | 124 |      #
 | 
| 1 | +#
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | +#
 | |
| 4 | +#  This program is free software; you can redistribute it and/or
 | |
| 5 | +#  modify it under the terms of the GNU Lesser General Public
 | |
| 6 | +#  License as published by the Free Software Foundation; either
 | |
| 7 | +#  version 2 of the License, or (at your option) any later version.
 | |
| 8 | +#
 | |
| 9 | +#  This library is distributed in the hope that it will be useful,
 | |
| 10 | +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| 11 | +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | |
| 12 | +#  Lesser General Public License for more details.
 | |
| 13 | +#
 | |
| 14 | +#  You should have received a copy of the GNU Lesser General Public
 | |
| 15 | +#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 16 | +#
 | |
| 17 | +#  Authors:
 | |
| 18 | +#        Tristan Maat <tristan maat codethink co uk>
 | |
| 19 | + | |
| 1 | 20 |  from .elementjob import ElementJob
 | 
| 2 | -from .cachesizejob import CacheSizeJob
 | |
| 3 | 21 |  from .cleanupjob import CleanupJob | 
| 1 | -#  Copyright (C) 2018 Codethink Limited
 | |
| 2 | -#
 | |
| 3 | -#  This program is free software; you can redistribute it and/or
 | |
| 4 | -#  modify it under the terms of the GNU Lesser General Public
 | |
| 5 | -#  License as published by the Free Software Foundation; either
 | |
| 6 | -#  version 2 of the License, or (at your option) any later version.
 | |
| 7 | -#
 | |
| 8 | -#  This library is distributed in the hope that it will be useful,
 | |
| 9 | -#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| 10 | -#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
 | |
| 11 | -#  Lesser General Public License for more details.
 | |
| 12 | -#
 | |
| 13 | -#  You should have received a copy of the GNU Lesser General Public
 | |
| 14 | -#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 15 | -#
 | |
| 16 | -#  Author:
 | |
| 17 | -#        Tristan Daniël Maat <tristan maat codethink co uk>
 | |
| 18 | -#
 | |
| 19 | -from .job import Job
 | |
| 20 | -from ..._platform import Platform
 | |
| 21 | - | |
| 22 | - | |
| 23 | -class CacheSizeJob(Job):
 | |
| 24 | -    def __init__(self, *args, complete_cb, **kwargs):
 | |
| 25 | -        super().__init__(*args, **kwargs)
 | |
| 26 | -        self._complete_cb = complete_cb
 | |
| 27 | - | |
| 28 | -        platform = Platform.get_platform()
 | |
| 29 | -        self._artifacts = platform.artifactcache
 | |
| 30 | - | |
| 31 | -    def child_process(self):
 | |
| 32 | -        return self._artifacts.compute_cache_size()
 | |
| 33 | - | |
| 34 | -    def parent_complete(self, success, result):
 | |
| 35 | -        if success:
 | |
| 36 | -            self._artifacts.set_cache_size(result)
 | |
| 37 | - | |
| 38 | -            if self._complete_cb:
 | |
| 39 | -                self._complete_cb(result)
 | |
| 40 | - | |
| 41 | -    def child_process_data(self):
 | |
| 42 | -        return {} | 
| ... | ... | @@ -21,9 +21,8 @@ from ..._platform import Platform | 
| 21 | 21 |  | 
| 22 | 22 |  | 
| 23 | 23 |  class CleanupJob(Job):
 | 
| 24 | -    def __init__(self, *args, complete_cb, **kwargs):
 | |
| 24 | +    def __init__(self, *args, **kwargs):
 | |
| 25 | 25 |          super().__init__(*args, **kwargs)
 | 
| 26 | -        self._complete_cb = complete_cb
 | |
| 27 | 26 |  | 
| 28 | 27 |          platform = Platform.get_platform()
 | 
| 29 | 28 |          self._artifacts = platform.artifactcache
 | 
| ... | ... | @@ -33,10 +32,4 @@ class CleanupJob(Job): | 
| 33 | 32 |  | 
| 34 | 33 |      def parent_complete(self, success, result):
 | 
| 35 | 34 |          if success:
 | 
| 36 | -            self._artifacts.set_cache_size(result)
 | |
| 37 | - | |
| 38 | -            if self._complete_cb:
 | |
| 39 | -                self._complete_cb()
 | |
| 40 | - | |
| 41 | -    def child_process_data(self):
 | |
| 42 | -        return {} | |
| 35 | +            self._artifacts.subtract_artifact_size(result) | 
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2016 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -87,31 +87,17 @@ class BuildQueue(Queue): | 
| 87 | 87 |  | 
| 88 | 88 |          return QueueStatus.READY
 | 
| 89 | 89 |  | 
| 90 | -    def _check_cache_size(self, job, element, artifact_size):
 | |
| 91 | - | |
| 92 | -        # After completing a build job, add the artifact size
 | |
| 93 | -        # as returned from Element._assemble() to the estimated
 | |
| 94 | -        # artifact cache size
 | |
| 95 | -        #
 | |
| 96 | -        platform = Platform.get_platform()
 | |
| 97 | -        artifacts = platform.artifactcache
 | |
| 98 | - | |
| 99 | -        artifacts.add_artifact_size(artifact_size)
 | |
| 100 | - | |
| 101 | -        # If the estimated size outgrows the quota, ask the scheduler
 | |
| 102 | -        # to queue a job to actually check the real cache size.
 | |
| 103 | -        #
 | |
| 104 | -        if artifacts.get_quota_exceeded():
 | |
| 105 | -            self._scheduler.check_cache_size()
 | |
| 106 | - | |
| 107 | 90 |      def done(self, job, element, result, success):
 | 
| 91 | +        if not success:
 | |
| 92 | +            return False
 | |
| 93 | + | |
| 94 | +        element._assemble_done()
 | |
| 108 | 95 |  | 
| 109 | -        if success:
 | |
| 110 | -            # Inform element in main process that assembly is done
 | |
| 111 | -            element._assemble_done()
 | |
| 96 | +        artifacts = Platform.get_platform().artifactcache
 | |
| 97 | +        artifacts.add_artifact_size(result)
 | |
| 112 | 98 |  | 
| 113 | -            # This has to be done after _assemble_done, such that the
 | |
| 114 | -            # element may register its cache key as required
 | |
| 115 | -            self._check_cache_size(job, element, result)
 | |
| 99 | +        # This has to be done after _assemble_done, such that the
 | |
| 100 | +        # element may register its cache key as required
 | |
| 101 | +        self._scheduler.check_cache_size()
 | |
| 116 | 102 |  | 
| 117 | -        return True | |
| 103 | +        return success | 
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2016 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -21,6 +21,7 @@ | 
| 21 | 21 |  # Local imports
 | 
| 22 | 22 |  from . import Queue, QueueStatus
 | 
| 23 | 23 |  from ..resources import ResourceType
 | 
| 24 | +from ..._platform import Platform
 | |
| 24 | 25 |  | 
| 25 | 26 |  | 
| 26 | 27 |  # A queue which pulls element artifacts
 | 
| ... | ... | @@ -52,18 +53,21 @@ class PullQueue(Queue): | 
| 52 | 53 |          else:
 | 
| 53 | 54 |              return QueueStatus.SKIP
 | 
| 54 | 55 |  | 
| 55 | -    def done(self, _, element, result, success):
 | |
| 56 | - | |
| 56 | +    def done(self, job, element, result, success):
 | |
| 57 | 57 |          if not success:
 | 
| 58 | 58 |              return False
 | 
| 59 | 59 |  | 
| 60 | 60 |          element._pull_done()
 | 
| 61 | 61 |  | 
| 62 | -        # Build jobs will check the "approximate" size first. Since we
 | |
| 63 | -        # do not get an artifact size from pull jobs, we have to
 | |
| 64 | -        # actually check the cache size.
 | |
| 62 | +        pulled, artifact_size = result
 | |
| 63 | + | |
| 64 | +        artifacts = Platform.get_platform().artifactcache
 | |
| 65 | +        artifacts.add_artifact_size(artifact_size)
 | |
| 66 | + | |
| 67 | +        # This has to be done after _pull_done, such that the
 | |
| 68 | +        # element may register its cache key as required
 | |
| 65 | 69 |          self._scheduler.check_cache_size()
 | 
| 66 | 70 |  | 
| 67 | 71 |          # Element._pull() returns True if it downloaded an artifact,
 | 
| 68 | 72 |          # here we want to appear skipped if we did not download.
 | 
| 69 | -        return result | |
| 73 | +        return pulled | 
| 1 | 1 |  #
 | 
| 2 | -#  Copyright (C) 2016 Codethink Limited
 | |
| 2 | +#  Copyright (C) 2018 Codethink Limited
 | |
| 3 | 3 |  #
 | 
| 4 | 4 |  #  This program is free software; you can redistribute it and/or
 | 
| 5 | 5 |  #  modify it under the terms of the GNU Lesser General Public
 | 
| ... | ... | @@ -28,7 +28,7 @@ from contextlib import contextmanager | 
| 28 | 28 |  | 
| 29 | 29 |  # Local imports
 | 
| 30 | 30 |  from .resources import Resources, ResourceType
 | 
| 31 | -from .jobs import CacheSizeJob, CleanupJob
 | |
| 31 | +from .jobs import CleanupJob
 | |
| 32 | 32 |  from .._platform import Platform
 | 
| 33 | 33 |  | 
| 34 | 34 |  | 
| ... | ... | @@ -243,22 +243,19 @@ class Scheduler(): | 
| 243 | 243 |  | 
| 244 | 244 |      # check_cache_size():
 | 
| 245 | 245 |      #
 | 
| 246 | -    # Queues a cache size calculation job, after the cache
 | |
| 247 | -    # size is calculated, a cleanup job will be run automatically
 | |
| 248 | -    # if needed.
 | |
| 249 | -    #
 | |
| 250 | -    # FIXME: This should ensure that only one cache size job
 | |
| 251 | -    #        is ever pending at a given time. If a cache size
 | |
| 252 | -    #        job is already running, it is correct to queue
 | |
| 253 | -    #        a new one, it is incorrect to have more than one
 | |
| 254 | -    #        of these jobs pending at a given time, though.
 | |
| 246 | +    # Queues a cleanup job if the size of the artifact cache exceeded
 | |
| 247 | +    # the quota
 | |
| 255 | 248 |      #
 | 
| 256 | 249 |      def check_cache_size(self):
 | 
| 257 | -        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
 | |
| 258 | -                           resources=[ResourceType.CACHE,
 | |
| 259 | -                                      ResourceType.PROCESS],
 | |
| 260 | -                           complete_cb=self._run_cleanup)
 | |
| 261 | -        self.schedule_jobs([job])
 | |
| 250 | +        artifacts = Platform.get_platform().artifactcache
 | |
| 251 | + | |
| 252 | +        if artifacts.has_quota_exceeded():
 | |
| 253 | +            job = CleanupJob(self, 'Clean artifact cache',
 | |
| 254 | +                             'cleanup/cleanup',
 | |
| 255 | +                             resources=[ResourceType.CACHE,
 | |
| 256 | +                                        ResourceType.PROCESS],
 | |
| 257 | +                             exclusive_resources=[ResourceType.CACHE])
 | |
| 258 | +            self.schedule_jobs([job])
 | |
| 262 | 259 |  | 
| 263 | 260 |      #######################################################
 | 
| 264 | 261 |      #                  Local Private Methods              #
 | 
| ... | ... | @@ -335,32 +332,6 @@ class Scheduler(): | 
| 335 | 332 |          self.schedule_jobs(ready)
 | 
| 336 | 333 |          self._sched()
 | 
| 337 | 334 |  | 
| 338 | -    # _run_cleanup()
 | |
| 339 | -    #
 | |
| 340 | -    # Schedules the cache cleanup job if the passed size
 | |
| 341 | -    # exceeds the cache quota.
 | |
| 342 | -    #
 | |
| 343 | -    # Args:
 | |
| 344 | -    #    cache_size (int): The calculated cache size (ignored)
 | |
| 345 | -    #
 | |
| 346 | -    # NOTE: This runs in response to completion of the cache size
 | |
| 347 | -    #       calculation job lauched by Scheduler.check_cache_size(),
 | |
| 348 | -    #       which will report the calculated cache size.
 | |
| 349 | -    #
 | |
| 350 | -    def _run_cleanup(self, cache_size):
 | |
| 351 | -        platform = Platform.get_platform()
 | |
| 352 | -        artifacts = platform.artifactcache
 | |
| 353 | - | |
| 354 | -        if not artifacts.get_quota_exceeded():
 | |
| 355 | -            return
 | |
| 356 | - | |
| 357 | -        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
 | |
| 358 | -                         resources=[ResourceType.CACHE,
 | |
| 359 | -                                    ResourceType.PROCESS],
 | |
| 360 | -                         exclusive_resources=[ResourceType.CACHE],
 | |
| 361 | -                         complete_cb=None)
 | |
| 362 | -        self.schedule_jobs([job])
 | |
| 363 | - | |
| 364 | 335 |      # _suspend_jobs()
 | 
| 365 | 336 |      #
 | 
| 366 | 337 |      # Suspend all ongoing jobs.
 | 
| ... | ... | @@ -938,13 +938,10 @@ class Stream(): | 
| 938 | 938 |          # Set the "required" artifacts that should not be removed
 | 
| 939 | 939 |          # while this pipeline is active
 | 
| 940 | 940 |          #
 | 
| 941 | -        # FIXME: The set of required artifacts is only really needed
 | |
| 942 | -        #        for build and pull tasks.
 | |
| 941 | +        # It must include all the artifacts which are required by the
 | |
| 942 | +        # final product. Note that this is a superset of the build plan.
 | |
| 943 | 943 |          #
 | 
| 944 | -        #        It must include all the artifacts which are required by the
 | |
| 945 | -        #        final product. Note that this is a superset of the build plan.
 | |
| 946 | -        #
 | |
| 947 | -        self._artifacts.append_required_artifacts((e for e in self._pipeline.dependencies(elements, Scope.ALL)))
 | |
| 944 | +        self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL))
 | |
| 948 | 945 |  | 
| 949 | 946 |          if selection == PipelineSelection.PLAN and dynamic_plan:
 | 
| 950 | 947 |              # We use a dynamic build plan, only request artifacts of top-level targets,
 | 
| ... | ... | @@ -200,7 +200,6 @@ class Element(Plugin): | 
| 200 | 200 |          self.__strict_cache_key = None          # Our cached cache key for strict builds
 | 
| 201 | 201 |          self.__artifacts = artifacts            # Artifact cache
 | 
| 202 | 202 |          self.__consistency = Consistency.INCONSISTENT  # Cached overall consistency state
 | 
| 203 | -        self.__cached = None                    # Whether we have a cached artifact
 | |
| 204 | 203 |          self.__strong_cached = None             # Whether we have a cached artifact
 | 
| 205 | 204 |          self.__weak_cached = None               # Whether we have a cached artifact
 | 
| 206 | 205 |          self.__assemble_scheduled = False       # Element is scheduled to be assembled
 | 
| ... | ... | @@ -1126,8 +1125,6 @@ class Element(Plugin): | 
| 1126 | 1125 |  | 
| 1127 | 1126 |          # Query caches now that the weak and strict cache keys are available
 | 
| 1128 | 1127 |          key_for_cache_lookup = self.__strict_cache_key if context.get_strict() else self.__weak_cache_key
 | 
| 1129 | -        if not self.__cached:
 | |
| 1130 | -            self.__cached = self.__artifacts.contains(self, key_for_cache_lookup)
 | |
| 1131 | 1128 |          if not self.__strong_cached:
 | 
| 1132 | 1129 |              self.__strong_cached = self.__artifacts.contains(self, self.__strict_cache_key)
 | 
| 1133 | 1130 |          if key_for_cache_lookup == self.__weak_cache_key:
 | 
| ... | ... | @@ -1489,15 +1486,20 @@ class Element(Plugin): | 
| 1489 | 1486 |              workspace.clear_running_files()
 | 
| 1490 | 1487 |              self._get_context().get_workspaces().save_config()
 | 
| 1491 | 1488 |  | 
| 1492 | -            # We also need to update the required artifacts, since
 | |
| 1493 | -            # workspaced dependencies do not have a fixed cache key
 | |
| 1494 | -            # when the build starts.
 | |
| 1489 | +            # This element will have already been marked as
 | |
| 1490 | +            # required, but we bump the atime again, in case
 | |
| 1491 | +            # we did not know the cache key until now.
 | |
| 1495 | 1492 |              #
 | 
| 1496 | -            # This does *not* cause a race condition, because
 | |
| 1497 | -            # _assemble_done is called before a cleanup job may be
 | |
| 1498 | -            # launched.
 | |
| 1493 | +            # FIXME: This is not exactly correct, we should be
 | |
| 1494 | +            #        doing this at the time which we have discovered
 | |
| 1495 | +            #        a new cache key, this just happens to be the
 | |
| 1496 | +            #        last place where that can happen.
 | |
| 1499 | 1497 |              #
 | 
| 1500 | -            self.__artifacts.append_required_artifacts([self])
 | |
| 1498 | +            #        Ultimately, we should be refactoring
 | |
| 1499 | +            #        Element._update_state() such that we know
 | |
| 1500 | +            #        when a cache key is actually discovered.
 | |
| 1501 | +            #
 | |
| 1502 | +            self.__artifacts.mark_required_elements([self])
 | |
| 1501 | 1503 |  | 
| 1502 | 1504 |      # _assemble():
 | 
| 1503 | 1505 |      #
 | 
| ... | ... | @@ -1657,8 +1659,8 @@ class Element(Plugin): | 
| 1657 | 1659 |                      }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
 | 
| 1658 | 1660 |  | 
| 1659 | 1661 |                      with self.timed_activity("Caching artifact"):
 | 
| 1660 | -                        artifact_size = utils._get_dir_size(assembledir)
 | |
| 1661 | -                        self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
 | |
| 1662 | +                        artifact_size = self.__artifacts.commit(
 | |
| 1663 | +                            self, assembledir, self.__get_cache_keys_for_commit())
 | |
| 1662 | 1664 |  | 
| 1663 | 1665 |                      if collect is not None and collectvdir is None:
 | 
| 1664 | 1666 |                          raise ElementError(
 | 
| ... | ... | @@ -1710,31 +1712,31 @@ class Element(Plugin): | 
| 1710 | 1712 |          self._update_state()
 | 
| 1711 | 1713 |  | 
| 1712 | 1714 |      def _pull_strong(self, *, progress=None):
 | 
| 1713 | -        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
 | |
| 1714 | - | |
| 1715 | 1715 |          key = self.__strict_cache_key
 | 
| 1716 | -        if not self.__artifacts.pull(self, key, progress=progress):
 | |
| 1717 | -            return False
 | |
| 1716 | +        pulled, artifact_size = self.__artifacts.pull(self, key,
 | |
| 1717 | +                                                      progress=progress)
 | |
| 1718 | 1718 |  | 
| 1719 | -        # update weak ref by pointing it to this newly fetched artifact
 | |
| 1720 | -        self.__artifacts.link_key(self, key, weak_key)
 | |
| 1719 | +        if pulled:
 | |
| 1720 | +            # update weak ref by pointing it to this newly fetched artifact
 | |
| 1721 | +            weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
 | |
| 1722 | +            self.__artifacts.link_key(self, key, weak_key)
 | |
| 1721 | 1723 |  | 
| 1722 | -        return True
 | |
| 1724 | +        return pulled, artifact_size
 | |
| 1723 | 1725 |  | 
| 1724 | 1726 |      def _pull_weak(self, *, progress=None):
 | 
| 1725 | 1727 |          weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
 | 
| 1728 | +        pulled, artifact_size = self.__artifacts.pull(self, weak_key,
 | |
| 1729 | +                                                      progress=progress)
 | |
| 1726 | 1730 |  | 
| 1727 | -        if not self.__artifacts.pull(self, weak_key, progress=progress):
 | |
| 1728 | -            return False
 | |
| 1729 | - | |
| 1730 | -        # extract strong cache key from this newly fetched artifact
 | |
| 1731 | -        self._pull_done()
 | |
| 1731 | +        if pulled:
 | |
| 1732 | +            # extract strong cache key from this newly fetched artifact
 | |
| 1733 | +            self._pull_done()
 | |
| 1732 | 1734 |  | 
| 1733 | -        # create tag for strong cache key
 | |
| 1734 | -        key = self._get_cache_key(strength=_KeyStrength.STRONG)
 | |
| 1735 | -        self.__artifacts.link_key(self, weak_key, key)
 | |
| 1735 | +            # create tag for strong cache key
 | |
| 1736 | +            key = self._get_cache_key(strength=_KeyStrength.STRONG)
 | |
| 1737 | +            self.__artifacts.link_key(self, weak_key, key)
 | |
| 1736 | 1738 |  | 
| 1737 | -        return True
 | |
| 1739 | +        return pulled, artifact_size
 | |
| 1738 | 1740 |  | 
| 1739 | 1741 |      # _pull():
 | 
| 1740 | 1742 |      #
 | 
| ... | ... | @@ -1749,18 +1751,17 @@ class Element(Plugin): | 
| 1749 | 1751 |              self.status(message)
 | 
| 1750 | 1752 |  | 
| 1751 | 1753 |          # Attempt to pull artifact without knowing whether it's available
 | 
| 1752 | -        pulled = self._pull_strong(progress=progress)
 | |
| 1754 | +        pulled, artifact_size = self._pull_strong(progress=progress)
 | |
| 1753 | 1755 |  | 
| 1754 | 1756 |          if not pulled and not self._cached() and not context.get_strict():
 | 
| 1755 | -            pulled = self._pull_weak(progress=progress)
 | |
| 1757 | +            pulled, artifact_size = self._pull_weak(progress=progress)
 | |
| 1756 | 1758 |  | 
| 1757 | -        if not pulled:
 | |
| 1758 | -            return False
 | |
| 1759 | +        if pulled:
 | |
| 1760 | +            # Notify successfull download
 | |
| 1761 | +            display_key = self._get_brief_display_key()
 | |
| 1762 | +            self.info("Downloaded artifact {}".format(display_key))
 | |
| 1759 | 1763 |  | 
| 1760 | -        # Notify successfull download
 | |
| 1761 | -        display_key = self._get_brief_display_key()
 | |
| 1762 | -        self.info("Downloaded artifact {}".format(display_key))
 | |
| 1763 | -        return True
 | |
| 1764 | +        return pulled, artifact_size
 | |
| 1764 | 1765 |  | 
| 1765 | 1766 |      # _skip_push():
 | 
| 1766 | 1767 |      #
 | 
| ... | ... | @@ -2079,7 +2080,7 @@ class Element(Plugin): | 
| 2079 | 2080 |  | 
| 2080 | 2081 |      def __is_cached(self, keystrength):
 | 
| 2081 | 2082 |          if keystrength is None:
 | 
| 2082 | -            return self.__cached
 | |
| 2083 | +            keystrength = _KeyStrength.STRONG if self._get_context().get_strict() else _KeyStrength.WEAK
 | |
| 2083 | 2084 |  | 
| 2084 | 2085 |          return self.__strong_cached if keystrength == _KeyStrength.STRONG else self.__weak_cached
 | 
| 2085 | 2086 |  | 
| ... | ... | @@ -111,7 +111,7 @@ class CasBasedDirectory(Directory): | 
| 111 | 111 |          the parent).
 | 
| 112 | 112 |  | 
| 113 | 113 |          """
 | 
| 114 | -        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
 | |
| 114 | +        self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
 | |
| 115 | 115 |          if caller:
 | 
| 116 | 116 |              old_dir = self._find_pb2_entry(caller.filename)
 | 
| 117 | 117 |              self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
 | 
| ... | ... | @@ -130,9 +130,10 @@ class CasBasedDirectory(Directory): | 
| 130 | 130 |              self.index[entry.name].buildstream_object._recalculate_recursing_down(entry)
 | 
| 131 | 131 |  | 
| 132 | 132 |          if parent:
 | 
| 133 | -            self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
 | |
| 133 | +            self.ref = self.cas_cache.add_object(digest=parent.digest,
 | |
| 134 | +                                                 buffer=self.pb2_directory.SerializeToString())[0]
 | |
| 134 | 135 |          else:
 | 
| 135 | -            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
 | |
| 136 | +            self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())[0]
 | |
| 136 | 137 |          # We don't need to do anything more than that; files were already added ealier, and symlinks are
 | 
| 137 | 138 |          # part of the directory structure.
 | 
| 138 | 139 |  | 
| ... | ... | @@ -372,6 +372,8 @@ def copy_files(src, dest, *, files=None, ignore_missing=False, report_written=Fa | 
| 372 | 372 |         Directories in `dest` are replaced with files from `src`,
 | 
| 373 | 373 |         unless the existing directory in `dest` is not empty in which
 | 
| 374 | 374 |         case the path will be reported in the return value.
 | 
| 375 | + | |
| 376 | +       UNIX domain socket files from `src` are ignored.
 | |
| 375 | 377 |      """
 | 
| 376 | 378 |      presorted = False
 | 
| 377 | 379 |      if files is None:
 | 
| ... | ... | @@ -414,6 +416,8 @@ def link_files(src, dest, *, files=None, ignore_missing=False, report_written=Fa | 
| 414 | 416 |  | 
| 415 | 417 |         If a hardlink cannot be created due to crossing filesystems,
 | 
| 416 | 418 |         then the file will be copied instead.
 | 
| 419 | + | |
| 420 | +       UNIX domain socket files from `src` are ignored.
 | |
| 417 | 421 |      """
 | 
| 418 | 422 |      presorted = False
 | 
| 419 | 423 |      if files is None:
 | 
| ... | ... | @@ -841,6 +845,13 @@ def _process_list(srcdir, destdir, filelist, actionfunc, result, | 
| 841 | 845 |              os.mknod(destpath, file_stat.st_mode, file_stat.st_rdev)
 | 
| 842 | 846 |              os.chmod(destpath, file_stat.st_mode)
 | 
| 843 | 847 |  | 
| 848 | +        elif stat.S_ISFIFO(mode):
 | |
| 849 | +            os.mkfifo(destpath, mode)
 | |
| 850 | + | |
| 851 | +        elif stat.S_ISSOCK(mode):
 | |
| 852 | +            # We can't duplicate the process serving the socket anyway
 | |
| 853 | +            pass
 | |
| 854 | + | |
| 844 | 855 |          else:
 | 
| 845 | 856 |              # Unsupported type.
 | 
| 846 | 857 |              raise UtilError('Cannot extract {} into staging-area. Unsupported type.'.format(srcpath))
 | 
| ... | ... | @@ -24,7 +24,7 @@ import pytest | 
| 24 | 24 |  from buildstream import _yaml
 | 
| 25 | 25 |  from buildstream._exceptions import ErrorDomain, LoadErrorReason
 | 
| 26 | 26 |  | 
| 27 | -from tests.testutils import cli, create_element_size, wait_for_cache_granularity
 | |
| 27 | +from tests.testutils import cli, create_element_size, update_element_size, wait_for_cache_granularity
 | |
| 28 | 28 |  | 
| 29 | 29 |  | 
| 30 | 30 |  DATA_DIR = os.path.join(
 | 
| ... | ... | @@ -93,6 +93,7 @@ def test_artifact_too_large(cli, datafiles, tmpdir, size): | 
| 93 | 93 |      create_element_size('target.bst', project, element_path, [], size)
 | 
| 94 | 94 |      res = cli.run(project=project, args=['build', 'target.bst'])
 | 
| 95 | 95 |      res.assert_main_error(ErrorDomain.STREAM, None)
 | 
| 96 | +    res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full')
 | |
| 96 | 97 |  | 
| 97 | 98 |  | 
| 98 | 99 |  @pytest.mark.datafiles(DATA_DIR)
 | 
| ... | ... | @@ -196,24 +197,8 @@ def test_keep_dependencies(cli, datafiles, tmpdir): | 
| 196 | 197 |  | 
| 197 | 198 |  | 
| 198 | 199 |  # Assert that we never delete a dependency required for a build tree
 | 
| 199 | -#
 | |
| 200 | -# NOTE: This test expects that a build will fail if it attempts to
 | |
| 201 | -#       put more artifacts in the cache than the quota can hold,
 | |
| 202 | -#       and expects that the last two elements which don't fit into
 | |
| 203 | -#       the quota wont even be built.
 | |
| 204 | -#
 | |
| 205 | -#       In real life, this will not be the case, since once we reach
 | |
| 206 | -#       the estimated quota we launch a cache size calculation job and
 | |
| 207 | -#       only launch a cleanup job when the size is calculated; and
 | |
| 208 | -#       other build tasks will be scheduled while the cache size job
 | |
| 209 | -#       is running.
 | |
| 210 | -#
 | |
| 211 | -#       This test only passes because we configure `builders` to 1,
 | |
| 212 | -#       ensuring that the cache size job runs exclusively since it
 | |
| 213 | -#       also requires a compute resource (a "builder").
 | |
| 214 | -#
 | |
| 215 | 200 |  @pytest.mark.datafiles(DATA_DIR)
 | 
| 216 | -def test_never_delete_dependencies(cli, datafiles, tmpdir):
 | |
| 201 | +def test_never_delete_required(cli, datafiles, tmpdir):
 | |
| 217 | 202 |      project = os.path.join(datafiles.dirname, datafiles.basename)
 | 
| 218 | 203 |      element_path = 'elements'
 | 
| 219 | 204 |  | 
| ... | ... | @@ -226,37 +211,94 @@ def test_never_delete_dependencies(cli, datafiles, tmpdir): | 
| 226 | 211 |          }
 | 
| 227 | 212 |      })
 | 
| 228 | 213 |  | 
| 229 | -    # Create a build tree
 | |
| 230 | -    create_element_size('dependency.bst', project,
 | |
| 231 | -                        element_path, [], 8000000)
 | |
| 232 | -    create_element_size('related.bst', project,
 | |
| 233 | -                        element_path, ['dependency.bst'], 8000000)
 | |
| 234 | -    create_element_size('target.bst', project,
 | |
| 235 | -                        element_path, ['related.bst'], 8000000)
 | |
| 236 | -    create_element_size('target2.bst', project,
 | |
| 237 | -                        element_path, ['target.bst'], 8000000)
 | |
| 214 | +    # Create a linear build tree
 | |
| 215 | +    create_element_size('dep1.bst', project, element_path, [], 8000000)
 | |
| 216 | +    create_element_size('dep2.bst', project, element_path, ['dep1.bst'], 8000000)
 | |
| 217 | +    create_element_size('dep3.bst', project, element_path, ['dep2.bst'], 8000000)
 | |
| 218 | +    create_element_size('target.bst', project, element_path, ['dep3.bst'], 8000000)
 | |
| 238 | 219 |  | 
| 239 | 220 |      # We try to build this pipeline, but it's too big for the
 | 
| 240 | 221 |      # cache. Since all elements are required, the build should fail.
 | 
| 241 | -    res = cli.run(project=project, args=['build', 'target2.bst'])
 | |
| 222 | +    res = cli.run(project=project, args=['build', 'target.bst'])
 | |
| 242 | 223 |      res.assert_main_error(ErrorDomain.STREAM, None)
 | 
| 224 | +    res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full')
 | |
| 243 | 225 |  | 
| 244 | -    assert cli.get_element_state(project, 'dependency.bst') == 'cached'
 | |
| 226 | +    # Only the first artifact fits in the cache, but we expect
 | |
| 227 | +    # that the first *two* artifacts will be cached.
 | |
| 228 | +    #
 | |
| 229 | +    # This is because after caching the first artifact we must
 | |
| 230 | +    # proceed to build the next artifact, and we cannot really
 | |
| 231 | +    # know how large an artifact will be until we try to cache it.
 | |
| 232 | +    #
 | |
| 233 | +    # In this case, we deem it more acceptable to not delete an
 | |
| 234 | +    # artifact which caused the cache to outgrow the quota.
 | |
| 235 | +    #
 | |
| 236 | +    # Note that this test only works because we have forced
 | |
| 237 | +    # the configuration to build one element at a time, in real
 | |
| 238 | +    # life there may potentially be N-builders cached artifacts
 | |
| 239 | +    # which exceed the quota
 | |
| 240 | +    #
 | |
| 241 | +    assert cli.get_element_state(project, 'dep1.bst') == 'cached'
 | |
| 242 | +    assert cli.get_element_state(project, 'dep2.bst') == 'cached'
 | |
| 243 | + | |
| 244 | +    assert cli.get_element_state(project, 'dep3.bst') != 'cached'
 | |
| 245 | +    assert cli.get_element_state(project, 'target.bst') != 'cached'
 | |
| 246 | + | |
| 247 | + | |
| 248 | +# Assert that we never delete a dependency required for a build tree,
 | |
| 249 | +# even when the artifact cache was previously populated with
 | |
| 250 | +# artifacts we do not require, and the new build is run with dynamic tracking.
 | |
| 251 | +#
 | |
| 252 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 253 | +def test_never_delete_required_track(cli, datafiles, tmpdir):
 | |
| 254 | +    project = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 255 | +    element_path = 'elements'
 | |
| 256 | + | |
| 257 | +    cli.configure({
 | |
| 258 | +        'cache': {
 | |
| 259 | +            'quota': 10000000
 | |
| 260 | +        },
 | |
| 261 | +        'scheduler': {
 | |
| 262 | +            'builders': 1
 | |
| 263 | +        }
 | |
| 264 | +    })
 | |
| 245 | 265 |  | 
| 246 | -    # This is *technically* above the cache limit. BuildStream accepts
 | |
| 247 | -    # some fuzziness, since it's hard to assert that we don't create
 | |
| 248 | -    # an artifact larger than the cache quota. We would have to remove
 | |
| 249 | -    # the artifact after-the-fact, but since it is required for the
 | |
| 250 | -    # current build and nothing broke yet, it's nicer to keep it
 | |
| 251 | -    # around.
 | |
| 266 | +    # Create a linear build tree
 | |
| 267 | +    repo_dep1 = create_element_size('dep1.bst', project, element_path, [], 2000000)
 | |
| 268 | +    repo_dep2 = create_element_size('dep2.bst', project, element_path, ['dep1.bst'], 2000000)
 | |
| 269 | +    repo_dep3 = create_element_size('dep3.bst', project, element_path, ['dep2.bst'], 2000000)
 | |
| 270 | +    repo_target = create_element_size('target.bst', project, element_path, ['dep3.bst'], 2000000)
 | |
| 271 | + | |
| 272 | +    # This should all fit into the artifact cache
 | |
| 273 | +    res = cli.run(project=project, args=['build', 'target.bst'])
 | |
| 274 | +    res.assert_success()
 | |
| 275 | + | |
| 276 | +    # They should all be cached
 | |
| 277 | +    assert cli.get_element_state(project, 'dep1.bst') == 'cached'
 | |
| 278 | +    assert cli.get_element_state(project, 'dep2.bst') == 'cached'
 | |
| 279 | +    assert cli.get_element_state(project, 'dep3.bst') == 'cached'
 | |
| 280 | +    assert cli.get_element_state(project, 'target.bst') == 'cached'
 | |
| 281 | + | |
| 282 | +    # Now increase the size of all the elements
 | |
| 252 | 283 |      #
 | 
| 253 | -    # This scenario is quite unlikely, and the cache overflow will be
 | |
| 254 | -    # resolved if the user does something about it anyway.
 | |
| 284 | +    update_element_size('dep1.bst', project, repo_dep1, 8000000)
 | |
| 285 | +    update_element_size('dep2.bst', project, repo_dep2, 8000000)
 | |
| 286 | +    update_element_size('dep3.bst', project, repo_dep3, 8000000)
 | |
| 287 | +    update_element_size('target.bst', project, repo_target, 8000000)
 | |
| 288 | + | |
| 289 | +    # Now repeat the same test we did in test_never_delete_required(),
 | |
| 290 | +    # except this time let's add dynamic tracking
 | |
| 255 | 291 |      #
 | 
| 256 | -    assert cli.get_element_state(project, 'related.bst') == 'cached'
 | |
| 292 | +    res = cli.run(project=project, args=['build', '--track-all', 'target.bst'])
 | |
| 293 | +    res.assert_main_error(ErrorDomain.STREAM, None)
 | |
| 294 | +    res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full')
 | |
| 257 | 295 |  | 
| 296 | +    # Expect the same result that we did in test_never_delete_required()
 | |
| 297 | +    #
 | |
| 298 | +    assert cli.get_element_state(project, 'dep1.bst') == 'cached'
 | |
| 299 | +    assert cli.get_element_state(project, 'dep2.bst') == 'cached'
 | |
| 300 | +    assert cli.get_element_state(project, 'dep3.bst') != 'cached'
 | |
| 258 | 301 |      assert cli.get_element_state(project, 'target.bst') != 'cached'
 | 
| 259 | -    assert cli.get_element_state(project, 'target2.bst') != 'cached'
 | |
| 260 | 302 |  | 
| 261 | 303 |  | 
| 262 | 304 |  # Ensure that only valid cache quotas make it through the loading
 | 
| ... | ... | @@ -54,8 +54,7 @@ def test_custom_logging(cli, tmpdir, datafiles): | 
| 54 | 54 |  | 
| 55 | 55 |      custom_log_format = '%{elapsed},%{elapsed-us},%{wallclock},%{key},%{element},%{action},%{message}'
 | 
| 56 | 56 |      user_config = {'logging': {'message-format': custom_log_format}}
 | 
| 57 | -    user_config_file = str(tmpdir.join('buildstream.conf'))
 | |
| 58 | -    _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
 | |
| 57 | +    cli.configure(user_config)
 | |
| 59 | 58 |  | 
| 60 | 59 |      # Create our repo object of the given source type with
 | 
| 61 | 60 |      # the bin files, and then collect the initial ref.
 | 
| ... | ... | @@ -75,7 +74,7 @@ def test_custom_logging(cli, tmpdir, datafiles): | 
| 75 | 74 |                              element_name))
 | 
| 76 | 75 |  | 
| 77 | 76 |      # Now try to fetch it
 | 
| 78 | -    result = cli.run(project=project, args=['-c', user_config_file, 'fetch', element_name])
 | |
| 77 | +    result = cli.run(project=project, args=['fetch', element_name])
 | |
| 79 | 78 |      result.assert_success()
 | 
| 80 | 79 |  | 
| 81 | 80 |      m = re.search("\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr)
 | 
| ... | ... | @@ -43,10 +43,13 @@ DATA_DIR = os.path.join( | 
| 43 | 43 |  )
 | 
| 44 | 44 |  | 
| 45 | 45 |  | 
| 46 | -def open_workspace(cli, tmpdir, datafiles, kind, track, suffix='', workspace_dir=None):
 | |
| 46 | +def open_workspace(cli, tmpdir, datafiles, kind, track, suffix='', workspace_dir=None, project_path=None):
 | |
| 47 | 47 |      if not workspace_dir:
 | 
| 48 | 48 |          workspace_dir = os.path.join(str(tmpdir), 'workspace{}'.format(suffix))
 | 
| 49 | -    project_path = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 49 | +    if not project_path:
 | |
| 50 | +        project_path = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 51 | +    else:
 | |
| 52 | +        shutil.copytree(os.path.join(datafiles.dirname, datafiles.basename), project_path)
 | |
| 50 | 53 |      bin_files_path = os.path.join(project_path, 'files', 'bin-files')
 | 
| 51 | 54 |      element_path = os.path.join(project_path, 'elements')
 | 
| 52 | 55 |      element_name = 'workspace-test-{}{}.bst'.format(kind, suffix)
 | 
| ... | ... | @@ -218,41 +221,42 @@ def test_close(cli, tmpdir, datafiles, kind): | 
| 218 | 221 |  | 
| 219 | 222 |  @pytest.mark.datafiles(DATA_DIR)
 | 
| 220 | 223 |  def test_close_external_after_move_project(cli, tmpdir, datafiles):
 | 
| 221 | -    tmp_parent = os.path.dirname(str(tmpdir))
 | |
| 222 | -    workspace_dir = os.path.join(tmp_parent, "workspace")
 | |
| 223 | -    element_name, project_path, _ = open_workspace(cli, tmpdir, datafiles, 'git', False, "", workspace_dir)
 | |
| 224 | +    workspace_dir = os.path.join(str(tmpdir), "workspace")
 | |
| 225 | +    project_path = os.path.join(str(tmpdir), 'initial_project')
 | |
| 226 | +    element_name, _, _ = open_workspace(cli, tmpdir, datafiles, 'git', False, "", workspace_dir, project_path)
 | |
| 224 | 227 |      assert os.path.exists(workspace_dir)
 | 
| 225 | -    tmp_dir = os.path.join(tmp_parent, 'external_project')
 | |
| 226 | -    shutil.move(project_path, tmp_dir)
 | |
| 227 | -    assert os.path.exists(tmp_dir)
 | |
| 228 | +    moved_dir = os.path.join(str(tmpdir), 'external_project')
 | |
| 229 | +    shutil.move(project_path, moved_dir)
 | |
| 230 | +    assert os.path.exists(moved_dir)
 | |
| 228 | 231 |  | 
| 229 | 232 |      # Close the workspace
 | 
| 230 | -    result = cli.run(configure=False, project=tmp_dir, args=[
 | |
| 233 | +    result = cli.run(project=moved_dir, args=[
 | |
| 231 | 234 |          'workspace', 'close', '--remove-dir', element_name
 | 
| 232 | 235 |      ])
 | 
| 233 | 236 |      result.assert_success()
 | 
| 234 | 237 |  | 
| 235 | 238 |      # Assert the workspace dir has been deleted
 | 
| 236 | 239 |      assert not os.path.exists(workspace_dir)
 | 
| 237 | -    # Move directory back inside tmp directory so it can be recognised
 | |
| 238 | -    shutil.move(tmp_dir, project_path)
 | |
| 239 | 240 |  | 
| 240 | 241 |  | 
| 241 | 242 |  @pytest.mark.datafiles(DATA_DIR)
 | 
| 242 | 243 |  def test_close_internal_after_move_project(cli, tmpdir, datafiles):
 | 
| 243 | -    element_name, project, _ = open_workspace(cli, tmpdir, datafiles, 'git', False)
 | |
| 244 | -    tmp_dir = os.path.join(os.path.dirname(str(tmpdir)), 'external_project')
 | |
| 245 | -    shutil.move(str(tmpdir), tmp_dir)
 | |
| 246 | -    assert os.path.exists(tmp_dir)
 | |
| 244 | +    initial_dir = os.path.join(str(tmpdir), 'initial_project')
 | |
| 245 | +    initial_workspace = os.path.join(initial_dir, 'workspace')
 | |
| 246 | +    element_name, _, _ = open_workspace(cli, tmpdir, datafiles, 'git', False,
 | |
| 247 | +                                        workspace_dir=initial_workspace, project_path=initial_dir)
 | |
| 248 | +    moved_dir = os.path.join(str(tmpdir), 'internal_project')
 | |
| 249 | +    shutil.move(initial_dir, moved_dir)
 | |
| 250 | +    assert os.path.exists(moved_dir)
 | |
| 247 | 251 |  | 
| 248 | 252 |      # Close the workspace
 | 
| 249 | -    result = cli.run(configure=False, project=tmp_dir, args=[
 | |
| 253 | +    result = cli.run(project=moved_dir, args=[
 | |
| 250 | 254 |          'workspace', 'close', '--remove-dir', element_name
 | 
| 251 | 255 |      ])
 | 
| 252 | 256 |      result.assert_success()
 | 
| 253 | 257 |  | 
| 254 | 258 |      # Assert the workspace dir has been deleted
 | 
| 255 | -    workspace = os.path.join(tmp_dir, 'workspace')
 | |
| 259 | +    workspace = os.path.join(moved_dir, 'workspace')
 | |
| 256 | 260 |      assert not os.path.exists(workspace)
 | 
| 257 | 261 |  | 
| 258 | 262 |  | 
| 1 | +kind: manual
 | |
| 2 | + | |
| 3 | +depends:
 | |
| 4 | +- filename: base.bst
 | |
| 5 | +  type: build
 | |
| 6 | + | |
| 7 | +config:
 | |
| 8 | +  build-commands:
 | |
| 9 | +    - |
 | |
| 10 | +      python3 -c '
 | |
| 11 | +      from socket import socket, AF_UNIX, SOCK_STREAM
 | |
| 12 | +      s = socket(AF_UNIX, SOCK_STREAM)
 | |
| 13 | +      s.bind("testsocket")
 | |
| 14 | +      ' | 
| 1 | +kind: manual
 | |
| 2 | + | |
| 3 | +depends:
 | |
| 4 | +- filename: base.bst
 | |
| 5 | +  type: build
 | |
| 6 | + | |
| 7 | +config:
 | |
| 8 | +  install-commands:
 | |
| 9 | +    - |
 | |
| 10 | +      python3 -c '
 | |
| 11 | +      from os.path import join
 | |
| 12 | +      from sys import argv
 | |
| 13 | +      from socket import socket, AF_UNIX, SOCK_STREAM
 | |
| 14 | +      s = socket(AF_UNIX, SOCK_STREAM)
 | |
| 15 | +      s.bind(join(argv[1], "testsocket"))
 | |
| 16 | +      ' %{install-root} | 
| 1 | +import os
 | |
| 2 | +import pytest
 | |
| 3 | + | |
| 4 | +from buildstream import _yaml
 | |
| 5 | + | |
| 6 | +from tests.testutils import cli_integration as cli
 | |
| 7 | +from tests.testutils.integration import assert_contains
 | |
| 8 | + | |
| 9 | + | |
| 10 | +pytestmark = pytest.mark.integration
 | |
| 11 | + | |
| 12 | +DATA_DIR = os.path.join(
 | |
| 13 | +    os.path.dirname(os.path.realpath(__file__)),
 | |
| 14 | +    "project"
 | |
| 15 | +)
 | |
| 16 | + | |
| 17 | + | |
| 18 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 19 | +def test_builddir_socket_ignored(cli, tmpdir, datafiles):
 | |
| 20 | +    project = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 21 | +    element_name = 'sockets/make-builddir-socket.bst'
 | |
| 22 | + | |
| 23 | +    result = cli.run(project=project, args=['build', element_name])
 | |
| 24 | +    assert result.exit_code == 0
 | |
| 25 | + | |
| 26 | + | |
| 27 | +@pytest.mark.datafiles(DATA_DIR)
 | |
| 28 | +def test_install_root_socket_ignored(cli, tmpdir, datafiles):
 | |
| 29 | +    project = os.path.join(datafiles.dirname, datafiles.basename)
 | |
| 30 | +    element_name = 'sockets/make-install-root-socket.bst'
 | |
| 31 | + | |
| 32 | +    result = cli.run(project=project, args=['build', element_name])
 | |
| 33 | +    assert result.exit_code == 0 | 
| ... | ... | @@ -26,6 +26,6 @@ | 
| 26 | 26 |  from .runcli import cli, cli_integration
 | 
| 27 | 27 |  from .repo import create_repo, ALL_REPO_KINDS
 | 
| 28 | 28 |  from .artifactshare import create_artifact_share
 | 
| 29 | -from .element_generators import create_element_size
 | |
| 29 | +from .element_generators import create_element_size, update_element_size
 | |
| 30 | 30 |  from .junction import generate_junction
 | 
| 31 | 31 |  from .runner_integration import wait_for_cache_granularity | 
| 1 | 1 |  import os
 | 
| 2 | 2 |  | 
| 3 | 3 |  from buildstream import _yaml
 | 
| 4 | +from buildstream import utils
 | |
| 5 | + | |
| 6 | +from . import create_repo
 | |
| 4 | 7 |  | 
| 5 | 8 |  | 
| 6 | 9 |  # create_element_size()
 | 
| 7 | 10 |  #
 | 
| 8 | -# This will open a "<name>_data" file for writing and write
 | |
| 9 | -# <size> MB of urandom (/dev/urandom) "stuff" into the file.
 | |
| 10 | -# A bst import element file is then created: <name>.bst
 | |
| 11 | +# Creates an import element with a git repo, using random
 | |
| 12 | +# data to create a file in that repo of the specified size,
 | |
| 13 | +# such that building it will add an artifact of the specified
 | |
| 14 | +# size to the artifact cache.
 | |
| 11 | 15 |  #
 | 
| 12 | 16 |  # Args:
 | 
| 13 | -#  name: (str) of the element name (e.g. target.bst)
 | |
| 14 | -#  path: (str) pathway to the project/elements directory
 | |
| 15 | -#  dependencies: A list of strings (can also be an empty list)
 | |
| 16 | -#  size: (int) size of the element in bytes
 | |
| 17 | +#    name: (str) of the element name (e.g. target.bst)
 | |
| 18 | +#    project_dir (str): The path to the project
 | |
| 19 | +#    element_path (str): The element path within the project
 | |
| 20 | +#    dependencies: A list of strings (can also be an empty list)
 | |
| 21 | +#    size: (int) size of the element in bytes
 | |
| 17 | 22 |  #
 | 
| 18 | 23 |  # Returns:
 | 
| 19 | -#  Nothing (creates a .bst file of specified size)
 | |
| 24 | +#    (Repo): A git repo which can be used to introduce trackable changes
 | |
| 25 | +#            by using the update_element_size() function below.
 | |
| 20 | 26 |  #
 | 
| 21 | 27 |  def create_element_size(name, project_dir, elements_path, dependencies, size):
 | 
| 22 | 28 |      full_elements_path = os.path.join(project_dir, elements_path)
 | 
| 23 | 29 |      os.makedirs(full_elements_path, exist_ok=True)
 | 
| 24 | 30 |  | 
| 25 | -    # Create a file to be included in this element's artifact
 | |
| 26 | -    with open(os.path.join(project_dir, name + '_data'), 'wb+') as f:
 | |
| 27 | -        f.write(os.urandom(size))
 | |
| 31 | +    # Create a git repo
 | |
| 32 | +    repodir = os.path.join(project_dir, 'repos')
 | |
| 33 | +    repo = create_repo('git', repodir, subdir=name)
 | |
| 34 | + | |
| 35 | +    with utils._tempdir(dir=project_dir) as tmp:
 | |
| 36 | + | |
| 37 | +        # We use a data/ subdir in the git repo we create,
 | |
| 38 | +        # and we set the import element to only extract that
 | |
| 39 | +        # part; this ensures we never include a .git/ directory
 | |
| 40 | +        # in the cached artifacts for these sized elements.
 | |
| 41 | +        #
 | |
| 42 | +        datadir = os.path.join(tmp, 'data')
 | |
| 43 | +        os.makedirs(datadir)
 | |
| 44 | + | |
| 45 | +        # Use /dev/urandom to create the sized file in the datadir
 | |
| 46 | +        with open(os.path.join(datadir, name), 'wb+') as f:
 | |
| 47 | +            f.write(os.urandom(size))
 | |
| 48 | + | |
| 49 | +        # Create the git repo from the temp directory
 | |
| 50 | +        ref = repo.create(tmp)
 | |
| 28 | 51 |  | 
| 29 | -    # Simplest case: We want this file (of specified size) to just
 | |
| 30 | -    # be an import element.
 | |
| 31 | 52 |      element = {
 | 
| 32 | 53 |          'kind': 'import',
 | 
| 33 | 54 |          'sources': [
 | 
| 34 | -            {
 | |
| 35 | -                'kind': 'local',
 | |
| 36 | -                'path': name + '_data'
 | |
| 37 | -            }
 | |
| 55 | +            repo.source_config(ref=ref)
 | |
| 38 | 56 |          ],
 | 
| 57 | +        'config': {
 | |
| 58 | +            # Extract only the data directory
 | |
| 59 | +            'source': 'data'
 | |
| 60 | +        },
 | |
| 39 | 61 |          'depends': dependencies
 | 
| 40 | 62 |      }
 | 
| 41 | 63 |      _yaml.dump(element, os.path.join(project_dir, elements_path, name))
 | 
| 64 | + | |
| 65 | +    # Return the repo, so that it can later be used to add commits
 | |
| 66 | +    return repo
 | |
| 67 | + | |
| 68 | + | |
| 69 | +# update_element_size()
 | |
| 70 | +#
 | |
| 71 | +# Updates a repo returned by create_element_size() such that
 | |
| 72 | +# the newly added commit is completely changed, and has the newly
 | |
| 73 | +# specified size.
 | |
| 74 | +#
 | |
| 75 | +# The name and project_dir arguments must match the arguments
 | |
| 76 | +# previously given to create_element_size()
 | |
| 77 | +#
 | |
| 78 | +# Args:
 | |
| 79 | +#    name: (str) of the element name (e.g. target.bst)
 | |
| 80 | +#    project_dir (str): The path to the project
 | |
| 81 | +#    repo: (Repo) The Repo returned by create_element_size()
 | |
| 82 | +#    size: (int) The new size which the element generates, in bytes
 | |
| 83 | +#
 | |
| 84 | +# Returns:
 | |
| 85 | +#    (Repo): A git repo which can be used to introduce trackable changes
 | |
| 86 | +#            by using the update_element_size() function below.
 | |
| 87 | +#
 | |
| 88 | +def update_element_size(name, project_dir, repo, size):
 | |
| 89 | + | |
| 90 | +    with utils._tempdir(dir=project_dir) as tmp:
 | |
| 91 | + | |
| 92 | +        new_file = os.path.join(tmp, name)
 | |
| 93 | + | |
| 94 | +        # Use /dev/urandom to create the sized file in the datadir
 | |
| 95 | +        with open(new_file, 'wb+') as f:
 | |
| 96 | +            f.write(os.urandom(size))
 | |
| 97 | + | |
| 98 | +        # Modify the git repo with a new commit to the same path,
 | |
| 99 | +        # replacing the original file with a new one.
 | |
| 100 | +        repo.modify_file(new_file, os.path.join('data', name)) | 
| ... | ... | @@ -52,6 +52,13 @@ class Git(Repo): | 
| 52 | 52 |          self._run_git('commit', '-m', 'Added {}'.format(os.path.basename(filename)))
 | 
| 53 | 53 |          return self.latest_commit()
 | 
| 54 | 54 |  | 
| 55 | +    def modify_file(self, new_file, path):
 | |
| 56 | +        shutil.copy(new_file, os.path.join(self.repo, path))
 | |
| 57 | +        subprocess.call([
 | |
| 58 | +            'git', 'commit', path, '-m', 'Modified {}'.format(os.path.basename(path))
 | |
| 59 | +        ], env=GIT_ENV, cwd=self.repo)
 | |
| 60 | +        return self.latest_commit()
 | |
| 61 | + | |
| 55 | 62 |      def add_submodule(self, subdir, url=None, checkout=None):
 | 
| 56 | 63 |          submodule = {}
 | 
| 57 | 64 |          if checkout is not None:
 | 
