[geary/wip/131-sent-mail: 3/14] Add replay queue notification flush and checkpoint methods



commit ec5446c86e6f75a58458381aadf227c55d31a3d8
Author: Michael Gratton <mike vee net>
Date:   Thu Aug 8 22:55:54 2019 +1000

    Add replay queue notification flush and checkpoint methods
    
    To support more deterministic folder synchronisation, add means to
    flush all pending server notifications and a checkpoint operation to
    be able to determine when they have been processed.

 .../imap-engine/imap-engine-replay-queue.vala      | 68 +++++++++++++++++-----
 1 file changed, 55 insertions(+), 13 deletions(-)
---
diff --git a/src/engine/imap-engine/imap-engine-replay-queue.vala 
b/src/engine/imap-engine/imap-engine-replay-queue.vala
index 9c6f3fef..ce8dffcd 100644
--- a/src/engine/imap-engine/imap-engine-replay-queue.vala
+++ b/src/engine/imap-engine/imap-engine-replay-queue.vala
@@ -60,6 +60,32 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
         }
     }
 
+
+    private class WaitOperation : ReplayOperation {
+
+        public WaitOperation() {
+            // LOCAL_AND_REMOTE to make sure this operation is flushed
+            // all the way down the pipe
+            base("Wait", LOCAL_AND_REMOTE, OnError.IGNORE_REMOTE);
+        }
+
+        public override async ReplayOperation.Status replay_local_async()
+            throws GLib.Error {
+            return Status.CONTINUE;
+        }
+
+        public override async void replay_remote_async(Imap.FolderSession remote)
+            throws GLib.Error {
+            // Noop
+        }
+
+        public override string describe_state() {
+            return "";
+        }
+
+    }
+
+
     public int local_count { get {
         return local_queue.size;
     } }
@@ -230,23 +256,39 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
         return true;
     }
 
-    private bool on_notification_timeout() {
-        if (notification_queue.size == 0)
-            return false;
-
-        debug("%s: Scheduling %d held server notification operations", owner.to_string(),
-            notification_queue.size);
+    /** Waits until all pending operations have been processed. */
+    public async void checkpoint(GLib.Cancellable? cancellable)
+        throws GLib.Error {
+        WaitOperation wait_op = new WaitOperation();
+        if (schedule(wait_op)) {
+            yield wait_op.wait_for_ready_async(cancellable);
+        } else {
+            debug("Unable to schedule checkpoint op on %s", to_string());
+        }
+    }
 
-        // no new operations in timeout span, add them all to the "real" queue
-        foreach (ReplayOperation notification_op in notification_queue) {
-            if (!schedule(notification_op)) {
-                debug("Unable to schedule notification operation %s on %s", notification_op.to_string(),
-                    to_string());
+    /** Queues for any pending server notifications to be processed. */
+    public void flush_notifications() {
+        if (this.notification_queue.size > 0) {
+            debug("%s: Scheduling %d held server notification operations",
+                  this.owner.to_string(),
+                  this.notification_queue.size);
+
+            // no new operations in timeout span, add them all to the
+            // "real" queue
+            foreach (ReplayOperation notification_op in this.notification_queue) {
+                if (!schedule(notification_op)) {
+                    debug("Unable to schedule notification operation %s on %s",
+                          notification_op.to_string(),
+                          to_string());
+                }
             }
+            this.notification_queue.clear();
         }
+    }
 
-        notification_queue.clear();
-
+    private bool on_notification_timeout() {
+        flush_notifications();
         return false;
     }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]