Jim MacArthur pushed to branch jmac/fix-test-hangs-2 at BuildStream / buildstream
Commits:
-
5bf4ce54
by Jim MacArthur at 2018-11-22T15:48:07Z
3 changed files:
Changes:
| ... | ... | @@ -25,6 +25,17 @@ def message_handler(message, context): |
| 25 | 25 |
pass
|
| 26 | 26 |
|
| 27 | 27 |
|
| 28 |
+# Since parent processes wait for queue events, we need
|
|
| 29 |
+# to put something on it if the called process raises an
|
|
| 30 |
+# exception.
|
|
| 31 |
+def _queue_wrapper(target, queue, *args):
|
|
| 32 |
+ try:
|
|
| 33 |
+ target(*args, queue=queue)
|
|
| 34 |
+ except Exception as e:
|
|
| 35 |
+ queue.put(str(e))
|
|
| 36 |
+ raise
|
|
| 37 |
+ |
|
| 38 |
+ |
|
| 28 | 39 |
def tree_maker(cas, tree, directory):
|
| 29 | 40 |
if tree.root.ByteSize() == 0:
|
| 30 | 41 |
tree.root.CopyFrom(directory)
|
| ... | ... | @@ -97,9 +108,9 @@ def test_pull(cli, tmpdir, datafiles): |
| 97 | 108 |
queue = multiprocessing.Queue()
|
| 98 | 109 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 99 | 110 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 100 |
- process = multiprocessing.Process(target=_test_pull,
|
|
| 101 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
| 102 |
- 'target.bst', element_key, queue))
|
|
| 111 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
| 112 |
+ args=(_test_pull, queue, user_config_file, project_dir,
|
|
| 113 |
+ artifact_dir, 'target.bst', element_key))
|
|
| 103 | 114 |
|
| 104 | 115 |
try:
|
| 105 | 116 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -205,9 +216,9 @@ def test_pull_tree(cli, tmpdir, datafiles): |
| 205 | 216 |
queue = multiprocessing.Queue()
|
| 206 | 217 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 207 | 218 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 208 |
- process = multiprocessing.Process(target=_test_push_tree,
|
|
| 209 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
| 210 |
- artifact_digest, queue))
|
|
| 219 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
| 220 |
+ args=(_test_push_tree, queue, user_config_file, project_dir,
|
|
| 221 |
+ artifact_dir, artifact_digest))
|
|
| 211 | 222 |
|
| 212 | 223 |
try:
|
| 213 | 224 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -233,9 +244,9 @@ def test_pull_tree(cli, tmpdir, datafiles): |
| 233 | 244 |
|
| 234 | 245 |
queue = multiprocessing.Queue()
|
| 235 | 246 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 236 |
- process = multiprocessing.Process(target=_test_pull_tree,
|
|
| 237 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
| 238 |
- tree_digest, queue))
|
|
| 247 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
| 248 |
+ args=(_test_pull_tree, queue, user_config_file, project_dir,
|
|
| 249 |
+ artifact_dir, tree_digest))
|
|
| 239 | 250 |
|
| 240 | 251 |
try:
|
| 241 | 252 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -26,6 +26,17 @@ def message_handler(message, context): |
| 26 | 26 |
pass
|
| 27 | 27 |
|
| 28 | 28 |
|
| 29 |
+# Since parent processes wait for queue events, we need
|
|
| 30 |
+# to put something on it if the called process raises an
|
|
| 31 |
+# exception.
|
|
| 32 |
+def _queue_wrapper(target, queue, *args):
|
|
| 33 |
+ try:
|
|
| 34 |
+ target(*args, queue=queue)
|
|
| 35 |
+ except Exception as e:
|
|
| 36 |
+ queue.put(str(e))
|
|
| 37 |
+ raise
|
|
| 38 |
+ |
|
| 39 |
+ |
|
| 29 | 40 |
@pytest.mark.datafiles(DATA_DIR)
|
| 30 | 41 |
def test_push(cli, tmpdir, datafiles):
|
| 31 | 42 |
project_dir = str(datafiles)
|
| ... | ... | @@ -76,9 +87,9 @@ def test_push(cli, tmpdir, datafiles): |
| 76 | 87 |
queue = multiprocessing.Queue()
|
| 77 | 88 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 78 | 89 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 79 |
- process = multiprocessing.Process(target=_test_push,
|
|
| 80 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
| 81 |
- 'target.bst', element_key, queue))
|
|
| 90 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
| 91 |
+ args=(_test_push, queue, user_config_file, project_dir,
|
|
| 92 |
+ artifact_dir, 'target.bst', element_key))
|
|
| 82 | 93 |
|
| 83 | 94 |
try:
|
| 84 | 95 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -185,9 +196,9 @@ def test_push_directory(cli, tmpdir, datafiles): |
| 185 | 196 |
queue = multiprocessing.Queue()
|
| 186 | 197 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 187 | 198 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 188 |
- process = multiprocessing.Process(target=_test_push_directory,
|
|
| 189 |
- args=(user_config_file, project_dir, artifact_dir,
|
|
| 190 |
- artifact_digest, queue))
|
|
| 199 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
| 200 |
+ args=(_test_push_directory, queue, user_config_file,
|
|
| 201 |
+ project_dir, artifact_dir, artifact_digest))
|
|
| 191 | 202 |
|
| 192 | 203 |
try:
|
| 193 | 204 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -260,8 +271,9 @@ def test_push_message(cli, tmpdir, datafiles): |
| 260 | 271 |
queue = multiprocessing.Queue()
|
| 261 | 272 |
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
|
| 262 | 273 |
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
|
| 263 |
- process = multiprocessing.Process(target=_test_push_message,
|
|
| 264 |
- args=(user_config_file, project_dir, artifact_dir, queue))
|
|
| 274 |
+ process = multiprocessing.Process(target=_queue_wrapper,
|
|
| 275 |
+ args=(_test_push_message, queue, user_config_file,
|
|
| 276 |
+ project_dir, artifact_dir))
|
|
| 265 | 277 |
|
| 266 | 278 |
try:
|
| 267 | 279 |
# Keep SIGINT blocked in the child process
|
| ... | ... | @@ -67,22 +67,26 @@ class ArtifactShare(): |
| 67 | 67 |
def run(self, q):
|
| 68 | 68 |
pytest_cov.embed.cleanup_on_sigterm()
|
| 69 | 69 |
|
| 70 |
- # Optionally mock statvfs
|
|
| 71 |
- if self.total_space:
|
|
| 72 |
- if self.free_space is None:
|
|
| 73 |
- self.free_space = self.total_space
|
|
| 74 |
- os.statvfs = self._mock_statvfs
|
|
| 70 |
+ try:
|
|
| 71 |
+ # Optionally mock statvfs
|
|
| 72 |
+ if self.total_space:
|
|
| 73 |
+ if self.free_space is None:
|
|
| 74 |
+ self.free_space = self.total_space
|
|
| 75 |
+ os.statvfs = self._mock_statvfs
|
|
| 75 | 76 |
|
| 76 |
- server = create_server(self.repodir, enable_push=True)
|
|
| 77 |
- port = server.add_insecure_port('localhost:0')
|
|
| 77 |
+ server = create_server(self.repodir, enable_push=True)
|
|
| 78 |
+ port = server.add_insecure_port('localhost:0')
|
|
| 78 | 79 |
|
| 79 |
- server.start()
|
|
| 80 |
+ server.start()
|
|
| 80 | 81 |
|
| 81 |
- # Send port to parent
|
|
| 82 |
- q.put(port)
|
|
| 82 |
+ # Send port to parent
|
|
| 83 |
+ q.put(port)
|
|
| 83 | 84 |
|
| 84 |
- # Sleep until termination by signal
|
|
| 85 |
- signal.pause()
|
|
| 85 |
+ # Sleep until termination by signal
|
|
| 86 |
+ signal.pause()
|
|
| 87 |
+ except Exception as e:
|
|
| 88 |
+ q.put(None)
|
|
| 89 |
+ raise
|
|
| 86 | 90 |
|
| 87 | 91 |
# has_object():
|
| 88 | 92 |
#
|
