[pygobject/benzea/gio-asyncio] tests: Add tests for awaitable return of async routines
- From: Benjamin Berg <bberg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pygobject/benzea/gio-asyncio] tests: Add tests for awaitable return of async routines
- Date: Fri, 26 Nov 2021 13:09:07 +0000 (UTC)
commit 027b335f334884284ad9ad68b77edda5a1b324fd
Author: Benjamin Berg <bberg redhat com>
Date: Fri Nov 26 13:31:38 2021 +0100
tests: Add tests for awaitable return of async routines
tests/test_async.py | 199 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 199 insertions(+)
---
diff --git a/tests/test_async.py b/tests/test_async.py
new file mode 100644
index 00000000..af96c61b
--- /dev/null
+++ b/tests/test_async.py
@@ -0,0 +1,199 @@
+# -*- Mode: Python; py-indent-offset: 4 -*-
+# vim: tabstop=4 shiftwidth=4 expandtab
+
+import os
+import unittest
+import warnings
+
+import pytest
+
+import gi
+import gi.events
+import asyncio
+import threading
+from gi.repository import GLib, Gio
+from gi.events import GLibEventLoopPolicy
+
+from .helper import ignore_gi_deprecation_warnings
+
+class TestAsync(unittest.TestCase):
+
+ def setUp(self):
+ policy = GLibEventLoopPolicy()
+ asyncio.set_event_loop_policy(policy)
+ self.loop = policy.get_event_loop()
+ self.addCleanup(self.loop.close)
+
+ def test_async_enumerate(self):
+ f = Gio.file_new_for_path("./")
+
+ called = False
+
+ def cb():
+ nonlocal called
+ called = True
+
+ async def run():
+ nonlocal called, self
+
+ self.loop.call_soon(cb)
+
+ iter_info = []
+ for info in await f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT):
+ iter_info.append(info.get_name())
+
+ # The await runs the mainloop and cb is called.
+ self.assertEqual(called, True)
+
+ next_info = []
+ enumerator = f.enumerate_children("standard::*", 0, None)
+ while True:
+ info = enumerator.next_file(None)
+ if info is None:
+ break
+ next_info.append(info.get_name())
+
+ self.assertEqual(iter_info, next_info)
+
+ self.loop.run_until_complete(run())
+
+ def test_async_cancellation(self):
+ """Cancellation raises G_IO_ERROR_CANCELLED"""
+
+ f = Gio.file_new_for_path("./")
+
+ async def run():
+ nonlocal self
+
+ # cancellable created implicitly
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+ res.cancel()
+ with self.assertRaisesRegex(GLib.GError, "Operation was cancelled"):
+ await res
+
+ # cancellable passed explicitly
+ cancel = Gio.Cancellable()
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT, cancel)
+ self.assertEqual(res.cancellable, cancel)
+ cancel.cancel()
+ with self.assertRaisesRegex(GLib.GError, "Operation was cancelled"):
+ await res
+
+ self.loop.run_until_complete(run())
+
+ def test_not_completed(self):
+ """Querying an uncompleted task raises exceptions"""
+
+ f = Gio.file_new_for_path("./")
+
+ async def run():
+ nonlocal self
+
+ # cancellable created implicitly
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+ with self.assertRaises(asyncio.InvalidStateError):
+ res.result()
+
+ with self.assertRaises(asyncio.InvalidStateError):
+ res.exception()
+
+ # And, await it
+ await res
+
+ self.loop.run_until_complete(run())
+
+ def test_async_cancel_completed(self):
+ """Cancelling a completed task just cancels the cancellable"""
+
+ f = Gio.file_new_for_path("./")
+
+ async def run():
+ nonlocal self
+
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+ await res
+ assert res.cancellable.is_cancelled() == False
+ res.cancel()
+ assert res.cancellable.is_cancelled() == True
+
+ self.loop.run_until_complete(run())
+
+ def test_async_completed_add_cb(self):
+ """Adding a done cb to a completed future queues it with call_soon"""
+
+ f = Gio.file_new_for_path("./")
+
+ called = False
+
+ def cb():
+ nonlocal called
+ called = True
+
+ async def run():
+ nonlocal called, self
+
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+ await res
+ self.loop.call_soon(cb)
+
+ # Python await is smart and does not iterate the EventLoop
+ await res
+ assert called == False
+
+ # So create a new future and wait on that
+ fut = asyncio.Future()
+ def done_cb(res):
+ nonlocal fut
+ fut.set_result(res.result())
+ res.add_done_callback(done_cb)
+ await fut
+ assert called == True
+
+ self.loop.run_until_complete(run())
+
+ def test_deleting_failed_logs(self):
+ f = Gio.file_new_for_path("./")
+
+ async def run():
+ nonlocal self
+
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+ res.cancel()
+
+ exc = None
+ msg = None
+
+ def handler(loop, context):
+ nonlocal exc, msg
+ msg = context['message']
+ exc = context['exception']
+
+ self.loop.set_exception_handler(handler)
+ self.loop.run_until_complete(run())
+
+ self.assertRegex(msg, ".*exception was never retrieved")
+ self.assertIsInstance(exc, GLib.GError)
+
+ @pytest.mark.xfail(reason="BEHAVIOUR: Async routines need a running EventLoop for the current
GMainContext")
+ def test_no_running_loop(self):
+ f = Gio.file_new_for_path("./")
+
+ res = f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+ self.assertEqual(res._loop, self.loop)
+
+ @pytest.mark.xfail(reason="BEHAVIOUR: Async routines need a running EventLoop for the current
GMainContext")
+ def test_wrong_default_context(self):
+ f = Gio.file_new_for_path("./")
+
+ async def run():
+ nonlocal self
+
+ ctx = GLib.MainContext.new()
+ GLib.MainContext.push_thread_default(ctx)
+ self.addCleanup(GLib.MainContext.pop_thread_default, ctx)
+
+ with self.assertRaises(RuntimeError):
+ await f.enumerate_children_async("standard::*", 0, GLib.PRIORITY_DEFAULT)
+
+ self.loop.run_until_complete(run())
+
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]