Raoul Hidalgo Charman pushed to branch master at BuildGrid / buildgrid
Commits:
-
0c8e5a4c
by Martin Blanchard at 2018-12-11T09:22:50Z
-
6ab06a17
by Martin Blanchard at 2018-12-11T09:22:52Z
-
5fb63a15
by Martin Blanchard at 2018-12-11T09:22:52Z
-
6c7ea3b2
by Martin Blanchard at 2018-12-11T09:22:52Z
-
37d52020
by Martin Blanchard at 2018-12-11T09:22:52Z
-
2265a9dc
by Martin Blanchard at 2018-12-11T09:22:52Z
-
53bbd05e
by Martin Blanchard at 2018-12-11T09:22:52Z
5 changed files:
- buildgrid/_app/commands/cmd_server.py
- buildgrid/_app/settings/reference.yml
- buildgrid/server/_monitoring.py
- buildgrid/server/instance.py
- buildgrid/server/scheduler.py
Changes:
... | ... | @@ -26,6 +26,7 @@ import click |
26 | 26 |
|
27 | 27 |
from buildgrid.server._authentication import AuthMetadataMethod, AuthMetadataAlgorithm
|
28 | 28 |
from buildgrid.server.instance import BuildGridServer
|
29 |
+from buildgrid.server._monitoring import MonitoringOutputType, MonitoringOutputFormat
|
|
29 | 30 |
from buildgrid.utils import read_file
|
30 | 31 |
|
31 | 32 |
from ..cli import pass_context, setup_logging
|
... | ... | @@ -97,6 +98,26 @@ def _create_server_from_config(configuration): |
97 | 98 |
click.echo("Error: Configuration, {}.".format(e), err=True)
|
98 | 99 |
sys.exit(-1)
|
99 | 100 |
|
101 |
+ if 'monitoring' in configuration:
|
|
102 |
+ monitoring = configuration['monitoring']
|
|
103 |
+ |
|
104 |
+ try:
|
|
105 |
+ if 'enabled' in monitoring:
|
|
106 |
+ kargs['monitor'] = monitoring['enabled']
|
|
107 |
+ |
|
108 |
+ if 'endpoint-type' in monitoring:
|
|
109 |
+ kargs['mon_endpoint_type'] = MonitoringOutputType(monitoring['endpoint-type'])
|
|
110 |
+ |
|
111 |
+ if 'endpoint-location' in monitoring:
|
|
112 |
+ kargs['mon_endpoint_location'] = monitoring['endpoint-location']
|
|
113 |
+ |
|
114 |
+ if 'serialization-format' in monitoring:
|
|
115 |
+ kargs['mon_serialisation_format'] = MonitoringOutputFormat(monitoring['serialization-format'])
|
|
116 |
+ |
|
117 |
+ except (ValueError, OSError) as e:
|
|
118 |
+ click.echo("Error: Configuration, {}.".format(e), err=True)
|
|
119 |
+ sys.exit(-1)
|
|
120 |
+ |
|
100 | 121 |
server = BuildGridServer(**kargs)
|
101 | 122 |
|
102 | 123 |
for channel in network:
|
... | ... | @@ -103,3 +103,30 @@ instances: |
103 | 103 |
##
|
104 | 104 |
# Whether or not writing to the cache is allowed.
|
105 | 105 |
allow-updates: true
|
106 |
+ |
|
107 |
+##
|
|
108 |
+# Server's internal monitoring configuration.
|
|
109 |
+monitoring:
|
|
110 |
+ ##
|
|
111 |
+ # Whether or not to activate the monitoring subsytem.
|
|
112 |
+ enabled: false
|
|
113 |
+ |
|
114 |
+ ##
|
|
115 |
+ # Type of the monitoring bus endpoint.
|
|
116 |
+ # stdout - Standard output stream.
|
|
117 |
+ # file - On-disk file.
|
|
118 |
+ # socket - UNIX domain socket.
|
|
119 |
+ endpoint-type: socket
|
|
120 |
+ |
|
121 |
+ ##
|
|
122 |
+ # Location for the monitoring bus endpoint. Only
|
|
123 |
+ # necessary for 'file' and 'socket' `endpoint-type`.
|
|
124 |
+ # Full path is expected for 'file', name
|
|
125 |
+ # only for 'socket'.
|
|
126 |
+ endpoint-location: monitoring_bus_socket
|
|
127 |
+ |
|
128 |
+ ##
|
|
129 |
+ # Messages serialisation format.
|
|
130 |
+ # binary - Protobuf binary format.
|
|
131 |
+ # json - JSON format.
|
|
132 |
+ serialization-format: binary
|
... | ... | @@ -20,6 +20,7 @@ import sys |
20 | 20 |
|
21 | 21 |
from google.protobuf import json_format
|
22 | 22 |
|
23 |
+from buildgrid._exceptions import InvalidArgumentError
|
|
23 | 24 |
from buildgrid._protos.buildgrid.v2 import monitoring_pb2
|
24 | 25 |
|
25 | 26 |
|
... | ... | @@ -53,6 +54,7 @@ class MonitoringBus: |
53 | 54 |
self.__output_location = None
|
54 | 55 |
self.__async_output = False
|
55 | 56 |
self.__json_output = False
|
57 |
+ self.__print_output = False
|
|
56 | 58 |
|
57 | 59 |
if endpoint_type == MonitoringOutputType.FILE:
|
58 | 60 |
self.__output_location = endpoint_location
|
... | ... | @@ -61,11 +63,23 @@ class MonitoringBus: |
61 | 63 |
self.__output_location = endpoint_location
|
62 | 64 |
self.__async_output = True
|
63 | 65 |
|
66 |
+ elif endpoint_type == MonitoringOutputType.STDOUT:
|
|
67 |
+ self.__print_output = True
|
|
68 |
+ |
|
69 |
+ else:
|
|
70 |
+ raise InvalidArgumentError("Invalid endpoint output type: [{}]"
|
|
71 |
+ .format(endpoint_type))
|
|
72 |
+ |
|
64 | 73 |
if serialisation_format == MonitoringOutputFormat.JSON:
|
65 | 74 |
self.__json_output = True
|
66 | 75 |
|
67 | 76 |
# --- Public API ---
|
68 | 77 |
|
78 |
+ @property
|
|
79 |
+ def prints_records(self):
|
|
80 |
+ """Whether or not messages are printed to standard output."""
|
|
81 |
+ return self.__print_output
|
|
82 |
+ |
|
69 | 83 |
def start(self):
|
70 | 84 |
"""Starts the monitoring bus worker task."""
|
71 | 85 |
if self.__streaming_task is not None:
|
... | ... | @@ -161,7 +175,7 @@ class MonitoringBus: |
161 | 175 |
|
162 | 176 |
output_file.flush()
|
163 | 177 |
|
164 |
- else:
|
|
178 |
+ elif self.__print_output:
|
|
165 | 179 |
output_writers.append(sys.stdout.buffer)
|
166 | 180 |
|
167 | 181 |
while True:
|
... | ... | @@ -48,8 +48,14 @@ class BuildGridServer: |
48 | 48 |
requisite services.
|
49 | 49 |
"""
|
50 | 50 |
|
51 |
- def __init__(self, max_workers=None, monitor=False, auth_method=AuthMetadataMethod.NONE,
|
|
52 |
- auth_secret=None, auth_algorithm=AuthMetadataAlgorithm.UNSPECIFIED):
|
|
51 |
+ def __init__(self,
|
|
52 |
+ max_workers=None, monitor=False,
|
|
53 |
+ mon_endpoint_type=MonitoringOutputType.STDOUT,
|
|
54 |
+ mon_endpoint_location=None,
|
|
55 |
+ mon_serialisation_format=MonitoringOutputFormat.JSON,
|
|
56 |
+ auth_method=AuthMetadataMethod.NONE,
|
|
57 |
+ auth_secret=None,
|
|
58 |
+ auth_algorithm=AuthMetadataAlgorithm.UNSPECIFIED):
|
|
53 | 59 |
"""Initializes a new :class:`BuildGridServer` instance.
|
54 | 60 |
|
55 | 61 |
Args:
|
... | ... | @@ -116,8 +122,9 @@ class BuildGridServer: |
116 | 122 |
|
117 | 123 |
if self._is_instrumented:
|
118 | 124 |
self.__monitoring_bus = MonitoringBus(
|
119 |
- self.__main_loop, endpoint_type=MonitoringOutputType.STDOUT,
|
|
120 |
- serialisation_format=MonitoringOutputFormat.JSON)
|
|
125 |
+ self.__main_loop, endpoint_type=mon_endpoint_type,
|
|
126 |
+ endpoint_location=mon_endpoint_location,
|
|
127 |
+ serialisation_format=mon_serialisation_format)
|
|
121 | 128 |
|
122 | 129 |
self.__build_monitoring_tasks = []
|
123 | 130 |
|
... | ... | @@ -132,6 +139,9 @@ class BuildGridServer: |
132 | 139 |
root_logger.removeHandler(log_handler)
|
133 | 140 |
root_logger.addHandler(self.__logging_handler)
|
134 | 141 |
|
142 |
+ if self._is_instrumented and self.__monitoring_bus.prints_records:
|
|
143 |
+ self.__print_log_records = False
|
|
144 |
+ |
|
135 | 145 |
# --- Public API ---
|
136 | 146 |
|
137 | 147 |
def start(self):
|
... | ... | @@ -225,6 +235,9 @@ class BuildGridServer: |
225 | 235 |
self._schedulers[instance_name] = instance.scheduler
|
226 | 236 |
self._instances.add(instance_name)
|
227 | 237 |
|
238 |
+ if self._is_instrumented:
|
|
239 |
+ instance.scheduler.activate_monitoring()
|
|
240 |
+ |
|
228 | 241 |
def add_bots_interface(self, instance, instance_name):
|
229 | 242 |
"""Adds a :obj:`BotsInterface` to the service.
|
230 | 243 |
|
... | ... | @@ -48,20 +48,7 @@ class Scheduler: |
48 | 48 |
self._is_instrumented = monitor
|
49 | 49 |
|
50 | 50 |
if self._is_instrumented:
|
51 |
- self.__build_metadata_queues = []
|
|
52 |
- |
|
53 |
- self.__operations_by_stage = {}
|
|
54 |
- self.__leases_by_state = {}
|
|
55 |
- self.__queue_time_average = 0, timedelta()
|
|
56 |
- |
|
57 |
- self.__operations_by_stage[OperationStage.CACHE_CHECK] = set()
|
|
58 |
- self.__operations_by_stage[OperationStage.QUEUED] = set()
|
|
59 |
- self.__operations_by_stage[OperationStage.EXECUTING] = set()
|
|
60 |
- self.__operations_by_stage[OperationStage.COMPLETED] = set()
|
|
61 |
- |
|
62 |
- self.__leases_by_state[LeaseState.PENDING] = set()
|
|
63 |
- self.__leases_by_state[LeaseState.ACTIVE] = set()
|
|
64 |
- self.__leases_by_state[LeaseState.COMPLETED] = set()
|
|
51 |
+ self.activate_monitoring()
|
|
65 | 52 |
|
66 | 53 |
# --- Public API ---
|
67 | 54 |
|
... | ... | @@ -232,6 +219,43 @@ class Scheduler: |
232 | 219 |
def is_instrumented(self):
|
233 | 220 |
return self._is_instrumented
|
234 | 221 |
|
222 |
+ def activate_monitoring(self):
|
|
223 |
+ """Activated jobs monitoring."""
|
|
224 |
+ if self._is_instrumented:
|
|
225 |
+ return
|
|
226 |
+ |
|
227 |
+ self.__build_metadata_queues = []
|
|
228 |
+ |
|
229 |
+ self.__operations_by_stage = {}
|
|
230 |
+ self.__leases_by_state = {}
|
|
231 |
+ self.__queue_time_average = 0, timedelta()
|
|
232 |
+ self.__retries_count = 0
|
|
233 |
+ |
|
234 |
+ self.__operations_by_stage[OperationStage.CACHE_CHECK] = set()
|
|
235 |
+ self.__operations_by_stage[OperationStage.QUEUED] = set()
|
|
236 |
+ self.__operations_by_stage[OperationStage.EXECUTING] = set()
|
|
237 |
+ self.__operations_by_stage[OperationStage.COMPLETED] = set()
|
|
238 |
+ |
|
239 |
+ self.__leases_by_state[LeaseState.PENDING] = set()
|
|
240 |
+ self.__leases_by_state[LeaseState.ACTIVE] = set()
|
|
241 |
+ self.__leases_by_state[LeaseState.COMPLETED] = set()
|
|
242 |
+ |
|
243 |
+ self._is_instrumented = True
|
|
244 |
+ |
|
245 |
+ def deactivate_monitoring(self):
|
|
246 |
+ """Deactivated jobs monitoring."""
|
|
247 |
+ if not self._is_instrumented:
|
|
248 |
+ return
|
|
249 |
+ |
|
250 |
+ self._is_instrumented = False
|
|
251 |
+ |
|
252 |
+ self.__build_metadata_queues = None
|
|
253 |
+ |
|
254 |
+ self.__operations_by_stage = None
|
|
255 |
+ self.__leases_by_state = None
|
|
256 |
+ self.__queue_time_average = None
|
|
257 |
+ self.__retries_count = 0
|
|
258 |
+ |
|
235 | 259 |
def register_build_metadata_watcher(self, message_queue):
|
236 | 260 |
if self.__build_metadata_queues is not None:
|
237 | 261 |
self.__build_metadata_queues.append(message_queue)
|