Santiago Gil pushed to branch santi/76-batch-read-blobs at BuildGrid / buildgrid
Commits:
-
61964b57
by Santiago Gil at 2019-01-16T15:53:35Z
2 changed files:
Changes:
| ... | ... | @@ -24,6 +24,7 @@ import logging |
| 24 | 24 |
from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRangeError
|
| 25 | 25 |
from buildgrid._protos.google.bytestream import bytestream_pb2
|
| 26 | 26 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
| 27 |
+from buildgrid._protos.google.rpc import code_pb2, status_pb2
|
|
| 27 | 28 |
from buildgrid.settings import HASH, HASH_LENGTH, MAX_REQUEST_SIZE, MAX_REQUEST_COUNT
|
| 28 | 29 |
from buildgrid.utils import get_hash_type
|
| 29 | 30 |
|
| ... | ... | @@ -70,6 +71,33 @@ class ContentAddressableStorageInstance: |
| 70 | 71 |
|
| 71 | 72 |
return response
|
| 72 | 73 |
|
| 74 |
+ def batch_read_blobs(self, digests):
|
|
| 75 |
+ response = re_pb2.BatchReadBlobsResponse()
|
|
| 76 |
+ |
|
| 77 |
+ requested_bytes = sum((digest.size_bytes for digest in digests))
|
|
| 78 |
+ max_batch_size = self.max_batch_total_size_bytes()
|
|
| 79 |
+ |
|
| 80 |
+ if requested_bytes > max_batch_size:
|
|
| 81 |
+ raise InvalidArgumentError('Combined total size of blobs exceeds '
|
|
| 82 |
+ 'server limit. '
|
|
| 83 |
+ '({} > {} [byte])'.format(requested_bytes,
|
|
| 84 |
+ max_batch_size))
|
|
| 85 |
+ |
|
| 86 |
+ for digest in digests:
|
|
| 87 |
+ response_proto = response.responses.add()
|
|
| 88 |
+ response_proto.digest.CopyFrom(digest)
|
|
| 89 |
+ |
|
| 90 |
+ blob = self._storage.get_blob(digest)
|
|
| 91 |
+ if blob:
|
|
| 92 |
+ response_proto.data = blob.read()
|
|
| 93 |
+ status_code = code_pb2.OK
|
|
| 94 |
+ else:
|
|
| 95 |
+ status_code = code_pb2.NOT_FOUND
|
|
| 96 |
+ |
|
| 97 |
+ response_proto.status.CopyFrom(status_pb2.Status(code=status_code))
|
|
| 98 |
+ |
|
| 99 |
+ return response
|
|
| 100 |
+ |
|
| 73 | 101 |
def get_tree(self, request):
|
| 74 | 102 |
storage = self._storage
|
| 75 | 103 |
|
| ... | ... | @@ -86,8 +86,15 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 86 | 86 |
def BatchReadBlobs(self, request, context):
|
| 87 | 87 |
self.__logger.debug("BatchReadBlobs request from [%s]", context.peer())
|
| 88 | 88 |
|
| 89 |
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
|
| 90 |
- context.set_details('Method not implemented!')
|
|
| 89 |
+ try:
|
|
| 90 |
+ instance = self._get_instance(request.instance_name)
|
|
| 91 |
+ response = instance.batch_read_blobs(request.digests)
|
|
| 92 |
+ return response
|
|
| 93 |
+ |
|
| 94 |
+ except InvalidArgumentError as e:
|
|
| 95 |
+ self.__logger.error(e)
|
|
| 96 |
+ context.set_details(str(e))
|
|
| 97 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 91 | 98 |
|
| 92 | 99 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
| 93 | 100 |
|
