finn pushed to branch finn/delete-action-cache at BuildGrid / buildgrid
Commits:
-
ea7e2208
by finn at 2018-08-23T10:56:26Z
5 changed files:
- − buildgrid/server/action_cache.py
- buildgrid/server/execution/action_cache_service.py
- buildgrid/server/execution/execution_service.py
- buildgrid/server/execution/operations_service.py
- buildgrid/server/worker/bots_service.py
Changes:
| 1 |
-# Copyright (C) 2018 Bloomberg LP
|
|
| 2 |
-#
|
|
| 3 |
-# Licensed under the Apache License, Version 2.0 (the "License");
|
|
| 4 |
-# you may not use this file except in compliance with the License.
|
|
| 5 |
-# You may obtain a copy of the License at
|
|
| 6 |
-#
|
|
| 7 |
-# <http://www.apache.org/licenses/LICENSE-2.0>
|
|
| 8 |
-#
|
|
| 9 |
-# Unless required by applicable law or agreed to in writing, software
|
|
| 10 |
-# distributed under the License is distributed on an "AS IS" BASIS,
|
|
| 11 |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
| 12 |
-# See the License for the specific language governing permissions and
|
|
| 13 |
-# limitations under the License.
|
|
| 14 |
- |
|
| 15 |
- |
|
| 16 |
-"""
|
|
| 17 |
-ActionCache
|
|
| 18 |
-===========
|
|
| 19 |
- |
|
| 20 |
-Implements a simple in-memory action cache.
|
|
| 21 |
- |
|
| 22 |
-The action cache maps Action to their corresponding ActionResult. An
|
|
| 23 |
-ActionResult may be found in cache, for any given Action, if that action has
|
|
| 24 |
-already been executed.
|
|
| 25 |
- |
|
| 26 |
-Note:
|
|
| 27 |
- Action and ActionResult are referenced by their Digest and mapping is stored
|
|
| 28 |
- in-memory.
|
|
| 29 |
-"""
|
|
| 30 |
- |
|
| 31 |
-import collections
|
|
| 32 |
- |
|
| 33 |
-from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
|
|
| 34 |
- |
|
| 35 |
- |
|
| 36 |
-class ActionCache:
|
|
| 37 |
- """In-memory Action to ActionResult associative array.
|
|
| 38 |
- """
|
|
| 39 |
- |
|
| 40 |
- def __init__(self, storage, max_cached_actions):
|
|
| 41 |
- """Initialises a new ActionCache instance.
|
|
| 42 |
- |
|
| 43 |
- Args:
|
|
| 44 |
- storage (StorageABC): storage backend instance to be used.
|
|
| 45 |
- max_cached_actions (int): maximun number of entries to cache.
|
|
| 46 |
- """
|
|
| 47 |
- self._storage = storage
|
|
| 48 |
- self._max_cached_actions = max_cached_actions
|
|
| 49 |
- self._digest_map = collections.OrderedDict()
|
|
| 50 |
- |
|
| 51 |
- def get_action_result(self, action_digest):
|
|
| 52 |
- """Retrieves the cached ActionResult for the given Action digest.
|
|
| 53 |
- |
|
| 54 |
- Args:
|
|
| 55 |
- action_digest (Digest): digest of the Action to query.
|
|
| 56 |
- |
|
| 57 |
- Returns:
|
|
| 58 |
- The cached ActionResult matching the given Action digest or None if
|
|
| 59 |
- the nothing hass been cached yet for that Action.
|
|
| 60 |
- """
|
|
| 61 |
- key = (action_digest.hash, action_digest.size_bytes)
|
|
| 62 |
- if key in self._digest_map:
|
|
| 63 |
- action_result = self._storage.get_message(self._digest_map[key],
|
|
| 64 |
- re_pb2.ActionResult)
|
|
| 65 |
- if action_result is not None:
|
|
| 66 |
- if self._blobs_still_exist(action_result):
|
|
| 67 |
- self._digest_map.move_to_end(key)
|
|
| 68 |
- return action_result
|
|
| 69 |
- del self._digest_map[key]
|
|
| 70 |
- return None
|
|
| 71 |
- |
|
| 72 |
- def put_action_result(self, action_digest, action_result):
|
|
| 73 |
- """Stores an ActionResult in cache for the given Action digest.
|
|
| 74 |
- |
|
| 75 |
- If the cache size limit has been reached, the oldest cache entries will
|
|
| 76 |
- be dropped before insertion so that the cache size never exceeds the
|
|
| 77 |
- maximum numbers of entries allowed.
|
|
| 78 |
- |
|
| 79 |
- Args:
|
|
| 80 |
- action_digest (Digest): digest of the Action to select.
|
|
| 81 |
- action_result (ActionResult): result object to store.
|
|
| 82 |
- """
|
|
| 83 |
- if self._max_cached_actions == 0:
|
|
| 84 |
- return
|
|
| 85 |
- |
|
| 86 |
- while len(self._digest_map) >= self._max_cached_actions:
|
|
| 87 |
- self._digest_map.popitem(last=False)
|
|
| 88 |
- |
|
| 89 |
- key = (action_digest.hash, action_digest.size_bytes)
|
|
| 90 |
- action_result_digest = self._storage.put_message(action_result)
|
|
| 91 |
- self._digest_map[key] = action_result_digest
|
|
| 92 |
- |
|
| 93 |
- def _blobs_still_exist(self, action_result):
|
|
| 94 |
- """Checks CAS for ActionResult output blobs existance.
|
|
| 95 |
- |
|
| 96 |
- Args:
|
|
| 97 |
- action_result (ActionResult): ActionResult to search referenced
|
|
| 98 |
- output blobs for.
|
|
| 99 |
- |
|
| 100 |
- Returns:
|
|
| 101 |
- True if all referenced blobs are present in CAS, False otherwise.
|
|
| 102 |
- """
|
|
| 103 |
- blobs_needed = []
|
|
| 104 |
- |
|
| 105 |
- for output_file in action_result.output_files:
|
|
| 106 |
- blobs_needed.append(output_file.digest)
|
|
| 107 |
- |
|
| 108 |
- for output_directory in action_result.output_directories:
|
|
| 109 |
- blobs_needed.append(output_directory.tree_digest)
|
|
| 110 |
- tree = self._storage.get_message(output_directory.tree_digest,
|
|
| 111 |
- re_pb2.Tree)
|
|
| 112 |
- if tree is None:
|
|
| 113 |
- return False
|
|
| 114 |
- for file_node in tree.root.files:
|
|
| 115 |
- blobs_needed.append(file_node.digest)
|
|
| 116 |
- for child in tree.children:
|
|
| 117 |
- for file_node in child.files:
|
|
| 118 |
- blobs_needed.append(file_node.digest)
|
|
| 119 |
- |
|
| 120 |
- if action_result.stdout_digest.hash and not action_result.stdout_raw:
|
|
| 121 |
- blobs_needed.append(action_result.stdout_digest)
|
|
| 122 |
- if action_result.stderr_digest.hash and not action_result.stderr_raw:
|
|
| 123 |
- blobs_needed.append(action_result.stderr_digest)
|
|
| 124 |
- |
|
| 125 |
- missing = self._storage.missing_blobs(blobs_needed)
|
|
| 126 |
- return len(missing) == 0
|
| ... | ... | @@ -24,6 +24,7 @@ import logging |
| 24 | 24 |
|
| 25 | 25 |
import grpc
|
| 26 | 26 |
|
| 27 |
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
|
|
| 27 | 28 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
| 28 | 29 |
|
| 29 | 30 |
from .._exceptions import NotFoundError
|
| ... | ... | @@ -39,13 +40,19 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer): |
| 39 | 40 |
try:
|
| 40 | 41 |
return self._action_cache.get_action_result(request.action_digest)
|
| 41 | 42 |
|
| 42 |
- except NotFoundError:
|
|
| 43 |
+ except NotFoundError as e:
|
|
| 44 |
+ self.logger.error(e)
|
|
| 43 | 45 |
context.set_code(grpc.StatusCode.NOT_FOUND)
|
| 44 | 46 |
|
| 47 |
+ return remote_execution_pb2.ActionResult()
|
|
| 48 |
+ |
|
| 45 | 49 |
def UpdateActionResult(self, request, context):
|
| 46 | 50 |
try:
|
| 47 | 51 |
self._action_cache.update_action_result(request.action_digest, request.action_result)
|
| 48 | 52 |
return request.action_result
|
| 49 | 53 |
|
| 50 |
- except NotImplementedError:
|
|
| 54 |
+ except NotImplementedError as e:
|
|
| 55 |
+ self.logger.error(e)
|
|
| 51 | 56 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
| 57 |
+ |
|
| 58 |
+ return remote_execution_pb2.ActionResult()
|
| ... | ... | @@ -28,6 +28,8 @@ import grpc |
| 28 | 28 |
|
| 29 | 29 |
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
|
| 30 | 30 |
|
| 31 |
+from buildgrid._protos.google.longrunning import operations_pb2
|
|
| 32 |
+ |
|
| 31 | 33 |
from .._exceptions import InvalidArgumentError
|
| 32 | 34 |
|
| 33 | 35 |
|
| ... | ... | @@ -55,11 +57,13 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 55 | 57 |
self.logger.error(e)
|
| 56 | 58 |
context.set_details(str(e))
|
| 57 | 59 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 60 |
+ yield operations_pb2.Operation()
|
|
| 58 | 61 |
|
| 59 | 62 |
except NotImplementedError as e:
|
| 60 | 63 |
self.logger.error(e)
|
| 61 | 64 |
context.set_details(str(e))
|
| 62 | 65 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
| 66 |
+ yield operations_pb2.Operation()
|
|
| 63 | 67 |
|
| 64 | 68 |
def WaitExecution(self, request, context):
|
| 65 | 69 |
try:
|
| ... | ... | @@ -77,6 +81,7 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer): |
| 77 | 81 |
self.logger.error(e)
|
| 78 | 82 |
context.set_details(str(e))
|
| 79 | 83 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 84 |
+ yield operations_pb2.Operation()
|
|
| 80 | 85 |
|
| 81 | 86 |
def _remove_client(self, operation_name, message_queue):
|
| 82 | 87 |
self._instance.unregister_message_client(operation_name, message_queue)
|
| ... | ... | @@ -37,6 +37,7 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
| 37 | 37 |
def GetOperation(self, request, context):
|
| 38 | 38 |
try:
|
| 39 | 39 |
return self._instance.get_operation(request.name)
|
| 40 |
+ |
|
| 40 | 41 |
except InvalidArgumentError as e:
|
| 41 | 42 |
self.logger.error(e)
|
| 42 | 43 |
context.set_details(str(e))
|
| ... | ... | @@ -52,15 +53,19 @@ class OperationsService(operations_pb2_grpc.OperationsServicer): |
| 52 | 53 |
def DeleteOperation(self, request, context):
|
| 53 | 54 |
try:
|
| 54 | 55 |
return self._instance.delete_operation(request.name)
|
| 56 |
+ |
|
| 55 | 57 |
except InvalidArgumentError as e:
|
| 56 | 58 |
self.logger.error(e)
|
| 57 | 59 |
context.set_details(str(e))
|
| 58 | 60 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 61 |
+ return operations_pb2.Operation()
|
|
| 59 | 62 |
|
| 60 | 63 |
def CancelOperation(self, request, context):
|
| 61 | 64 |
try:
|
| 62 | 65 |
return self._instance.cancel_operation(request.name)
|
| 66 |
+ |
|
| 63 | 67 |
except NotImplementedError as e:
|
| 64 | 68 |
self.logger.error(e)
|
| 65 | 69 |
context.set_details(str(e))
|
| 66 | 70 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
| 71 |
+ return operations_pb2.Operation()
|
| ... | ... | @@ -23,6 +23,9 @@ import logging |
| 23 | 23 |
|
| 24 | 24 |
import grpc
|
| 25 | 25 |
|
| 26 |
+from google.protobuf.empty_pb2 import Empty
|
|
| 27 |
+ |
|
| 28 |
+from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
|
|
| 26 | 29 |
from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2_grpc
|
| 27 | 30 |
|
| 28 | 31 |
from .._exceptions import InvalidArgumentError, OutofSyncError
|
| ... | ... | @@ -43,6 +46,8 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
| 43 | 46 |
context.set_details(str(e))
|
| 44 | 47 |
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
|
| 45 | 48 |
|
| 49 |
+ return bots_pb2.BotSession()
|
|
| 50 |
+ |
|
| 46 | 51 |
def UpdateBotSession(self, request, context):
|
| 47 | 52 |
try:
|
| 48 | 53 |
return self._instance.update_bot_session(request.name,
|
| ... | ... | @@ -62,5 +67,8 @@ class BotsService(bots_pb2_grpc.BotsServicer): |
| 62 | 67 |
context.set_details(str(e))
|
| 63 | 68 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
| 64 | 69 |
|
| 70 |
+ return bots_pb2.BotSession()
|
|
| 71 |
+ |
|
| 65 | 72 |
def PostBotEventTemp(self, request, context):
|
| 66 | 73 |
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
| 74 |
+ return Empty()
|
