[libsoup/carlosgc/thread-safe: 22/32] cache: make SoupCache thread safe




commit 81aadbf31a57a4955547ce6071c7cd0c4b381732
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Tue Apr 19 14:15:55 2022 +0200

    cache: make SoupCache thread safe

 libsoup/cache/soup-cache.c |  55 ++++++++++-
 tests/cache-test.c         | 235 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 285 insertions(+), 5 deletions(-)
---
diff --git a/libsoup/cache/soup-cache.c b/libsoup/cache/soup-cache.c
index 96771657..bc733a63 100644
--- a/libsoup/cache/soup-cache.c
+++ b/libsoup/cache/soup-cache.c
@@ -123,6 +123,7 @@ typedef struct _SoupCacheEntry {
 
 typedef struct {
        char *cache_dir;
+        GMutex mutex;
        GHashTable *cache;
        guint n_pending;
        SoupSession *session;
@@ -543,8 +544,10 @@ soup_cache_entry_remove (SoupCache *cache, SoupCacheEntry *entry, gboolean purge
        g_assert (!entry->dirty);
        g_assert (g_list_length (priv->lru_start) == g_hash_table_size (priv->cache));
 
-       if (!g_hash_table_remove (priv->cache, GUINT_TO_POINTER (entry->key)))
+       if (!g_hash_table_remove (priv->cache, GUINT_TO_POINTER (entry->key))) {
+                g_mutex_unlock (&priv->mutex);
                return FALSE;
+        }
 
        /* Remove from LRU */
        lru_item = g_list_find (priv->lru_start, entry);
@@ -712,7 +715,9 @@ soup_cache_send_response (SoupCache *cache, SoupMessage *msg)
 
         soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_REQUEST_START);
 
+        g_mutex_lock (&priv->mutex);
        entry = soup_cache_entry_lookup (cache, msg);
+        g_mutex_unlock (&priv->mutex);
        g_return_val_if_fail (entry, NULL);
 
        file = get_file_from_entry (cache, entry);
@@ -815,6 +820,8 @@ istream_caching_finished (SoupCacheInputStream *istream,
        SoupCachePrivate *priv = soup_cache_get_instance_private (cache);
        SoupCacheEntry *entry = helper->entry;
 
+        g_mutex_lock (&priv->mutex);
+
        --priv->n_pending;
 
        entry->dirty = FALSE;
@@ -843,6 +850,7 @@ istream_caching_finished (SoupCacheInputStream *istream,
        }
 
  cleanup:
+        g_mutex_unlock (&priv->mutex);
        g_object_unref (helper->cache);
        g_slice_free (StreamHelper, helper);
 }
@@ -862,6 +870,8 @@ soup_cache_content_processor_wrap_input (SoupContentProcessor *processor,
        StreamHelper *helper;
        time_t request_time, response_time;
 
+        g_mutex_lock (&priv->mutex);
+
        /* First of all, check if we should cache the resource. */
        cacheability = soup_cache_get_cacheability (cache, msg);
        entry = soup_cache_entry_lookup (cache, msg);
@@ -869,6 +879,7 @@ soup_cache_content_processor_wrap_input (SoupContentProcessor *processor,
        if (cacheability & SOUP_CACHE_INVALIDATES) {
                if (entry)
                        soup_cache_entry_remove (cache, entry, TRUE);
+                g_mutex_unlock (&priv->mutex);
                return NULL;
        }
 
@@ -880,15 +891,20 @@ soup_cache_content_processor_wrap_input (SoupContentProcessor *processor,
                 */
                if (entry)
                        soup_cache_update_from_conditional_request (cache, msg);
+                g_mutex_unlock (&priv->mutex);
                return NULL;
        }
 
-       if (!(cacheability & SOUP_CACHE_CACHEABLE))
+       if (!(cacheability & SOUP_CACHE_CACHEABLE)) {
+                g_mutex_unlock (&priv->mutex);
                return NULL;
+        }
 
        /* Check if we are already caching this resource */
-       if (entry && (entry->dirty || entry->being_validated))
+       if (entry && (entry->dirty || entry->being_validated)) {
+                g_mutex_unlock (&priv->mutex);
                return NULL;
+        }
 
        /* Create a new entry, deleting any old one if present */
        if (entry)
@@ -903,12 +919,15 @@ soup_cache_content_processor_wrap_input (SoupContentProcessor *processor,
        /* Do not continue if it can not be stored */
        if (!soup_cache_entry_insert (cache, entry, TRUE)) {
                soup_cache_entry_free (entry);
+                g_mutex_unlock (&priv->mutex);
                return NULL;
        }
 
        entry->cancellable = g_cancellable_new ();
        ++priv->n_pending;
 
+        g_mutex_unlock (&priv->mutex);
+
        helper = g_slice_new (StreamHelper);
        helper->cache = g_object_ref (cache);
        helper->entry = entry;
@@ -949,6 +968,8 @@ soup_cache_init (SoupCache *cache)
        priv->max_size = DEFAULT_MAX_SIZE;
        priv->max_entry_data_size = priv->max_size / MAX_ENTRY_DATA_PERCENTAGE;
        priv->size = 0;
+
+        g_mutex_init (&priv->mutex);
 }
 
 static void
@@ -974,6 +995,8 @@ soup_cache_finalize (GObject *object)
 
        g_list_free (priv->lru_start);
 
+        g_mutex_clear (&priv->mutex);
+
        G_OBJECT_CLASS (soup_cache_parent_class)->finalize (object);
 }
 
@@ -1121,13 +1144,17 @@ soup_cache_has_response (SoupCache *cache, SoupMessage *msg)
        int max_age, max_stale, min_fresh;
        GList *lru_item, *item;
 
+        g_mutex_lock (&priv->mutex);
+
        entry = soup_cache_entry_lookup (cache, msg);
 
        /* 1. The presented Request-URI and that of stored response
         * match
         */
-       if (!entry)
+       if (!entry) {
+                g_mutex_unlock (&priv->mutex);
                return SOUP_CACHE_RESPONSE_STALE;
+        }
 
        /* Increase hit count. Take sorting into account */
        entry->hits++;
@@ -1143,6 +1170,8 @@ soup_cache_has_response (SoupCache *cache, SoupMessage *msg)
                g_list_free (lru_item);
        }
 
+        g_mutex_unlock (&priv->mutex);
+
        if (entry->dirty || entry->being_validated)
                return SOUP_CACHE_RESPONSE_STALE;
 
@@ -1376,6 +1405,8 @@ clear_cache_files (SoupCache *cache)
  * @cache: a #SoupCache
  *
  * Will remove all entries in the @cache plus all the cache files.
+ *
+ * This is not thread safe and must be called only from the thread that created the #SoupCache
  */
 void
 soup_cache_clear (SoupCache *cache)
@@ -1398,6 +1429,7 @@ soup_cache_clear (SoupCache *cache)
 SoupMessage *
 soup_cache_generate_conditional_request (SoupCache *cache, SoupMessage *original)
 {
+        SoupCachePrivate *priv = soup_cache_get_instance_private (cache);
        SoupMessage *msg;
        GUri *uri;
        SoupCacheEntry *entry;
@@ -1408,7 +1440,9 @@ soup_cache_generate_conditional_request (SoupCache *cache, SoupMessage *original
        g_return_val_if_fail (SOUP_IS_MESSAGE (original), NULL);
 
        /* Add the validator entries in the header from the cached data */
+        g_mutex_lock (&priv->mutex);
        entry = soup_cache_entry_lookup (cache, original);
+        g_mutex_unlock (&priv->mutex);
        g_return_val_if_fail (entry, NULL);
 
        last_modified = soup_message_headers_get_one_common (entry->headers, SOUP_HEADER_LAST_MODIFIED);
@@ -1453,7 +1487,9 @@ soup_cache_cancel_conditional_request (SoupCache   *cache,
        SoupCachePrivate *priv = soup_cache_get_instance_private (cache);
        SoupCacheEntry *entry;
 
+        g_mutex_lock (&priv->mutex);
        entry = soup_cache_entry_lookup (cache, msg);
+        g_mutex_unlock (&priv->mutex);
        if (entry)
                entry->being_validated = FALSE;
 
@@ -1464,7 +1500,12 @@ void
 soup_cache_update_from_conditional_request (SoupCache   *cache,
                                            SoupMessage *msg)
 {
-       SoupCacheEntry *entry = soup_cache_entry_lookup (cache, msg);
+        SoupCachePrivate *priv = soup_cache_get_instance_private (cache);
+       SoupCacheEntry *entry;
+
+        g_mutex_lock (&priv->mutex);
+        entry = soup_cache_entry_lookup (cache, msg);
+        g_mutex_unlock (&priv->mutex);
        if (!entry)
                return;
 
@@ -1526,6 +1567,8 @@ pack_entry (gpointer data,
  *
  * You must call this before exiting if you want your cache data to
  * persist between sessions.
+ *
+ * This is not thread safe and must be called only from the thread that created the #SoupCache
  */
 void
 soup_cache_dump (SoupCache *cache)
@@ -1587,6 +1630,8 @@ insert_cache_file (SoupCache *cache, const char *name, GHashTable *leaked_entrie
  * @cache: a #SoupCache
  *
  * Loads the contents of @cache's index into memory.
+ *
+ * This is not thread safe and must be called only from the thread that created the #SoupCache
  */
 void
 soup_cache_load (SoupCache *cache)
diff --git a/tests/cache-test.c b/tests/cache-test.c
index 185def8b..e842005f 100644
--- a/tests/cache-test.c
+++ b/tests/cache-test.c
@@ -4,6 +4,7 @@
  */
 
 #include "test-utils.h"
+#include <glib/gstdio.h>
 
 static void
 server_callback (SoupServer        *server,
@@ -861,6 +862,239 @@ do_metrics_test (gconstpointer data)
         g_free (cache_dir);
 }
 
+typedef struct {
+        SoupSession *session;
+        GUri *uri;
+        gboolean must_revalidate;
+        gboolean is_expired;
+        gboolean hit_network;
+        gboolean validated;
+        GError *error;
+} ThreadTestRequest;
+
+static void
+threads_message_starting (SoupMessage       *msg,
+                          ThreadTestRequest *request)
+{
+        SoupMessageHeaders *request_headers;
+
+        if (request->validated)
+                return;
+
+        request_headers = soup_message_get_request_headers (msg);
+        request->validated = !!soup_message_headers_get_one (request_headers, "If-Modified-Since");
+}
+
+static void
+threads_request_queued (SoupSession       *session,
+                        SoupMessage       *msg,
+                        ThreadTestRequest *request)
+{
+        if (soup_message_get_uri (msg) != request->uri)
+                return;
+
+        g_signal_connect (msg, "starting",
+                          G_CALLBACK (threads_message_starting),
+                          request);
+}
+
+static void
+task_async_function (GTask        *task,
+                     GObject      *source,
+                     gpointer      task_data,
+                     GCancellable *cancellable)
+{
+        GMainContext *context;
+        SoupMessage *msg;
+        SoupMessageHeaders *request_headers;
+        GInputStream *stream;
+        ThreadTestRequest *request = (ThreadTestRequest *)task_data;
+
+        context = g_main_context_new ();
+        g_main_context_push_thread_default (context);
+
+        msg = soup_message_new_from_uri ("GET", request->uri);
+        g_signal_connect (request->session, "request-queued",
+                          G_CALLBACK (threads_request_queued),
+                          request);
+        request_headers = soup_message_get_request_headers (msg);
+        if (request->must_revalidate) {
+                soup_message_headers_append (request_headers,
+                                             "Test-Set-Last-Modified",
+                                             request->is_expired ? "Sat, 02 Jan 2010 00:00:00 GMT" : "Fri, 
01 Jan 2010 00:00:00 GMT");
+                soup_message_headers_append (request_headers,
+                                             "Test-Set-Expires", "Sat, 02 Jan 2011 00:00:00 GMT");
+                soup_message_headers_append (request_headers,
+                                             "Test-Set-Cache-Control", "must-revalidate");
+        } else {
+                soup_message_headers_append (request_headers,
+                                             "Test-Set-Expires", "Fri, 01 Jan 2100 00:00:00 GMT");
+        }
+
+        stream = soup_test_request_send (request->session, msg, NULL, 0, &request->error);
+        if (stream) {
+                request->hit_network = is_network_stream (stream);
+                soup_test_request_read_all (stream, NULL, &request->error);
+                g_object_unref (stream);
+        }
+
+        g_signal_handlers_disconnect_by_data (request->session, request);
+
+        g_object_unref (msg);
+
+        /* Continue iterating to ensure the item is unqueued and the connection released */
+        while (g_main_context_pending (context))
+                g_main_context_iteration (context, TRUE);
+
+        /* Cache writes are G_PRIORITY_LOW, so they won't have happened yet */
+        soup_cache_flush ((SoupCache *)soup_session_get_feature (request->session, SOUP_TYPE_CACHE));
+
+        g_task_return_boolean (task, TRUE);
+
+        g_main_context_pop_thread_default (context);
+        g_main_context_unref (context);
+}
+
+static void
+task_finished_cb (SoupSession  *session,
+                  GAsyncResult *result,
+                  guint        *finished_count)
+{
+        g_assert_true (g_task_propagate_boolean (G_TASK (result), NULL));
+        g_atomic_int_inc (finished_count);
+}
+
+static void
+do_threads_test (gconstpointer data)
+{
+        GUri *base_uri = (GUri *)data;
+        SoupSession *session;
+        SoupCache *cache;
+        char *cache_dir;
+        ThreadTestRequest requests[4];
+        guint i;
+        guint finished_count = 0;
+
+        session = soup_test_session_new (NULL);
+
+        cache_dir = g_dir_make_tmp ("cache-test-XXXXXX", NULL);
+        cache = soup_cache_new (cache_dir, SOUP_CACHE_SINGLE_USER);
+        soup_session_add_feature (session, SOUP_SESSION_FEATURE (cache));
+
+        requests[0].session = session;
+        requests[0].uri = g_uri_parse_relative (base_uri, "/1", SOUP_HTTP_URI_FLAGS, NULL);
+        requests[0].must_revalidate = FALSE;
+        requests[0].is_expired = FALSE;
+        requests[1].session = session;
+        requests[1].uri = g_uri_parse_relative (base_uri, "/2", SOUP_HTTP_URI_FLAGS, NULL);
+        requests[1].must_revalidate = TRUE;
+        requests[1].is_expired = FALSE;
+        requests[2].session = session;
+        requests[2].uri = g_uri_parse_relative (base_uri, "/3", SOUP_HTTP_URI_FLAGS, NULL);
+        requests[2].must_revalidate = FALSE;
+        requests[2].is_expired = FALSE;
+        requests[3].session = session;
+        requests[3].uri = g_uri_parse_relative (base_uri, "/4", SOUP_HTTP_URI_FLAGS, NULL);
+        requests[3].must_revalidate = TRUE;
+        requests[3].is_expired = FALSE;
+
+        for (i = 0; i < 4; i++) {
+                GTask *task;
+
+                requests[i].hit_network = FALSE;
+                requests[i].validated = FALSE;
+                requests[i].error = NULL;
+
+                task = g_task_new (NULL, NULL, (GAsyncReadyCallback)task_finished_cb, &finished_count);
+                g_task_set_task_data (task, &requests[i], NULL);
+                g_task_run_in_thread (task, (GTaskThreadFunc)task_async_function);
+                g_object_unref (task);
+        }
+
+        while (g_atomic_int_get (&finished_count) != 4)
+                g_main_context_iteration (NULL, TRUE);
+
+        /* Initial requests hit the network */
+        for (i = 0; i < 4; i++) {
+                g_assert_true (requests[i].hit_network);
+                g_assert_false (requests[i].validated);
+                g_assert_no_error (requests[i].error);
+        }
+
+        finished_count = 0;
+        for (i = 0; i < 4; i++) {
+                GTask *task;
+
+                requests[i].hit_network = FALSE;
+                requests[i].validated = FALSE;
+                requests[i].error = NULL;
+
+                task = g_task_new (NULL, NULL, (GAsyncReadyCallback)task_finished_cb, &finished_count);
+                g_task_set_task_data (task, &requests[i], NULL);
+                g_task_run_in_thread (task, (GTaskThreadFunc)task_async_function);
+                g_object_unref (task);
+        }
+
+        while (g_atomic_int_get (&finished_count) != 4)
+                g_main_context_iteration (NULL, TRUE);
+
+        /* None of the requests hit the ntwork */
+        for (i = 0; i < 4; i++) {
+                g_assert_false (requests[i].hit_network);
+                g_assert_no_error (requests[i].error);
+        }
+
+        /* The ones including must-revalidate are validated */
+        g_assert_false (requests[0].validated);
+        g_assert_true (requests[1].validated);
+        g_assert_false (requests[2].validated);
+        g_assert_true (requests[3].validated);
+
+        /* Try again making the validations fail */
+        requests[1].is_expired = TRUE;
+        requests[3].is_expired = TRUE;
+
+        finished_count = 0;
+        for (i = 0; i < 4; i++) {
+                GTask *task;
+
+                requests[i].hit_network = FALSE;
+                requests[i].validated = FALSE;
+                requests[i].error = NULL;
+
+                task = g_task_new (NULL, NULL, (GAsyncReadyCallback)task_finished_cb, &finished_count);
+                g_task_set_task_data (task, &requests[i], NULL);
+                g_task_run_in_thread (task, (GTaskThreadFunc)task_async_function);
+                g_object_unref (task);
+        }
+
+        while (g_atomic_int_get (&finished_count) != 4)
+                g_main_context_iteration (NULL, TRUE);
+
+        /* None of the requests failed */
+        for (i = 0; i < 4; i++)
+                g_assert_no_error (requests[i].error);
+
+        /* The ones including must-revalidate are validated and hit the network this time */
+        g_assert_false (requests[0].validated);
+        g_assert_false (requests[0].hit_network);
+        g_assert_true (requests[1].validated);
+        g_assert_true (requests[1].hit_network);
+        g_assert_false (requests[2].validated);
+        g_assert_false (requests[2].hit_network);
+        g_assert_true (requests[3].validated);
+        g_assert_true (requests[3].hit_network);
+
+        for (i = 0; i < 4; i++)
+                g_uri_unref (requests[i].uri);
+
+        soup_test_session_abort_unref (session);
+        soup_cache_clear (cache);
+        g_rmdir (cache_dir);
+        g_object_unref (cache);
+        g_free (cache_dir);
+}
+
 int
 main (int argc, char **argv)
 {
@@ -880,6 +1114,7 @@ main (int argc, char **argv)
        g_test_add_data_func ("/cache/headers", base_uri, do_headers_test);
        g_test_add_data_func ("/cache/leaks", base_uri, do_leaks_test);
         g_test_add_data_func ("/cache/metrics", base_uri, do_metrics_test);
+        g_test_add_data_func ("/cache/threads", base_uri, do_threads_test);
 
        ret = g_test_run ();
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]