[rhythmbox/rhythmdb-barrier: 6/7] rhythmdb: wait for changes to be processed before committing
- From: Jonathan Matthew <jmatthew src gnome org>
- To: commits-list gnome org
- Cc: 
- Subject: [rhythmbox/rhythmdb-barrier: 6/7] rhythmdb: wait for changes to be processed before committing
- Date: Sat,  3 Apr 2021 03:56:09 +0000 (UTC)
commit ed9fc277d2dc1ad593873567f5c59c76125d9b1c
Author: Jonathan Matthew <jonathan d14n org>
Date:   Fri Mar 26 11:56:09 2021 +1000
    rhythmdb: wait for changes to be processed before committing
    
    When committing from a worker thread, push an event onto the event queue
    and wait for it to be processed before proceeding, ensuring that changes
    made are actually processed by the main thread first.
    
    Fixes: #1844, #1782
 rhythmdb/rhythmdb-private.h |  7 ++++++-
 rhythmdb/rhythmdb.c         | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 42 insertions(+), 2 deletions(-)
---
diff --git a/rhythmdb/rhythmdb-private.h b/rhythmdb/rhythmdb-private.h
index 579f785c2..585b63234 100644
--- a/rhythmdb/rhythmdb-private.h
+++ b/rhythmdb/rhythmdb-private.h
@@ -181,6 +181,10 @@ struct _RhythmDBPrivate
        GList *deleted_entries_to_emit;
        GHashTable *changed_entries_to_emit;
 
+       GList *barriers_done;
+       GMutex barrier_mutex;
+       GCond barrier_condition;
+
        gboolean can_save;
        gboolean saving;
        gboolean dirty;
@@ -206,7 +210,8 @@ typedef struct
                RHYTHMDB_EVENT_THREAD_EXITED,
                RHYTHMDB_EVENT_DB_SAVED,
                RHYTHMDB_EVENT_QUERY_COMPLETE,
-               RHYTHMDB_EVENT_ENTRY_SET
+               RHYTHMDB_EVENT_ENTRY_SET,
+               RHYTHMDB_EVENT_BARRIER
        } type;
        RBRefString *uri;
        RBRefString *real_uri; /* Target of a symlink, if any */
diff --git a/rhythmdb/rhythmdb.c b/rhythmdb/rhythmdb.c
index fa93ecc92..61832bc6c 100644
--- a/rhythmdb/rhythmdb.c
+++ b/rhythmdb/rhythmdb.c
@@ -1001,6 +1001,8 @@ rhythmdb_event_free (RhythmDB *db,
        case RHYTHMDB_EVENT_METADATA_CACHE:
                free_cached_metadata(&result->cached_metadata);
                break;
+       case RHYTHMDB_EVENT_BARRIER:
+               break;
        }
        if (result->error)
                g_error_free (result->error);
@@ -1547,6 +1549,28 @@ rhythmdb_commit_internal (RhythmDB *db,
                          gboolean sync_changes,
                          GThread *thread)
 {
+       /*
+        * during normal operation, if committing from a worker thread,
+        * wait for changes made on the thread to be processed by the main thread.
+        * this avoids races and ensures the signals emitted are correct.
+        */
+       if (db->priv->action_thread_running && !rb_is_main_thread ()) {
+               RhythmDBEvent *event;
+
+               event = g_slice_new0 (RhythmDBEvent);
+               event->db = db;
+               event->type = RHYTHMDB_EVENT_BARRIER;
+
+               g_mutex_lock (&db->priv->barrier_mutex);
+               rhythmdb_push_event (db, event);
+               while (g_list_find (db->priv->barriers_done, event) == NULL)
+                       g_cond_wait (&db->priv->barrier_condition, &db->priv->barrier_mutex);
+               db->priv->barriers_done = g_list_remove (db->priv->barriers_done, event);
+               g_mutex_unlock (&db->priv->barrier_mutex);
+
+               rhythmdb_event_free (db, event);
+       }
+
        g_mutex_lock (&db->priv->change_mutex);
 
        if (sync_changes) {
@@ -2626,7 +2650,8 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
            ((event->type == RHYTHMDB_EVENT_STAT)
             || (event->type == RHYTHMDB_EVENT_METADATA_LOAD)
             || (event->type == RHYTHMDB_EVENT_METADATA_CACHE)
-            || (event->type == RHYTHMDB_EVENT_ENTRY_SET))) {
+            || (event->type == RHYTHMDB_EVENT_ENTRY_SET)
+            || (event->type == RHYTHMDB_EVENT_BARRIER))) {
                rb_debug ("Database is read-only, delaying event processing");
                g_async_queue_push (db->priv->delayed_write_queue, event);
                return;
@@ -2674,6 +2699,16 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
                rb_debug ("processing RHYTHMDB_EVENT_QUERY_COMPLETE");
                rhythmdb_read_leave (db);
                break;
+       case RHYTHMDB_EVENT_BARRIER:
+               rb_debug ("processing RHYTHMDB_EVENT_BARRIER");
+               g_mutex_lock (&db->priv->barrier_mutex);
+               db->priv->barriers_done = g_list_prepend (db->priv->barriers_done, event);
+               g_cond_broadcast (&db->priv->barrier_condition);
+               g_mutex_unlock (&db->priv->barrier_mutex);
+
+               /* freed by the thread waiting on the barrier */
+               free = FALSE;
+               break;
        }
        if (free)
                rhythmdb_event_free (db, event);
[
Date Prev][
Date Next]   [
Thread Prev][
Thread Next]   
[
Thread Index]
[
Date Index]
[
Author Index]