finn pushed to branch finn/78-capabilities-service at BuildGrid / buildgrid
Commits:
- 
aa3828fc
by Finn at 2018-11-23T17:26:33Z
 - 
c083705c
by Finn at 2018-11-23T17:26:33Z
 - 
1b686536
by Finn at 2018-11-23T17:26:33Z
 - 
ccb62707
by Finn at 2018-11-23T17:26:33Z
 - 
fcb793e4
by Finn at 2018-11-23T17:26:33Z
 - 
2fb52a0e
by Finn at 2018-11-23T17:26:33Z
 - 
2a23eabc
by Finn at 2018-11-23T17:26:33Z
 
7 changed files:
- + buildgrid/_app/commands/cmd_capabilities.py
 - buildgrid/server/cas/instance.py
 - buildgrid/server/execution/instance.py
 - buildgrid/utils.py
 - + tests/integration/capabilities_service.py
 - tests/server_instance.py
 - + tests/utils/capabilities.py
 
Changes:
| 1 | 
+# Copyright (C) 2018 Bloomberg LP
 | 
|
| 2 | 
+#
 | 
|
| 3 | 
+# Licensed under the Apache License, Version 2.0 (the "License");
 | 
|
| 4 | 
+# you may not use this file except in compliance with the License.
 | 
|
| 5 | 
+# You may obtain a copy of the License at
 | 
|
| 6 | 
+#
 | 
|
| 7 | 
+#  <http://www.apache.org/licenses/LICENSE-2.0>
 | 
|
| 8 | 
+#
 | 
|
| 9 | 
+# Unless required by applicable law or agreed to in writing, software
 | 
|
| 10 | 
+# distributed under the License is distributed on an "AS IS" BASIS,
 | 
|
| 11 | 
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
|
| 12 | 
+# See the License for the specific language governing permissions and
 | 
|
| 13 | 
+# limitations under the License.
 | 
|
| 14 | 
+  | 
|
| 15 | 
+  | 
|
| 16 | 
+import sys
 | 
|
| 17 | 
+from urllib.parse import urlparse
 | 
|
| 18 | 
+  | 
|
| 19 | 
+import click
 | 
|
| 20 | 
+import grpc
 | 
|
| 21 | 
+  | 
|
| 22 | 
+from buildgrid.client.capabilities import CapabilitiesInterface
 | 
|
| 23 | 
+  | 
|
| 24 | 
+from ..cli import pass_context
 | 
|
| 25 | 
+  | 
|
| 26 | 
+  | 
|
| 27 | 
+@click.command(name='capabilities', short_help="Capabilities service.")
 | 
|
| 28 | 
+@click.option('--remote', type=click.STRING, default='http://localhost:50051', show_default=True,
 | 
|
| 29 | 
+              help="Remote execution server's URL (port defaults to 50051 if no specified).")
 | 
|
| 30 | 
+@click.option('--client-key', type=click.Path(exists=True, dir_okay=False), default=None,
 | 
|
| 31 | 
+              help="Private client key for TLS (PEM-encoded)")
 | 
|
| 32 | 
+@click.option('--client-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | 
|
| 33 | 
+              help="Public client certificate for TLS (PEM-encoded)")
 | 
|
| 34 | 
+@click.option('--server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
 | 
|
| 35 | 
+              help="Public server certificate for TLS (PEM-encoded)")
 | 
|
| 36 | 
+@click.option('--instance-name', type=click.STRING, default='main', show_default=True,
 | 
|
| 37 | 
+              help="Targeted farm instance name.")
 | 
|
| 38 | 
+@pass_context
 | 
|
| 39 | 
+def cli(context, remote, instance_name, client_key, client_cert, server_cert):
 | 
|
| 40 | 
+    click.echo("Getting capabilities...")
 | 
|
| 41 | 
+    url = urlparse(remote)
 | 
|
| 42 | 
+  | 
|
| 43 | 
+    remote = '{}:{}'.format(url.hostname, url.port or 50051)
 | 
|
| 44 | 
+    instance_name = instance_name
 | 
|
| 45 | 
+  | 
|
| 46 | 
+    if url.scheme == 'http':
 | 
|
| 47 | 
+        channel = grpc.insecure_channel(remote)
 | 
|
| 48 | 
+    else:
 | 
|
| 49 | 
+        credentials = context.load_client_credentials(client_key, client_cert, server_cert)
 | 
|
| 50 | 
+        if not credentials:
 | 
|
| 51 | 
+            click.echo("ERROR: no TLS keys were specified and no defaults could be found.", err=True)
 | 
|
| 52 | 
+            sys.exit(-1)
 | 
|
| 53 | 
+  | 
|
| 54 | 
+        channel = grpc.secure_channel(remote, credentials)
 | 
|
| 55 | 
+  | 
|
| 56 | 
+    interface = CapabilitiesInterface(channel)
 | 
|
| 57 | 
+    response = interface.get_capabilities(instance_name)
 | 
|
| 58 | 
+    click.echo(response)
 | 
| ... | ... | @@ -25,6 +25,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang | 
| 25 | 25 | 
 from buildgrid._protos.google.bytestream import bytestream_pb2
 | 
| 26 | 26 | 
 from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
 | 
| 27 | 27 | 
 from buildgrid.settings import HASH
 | 
| 28 | 
+from buildgrid.utils import get_hash_type
 | 
|
| 28 | 29 | 
 | 
| 29 | 30 | 
 | 
| 30 | 31 | 
 class ContentAddressableStorageInstance:
 | 
| ... | ... | @@ -37,6 +38,19 @@ class ContentAddressableStorageInstance: | 
| 37 | 38 | 
     def register_instance_with_server(self, instance_name, server):
 | 
| 38 | 39 | 
         server.add_cas_instance(self, instance_name)
 | 
| 39 | 40 | 
 | 
| 41 | 
+    def hash_type(self):
 | 
|
| 42 | 
+        return get_hash_type()
 | 
|
| 43 | 
+  | 
|
| 44 | 
+    def max_batch_total_size_bytes(self):
 | 
|
| 45 | 
+        # TODO: link with max size
 | 
|
| 46 | 
+        # Should be added from settings in MR !119
 | 
|
| 47 | 
+        return 2000000
 | 
|
| 48 | 
+  | 
|
| 49 | 
+    def symlink_absolute_path_strategy(self):
 | 
|
| 50 | 
+        # Currently this strategy is hardcoded into BuildGrid
 | 
|
| 51 | 
+        # With no setting to reference
 | 
|
| 52 | 
+        return re_pb2.CacheCapabilities().DISALLOWED
 | 
|
| 53 | 
+  | 
|
| 40 | 54 | 
     def find_missing_blobs(self, blob_digests):
 | 
| 41 | 55 | 
         storage = self._storage
 | 
| 42 | 56 | 
         return re_pb2.FindMissingBlobsResponse(
 | 
| ... | ... | @@ -25,6 +25,7 @@ from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError | 
| 25 | 25 | 
 from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 | 
| 26 | 26 | 
 | 
| 27 | 27 | 
 from ..job import Job
 | 
| 28 | 
+from ...utils import get_hash_type
 | 
|
| 28 | 29 | 
 | 
| 29 | 30 | 
 | 
| 30 | 31 | 
 class ExecutionInstance:
 | 
| ... | ... | @@ -38,6 +39,9 @@ class ExecutionInstance: | 
| 38 | 39 | 
     def register_instance_with_server(self, instance_name, server):
 | 
| 39 | 40 | 
         server.add_execution_instance(self, instance_name)
 | 
| 40 | 41 | 
 | 
| 42 | 
+    def hash_type(self):
 | 
|
| 43 | 
+        return get_hash_type()
 | 
|
| 44 | 
+  | 
|
| 41 | 45 | 
     def execute(self, action_digest, skip_cache_lookup, message_queue=None):
 | 
| 42 | 46 | 
         """ Sends a job for execution.
 | 
| 43 | 47 | 
         Queues an action and creates an Operation instance to be associated with
 | 
| ... | ... | @@ -30,6 +30,14 @@ def get_hostname(): | 
| 30 | 30 | 
     return socket.gethostname()
 | 
| 31 | 31 | 
 | 
| 32 | 32 | 
 | 
| 33 | 
+def get_hash_type():
 | 
|
| 34 | 
+    """Returns the hash type."""
 | 
|
| 35 | 
+    hash_name = HASH().name
 | 
|
| 36 | 
+    if hash_name == "sha256":
 | 
|
| 37 | 
+        return remote_execution_pb2.SHA256
 | 
|
| 38 | 
+    return remote_execution_pb2.UNKNOWN
 | 
|
| 39 | 
+  | 
|
| 40 | 
+  | 
|
| 33 | 41 | 
 def create_digest(bytes_to_digest):
 | 
| 34 | 42 | 
     """Computes the :obj:`Digest` of a piece of data.
 | 
| 35 | 43 | 
 | 
| 1 | 
+# Copyright (C) 2018 Bloomberg LP
 | 
|
| 2 | 
+#
 | 
|
| 3 | 
+# Licensed under the Apache License, Version 2.0 (the "License");
 | 
|
| 4 | 
+# you may not use this file except in compliance with the License.
 | 
|
| 5 | 
+# You may obtain a copy of the License at
 | 
|
| 6 | 
+#
 | 
|
| 7 | 
+#  <http://www.apache.org/licenses/LICENSE-2.0>
 | 
|
| 8 | 
+#
 | 
|
| 9 | 
+# Unless required by applicable law or agreed to in writing, software
 | 
|
| 10 | 
+# distributed under the License is distributed on an "AS IS" BASIS,
 | 
|
| 11 | 
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
|
| 12 | 
+# See the License for the specific language governing permissions and
 | 
|
| 13 | 
+# limitations under the License.
 | 
|
| 14 | 
+  | 
|
| 15 | 
+# pylint: disable=redefined-outer-name
 | 
|
| 16 | 
+  | 
|
| 17 | 
+  | 
|
| 18 | 
+import grpc
 | 
|
| 19 | 
+import pytest
 | 
|
| 20 | 
+  | 
|
| 21 | 
+from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 | 
|
| 22 | 
+from buildgrid.client.capabilities import CapabilitiesInterface
 | 
|
| 23 | 
+from buildgrid.server.controller import ExecutionController
 | 
|
| 24 | 
+from buildgrid.server.actioncache.storage import ActionCache
 | 
|
| 25 | 
+from buildgrid.server.cas.instance import ContentAddressableStorageInstance
 | 
|
| 26 | 
+from buildgrid.server.cas.storage.lru_memory_cache import LRUMemoryCache
 | 
|
| 27 | 
+  | 
|
| 28 | 
+from ..utils.utils import run_in_subprocess
 | 
|
| 29 | 
+from ..utils.capabilities import serve_capabilities_service
 | 
|
| 30 | 
+  | 
|
| 31 | 
+  | 
|
| 32 | 
+INSTANCES = ['', 'instance']
 | 
|
| 33 | 
+  | 
|
| 34 | 
+  | 
|
| 35 | 
+# Use subprocess to avoid creation of gRPC threads in main process
 | 
|
| 36 | 
+# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md
 | 
|
| 37 | 
+# Multiprocessing uses pickle which protobufs don't work with
 | 
|
| 38 | 
+# Workaround wrapper to send messages as strings
 | 
|
| 39 | 
+class ServerInterface:
 | 
|
| 40 | 
+  | 
|
| 41 | 
+    def __init__(self, remote):
 | 
|
| 42 | 
+        self.__remote = remote
 | 
|
| 43 | 
+  | 
|
| 44 | 
+    def get_capabilities(self, instance_name):
 | 
|
| 45 | 
+  | 
|
| 46 | 
+        def __get_capabilities(queue, remote, instance_name):
 | 
|
| 47 | 
+            interface = CapabilitiesInterface(grpc.insecure_channel(remote))
 | 
|
| 48 | 
+  | 
|
| 49 | 
+            result = interface.get_capabilities(instance_name)
 | 
|
| 50 | 
+            queue.put(result.SerializeToString())
 | 
|
| 51 | 
+  | 
|
| 52 | 
+        result = run_in_subprocess(__get_capabilities,
 | 
|
| 53 | 
+                                   self.__remote, instance_name)
 | 
|
| 54 | 
+  | 
|
| 55 | 
+        capabilities = remote_execution_pb2.ServerCapabilities()
 | 
|
| 56 | 
+        capabilities.ParseFromString(result)
 | 
|
| 57 | 
+        return capabilities
 | 
|
| 58 | 
+  | 
|
| 59 | 
+  | 
|
| 60 | 
+@pytest.mark.parametrize('instance', INSTANCES)
 | 
|
| 61 | 
+def test_execution_not_available_capabilities(instance):
 | 
|
| 62 | 
+    with serve_capabilities_service([instance]) as server:
 | 
|
| 63 | 
+        server_interface = ServerInterface(server.remote)
 | 
|
| 64 | 
+        response = server_interface.get_capabilities(instance)
 | 
|
| 65 | 
+  | 
|
| 66 | 
+        assert not response.execution_capabilities.exec_enabled
 | 
|
| 67 | 
+  | 
|
| 68 | 
+  | 
|
| 69 | 
+@pytest.mark.parametrize('instance', INSTANCES)
 | 
|
| 70 | 
+def test_execution_available_capabilities(instance):
 | 
|
| 71 | 
+    controller = ExecutionController()
 | 
|
| 72 | 
+  | 
|
| 73 | 
+    with serve_capabilities_service([instance],
 | 
|
| 74 | 
+                                    execution_instance=controller.execution_instance) as server:
 | 
|
| 75 | 
+        server_interface = ServerInterface(server.remote)
 | 
|
| 76 | 
+        response = server_interface.get_capabilities(instance)
 | 
|
| 77 | 
+  | 
|
| 78 | 
+        assert response.execution_capabilities.exec_enabled
 | 
|
| 79 | 
+        assert response.execution_capabilities.digest_function
 | 
|
| 80 | 
+  | 
|
| 81 | 
+  | 
|
| 82 | 
+@pytest.mark.parametrize('instance', INSTANCES)
 | 
|
| 83 | 
+def test_action_cache_allow_updates_capabilities(instance):
 | 
|
| 84 | 
+    storage = LRUMemoryCache(limit=256)
 | 
|
| 85 | 
+    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=True)
 | 
|
| 86 | 
+  | 
|
| 87 | 
+    with serve_capabilities_service([instance],
 | 
|
| 88 | 
+                                    action_cache_instance=action_cache) as server:
 | 
|
| 89 | 
+        server_interface = ServerInterface(server.remote)
 | 
|
| 90 | 
+        response = server_interface.get_capabilities(instance)
 | 
|
| 91 | 
+  | 
|
| 92 | 
+        assert response.cache_capabilities.action_cache_update_capabilities.update_enabled
 | 
|
| 93 | 
+  | 
|
| 94 | 
+  | 
|
| 95 | 
+@pytest.mark.parametrize('instance', INSTANCES)
 | 
|
| 96 | 
+def test_action_cache_not_allow_updates_capabilities(instance):
 | 
|
| 97 | 
+    storage = LRUMemoryCache(limit=256)
 | 
|
| 98 | 
+    action_cache = ActionCache(storage, max_cached_refs=256, allow_updates=False)
 | 
|
| 99 | 
+  | 
|
| 100 | 
+    with serve_capabilities_service([instance],
 | 
|
| 101 | 
+                                    action_cache_instance=action_cache) as server:
 | 
|
| 102 | 
+        server_interface = ServerInterface(server.remote)
 | 
|
| 103 | 
+        response = server_interface.get_capabilities(instance)
 | 
|
| 104 | 
+  | 
|
| 105 | 
+        assert not response.cache_capabilities.action_cache_update_capabilities.update_enabled
 | 
|
| 106 | 
+  | 
|
| 107 | 
+  | 
|
| 108 | 
+@pytest.mark.parametrize('instance', INSTANCES)
 | 
|
| 109 | 
+def test_cas_capabilities(instance):
 | 
|
| 110 | 
+    cas = ContentAddressableStorageInstance(None)
 | 
|
| 111 | 
+  | 
|
| 112 | 
+    with serve_capabilities_service([instance],
 | 
|
| 113 | 
+                                    cas_instance=cas) as server:
 | 
|
| 114 | 
+        server_interface = ServerInterface(server.remote)
 | 
|
| 115 | 
+        response = server_interface.get_capabilities(instance)
 | 
|
| 116 | 
+  | 
|
| 117 | 
+        assert len(response.cache_capabilities.digest_function) == 1
 | 
|
| 118 | 
+        assert response.cache_capabilities.digest_function[0]
 | 
|
| 119 | 
+        assert response.cache_capabilities.symlink_absolute_path_strategy
 | 
|
| 120 | 
+        assert response.cache_capabilities.max_batch_total_size_bytes
 | 
| ... | ... | @@ -21,6 +21,7 @@ from buildgrid.server.execution.service import ExecutionService | 
| 21 | 21 | 
 from buildgrid.server.operations.service import OperationsService
 | 
| 22 | 22 | 
 from buildgrid.server.bots.service import BotsService
 | 
| 23 | 23 | 
 from buildgrid.server.referencestorage.service import ReferenceStorageService
 | 
| 24 | 
+from buildgrid.server.capabilities.service import CapabilitiesService
 | 
|
| 24 | 25 | 
 | 
| 25 | 26 | 
 from .utils.utils import run_in_subprocess
 | 
| 26 | 27 | 
 | 
| ... | ... | @@ -87,6 +88,7 @@ def test_create_server(): | 
| 87 | 88 | 
             assert isinstance(server._action_cache_service, ActionCacheService)
 | 
| 88 | 89 | 
             assert isinstance(server._cas_service, ContentAddressableStorageService)
 | 
| 89 | 90 | 
             assert isinstance(server._bytestream_service, ByteStreamService)
 | 
| 91 | 
+            assert isinstance(server._capabilities_service, CapabilitiesService)
 | 
|
| 90 | 92 | 
         except AssertionError:
 | 
| 91 | 93 | 
             queue.put(False)
 | 
| 92 | 94 | 
         else:
 | 
| 1 | 
+# Copyright (C) 2018 Bloomberg LP
 | 
|
| 2 | 
+#
 | 
|
| 3 | 
+# Licensed under the Apache License, Version 2.0 (the "License");
 | 
|
| 4 | 
+# you may not use this file except in compliance with the License.
 | 
|
| 5 | 
+# You may obtain a copy of the License at
 | 
|
| 6 | 
+#
 | 
|
| 7 | 
+#  <http://www.apache.org/licenses/LICENSE-2.0>
 | 
|
| 8 | 
+#
 | 
|
| 9 | 
+# Unless required by applicable law or agreed to in writing, software
 | 
|
| 10 | 
+# distributed under the License is distributed on an "AS IS" BASIS,
 | 
|
| 11 | 
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
|
| 12 | 
+# See the License for the specific language governing permissions and
 | 
|
| 13 | 
+# limitations under the License.
 | 
|
| 14 | 
+  | 
|
| 15 | 
+  | 
|
| 16 | 
+from concurrent import futures
 | 
|
| 17 | 
+from contextlib import contextmanager
 | 
|
| 18 | 
+import multiprocessing
 | 
|
| 19 | 
+import os
 | 
|
| 20 | 
+import signal
 | 
|
| 21 | 
+  | 
|
| 22 | 
+import grpc
 | 
|
| 23 | 
+import pytest_cov
 | 
|
| 24 | 
+  | 
|
| 25 | 
+from buildgrid.server.capabilities.service import CapabilitiesService
 | 
|
| 26 | 
+from buildgrid.server.capabilities.instance import CapabilitiesInstance
 | 
|
| 27 | 
+  | 
|
| 28 | 
+  | 
|
| 29 | 
+@contextmanager
 | 
|
| 30 | 
+def serve_capabilities_service(instances,
 | 
|
| 31 | 
+                               cas_instance=None,
 | 
|
| 32 | 
+                               action_cache_instance=None,
 | 
|
| 33 | 
+                               execution_instance=None):
 | 
|
| 34 | 
+    server = Server(instances,
 | 
|
| 35 | 
+                    cas_instance,
 | 
|
| 36 | 
+                    action_cache_instance,
 | 
|
| 37 | 
+                    execution_instance)
 | 
|
| 38 | 
+    try:
 | 
|
| 39 | 
+        yield server
 | 
|
| 40 | 
+    finally:
 | 
|
| 41 | 
+        server.quit()
 | 
|
| 42 | 
+  | 
|
| 43 | 
+  | 
|
| 44 | 
+class Server:
 | 
|
| 45 | 
+  | 
|
| 46 | 
+    def __init__(self, instances,
 | 
|
| 47 | 
+                 cas_instance=None,
 | 
|
| 48 | 
+                 action_cache_instance=None,
 | 
|
| 49 | 
+                 execution_instance=None):
 | 
|
| 50 | 
+        self.instances = instances
 | 
|
| 51 | 
+  | 
|
| 52 | 
+        self.__queue = multiprocessing.Queue()
 | 
|
| 53 | 
+        self.__process = multiprocessing.Process(
 | 
|
| 54 | 
+            target=Server.serve,
 | 
|
| 55 | 
+            args=(self.__queue, self.instances, cas_instance, action_cache_instance, execution_instance))
 | 
|
| 56 | 
+        self.__process.start()
 | 
|
| 57 | 
+  | 
|
| 58 | 
+        self.port = self.__queue.get(timeout=1)
 | 
|
| 59 | 
+        self.remote = 'localhost:{}'.format(self.port)
 | 
|
| 60 | 
+  | 
|
| 61 | 
+    @staticmethod
 | 
|
| 62 | 
+    def serve(queue, instances, cas_instance, action_cache_instance, execution_instance):
 | 
|
| 63 | 
+        pytest_cov.embed.cleanup_on_sigterm()
 | 
|
| 64 | 
+  | 
|
| 65 | 
+        # Use max_workers default from Python 3.5+
 | 
|
| 66 | 
+        max_workers = (os.cpu_count() or 1) * 5
 | 
|
| 67 | 
+        server = grpc.server(futures.ThreadPoolExecutor(max_workers))
 | 
|
| 68 | 
+        port = server.add_insecure_port('localhost:0')
 | 
|
| 69 | 
+  | 
|
| 70 | 
+        capabilities_service = CapabilitiesService(server)
 | 
|
| 71 | 
+        for name in instances:
 | 
|
| 72 | 
+            capabilities_instance = CapabilitiesInstance(cas_instance, action_cache_instance, execution_instance)
 | 
|
| 73 | 
+            capabilities_service.add_instance(name, capabilities_instance)
 | 
|
| 74 | 
+  | 
|
| 75 | 
+        server.start()
 | 
|
| 76 | 
+        queue.put(port)
 | 
|
| 77 | 
+        signal.pause()
 | 
|
| 78 | 
+  | 
|
| 79 | 
+    def quit(self):
 | 
|
| 80 | 
+        if self.__process:
 | 
|
| 81 | 
+            self.__process.terminate()
 | 
|
| 82 | 
+            self.__process.join()
 | 
