... |
... |
@@ -23,6 +23,7 @@ import os |
23
|
23
|
import signal
|
24
|
24
|
import sys
|
25
|
25
|
import tempfile
|
|
26
|
+import uuid
|
26
|
27
|
|
27
|
28
|
import click
|
28
|
29
|
import grpc
|
... |
... |
@@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
130
|
131
|
|
131
|
132
|
def Read(self, request, context):
|
132
|
133
|
resource_name = request.resource_name
|
133
|
|
- client_digest = _digest_from_resource_name(resource_name)
|
134
|
|
- assert request.read_offset <= client_digest.size_bytes
|
|
134
|
+ client_digest = _digest_from_download_resource_name(resource_name)
|
|
135
|
+ if client_digest is None:
|
|
136
|
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
137
|
+ return
|
|
138
|
+
|
|
139
|
+ if request.read_offset > client_digest.size_bytes:
|
|
140
|
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
|
|
141
|
+ return
|
135
|
142
|
|
136
|
143
|
try:
|
137
|
144
|
with open(self.cas.objpath(client_digest), 'rb') as f:
|
138
|
|
- assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
|
|
145
|
+ if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
|
|
146
|
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
147
|
+ return
|
|
148
|
+
|
139
|
149
|
if request.read_offset > 0:
|
140
|
150
|
f.seek(request.read_offset)
|
141
|
151
|
|
... |
... |
@@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
163
|
173
|
resource_name = None
|
164
|
174
|
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
|
165
|
175
|
for request in request_iterator:
|
166
|
|
- assert not finished
|
167
|
|
- assert request.write_offset == offset
|
|
176
|
+ if finished or request.write_offset != offset:
|
|
177
|
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
178
|
+ return response
|
|
179
|
+
|
168
|
180
|
if resource_name is None:
|
169
|
181
|
# First request
|
170
|
182
|
resource_name = request.resource_name
|
171
|
|
- client_digest = _digest_from_resource_name(resource_name)
|
|
183
|
+ client_digest = _digest_from_upload_resource_name(resource_name)
|
|
184
|
+ if client_digest is None:
|
|
185
|
+ context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
186
|
+ return response
|
|
187
|
+
|
172
|
188
|
try:
|
173
|
189
|
_clean_up_cache(self.cas, client_digest.size_bytes)
|
174
|
190
|
except ArtifactTooLargeException as e:
|
... |
... |
@@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): |
177
|
193
|
return response
|
178
|
194
|
elif request.resource_name:
|
179
|
195
|
# If it is set on subsequent calls, it **must** match the value of the first request.
|
180
|
|
- assert request.resource_name == resource_name
|
|
196
|
+ if request.resource_name != resource_name:
|
|
197
|
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
198
|
+ return response
|
181
|
199
|
out.write(request.data)
|
182
|
200
|
offset += len(request.data)
|
183
|
201
|
if request.finish_write:
|
184
|
|
- assert client_digest.size_bytes == offset
|
|
202
|
+ if client_digest.size_bytes != offset:
|
|
203
|
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
204
|
+ return response
|
185
|
205
|
out.flush()
|
186
|
206
|
digest = self.cas.add_object(path=out.name)
|
187
|
|
- assert digest.hash == client_digest.hash
|
|
207
|
+ if digest.hash != client_digest.hash:
|
|
208
|
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
|
|
209
|
+ return response
|
188
|
210
|
finished = True
|
189
|
211
|
|
190
|
212
|
assert finished
|
... |
... |
@@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): |
247
|
269
|
return response
|
248
|
270
|
|
249
|
271
|
|
250
|
|
-def _digest_from_resource_name(resource_name):
|
|
272
|
+def _digest_from_download_resource_name(resource_name):
|
|
273
|
+ parts = resource_name.split('/')
|
|
274
|
+
|
|
275
|
+ # Accept requests from non-conforming BuildStream 1.1.x clients
|
|
276
|
+ if len(parts) == 2:
|
|
277
|
+ parts.insert(0, 'blobs')
|
|
278
|
+
|
|
279
|
+ if len(parts) != 3 or parts[0] != 'blobs':
|
|
280
|
+ return None
|
|
281
|
+
|
|
282
|
+ try:
|
|
283
|
+ digest = remote_execution_pb2.Digest()
|
|
284
|
+ digest.hash = parts[1]
|
|
285
|
+ digest.size_bytes = int(parts[2])
|
|
286
|
+ return digest
|
|
287
|
+ except ValueError:
|
|
288
|
+ return None
|
|
289
|
+
|
|
290
|
+
|
|
291
|
+def _digest_from_upload_resource_name(resource_name):
|
251
|
292
|
parts = resource_name.split('/')
|
252
|
|
- assert len(parts) == 2
|
253
|
|
- digest = remote_execution_pb2.Digest()
|
254
|
|
- digest.hash = parts[0]
|
255
|
|
- digest.size_bytes = int(parts[1])
|
256
|
|
- return digest
|
|
293
|
+
|
|
294
|
+ # Accept requests from non-conforming BuildStream 1.1.x clients
|
|
295
|
+ if len(parts) == 2:
|
|
296
|
+ parts.insert(0, 'uploads')
|
|
297
|
+ parts.insert(1, str(uuid.uuid4()))
|
|
298
|
+ parts.insert(2, 'blobs')
|
|
299
|
+
|
|
300
|
+ if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
|
|
301
|
+ return None
|
|
302
|
+
|
|
303
|
+ try:
|
|
304
|
+ uuid_ = uuid.UUID(hex=parts[1])
|
|
305
|
+ if uuid_.version != 4:
|
|
306
|
+ return None
|
|
307
|
+
|
|
308
|
+ digest = remote_execution_pb2.Digest()
|
|
309
|
+ digest.hash = parts[3]
|
|
310
|
+ digest.size_bytes = int(parts[4])
|
|
311
|
+ return digest
|
|
312
|
+ except ValueError:
|
|
313
|
+ return None
|
257
|
314
|
|
258
|
315
|
|
259
|
316
|
def _has_object(cas, digest):
|