| ... | ... | @@ -38,6 +38,10 @@ from .._context import Context | 
| 38 | 38 |  from .cascache import CASCache
 | 
| 39 | 39 |  
 | 
| 40 | 40 |  
 | 
|  | 41 | +# The default limit for gRPC messages is 4 MiB
 | 
|  | 42 | +_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
 | 
|  | 43 | +
 | 
|  | 44 | +
 | 
| 41 | 45 |  # Trying to push an artifact that is too large
 | 
| 42 | 46 |  class ArtifactTooLargeException(Exception):
 | 
| 43 | 47 |      pass
 | 
| ... | ... | @@ -67,6 +71,9 @@ def create_server(repo, *, enable_push): | 
| 67 | 71 |      remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
 | 
| 68 | 72 |          _ContentAddressableStorageServicer(artifactcache), server)
 | 
| 69 | 73 |  
 | 
|  | 74 | +    remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
 | 
|  | 75 | +        _CapabilitiesServicer(), server)
 | 
|  | 76 | +
 | 
| 70 | 77 |      buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
 | 
| 71 | 78 |          _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
 | 
| 72 | 79 |  
 | 
| ... | ... | @@ -229,6 +236,48 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres | 
| 229 | 236 |                  d.size_bytes = digest.size_bytes
 | 
| 230 | 237 |          return response
 | 
| 231 | 238 |  
 | 
|  | 239 | +    def BatchReadBlobs(self, request, context):
 | 
|  | 240 | +        response = remote_execution_pb2.BatchReadBlobsResponse()
 | 
|  | 241 | +        batch_size = 0
 | 
|  | 242 | +
 | 
|  | 243 | +        for digest in request.digests:
 | 
|  | 244 | +            batch_size += digest.size_bytes
 | 
|  | 245 | +            if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
 | 
|  | 246 | +                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
 | 
|  | 247 | +                return response
 | 
|  | 248 | +
 | 
|  | 249 | +            blob_response = response.responses.add()
 | 
|  | 250 | +            blob_response.digest.hash = digest.hash
 | 
|  | 251 | +            blob_response.digest.size_bytes = digest.size_bytes
 | 
|  | 252 | +            try:
 | 
|  | 253 | +                with open(self.cas.objpath(digest), 'rb') as f:
 | 
|  | 254 | +                    if os.fstat(f.fileno()).st_size != digest.size_bytes:
 | 
|  | 255 | +                        blob_response.status.code = grpc.StatusCode.NOT_FOUND
 | 
|  | 256 | +                        continue
 | 
|  | 257 | +
 | 
|  | 258 | +                    blob_response.data = f.read(digest.size_bytes)
 | 
|  | 259 | +            except FileNotFoundError:
 | 
|  | 260 | +                blob_response.status.code = grpc.StatusCode.NOT_FOUND
 | 
|  | 261 | +
 | 
|  | 262 | +        return response
 | 
|  | 263 | +
 | 
|  | 264 | +
 | 
|  | 265 | +class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
 | 
|  | 266 | +    def GetCapabilities(self, request, context):
 | 
|  | 267 | +        response = remote_execution_pb2.ServerCapabilities()
 | 
|  | 268 | +
 | 
|  | 269 | +        cache_capabilities = response.cache_capabilities
 | 
|  | 270 | +        cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
 | 
|  | 271 | +        cache_capabilities.action_cache_update_capabilities.update_enabled = False
 | 
|  | 272 | +        cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
 | 
|  | 273 | +        cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
 | 
|  | 274 | +
 | 
|  | 275 | +        response.deprecated_api_version.major = 2
 | 
|  | 276 | +        response.low_api_version.major = 2
 | 
|  | 277 | +        response.high_api_version.major = 2
 | 
|  | 278 | +
 | 
|  | 279 | +        return response
 | 
|  | 280 | +
 | 
| 232 | 281 |  
 | 
| 233 | 282 |  class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
 | 
| 234 | 283 |      def __init__(self, cas, *, enable_push):
 |