Arber Xhindoli pushed to branch arber/91-get-tree at BuildGrid / buildgrid
WARNING: The push did not contain any new commits, but force pushed to delete the commits and changes below.
Deleted commits:
-
dbf2c430
by Arber Xhindoli at 2018-10-26T22:29:03Z
6 changed files:
- buildgrid/client/cas.py
- buildgrid/server/cas/instance.py
- buildgrid/server/cas/service.py
- buildgrid/server/cas/storage/storage_abc.py
- buildgrid/utils.py
- setup.cfg
Changes:
| ... | ... | @@ -24,7 +24,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p |
| 24 | 24 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 25 | 25 |
from buildgrid._protos.google.rpc import code_pb2
|
| 26 | 26 |
from buildgrid.settings import HASH
|
| 27 |
-from buildgrid.utils import merkle_tree_maker
|
|
| 27 |
+from buildgrid.utils import merkle_tree_maker, MAX_REQUEST_COUNT
|
|
| 28 | 28 |
|
| 29 | 29 |
|
| 30 | 30 |
# Maximum size for a queueable file:
|
| ... | ... | @@ -33,9 +33,6 @@ FILE_SIZE_THRESHOLD = 1 * 1024 * 1024 |
| 33 | 33 |
# Maximum size for a single gRPC request:
|
| 34 | 34 |
MAX_REQUEST_SIZE = 2 * 1024 * 1024
|
| 35 | 35 |
|
| 36 |
-# Maximum number of elements per gRPC request:
|
|
| 37 |
-MAX_REQUEST_COUNT = 500
|
|
| 38 |
- |
|
| 39 | 36 |
|
| 40 | 37 |
class _CallCache:
|
| 41 | 38 |
"""Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
|
| ... | ... | @@ -87,8 +84,10 @@ class Downloader: |
| 87 | 84 |
|
| 88 | 85 |
self.instance_name = instance
|
| 89 | 86 |
|
| 90 |
- self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
|
| 91 |
- self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
|
| 87 |
+ self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(
|
|
| 88 |
+ self.channel)
|
|
| 89 |
+ self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(
|
|
| 90 |
+ self.channel)
|
|
| 92 | 91 |
|
| 93 | 92 |
self.__file_requests = {}
|
| 94 | 93 |
self.__file_request_count = 0
|
| ... | ... | @@ -245,7 +244,8 @@ class Downloader: |
| 245 | 244 |
resource_name = '/'.join([self.instance_name, 'blobs',
|
| 246 | 245 |
digest.hash, str(digest.size_bytes)])
|
| 247 | 246 |
else:
|
| 248 |
- resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
|
| 247 |
+ resource_name = '/'.join(['blobs', digest.hash,
|
|
| 248 |
+ str(digest.size_bytes)])
|
|
| 249 | 249 |
|
| 250 | 250 |
read_request = bytestream_pb2.ReadRequest()
|
| 251 | 251 |
read_request.resource_name = resource_name
|
| ... | ... | @@ -261,7 +261,8 @@ class Downloader: |
| 261 | 261 |
except grpc.RpcError as e:
|
| 262 | 262 |
status_code = e.code()
|
| 263 | 263 |
if status_code == grpc.StatusCode.NOT_FOUND:
|
| 264 |
- raise NotFoundError("Requested data does not exist on the remote.")
|
|
| 264 |
+ raise NotFoundError(
|
|
| 265 |
+ "Requested data does not exist on the remote.")
|
|
| 265 | 266 |
|
| 266 | 267 |
else:
|
| 267 | 268 |
assert False
|
| ... | ... | @@ -295,7 +296,8 @@ class Downloader: |
| 295 | 296 |
except grpc.RpcError as e:
|
| 296 | 297 |
status_code = e.code()
|
| 297 | 298 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
| 298 |
- _CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs')
|
|
| 299 |
+ _CallCache.mark_unimplemented(
|
|
| 300 |
+ self.channel, 'BatchReadBlobs')
|
|
| 299 | 301 |
|
| 300 | 302 |
elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
|
| 301 | 303 |
read_blobs.clear()
|
| ... | ... | @@ -317,7 +319,8 @@ class Downloader: |
| 317 | 319 |
resource_name = '/'.join([self.instance_name, 'blobs',
|
| 318 | 320 |
digest.hash, str(digest.size_bytes)])
|
| 319 | 321 |
else:
|
| 320 |
- resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
|
|
| 322 |
+ resource_name = '/'.join(['blobs', digest.hash,
|
|
| 323 |
+ str(digest.size_bytes)])
|
|
| 321 | 324 |
|
| 322 | 325 |
read_request = bytestream_pb2.ReadRequest()
|
| 323 | 326 |
read_request.resource_name = resource_name
|
| ... | ... | @@ -391,10 +394,12 @@ class Downloader: |
| 391 | 394 |
except grpc.RpcError as e:
|
| 392 | 395 |
status_code = e.code()
|
| 393 | 396 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
| 394 |
- _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
|
|
| 397 |
+ _CallCache.mark_unimplemented(
|
|
| 398 |
+ self.channel, 'BatchUpdateBlobs')
|
|
| 395 | 399 |
|
| 396 | 400 |
elif status_code == grpc.StatusCode.NOT_FOUND:
|
| 397 |
- raise NotFoundError("Requested directory does not exist on the remote.")
|
|
| 401 |
+ raise NotFoundError(
|
|
| 402 |
+ "Requested directory does not exist on the remote.")
|
|
| 398 | 403 |
|
| 399 | 404 |
else:
|
| 400 | 405 |
assert False
|
| ... | ... | @@ -422,7 +427,8 @@ class Downloader: |
| 422 | 427 |
directory = directories[directory_node.digest.hash]
|
| 423 | 428 |
else:
|
| 424 | 429 |
directory = remote_execution_pb2.Directory()
|
| 425 |
- directory.ParseFromString(self._fetch_blob(directory_node.digest))
|
|
| 430 |
+ directory.ParseFromString(
|
|
| 431 |
+ self._fetch_blob(directory_node.digest))
|
|
| 426 | 432 |
|
| 427 | 433 |
os.makedirs(directory_path, exist_ok=True)
|
| 428 | 434 |
|
| ... | ... | @@ -484,8 +490,10 @@ class Uploader: |
| 484 | 490 |
else:
|
| 485 | 491 |
self.u_uid = str(uuid.uuid4())
|
| 486 | 492 |
|
| 487 |
- self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
|
|
| 488 |
- self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
|
|
| 493 |
+ self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(
|
|
| 494 |
+ self.channel)
|
|
| 495 |
+ self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(
|
|
| 496 |
+ self.channel)
|
|
| 489 | 497 |
|
| 490 | 498 |
self.__requests = {}
|
| 491 | 499 |
self.__request_count = 0
|
| ... | ... | @@ -770,7 +778,8 @@ class Uploader: |
| 770 | 778 |
request.data = blob
|
| 771 | 779 |
|
| 772 | 780 |
try:
|
| 773 |
- batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
|
|
| 781 |
+ batch_response = self.__cas_stub.BatchUpdateBlobs(
|
|
| 782 |
+ batch_request)
|
|
| 774 | 783 |
for response in batch_response.responses:
|
| 775 | 784 |
assert response.digest.hash in batch
|
| 776 | 785 |
|
| ... | ... | @@ -783,7 +792,8 @@ class Uploader: |
| 783 | 792 |
except grpc.RpcError as e:
|
| 784 | 793 |
status_code = e.code()
|
| 785 | 794 |
if status_code == grpc.StatusCode.UNIMPLEMENTED:
|
| 786 |
- _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
|
|
| 795 |
+ _CallCache.mark_unimplemented(
|
|
| 796 |
+ self.channel, 'BatchUpdateBlobs')
|
|
| 787 | 797 |
|
| 788 | 798 |
elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
|
| 789 | 799 |
written_digests.clear()
|
| ... | ... | @@ -54,6 +54,19 @@ class ContentAddressableStorageInstance: |
| 54 | 54 |
|
| 55 | 55 |
return response
|
| 56 | 56 |
|
| 57 |
+ def get_tree(self, request, directory_list, digest_list):
|
|
| 58 |
+ """
|
|
| 59 |
+ This function will start reading directories at request.root_digest.
|
|
| 60 |
+ It will push the directories, and their corresponding digests into the
|
|
| 61 |
+ directory_list and digest_list.
|
|
| 62 |
+ |
|
| 63 |
+ It will continue to do a level-order traversal until either: directory_list reaches the end,
|
|
| 64 |
+ or we have made request.page_size reads. If the latter case, it will return len(directory_list) - 1 so
|
|
| 65 |
+ subsequent calls can pick up where it left off.
|
|
| 66 |
+ Otherwise, returns None, meaning we have read the directory tree.
|
|
| 67 |
+ """
|
|
| 68 |
+ return None
|
|
| 69 |
+ |
|
| 57 | 70 |
|
| 58 | 71 |
class ByteStreamInstance:
|
| 59 | 72 |
|
| ... | ... | @@ -127,13 +140,15 @@ class ByteStreamInstance: |
| 127 | 140 |
|
| 128 | 141 |
for request in requests:
|
| 129 | 142 |
if finished:
|
| 130 |
- raise InvalidArgumentError("Write request sent after write finished")
|
|
| 143 |
+ raise InvalidArgumentError(
|
|
| 144 |
+ "Write request sent after write finished")
|
|
| 131 | 145 |
|
| 132 | 146 |
elif request.write_offset != bytes_written:
|
| 133 | 147 |
raise InvalidArgumentError("Invalid write offset")
|
| 134 | 148 |
|
| 135 | 149 |
elif request.resource_name and request.resource_name != first_request.resource_name:
|
| 136 |
- raise InvalidArgumentError("Resource name changed mid-write")
|
|
| 150 |
+ raise InvalidArgumentError(
|
|
| 151 |
+ "Resource name changed mid-write")
|
|
| 137 | 152 |
|
| 138 | 153 |
finished = request.finish_write
|
| 139 | 154 |
bytes_written += len(request.data)
|
| ... | ... | @@ -145,7 +160,8 @@ class ByteStreamInstance: |
| 145 | 160 |
|
| 146 | 161 |
# Check that the data matches the provided digest.
|
| 147 | 162 |
if bytes_written != digest.size_bytes or not finished:
|
| 148 |
- raise NotImplementedError("Cannot close stream before finishing write")
|
|
| 163 |
+ raise NotImplementedError(
|
|
| 164 |
+ "Cannot close stream before finishing write")
|
|
| 149 | 165 |
|
| 150 | 166 |
elif hash_.hexdigest() != digest.hash:
|
| 151 | 167 |
raise InvalidArgumentError("Data does not match hash")
|
| ... | ... | @@ -30,6 +30,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang |
| 30 | 30 |
from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
|
| 31 | 31 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 32 | 32 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
| 33 |
+from buildgrid.utils import MAX_REQUEST_COUNT
|
|
| 33 | 34 |
|
| 34 | 35 |
|
| 35 | 36 |
class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
|
| ... | ... | @@ -39,7 +40,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 39 | 40 |
|
| 40 | 41 |
self._instances = {}
|
| 41 | 42 |
|
| 42 |
- remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
|
|
| 43 |
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
|
|
| 44 |
+ self, server)
|
|
| 43 | 45 |
|
| 44 | 46 |
def add_instance(self, name, instance):
|
| 45 | 47 |
self._instances[name] = instance
|
| ... | ... | @@ -49,7 +51,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 49 | 51 |
self.logger.debug("FindMissingBlobs request: [{}]".format(request))
|
| 50 | 52 |
instance = self._get_instance(request.instance_name)
|
| 51 | 53 |
response = instance.find_missing_blobs(request.blob_digests)
|
| 52 |
- self.logger.debug("FindMissingBlobs response: [{}]".format(response))
|
|
| 54 |
+ self.logger.debug(
|
|
| 55 |
+ "FindMissingBlobs response: [{}]".format(response))
|
|
| 53 | 56 |
return response
|
| 54 | 57 |
|
| 55 | 58 |
except InvalidArgumentError as e:
|
| ... | ... | @@ -64,7 +67,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 64 | 67 |
self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
|
| 65 | 68 |
instance = self._get_instance(request.instance_name)
|
| 66 | 69 |
response = instance.batch_update_blobs(request.requests)
|
| 67 |
- self.logger.debug("FindMissingBlobs response: [{}]".format(response))
|
|
| 70 |
+ self.logger.debug(
|
|
| 71 |
+ "FindMissingBlobs response: [{}]".format(response))
|
|
| 68 | 72 |
return response
|
| 69 | 73 |
|
| 70 | 74 |
except InvalidArgumentError as e:
|
| ... | ... | @@ -81,8 +85,63 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 81 | 85 |
return remote_execution_pb2.BatchReadBlobsResponse()
|
| 82 | 86 |
|
| 83 | 87 |
def GetTree(self, request, context):
|
| 88 |
+ |
|
| 84 | 89 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
| 85 | 90 |
context.set_details('Method not implemented!')
|
| 91 |
+ return iter([remote_execution_pb2.GetTreeResponse()])
|
|
| 92 |
+ |
|
| 93 |
+ # Stores the directories as long as a page token is returned.
|
|
| 94 |
+ directories = []
|
|
| 95 |
+ # Stores the digests of those directories
|
|
| 96 |
+ digests = []
|
|
| 97 |
+ |
|
| 98 |
+ # if page_size is not set, set it to MAX_REQUEST_COUNT
|
|
| 99 |
+ if request.page_size == 0:
|
|
| 100 |
+ request.page_size = MAX_REQUEST_COUNT
|
|
| 101 |
+ |
|
| 102 |
+ """Set to 0, will be used to index into directory list,
|
|
| 103 |
+ and updated in instance.get_tree(only way this makes sense to me.)
|
|
| 104 |
+ """
|
|
| 105 |
+ request.page_token = 0
|
|
| 106 |
+ |
|
| 107 |
+ # start at index 1, to not return root
|
|
| 108 |
+ start_index = 1
|
|
| 109 |
+ |
|
| 110 |
+ try:
|
|
| 111 |
+ instance = self._get_instance(request.instance_name)
|
|
| 112 |
+ while True:
|
|
| 113 |
+ self.logger.debug("GetTree request: [{}]".format(request))
|
|
| 114 |
+ """
|
|
| 115 |
+ Returns next page_token once page_size directories is reached.
|
|
| 116 |
+ The page_token, is essentially an index into the directories/digests list.
|
|
| 117 |
+ """
|
|
| 118 |
+ page_token = instance.get_tree(
|
|
| 119 |
+ request, directories, digests)
|
|
| 120 |
+ |
|
| 121 |
+ response = remote_execution_pb2.GetTreeResponse()
|
|
| 122 |
+ if not page_token:
|
|
| 123 |
+ # get directories from last request to the end since no page_token
|
|
| 124 |
+ response.directories = directories[start_index:]
|
|
| 125 |
+ response.page_token = None
|
|
| 126 |
+ # stop the generator no more directories
|
|
| 127 |
+ return response
|
|
| 128 |
+ else:
|
|
| 129 |
+ # return from last request, to current request directories
|
|
| 130 |
+ response.directories = directories[start_index:page_token]
|
|
| 131 |
+ response.page_token = str(page_token)
|
|
| 132 |
+ yield response
|
|
| 133 |
+ |
|
| 134 |
+ # create new request using returned page token, update start_index
|
|
| 135 |
+ request = remote_execution_pb2.GetTreeRequest()
|
|
| 136 |
+ request.page_size = MAX_REQUEST_COUNT
|
|
| 137 |
+ request.page_token = page_token
|
|
| 138 |
+ request.root_digest = digests[page_token]
|
|
| 139 |
+ start_index = page_token
|
|
| 140 |
+ |
|
| 141 |
+ except InvalidArgumentError as e:
|
|
| 142 |
+ self.logger.error(e)
|
|
| 143 |
+ context.set_details(str(e))
|
|
| 144 |
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
|
| 86 | 145 |
|
| 87 | 146 |
return iter([remote_execution_pb2.GetTreeResponse()])
|
| 88 | 147 |
|
| ... | ... | @@ -91,7 +150,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa |
| 91 | 150 |
return self._instances[instance_name]
|
| 92 | 151 |
|
| 93 | 152 |
except KeyError:
|
| 94 |
- raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
|
|
| 153 |
+ raise InvalidArgumentError(
|
|
| 154 |
+ "Invalid instance name: [{}]".format(instance_name))
|
|
| 95 | 155 |
|
| 96 | 156 |
|
| 97 | 157 |
class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
|
| ... | ... | @@ -115,15 +175,18 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 115 | 175 |
# TODO: Decide on default instance name
|
| 116 | 176 |
if path[0] == "blobs":
|
| 117 | 177 |
if len(path) < 3 or not path[2].isdigit():
|
| 118 |
- raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
|
|
| 178 |
+ raise InvalidArgumentError(
|
|
| 179 |
+ "Invalid resource name: [{}]".format(request.resource_name))
|
|
| 119 | 180 |
instance_name = ""
|
| 120 | 181 |
|
| 121 | 182 |
elif path[1] == "blobs":
|
| 122 | 183 |
if len(path) < 4 or not path[3].isdigit():
|
| 123 |
- raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
|
|
| 184 |
+ raise InvalidArgumentError(
|
|
| 185 |
+ "Invalid resource name: [{}]".format(request.resource_name))
|
|
| 124 | 186 |
|
| 125 | 187 |
else:
|
| 126 |
- raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
|
|
| 188 |
+ raise InvalidArgumentError(
|
|
| 189 |
+ "Invalid resource name: [{}]".format(request.resource_name))
|
|
| 127 | 190 |
|
| 128 | 191 |
instance = self._get_instance(instance_name)
|
| 129 | 192 |
yield from instance.read(path,
|
| ... | ... | @@ -154,7 +217,8 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 154 | 217 |
try:
|
| 155 | 218 |
requests, request_probe = tee(requests, 2)
|
| 156 | 219 |
first_request = next(request_probe)
|
| 157 |
- self.logger.debug("First write request: [{}]".format(first_request))
|
|
| 220 |
+ self.logger.debug(
|
|
| 221 |
+ "First write request: [{}]".format(first_request))
|
|
| 158 | 222 |
|
| 159 | 223 |
path = first_request.resource_name.split("/")
|
| 160 | 224 |
|
| ... | ... | @@ -163,15 +227,18 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 163 | 227 |
# TODO: Sort out no instance name
|
| 164 | 228 |
if path[0] == "uploads":
|
| 165 | 229 |
if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
|
| 166 |
- raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
|
|
| 230 |
+ raise InvalidArgumentError(
|
|
| 231 |
+ "Invalid resource name: [{}]".format(first_request.resource_name))
|
|
| 167 | 232 |
instance_name = ""
|
| 168 | 233 |
|
| 169 | 234 |
elif path[1] == "uploads":
|
| 170 | 235 |
if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
|
| 171 |
- raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
|
|
| 236 |
+ raise InvalidArgumentError(
|
|
| 237 |
+ "Invalid resource name: [{}]".format(first_request.resource_name))
|
|
| 172 | 238 |
|
| 173 | 239 |
else:
|
| 174 |
- raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
|
|
| 240 |
+ raise InvalidArgumentError(
|
|
| 241 |
+ "Invalid resource name: [{}]".format(first_request.resource_name))
|
|
| 175 | 242 |
|
| 176 | 243 |
instance = self._get_instance(instance_name)
|
| 177 | 244 |
response = instance.write(requests)
|
| ... | ... | @@ -206,4 +273,5 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer): |
| 206 | 273 |
return self._instances[instance_name]
|
| 207 | 274 |
|
| 208 | 275 |
except KeyError:
|
| 209 |
- raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
|
|
| 276 |
+ raise InvalidArgumentError(
|
|
| 277 |
+ "Invalid instance name: [{}]".format(instance_name))
|
| ... | ... | @@ -22,7 +22,7 @@ The abstract base class for storage providers. |
| 22 | 22 |
|
| 23 | 23 |
import abc
|
| 24 | 24 |
|
| 25 |
-from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
|
|
| 25 |
+from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest, Directory
|
|
| 26 | 26 |
from buildgrid._protos.google.rpc.status_pb2 import Status
|
| 27 | 27 |
from buildgrid._protos.google.rpc import code_pb2
|
| 28 | 28 |
|
| ... | ... | @@ -92,7 +92,8 @@ class StorageABC(abc.ABC): |
| 92 | 92 |
write_session.write(data)
|
| 93 | 93 |
self.commit_write(digest, write_session)
|
| 94 | 94 |
except IOError as ex:
|
| 95 |
- result.append(Status(code=code_pb2.UNKNOWN, message=str(ex)))
|
|
| 95 |
+ result.append(
|
|
| 96 |
+ Status(code=code_pb2.UNKNOWN, message=str(ex)))
|
|
| 96 | 97 |
else:
|
| 97 | 98 |
result.append(Status(code=code_pb2.OK))
|
| 98 | 99 |
return result
|
| ... | ... | @@ -100,7 +101,8 @@ class StorageABC(abc.ABC): |
| 100 | 101 |
def put_message(self, message):
|
| 101 | 102 |
"""Store the given Protobuf message in CAS, returning its digest."""
|
| 102 | 103 |
message_blob = message.SerializeToString()
|
| 103 |
- digest = Digest(hash=HASH(message_blob).hexdigest(), size_bytes=len(message_blob))
|
|
| 104 |
+ digest = Digest(hash=HASH(message_blob).hexdigest(),
|
|
| 105 |
+ size_bytes=len(message_blob))
|
|
| 104 | 106 |
session = self.begin_write(digest)
|
| 105 | 107 |
session.write(message_blob)
|
| 106 | 108 |
self.commit_write(digest, session)
|
| ... | ... | @@ -19,6 +19,9 @@ import os |
| 19 | 19 |
from buildgrid.settings import HASH
|
| 20 | 20 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
| 21 | 21 |
|
| 22 |
+# Maximum number of elements per gRPC request:
|
|
| 23 |
+MAX_REQUEST_COUNT = 500
|
|
| 24 |
+ |
|
| 22 | 25 |
|
| 23 | 26 |
def create_digest(bytes_to_digest):
|
| 24 | 27 |
"""Computes the :obj:`Digest` of a piece of data.
|
| ... | ... | @@ -16,4 +16,4 @@ pep8ignore = |
| 16 | 16 |
*_pb2_grpc.py ALL
|
| 17 | 17 |
filterwarnings =
|
| 18 | 18 |
ignore::DeprecationWarning
|
| 19 |
- ignore::PendingDeprecationWarning
|
|
| \ No newline at end of file | ||
| 19 |
+ ignore::PendingDeprecationWarning
|
