[geary/wip/778276-better-flag-updates: 8/19] Add an operations queue to GenericAccount for server operations.
- From: Michael Gratton <mjog src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [geary/wip/778276-better-flag-updates: 8/19] Add an operations queue to GenericAccount for server operations.
- Date: Tue, 5 Dec 2017 04:01:57 +0000 (UTC)
commit 4c1ed24abf5b36386e6c0e3fc346fcfcfeb6b4ec
Author: Michael James Gratton <mike vee net>
Date: Thu Nov 23 10:11:30 2017 +1100
Add an operations queue to GenericAccount for server operations.
This generalises the approach used to execute the flag watcher and
background sync, provides a high-level means of managing local and remote
operations, and provides a means of compartmentalising operation-specific
code.
* src/engine/imap-engine/imap-engine-account-operation.vala
(AccountProcessor): Interface for denoting classes that implements some
account-specific operation.
* src/engine/imap-engine/imap-engine-account-processor.vala
(AccountProcessor): Class to manage the operation queue and execute
operations.
* src/engine/imap-engine/imap-engine-generic-account.vala
(GenericAccount): Create and manage an instance of AccountProcessor,
add queue_operation method to allow operations to be queued.
po/POTFILES.in | 2 +
src/CMakeLists.txt | 2 +
.../imap-engine/imap-engine-account-operation.vala | 86 +++++++++++
.../imap-engine/imap-engine-account-processor.vala | 88 +++++++++++
.../imap-engine/imap-engine-generic-account.vala | 19 +++
test/CMakeLists.txt | 1 +
.../engine/imap-engine/account-processor-test.vala | 156 ++++++++++++++++++++
test/test-engine.vala | 1 +
8 files changed, 355 insertions(+), 0 deletions(-)
---
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 4f79688..b8a6290 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -230,6 +230,8 @@ src/engine/imap-engine/gmail/imap-engine-gmail-drafts-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-search-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-spam-trash-folder.vala
+src/engine/imap-engine/imap-engine-account-operation.vala
+src/engine/imap-engine/imap-engine-account-processor.vala
src/engine/imap-engine/imap-engine-account-synchronizer.vala
src/engine/imap-engine/imap-engine-batch-operations.vala
src/engine/imap-engine/imap-engine-contact-store.vala
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5e2ab1a..82372a4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -190,6 +190,8 @@ engine/imap-db/outbox/smtp-outbox-folder-properties.vala
engine/imap-db/outbox/smtp-outbox-folder-root.vala
engine/imap-engine/imap-engine.vala
+engine/imap-engine/imap-engine-account-operation.vala
+engine/imap-engine/imap-engine-account-processor.vala
engine/imap-engine/imap-engine-account-synchronizer.vala
engine/imap-engine/imap-engine-batch-operations.vala
engine/imap-engine/imap-engine-contact-store.vala
diff --git a/src/engine/imap-engine/imap-engine-account-operation.vala
b/src/engine/imap-engine/imap-engine-account-operation.vala
new file mode 100644
index 0000000..3913021
--- /dev/null
+++ b/src/engine/imap-engine/imap-engine-account-operation.vala
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later). See the COPYING file in this distribution.
+ */
+
+/**
+ * A unit of work to be executed by {@link GenericAccount}.
+ *
+ * To queue an operation for execution, pass an instance to {@link
+ * GenericAccount.queue_operation} when the account is opened. It will
+ * added to the accounts queue and executed asynchronously when it
+ * reaches the front.
+ *
+ * Execution of the operation is managed by {@link
+ * AccountProcessor}. Since the processor will not en-queue duplicate
+ * operations, implementations may override the {@link equal_to}
+ * method to ensure that the same operation is not queued twice.
+ */
+public abstract class Geary.ImapEngine.AccountOperation : Geary.BaseObject {
+
+
+ /**
+ * Fired by after processing when the operation has completed.
+ *
+ * This is fired regardless of if an error was thrown after {@link
+ * execute} is called. It is always fired after either {@link
+ * succeeded} or {@link failed} is fired.
+ *
+ * Implementations should not fire this themselves, the
+ * processor will do it for them.
+ */
+ public signal void completed();
+
+ /**
+ * Fired by the processor if the operation completes successfully.
+ *
+ * This is fired only after {@link execute} was called and did
+ * not raise an error.
+ *
+ * Implementations should not fire this themselves, the
+ * processor will do it for them.
+ */
+ public signal void succeeded();
+
+ /**
+ * Fired by the processor if the operation throws an error.
+ *
+ * This is fired only after {@link execute} was called and
+ * threw an error. The argument is the error that was thrown.
+ *
+ * Implementations should not fire this themselves, the
+ * processor will do it for them.
+ */
+ public signal void failed(Error err);
+
+
+ /**
+ * Called by the processor to execute this operation.
+ */
+ public abstract async void execute(Cancellable cancellable) throws Error;
+
+ /**
+ * Determines if this operation is equal to another.
+ *
+ * By default assumes that the same instance or two different
+ * instances of the exact same type are equal. Implementations
+ * should override it if they wish to guard against different
+ * instances of the same high-level operation from being executed
+ * twice.
+ */
+ public virtual bool equal_to(AccountOperation op) {
+ return (op != null && (this == op || this.get_type() == op.get_type()));
+ }
+
+ /**
+ * Provides a representation of this operation for debugging.
+ *
+ * By default simply returns the name of the class.
+ */
+ public virtual string to_string() {
+ return this.get_type().name();
+ }
+
+}
diff --git a/src/engine/imap-engine/imap-engine-account-processor.vala
b/src/engine/imap-engine/imap-engine-account-processor.vala
new file mode 100644
index 0000000..0e6faea
--- /dev/null
+++ b/src/engine/imap-engine/imap-engine-account-processor.vala
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later). See the COPYING file in this distribution.
+ */
+
+/**
+ * Queues and asynchronously executes {@link AccountOperation} instances.
+ *
+ * Operations that are equal to any currently executing or currently
+ * in the queue will not be re-queued.
+ *
+ * Errors thrown are reported to the user via the account's
+ * `problem_report` signal.
+ */
+internal class Geary.ImapEngine.AccountProcessor : Geary.BaseObject {
+
+
+ private static bool op_equal(AccountOperation a, AccountOperation b) {
+ return a.equal_to(b);
+ }
+
+ /** Determines an operation is currently being executed. */
+ public bool is_executing { get { return this.current_op != null; } }
+
+ /** Returns the number of operations currently waiting in the queue. */
+ public uint waiting { get { return this.queue.size; } }
+
+
+ /** Fired when an error occurs processing an operation. */
+ public signal void operation_error(AccountOperation op, Error error);
+
+
+ private string id;
+
+ private Nonblocking.Queue<AccountOperation> queue =
+ new Nonblocking.Queue<AccountOperation>.fifo(op_equal);
+
+ private AccountOperation? current_op = null;
+
+ private Cancellable cancellable = new Cancellable();
+
+
+ public AccountProcessor(string id) {
+ this.id = id;
+ this.queue.allow_duplicates = false;
+ this.run.begin();
+ }
+
+ public void enqueue(AccountOperation op) {
+ if (this.current_op == null || !op.equal_to(this.current_op)) {
+ this.queue.send(op);
+ }
+ }
+
+ public void stop() {
+ this.cancellable.cancel();
+ this.queue.clear();
+ }
+
+ private async void run() {
+ while (!this.cancellable.is_cancelled()) {
+ AccountOperation? op = null;
+ try {
+ op = yield this.queue.receive(this.cancellable);
+ } catch (Error err) {
+ // we've been cancelled, so bail out
+ return;
+ }
+
+ if (op != null) {
+ debug("%s: Executing operation: %s", id, op.to_string());
+ this.current_op = op;
+ try {
+ yield op.execute(this.cancellable);
+ op.succeeded();
+ } catch (Error err) {
+ op.failed(err);
+ operation_error(op, err);
+ }
+ op.completed();
+ this.current_op = null;
+ }
+ }
+ }
+
+}
diff --git a/src/engine/imap-engine/imap-engine-generic-account.vala
b/src/engine/imap-engine/imap-engine-generic-account.vala
index f56faaf..653c356 100644
--- a/src/engine/imap-engine/imap-engine-generic-account.vala
+++ b/src/engine/imap-engine/imap-engine-generic-account.vala
@@ -28,6 +28,7 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
private Gee.HashMap<FolderPath, uint> refresh_unseen_timeout_ids
= new Gee.HashMap<FolderPath, uint>();
private Gee.HashSet<Geary.Folder> in_refresh_unseen = new Gee.HashSet<Geary.Folder>();
+ private AccountProcessor? processor;
private AccountSynchronizer sync;
private Cancellable? enumerate_folder_cancellable = null;
private TimeoutManager refresh_folder_timer;
@@ -72,6 +73,19 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
compile_special_search_names();
}
+ /**
+ * Queues an operation for execution by this account.
+ *
+ * The operation will added to the account's {@link
+ * AccountProcessor} and executed asynchronously by that when it
+ * reaches the front.
+ */
+ public void queue_operation(AccountOperation op)
+ throws EngineError {
+ check_open();
+ this.processor.enqueue(op);
+ }
+
protected override void notify_folders_available_unavailable(Gee.List<Geary.Folder>? available,
Gee.List<Geary.Folder>? unavailable) {
base.notify_folders_available_unavailable(available, unavailable);
@@ -141,6 +155,8 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
}
private async void internal_open_async(Cancellable? cancellable) throws Error {
+ this.processor = new AccountProcessor(this.to_string());
+
try {
yield local.open_async(information.data_dir, Engine.instance.resource_dir.get_child("sql"),
cancellable);
@@ -195,6 +211,9 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
if (!open)
return;
+ // Halt internal tasks early so they stop using local and
+ // remote connections.
+ this.processor.stop();
this.sync.stop();
Cancellable folder_cancellable = this.enumerate_folder_cancellable;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 7affa70..8c95e6c 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -11,6 +11,7 @@ set(TEST_ENGINE_SRC
engine/imap/command/imap-create-command-test.vala
engine/imap/response/imap-namespace-response-test.vala
engine/imap/transport/imap-deserializer-test.vala
+ engine/imap-engine/account-processor-test.vala
engine/mime-content-type-test.vala
engine/rfc822-mailbox-address-test.vala
engine/rfc822-message-test.vala
diff --git a/test/engine/imap-engine/account-processor-test.vala
b/test/engine/imap-engine/account-processor-test.vala
new file mode 100644
index 0000000..7c14219
--- /dev/null
+++ b/test/engine/imap-engine/account-processor-test.vala
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later). See the COPYING file in this distribution.
+ */
+
+errordomain AccountProcessorTestError {
+ TEST;
+}
+
+public class Geary.ImapEngine.AccountProcessorTest : Gee.TestCase {
+
+
+ public class TestOperation : AccountOperation {
+
+ public bool throw_error = false;
+ public bool wait_for_cancel = false;
+ public bool execute_called = false;
+
+ private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
+
+ public override async void execute(Cancellable cancellable)
+ throws Error {
+ print("Test op/");
+ this.execute_called = true;
+ if (this.wait_for_cancel) {
+ yield this.spinlock.wait_async(cancellable);
+ }
+ if (this.throw_error) {
+ throw new AccountProcessorTestError.TEST("Failed");
+ }
+ }
+
+ }
+
+
+ public class OtherOperation : TestOperation {
+
+ }
+
+
+ private AccountProcessor processor;
+ private uint succeeded;
+ private uint failed;
+ private uint completed;
+
+
+ public AccountProcessorTest() {
+ base("Geary.ImapEngine.AccountProcessorTest");
+ add_test("test_success", test_success);
+ add_test("test_failure", test_failure);
+ add_test("test_duplicate", test_duplicate);
+ add_test("test_stop", test_stop);
+
+ this.processor = new AccountProcessor("processor");
+ }
+
+ public override void set_up() {
+ this.succeeded = 0;
+ this.failed = 0;
+ this.completed = 0;
+ }
+
+ public void test_success() {
+ TestOperation op = setup_operation(new TestOperation());
+
+ this.processor.enqueue(op);
+ assert(this.processor.waiting == 1);
+
+ execute_all();
+
+ assert(op.execute_called);
+ assert(this.succeeded == 1);
+ assert(this.failed == 0);
+ assert(this.completed == 1);
+ }
+
+ public void test_failure() {
+ TestOperation op = setup_operation(new TestOperation());
+ op.throw_error = true;
+
+ AccountOperation? error_op = null;
+ Error? error = null;
+ this.processor.operation_error.connect((proc, op, err) => {
+ error_op = op;
+ error = err;
+ });
+
+ this.processor.enqueue(op);
+ execute_all();
+
+ assert(this.succeeded == 0);
+ assert(this.failed == 1);
+ assert(this.completed == 1);
+ assert(error_op == op);
+ assert(error is AccountProcessorTestError.TEST);
+ }
+
+ public void test_duplicate() {
+ TestOperation op1 = setup_operation(new TestOperation());
+ TestOperation op2 = setup_operation(new TestOperation());
+ TestOperation op3 = setup_operation(new OtherOperation());
+
+ this.processor.enqueue(op1);
+ this.processor.enqueue(op2);
+ assert(this.processor.waiting == 1);
+
+ this.processor.enqueue(op3);
+ assert(this.processor.waiting == 2);
+ }
+
+ public void test_stop() {
+ TestOperation op1 = setup_operation(new TestOperation());
+ op1.wait_for_cancel = true;
+ TestOperation op2 = setup_operation(new OtherOperation());
+
+ this.processor.enqueue(op1);
+ this.processor.enqueue(op2);
+
+ while (!this.processor.is_executing) {
+ this.main_loop.iteration(true);
+ }
+
+ this.processor.stop();
+
+ while (this.main_loop.pending()) {
+ this.main_loop.iteration(true);
+ }
+
+ assert(!this.processor.is_executing);
+ assert(this.processor.waiting == 0);
+ assert(this.succeeded == 0);
+ assert(this.failed == 1);
+ assert(this.completed == 1);
+ }
+
+ private TestOperation setup_operation(TestOperation op) {
+ op.succeeded.connect(() => {
+ this.succeeded++;
+ });
+ op.failed.connect(() => {
+ this.failed++;
+ });
+ op.completed.connect(() => {
+ this.completed++;
+ });
+ return op;
+ }
+
+ private void execute_all() {
+ while (this.processor.is_executing || this.processor.waiting > 0) {
+ this.main_loop.iteration(true);
+ }
+ }
+}
diff --git a/test/test-engine.vala b/test/test-engine.vala
index b30c7a2..31c59db 100644
--- a/test/test-engine.vala
+++ b/test/test-engine.vala
@@ -29,6 +29,7 @@ int main(string[] args) {
engine.add_suite(new Geary.Imap.DeserializerTest().get_suite());
engine.add_suite(new Geary.Imap.CreateCommandTest().get_suite());
engine.add_suite(new Geary.Imap.NamespaceResponseTest().get_suite());
+ engine.add_suite(new Geary.ImapEngine.AccountProcessorTest().get_suite());
engine.add_suite(new Geary.Inet.Test().get_suite());
engine.add_suite(new Geary.JS.Test().get_suite());
engine.add_suite(new Geary.Mime.ContentTypeTest().get_suite());
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]