[libsoup/carlosgc/async-context] session: remove async-context property
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/async-context] session: remove async-context property
- Date: Thu, 25 Feb 2021 15:00:43 +0000 (UTC)
commit 69dc2314e881487cbf8147f6c48b9e8d2e258358
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Thu Feb 25 11:45:33 2021 +0100
session: remove async-context property
Stop supporting multiple contexts, the session is expected to be used
always in the thread from which it was created.
libsoup/auth/soup-auth-manager.c | 33 +----
libsoup/server/soup-server-io.c | 14 ++-
libsoup/soup-logger.c | 23 ----
libsoup/soup-message-io-data.c | 2 -
libsoup/soup-message-io-data.h | 1 -
libsoup/soup-message-io.c | 9 +-
libsoup/soup-message-queue.c | 31 +----
libsoup/soup-message-queue.h | 2 -
libsoup/soup-session.c | 175 +++++++-------------------
libsoup/websocket/soup-websocket-connection.c | 13 +-
tests/context-test.c | 139 ++++++--------------
11 files changed, 102 insertions(+), 340 deletions(-)
---
diff --git a/libsoup/auth/soup-auth-manager.c b/libsoup/auth/soup-auth-manager.c
index 4561936c..2ab924f3 100644
--- a/libsoup/auth/soup-auth-manager.c
+++ b/libsoup/auth/soup-auth-manager.c
@@ -73,7 +73,6 @@ typedef struct {
GPtrArray *auth_types;
gboolean auto_ntlm;
- GMutex lock;
SoupAuth *proxy_auth;
GHashTable *auth_hosts;
} SoupAuthManagerPrivate;
@@ -104,7 +103,6 @@ soup_auth_manager_init (SoupAuthManager *manager)
soup_uri_host_equal,
NULL,
(GDestroyNotify)soup_auth_host_free);
- g_mutex_init (&priv->lock);
}
static void
@@ -118,8 +116,6 @@ soup_auth_manager_finalize (GObject *object)
g_clear_object (&priv->proxy_auth);
- g_mutex_clear (&priv->lock);
-
G_OBJECT_CLASS (soup_auth_manager_parent_class)->finalize (object);
}
@@ -638,8 +634,6 @@ auth_got_headers (SoupMessage *msg, gpointer manager)
SoupAuth *auth, *prior_auth;
gboolean prior_auth_failed = FALSE;
- g_mutex_lock (&priv->lock);
-
/* See if we used auth last time */
prior_auth = soup_message_get_auth (msg);
if (prior_auth && check_auth (msg, prior_auth)) {
@@ -648,10 +642,8 @@ auth_got_headers (SoupMessage *msg, gpointer manager)
prior_auth_failed = TRUE;
} else {
auth = create_auth (priv, msg);
- if (!auth) {
- g_mutex_unlock (&priv->lock);
+ if (!auth)
return;
- }
}
if (!soup_message_query_flags (msg, SOUP_MESSAGE_DO_NOT_USE_AUTH_CACHE)) {
@@ -668,7 +660,6 @@ auth_got_headers (SoupMessage *msg, gpointer manager)
prior_auth_failed, FALSE, TRUE);
soup_message_set_auth (msg, auth);
g_object_unref (auth);
- g_mutex_unlock (&priv->lock);
}
static void
@@ -677,7 +668,6 @@ auth_got_body (SoupMessage *msg, gpointer manager)
SoupAuthManagerPrivate *priv = soup_auth_manager_get_instance_private (manager);
SoupAuth *auth;
- g_mutex_lock (&priv->lock);
auth = lookup_auth (priv, msg);
if (auth && soup_auth_is_ready (auth, msg)) {
if (SOUP_IS_CONNECTION_AUTH (auth))
@@ -691,7 +681,6 @@ auth_got_body (SoupMessage *msg, gpointer manager)
soup_session_requeue_message (priv->session, msg);
}
- g_mutex_unlock (&priv->lock);
}
static void
@@ -701,8 +690,6 @@ proxy_auth_got_headers (SoupMessage *msg, gpointer manager)
SoupAuth *auth = NULL, *prior_auth;
gboolean prior_auth_failed = FALSE;
- g_mutex_lock (&priv->lock);
-
/* See if we used auth last time */
prior_auth = soup_message_get_proxy_auth (msg);
if (prior_auth && check_auth (msg, prior_auth)) {
@@ -715,10 +702,9 @@ proxy_auth_got_headers (SoupMessage *msg, gpointer manager)
if (!auth) {
auth = create_auth (priv, msg);
- if (!auth) {
- g_mutex_unlock (&priv->lock);
+ if (!auth)
return;
- }
+
if (!soup_message_query_flags (msg, SOUP_MESSAGE_DO_NOT_USE_AUTH_CACHE))
priv->proxy_auth = g_object_ref (auth);
}
@@ -728,7 +714,6 @@ proxy_auth_got_headers (SoupMessage *msg, gpointer manager)
prior_auth_failed, TRUE, TRUE);
soup_message_set_proxy_auth (msg, auth);
g_object_unref (auth);
- g_mutex_unlock (&priv->lock);
}
static void
@@ -737,8 +722,6 @@ proxy_auth_got_body (SoupMessage *msg, gpointer manager)
SoupAuthManagerPrivate *priv = soup_auth_manager_get_instance_private (manager);
SoupAuth *auth;
- g_mutex_lock (&priv->lock);
-
auth = lookup_proxy_auth (priv, msg);
if (auth && soup_auth_is_ready (auth, msg)) {
/* When not using cached credentials, update the Authorization header
@@ -748,8 +731,6 @@ proxy_auth_got_body (SoupMessage *msg, gpointer manager)
update_authorization_header (msg, auth, TRUE);
soup_session_requeue_message (priv->session, msg);
}
-
- g_mutex_unlock (&priv->lock);
}
static void
@@ -761,8 +742,6 @@ auth_msg_starting (SoupMessage *msg, gpointer manager)
if (soup_message_query_flags (msg, SOUP_MESSAGE_DO_NOT_USE_AUTH_CACHE))
return;
- g_mutex_lock (&priv->lock);
-
if (soup_message_get_method (msg) != SOUP_METHOD_CONNECT) {
auth = lookup_auth (priv, msg);
if (auth) {
@@ -782,8 +761,6 @@ auth_msg_starting (SoupMessage *msg, gpointer manager)
}
soup_message_set_proxy_auth (msg, auth);
update_authorization_header (msg, auth, TRUE);
-
- g_mutex_unlock (&priv->lock);
}
static void
@@ -840,9 +817,7 @@ soup_auth_manager_use_auth (SoupAuthManager *manager,
{
SoupAuthManagerPrivate *priv = soup_auth_manager_get_instance_private (manager);
- g_mutex_lock (&priv->lock);
record_auth_for_uri (priv, uri, auth, FALSE);
- g_mutex_unlock (&priv->lock);
}
/**
@@ -859,9 +834,7 @@ soup_auth_manager_clear_cached_credentials (SoupAuthManager *manager)
g_return_if_fail (SOUP_IS_AUTH_MANAGER (manager));
- g_mutex_lock (&priv->lock);
g_hash_table_remove_all (priv->auth_hosts);
- g_mutex_unlock (&priv->lock);
}
static void
diff --git a/libsoup/server/soup-server-io.c b/libsoup/server/soup-server-io.c
index 7d2831cd..9359293a 100644
--- a/libsoup/server/soup-server-io.c
+++ b/libsoup/server/soup-server-io.c
@@ -26,6 +26,8 @@ struct _SoupServerMessageIOData {
goffset write_body_offset;
GSource *unpause_source;
+
+ GMainContext *async_context;
};
#define RESPONSE_BLOCK_SIZE 8192
@@ -45,6 +47,7 @@ soup_server_message_io_data_free (SoupServerMessageIOData *io)
io->unpause_source = NULL;
}
+ g_clear_pointer (&io->async_context, g_main_context_unref);
g_clear_pointer (&io->write_chunk, g_bytes_unref);
g_slice_free (SoupServerMessageIOData, io);
@@ -488,11 +491,11 @@ io_write (SoupServerMessage *msg,
g_clear_object (&io->body_ostream);
} else {
io->async_wait = g_cancellable_new ();
- g_main_context_push_thread_default (io->async_context);
+ g_main_context_push_thread_default (server_io->async_context);
g_output_stream_close_async (io->body_ostream,
G_PRIORITY_DEFAULT, NULL,
closed_async, g_object_ref (msg));
- g_main_context_pop_thread_default (io->async_context);
+ g_main_context_pop_thread_default (server_io->async_context);
}
}
@@ -850,7 +853,7 @@ io_run (SoupServerMessage *msg)
io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), NULL,
(SoupMessageIOSourceFunc)io_run_ready,
NULL);
- g_source_attach (io->io_source, io->async_context);
+ g_source_attach (io->io_source, server_io->async_context);
} else if (soup_server_message_get_io_data (msg) == server_io) {
soup_server_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR, error ?
error->message : NULL);
soup_server_message_io_finished (msg);
@@ -875,7 +878,6 @@ soup_server_message_read_request (SoupServerMessage *msg,
io->base.iostream = g_object_ref (soup_socket_get_iostream (sock));
io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
- io->base.async_context = g_main_context_ref_thread_default ();
io->base.read_header_buf = g_byte_array_new ();
io->base.write_buf = g_string_new (NULL);
@@ -883,6 +885,8 @@ soup_server_message_read_request (SoupServerMessage *msg,
io->base.read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
io->base.write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
+ io->async_context = g_main_context_ref_thread_default ();
+
soup_server_message_set_io_data (msg, io);
io_run (msg);
@@ -928,7 +932,7 @@ soup_server_message_io_unpause (SoupServerMessage *msg)
g_return_if_fail (io != NULL);
if (!io->unpause_source) {
- io->unpause_source = soup_add_completion_reffed (io->base.async_context,
+ io->unpause_source = soup_add_completion_reffed (io->async_context,
io_unpause_internal, msg, NULL);
}
}
diff --git a/libsoup/soup-logger.c b/libsoup/soup-logger.c
index 29a089a1..72afb012 100644
--- a/libsoup/soup-logger.c
+++ b/libsoup/soup-logger.c
@@ -97,11 +97,6 @@ struct _SoupLogger {
};
typedef struct {
- /* We use a mutex so that if requests are being run in
- * multiple threads, we don't mix up the output.
- */
- GMutex lock;
-
GQuark tag;
GHashTable *ids;
@@ -142,7 +137,6 @@ soup_logger_init (SoupLogger *logger)
SoupLoggerPrivate *priv = soup_logger_get_instance_private (logger);
char *id;
- g_mutex_init (&priv->lock);
id = g_strdup_printf ("SoupLogger-%p", logger);
priv->tag = g_quark_from_string (id);
g_free (id);
@@ -164,8 +158,6 @@ soup_logger_finalize (GObject *object)
if (priv->printer_dnotify)
priv->printer_dnotify (priv->printer_data);
- g_mutex_clear (&priv->lock);
-
G_OBJECT_CLASS (soup_logger_parent_class)->finalize (object);
}
@@ -572,23 +564,15 @@ static void
finished (SoupMessage *msg, gpointer user_data)
{
SoupLogger *logger = user_data;
- SoupLoggerPrivate *priv = soup_logger_get_instance_private (logger);
-
- g_mutex_lock (&priv->lock);
print_response (logger, msg);
soup_logger_print (logger, SOUP_LOGGER_LOG_MINIMAL, ' ', "\n");
-
- g_mutex_unlock (&priv->lock);
}
static void
got_informational (SoupMessage *msg, gpointer user_data)
{
SoupLogger *logger = user_data;
- SoupLoggerPrivate *priv = soup_logger_get_instance_private (logger);
-
- g_mutex_lock (&priv->lock);
g_signal_handlers_disconnect_by_func (msg, finished, logger);
print_response (logger, msg);
@@ -599,23 +583,16 @@ got_informational (SoupMessage *msg, gpointer user_data)
"[Now sending request body...]");
soup_logger_print (logger, SOUP_LOGGER_LOG_MINIMAL, ' ', "\n");
}
-
- g_mutex_unlock (&priv->lock);
}
static void
got_body (SoupMessage *msg, gpointer user_data)
{
SoupLogger *logger = user_data;
- SoupLoggerPrivate *priv = soup_logger_get_instance_private (logger);
-
- g_mutex_lock (&priv->lock);
g_signal_handlers_disconnect_by_func (msg, finished, logger);
print_response (logger, msg);
soup_logger_print (logger, SOUP_LOGGER_LOG_MINIMAL, ' ', "\n");
-
- g_mutex_unlock (&priv->lock);
}
static void
diff --git a/libsoup/soup-message-io-data.c b/libsoup/soup-message-io-data.c
index 9fd76e5b..e9b16e9f 100644
--- a/libsoup/soup-message-io-data.c
+++ b/libsoup/soup-message-io-data.c
@@ -34,8 +34,6 @@ soup_message_io_data_cleanup (SoupMessageIOData *io)
g_object_unref (io->body_istream);
if (io->body_ostream)
g_object_unref (io->body_ostream);
- if (io->async_context)
- g_main_context_unref (io->async_context);
g_byte_array_free (io->read_header_buf, TRUE);
diff --git a/libsoup/soup-message-io-data.h b/libsoup/soup-message-io-data.h
index 0476a425..0e3eb77b 100644
--- a/libsoup/soup-message-io-data.h
+++ b/libsoup/soup-message-io-data.h
@@ -47,7 +47,6 @@ typedef struct {
GInputStream *body_istream;
GOutputStream *ostream;
GOutputStream *body_ostream;
- GMainContext *async_context;
SoupMessageIOState read_state;
SoupEncoding read_encoding;
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 1957bc9b..6503892c 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -392,7 +392,6 @@ io_write (SoupMessage *msg, gboolean blocking,
break;
} else {
io->async_wait = g_cancellable_new ();
- g_main_context_push_thread_default (io->async_context);
g_output_stream_splice_async (io->body_ostream,
soup_message_get_request_body_stream (msg),
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
@@ -400,7 +399,6 @@ io_write (SoupMessage *msg, gboolean blocking,
cancellable,
(GAsyncReadyCallback)request_body_stream_wrote_cb,
g_object_ref (msg));
- g_main_context_pop_thread_default (io->async_context);
return FALSE;
}
} else
@@ -415,12 +413,10 @@ io_write (SoupMessage *msg, gboolean blocking,
g_clear_object (&io->body_ostream);
} else {
io->async_wait = g_cancellable_new ();
- g_main_context_push_thread_default (io->async_context);
g_output_stream_close_async (io->body_ostream,
soup_client_message_io_data_get_priority
(client_io),
cancellable,
closed_async, g_object_ref (msg));
- g_main_context_pop_thread_default (io->async_context);
}
}
@@ -809,7 +805,7 @@ soup_message_io_run (SoupMessage *msg,
NULL);
g_source_set_priority (io->io_source,
soup_client_message_io_data_get_priority (client_io));
- g_source_attach (io->io_source, io->async_context);
+ g_source_attach (io->io_source, g_main_context_get_thread_default ());
} else {
if (soup_message_get_io_data (msg) == client_io)
soup_message_io_finish (msg, error);
@@ -882,7 +878,7 @@ io_run_until_read_async (SoupMessage *msg,
(SoupMessageIOSourceFunc)io_run_until_read_ready,
task);
g_source_set_priority (io->io_source, g_task_get_priority (task));
- g_source_attach (io->io_source, io->async_context);
+ g_source_attach (io->io_source, g_main_context_get_thread_default ());
return;
}
@@ -980,7 +976,6 @@ soup_message_send_request (SoupMessageQueueItem *item,
io->base.iostream = g_object_ref (soup_connection_get_iostream (io->item->conn));
io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
- io->base.async_context = g_main_context_ref_thread_default ();
io->base.read_header_buf = g_byte_array_new ();
io->base.write_buf = g_string_new (NULL);
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index f1422b19..5e04d306 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -29,7 +29,6 @@
struct _SoupMessageQueue {
SoupSession *session;
- GMutex mutex;
SoupMessageQueueItem *head, *tail;
};
@@ -40,7 +39,7 @@ soup_message_queue_new (SoupSession *session)
queue = g_slice_new0 (SoupMessageQueue);
queue->session = session;
- g_mutex_init (&queue->mutex);
+
return queue;
}
@@ -49,7 +48,6 @@ soup_message_queue_destroy (SoupMessageQueue *queue)
{
g_return_if_fail (queue->head == NULL);
- g_mutex_clear (&queue->mutex);
g_slice_free (SoupMessageQueue, queue);
}
@@ -84,9 +82,6 @@ soup_message_queue_append (SoupMessageQueue *queue,
item = g_slice_new0 (SoupMessageQueueItem);
item->session = g_object_ref (queue->session);
- item->async_context = g_main_context_get_thread_default ();
- if (item->async_context)
- g_main_context_ref (item->async_context);
item->queue = queue;
item->msg = g_object_ref (msg);
item->callback = callback;
@@ -103,7 +98,6 @@ soup_message_queue_append (SoupMessageQueue *queue,
*/
item->ref_count = 1;
- g_mutex_lock (&queue->mutex);
if (queue->head) {
SoupMessageQueueItem *it = queue->head;
@@ -129,7 +123,6 @@ soup_message_queue_append (SoupMessageQueue *queue,
} else
queue->head = queue->tail = item;
- g_mutex_unlock (&queue->mutex);
return item;
}
@@ -142,9 +135,7 @@ soup_message_queue_append (SoupMessageQueue *queue,
void
soup_message_queue_item_ref (SoupMessageQueueItem *item)
{
- g_mutex_lock (&item->queue->mutex);
item->ref_count++;
- g_mutex_unlock (&item->queue->mutex);
}
/**
@@ -158,15 +149,11 @@ soup_message_queue_item_ref (SoupMessageQueueItem *item)
void
soup_message_queue_item_unref (SoupMessageQueueItem *item)
{
- g_mutex_lock (&item->queue->mutex);
-
/* Decrement the ref_count; if it's still non-zero OR if the
* item is still in the queue, then return.
*/
- if (--item->ref_count || !item->removed) {
- g_mutex_unlock (&item->queue->mutex);
+ if (--item->ref_count || !item->removed)
return;
- }
g_warn_if_fail (item->conn == NULL);
@@ -180,8 +167,6 @@ soup_message_queue_item_unref (SoupMessageQueueItem *item)
else
item->queue->tail = item->prev;
- g_mutex_unlock (&item->queue->mutex);
-
/* And free it */
g_signal_handlers_disconnect_by_func (item->msg,
queue_message_restarted, item);
@@ -190,7 +175,6 @@ soup_message_queue_item_unref (SoupMessageQueueItem *item)
g_object_unref (item->cancellable);
g_clear_error (&item->error);
g_clear_object (&item->task);
- g_clear_pointer (&item->async_context, g_main_context_unref);
g_slice_free (SoupMessageQueueItem, item);
}
@@ -210,8 +194,6 @@ soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg)
{
SoupMessageQueueItem *item;
- g_mutex_lock (&queue->mutex);
-
item = queue->tail;
while (item && (item->removed || item->msg != msg))
item = item->prev;
@@ -219,7 +201,6 @@ soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg)
if (item)
item->ref_count++;
- g_mutex_unlock (&queue->mutex);
return item;
}
@@ -240,8 +221,6 @@ soup_message_queue_first (SoupMessageQueue *queue)
{
SoupMessageQueueItem *item;
- g_mutex_lock (&queue->mutex);
-
item = queue->head;
while (item && item->removed)
item = item->next;
@@ -249,7 +228,6 @@ soup_message_queue_first (SoupMessageQueue *queue)
if (item)
item->ref_count++;
- g_mutex_unlock (&queue->mutex);
return item;
}
@@ -270,15 +248,12 @@ soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueItem *item)
{
SoupMessageQueueItem *next;
- g_mutex_lock (&queue->mutex);
-
next = item->next;
while (next && next->removed)
next = next->next;
if (next)
next->ref_count++;
- g_mutex_unlock (&queue->mutex);
soup_message_queue_item_unref (item);
return next;
}
@@ -296,7 +271,5 @@ soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueItem *item)
{
g_return_if_fail (!item->removed);
- g_mutex_lock (&queue->mutex);
item->removed = TRUE;
- g_mutex_unlock (&queue->mutex);
}
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index e030adaf..c9cab022 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -39,7 +39,6 @@ struct _SoupMessageQueueItem {
SoupMessage *msg;
SoupSessionCallback callback;
gpointer callback_data;
- GMainContext *async_context;
GCancellable *cancellable;
GError *error;
@@ -50,7 +49,6 @@ struct _SoupMessageQueueItem {
guint paused : 1;
guint io_started : 1;
guint async : 1;
- guint async_pending : 1;
guint connect_only : 1;
guint priority : 3;
guint resend_count : 5;
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 663d8345..77c1f81c 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -106,6 +106,7 @@ typedef struct {
SoupSocketProperties *socket_props;
SoupMessageQueue *queue;
+ GSource *queue_source;
char *user_agent;
char *accept_language;
@@ -118,19 +119,6 @@ typedef struct {
GHashTable *conns; /* SoupConnection -> SoupSessionHost */
guint num_conns;
guint max_conns, max_conns_per_host;
-
- /* Must hold the conn_lock before potentially creating a new
- * SoupSessionHost, adding/removing a connection,
- * disconnecting a connection, moving a connection from
- * IDLE to IN_USE, or when updating socket properties.
- * Must not emit signals or destroy objects while holding it.
- * The conn_cond is signaled when it may be possible for
- * a previously-blocked message to continue.
- */
- GMutex conn_lock;
- GCond conn_cond;
-
- GMainContext *async_context;
} SoupSessionPrivate;
static void free_host (SoupSessionHost *host);
@@ -176,7 +164,6 @@ enum {
PROP_MAX_CONNS,
PROP_MAX_CONNS_PER_HOST,
PROP_TLS_DATABASE,
- PROP_ASYNC_CONTEXT,
PROP_TIMEOUT,
PROP_USER_AGENT,
PROP_ACCEPT_LANGUAGE,
@@ -216,18 +203,46 @@ enum {
*/
G_DEFINE_QUARK (soup-session-error-quark, soup_session_error)
+typedef struct {
+ GSource source;
+ SoupSession* session;
+} SoupMessageQueueSource;
+
+static gboolean
+queue_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ SoupSession *session = ((SoupMessageQueueSource *)source)->session;
+
+ g_source_set_ready_time (source, -1);
+ async_run_queue (session);
+ return G_SOURCE_CONTINUE;
+}
+
+static GSourceFuncs queue_source_funcs = {
+ NULL, //queue_prepare,
+ NULL, //queue_check,
+ queue_dispatch,
+ NULL, NULL, NULL
+};
+
static void
soup_session_init (SoupSession *session)
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
SoupAuthManager *auth_manager;
+ SoupMessageQueueSource *source;
priv->queue = soup_message_queue_new (session);
- priv->async_context = g_main_context_ref_thread_default ();
+ priv->queue_source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource));
+ source = (SoupMessageQueueSource *)priv->queue_source;
+ source->session = session;
+ g_source_set_name (priv->queue_source, "SoupMessageQueue");
+ g_source_set_can_recurse (priv->queue_source, TRUE);
+ g_source_attach (priv->queue_source, g_main_context_get_thread_default ());
priv->io_timeout = priv->idle_timeout = 60;
- g_mutex_init (&priv->conn_lock);
- g_cond_init (&priv->conn_cond);
priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash,
soup_host_uri_equal,
NULL, (GDestroyNotify)free_host);
@@ -272,6 +287,8 @@ soup_session_dispose (GObject *object)
while (priv->features)
soup_session_remove_feature (session, priv->features->data);
+ g_source_destroy (priv->queue_source);
+
G_OBJECT_CLASS (soup_session_parent_class)->dispose (object);
}
@@ -282,9 +299,8 @@ soup_session_finalize (GObject *object)
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
soup_message_queue_destroy (priv->queue);
+ g_source_unref (priv->queue_source);
- g_mutex_clear (&priv->conn_lock);
- g_cond_clear (&priv->conn_cond);
g_hash_table_destroy (priv->http_hosts);
g_hash_table_destroy (priv->https_hosts);
g_hash_table_destroy (priv->conns);
@@ -295,7 +311,6 @@ soup_session_finalize (GObject *object)
g_clear_object (&priv->tlsdb);
g_clear_object (&priv->tls_interaction);
- g_clear_pointer (&priv->async_context, g_main_context_unref);
g_clear_object (&priv->local_addr);
g_hash_table_destroy (priv->features_cache);
@@ -307,7 +322,6 @@ soup_session_finalize (GObject *object)
G_OBJECT_CLASS (soup_session_parent_class)->finalize (object);
}
-/* requires conn_lock */
static void
ensure_socket_props (SoupSession *session)
{
@@ -331,13 +345,12 @@ socket_props_changed (SoupSession *session)
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
- g_mutex_lock (&priv->conn_lock);
- if (priv->socket_props) {
- soup_socket_properties_unref (priv->socket_props);
- priv->socket_props = NULL;
- ensure_socket_props (session);
- }
- g_mutex_unlock (&priv->conn_lock);
+ if (!priv->socket_props)
+ return;
+
+ soup_socket_properties_unref (priv->socket_props);
+ priv->socket_props = NULL;
+ ensure_socket_props (session);
}
static void
@@ -996,7 +1009,6 @@ soup_session_host_new (SoupSession *session, GUri *uri)
return host;
}
-/* Requires conn_lock to be locked */
static SoupSessionHost *
get_host_for_uri (SoupSession *session, GUri *uri)
{
@@ -1030,7 +1042,6 @@ get_host_for_uri (SoupSession *session, GUri *uri)
return host;
}
-/* Requires conn_lock to be locked */
static SoupSessionHost *
get_host_for_message (SoupSession *session, SoupMessage *msg)
{
@@ -1266,10 +1277,8 @@ soup_session_append_queue_item (SoupSession *session,
item = soup_message_queue_append (priv->queue, msg, cancellable, callback, user_data);
item->async = async;
- g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
host->num_messages++;
- g_mutex_unlock (&priv->conn_lock);
if (!soup_message_query_flags (msg, SOUP_MESSAGE_NO_REDIRECT)) {
soup_message_add_header_handler (
@@ -1362,7 +1371,6 @@ soup_session_cleanup_connections (SoupSession *session,
gpointer conn, host;
SoupConnectionState state;
- g_mutex_lock (&priv->conn_lock);
g_hash_table_iter_init (&iter, priv->conns);
while (g_hash_table_iter_next (&iter, &conn, &host)) {
state = soup_connection_get_state (conn);
@@ -1373,7 +1381,6 @@ soup_session_cleanup_connections (SoupSession *session,
drop_connection (session, host, conn);
}
}
- g_mutex_unlock (&priv->conn_lock);
if (!conns)
return FALSE;
@@ -1395,15 +1402,8 @@ free_unused_host (gpointer user_data)
SoupSessionPrivate *priv = soup_session_get_instance_private (host->session);
GUri *uri = host->uri;
- g_mutex_lock (&priv->conn_lock);
-
- /* In a multithreaded session, a connection might have been
- * added while we were waiting for conn_lock.
- */
- if (host->connections) {
- g_mutex_unlock (&priv->conn_lock);
+ if (host->connections)
return FALSE;
- }
/* This will free the host in addition to removing it from the
* hash table
@@ -1412,7 +1412,6 @@ free_unused_host (gpointer user_data)
g_hash_table_remove (priv->https_hosts, uri);
else
g_hash_table_remove (priv->http_hosts, uri);
- g_mutex_unlock (&priv->conn_lock);
return FALSE;
}
@@ -1422,10 +1421,6 @@ drop_connection (SoupSession *session, SoupSessionHost *host, SoupConnection *co
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
- /* Note: caller must hold conn_lock, and must remove @conn
- * from priv->conns itself.
- */
-
if (host) {
host->connections = g_slist_remove (host->connections, conn);
host->num_conns--;
@@ -1436,7 +1431,7 @@ drop_connection (SoupSession *session, SoupSessionHost *host, SoupConnection *co
*/
if (host->num_conns == 0) {
g_assert (host->keep_alive_src == NULL);
- host->keep_alive_src = soup_add_timeout (priv->async_context,
+ host->keep_alive_src = soup_add_timeout (g_main_context_get_thread_default (),
HOST_KEEP_ALIVE,
free_unused_host,
host);
@@ -1457,15 +1452,11 @@ connection_disconnected (SoupConnection *conn, gpointer user_data)
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
SoupSessionHost *host;
- g_mutex_lock (&priv->conn_lock);
-
host = g_hash_table_lookup (priv->conns, conn);
if (host)
g_hash_table_remove (priv->conns, conn);
drop_connection (session, host, conn);
- g_mutex_unlock (&priv->conn_lock);
-
soup_session_kick_queue (session);
}
@@ -1501,11 +1492,8 @@ soup_session_unqueue_item (SoupSession *session,
soup_message_queue_remove (priv->queue, item);
- g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
host->num_messages--;
- g_cond_broadcast (&priv->conn_cond);
- g_mutex_unlock (&priv->conn_lock);
/* g_signal_handlers_disconnect_by_func doesn't work if you
* have a metamarshal, meaning it doesn't work with
@@ -1699,7 +1687,6 @@ connect_async_complete (GObject *object,
soup_message_queue_item_unref (item);
}
-/* requires conn_lock */
static SoupConnection *
get_connection_for_host (SoupSession *session,
SoupMessageQueueItem *item,
@@ -1780,7 +1767,6 @@ static gboolean
get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
{
SoupSession *session = item->session;
- SoupSessionPrivate *priv = soup_session_get_instance_private (session);
SoupSessionHost *host;
SoupConnection *conn = NULL;
gboolean my_should_cleanup = FALSE;
@@ -1793,7 +1779,6 @@ get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
(!soup_message_query_flags (item->msg, SOUP_MESSAGE_IDEMPOTENT) &&
!SOUP_METHOD_IS_IDEMPOTENT (soup_message_get_method (item->msg)));
- g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
while (TRUE) {
conn = get_connection_for_host (session, item, host,
@@ -1803,17 +1788,11 @@ get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
break;
if (my_should_cleanup) {
- g_mutex_unlock (&priv->conn_lock);
soup_session_cleanup_connections (session, TRUE);
- g_mutex_lock (&priv->conn_lock);
-
my_should_cleanup = FALSE;
continue;
}
-
- g_cond_wait (&priv->conn_cond, &priv->conn_lock);
}
- g_mutex_unlock (&priv->conn_lock);
if (!conn) {
if (should_cleanup)
@@ -1950,11 +1929,9 @@ async_run_queue (SoupSession *session)
if (soup_message_get_method (msg) == SOUP_METHOD_CONNECT)
continue;
- if (!item->async ||
- item->async_context != g_main_context_get_thread_default ())
+ if (!item->async)
continue;
- item->async_pending = FALSE;
soup_session_process_queue_item (session, item, &should_cleanup, TRUE);
}
@@ -1972,30 +1949,6 @@ async_run_queue (SoupSession *session)
g_object_unref (session);
}
-static gboolean
-idle_run_queue (gpointer user_data)
-{
- GWeakRef *wref = user_data;
- SoupSession *session;
-
- session = g_weak_ref_get (wref);
- if (!session)
- return FALSE;
-
- async_run_queue (session);
- g_object_unref (session);
- return FALSE;
-}
-
-static void
-idle_run_queue_dnotify (gpointer user_data)
-{
- GWeakRef *wref = user_data;
-
- g_weak_ref_clear (wref);
- g_slice_free (GWeakRef, wref);
-}
-
/**
* soup_session_requeue_message:
* @session: a #SoupSession
@@ -2049,42 +2002,8 @@ static void
soup_session_kick_queue (SoupSession *session)
{
SoupSessionPrivate *priv = soup_session_get_instance_private (session);
- SoupMessageQueueItem *item;
- GHashTable *async_pending;
- gboolean have_sync_items = FALSE;
-
- if (priv->disposed)
- return;
- async_pending = g_hash_table_new (NULL, NULL);
- for (item = soup_message_queue_first (priv->queue);
- item;
- item = soup_message_queue_next (priv->queue, item)) {
- if (item->async) {
- GMainContext *context = item->async_context;
-
- if (!g_hash_table_contains (async_pending, context)) {
- if (!item->async_pending) {
- GWeakRef *wref = g_slice_new (GWeakRef);
- GSource *source;
-
- g_weak_ref_init (wref, session);
- source = soup_add_completion_reffed (context, idle_run_queue, wref,
idle_run_queue_dnotify);
- g_source_unref (source);
- }
- g_hash_table_add (async_pending, context);
- }
- item->async_pending = TRUE;
- } else
- have_sync_items = TRUE;
- }
- g_hash_table_unref (async_pending);
-
- if (have_sync_items) {
- g_mutex_lock (&priv->conn_lock);
- g_cond_broadcast (&priv->conn_cond);
- g_mutex_unlock (&priv->conn_lock);
- }
+ g_source_set_ready_time (priv->queue_source, 0);
}
/**
@@ -2166,7 +2085,6 @@ soup_session_abort (SoupSession *session)
}
/* Close all idle connections */
- g_mutex_lock (&priv->conn_lock);
conns = NULL;
g_hash_table_iter_init (&iter, priv->conns);
while (g_hash_table_iter_next (&iter, &conn, &host)) {
@@ -2180,7 +2098,6 @@ soup_session_abort (SoupSession *session)
drop_connection (session, host, conn);
}
}
- g_mutex_unlock (&priv->conn_lock);
for (c = conns; c; c = c->next) {
soup_connection_disconnect (c->data);
@@ -3863,11 +3780,9 @@ steal_connection (SoupSession *session,
conn = g_object_ref (item->conn);
soup_session_set_item_connection (session, item, NULL);
- g_mutex_lock (&priv->conn_lock);
host = get_host_for_message (session, item->msg);
g_hash_table_remove (priv->conns, conn);
drop_connection (session, host, conn);
- g_mutex_unlock (&priv->conn_lock);
stream = soup_connection_steal_iostream (conn);
if (!item->connect_only)
diff --git a/libsoup/websocket/soup-websocket-connection.c b/libsoup/websocket/soup-websocket-connection.c
index 19b5cd1a..76b929a6 100644
--- a/libsoup/websocket/soup-websocket-connection.c
+++ b/libsoup/websocket/soup-websocket-connection.c
@@ -131,8 +131,6 @@ typedef struct {
gboolean dirty_close;
GSource *close_timeout;
- GMainContext *main_context;
-
gboolean io_closing;
gboolean io_closed;
@@ -266,7 +264,6 @@ soup_websocket_connection_init (SoupWebsocketConnection *self)
priv->incoming = g_byte_array_sized_new (1024);
g_queue_init (&priv->outgoing);
- priv->main_context = g_main_context_ref_thread_default ();
}
static void
@@ -307,7 +304,7 @@ soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
priv->input_source = g_pollable_input_stream_create_source (priv->input, NULL);
g_source_set_callback (priv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
- g_source_attach (priv->input_source, priv->main_context);
+ g_source_attach (priv->input_source, g_main_context_get_thread_default ());
}
static void
@@ -333,7 +330,7 @@ soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
priv->output_source = g_pollable_output_stream_create_source (priv->output, NULL);
g_source_set_callback (priv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
- g_source_attach (priv->output_source, priv->main_context);
+ g_source_attach (priv->output_source, g_main_context_get_thread_default ());
}
static void
@@ -445,7 +442,7 @@ close_io_after_timeout (SoupWebsocketConnection *self)
g_debug ("waiting %d seconds for peer to close io", timeout);
priv->close_timeout = g_timeout_source_new_seconds (timeout);
g_source_set_callback (priv->close_timeout, on_timeout_close_io, self, NULL);
- g_source_attach (priv->close_timeout, priv->main_context);
+ g_source_attach (priv->close_timeout, g_main_context_get_thread_default ());
}
static void
@@ -1441,8 +1438,6 @@ soup_websocket_connection_finalize (GObject *object)
g_free (priv->peer_close_data);
- g_main_context_unref (priv->main_context);
-
if (priv->incoming)
g_byte_array_free (priv->incoming, TRUE);
while (!g_queue_is_empty (&priv->outgoing))
@@ -2169,7 +2164,7 @@ soup_websocket_connection_set_keepalive_interval (SoupWebsocketConnection *self,
if (interval > 0) {
priv->keepalive_timeout = g_timeout_source_new_seconds (interval);
g_source_set_callback (priv->keepalive_timeout, on_queue_ping, self, NULL);
- g_source_attach (priv->keepalive_timeout, priv->main_context);
+ g_source_attach (priv->keepalive_timeout, g_main_context_get_thread_default ());
}
}
}
diff --git a/tests/context-test.c b/tests/context-test.c
index 38a303b7..c43fc890 100644
--- a/tests/context-test.c
+++ b/tests/context-test.c
@@ -128,7 +128,19 @@ idle_start_test1_thread (gpointer user_data)
}
static void
-test1_finished (SoupMessage *msg,
+message_send_cb (SoupSession *session,
+ GAsyncResult *result,
+ GMainContext *async_context)
+{
+ GInputStream *stream;
+
+ g_assert_true (async_context == g_main_context_get_thread_default ());
+ stream = soup_session_send_finish (session, result, NULL);
+ g_clear_object (&stream);
+}
+
+static void
+message_finished (SoupMessage *msg,
GMainLoop *loop)
{
g_main_loop_quit (loop);
@@ -165,8 +177,10 @@ test1_thread (gpointer user_data)
debug_printf (1, " queue_message\n");
msg = soup_message_new ("GET", uri);
loop = g_main_loop_new (async_context, FALSE);
- g_signal_connect (msg, "finished", G_CALLBACK (test1_finished), loop);
- soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
+ g_signal_connect (msg, "finished", G_CALLBACK (message_finished), loop);
+ soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL,
+ (GAsyncReadyCallback)message_send_cb,
+ async_context);
g_main_loop_run (loop);
/* We need one more iteration, because SoupMessage::finished is emitted
* right before the message is unqueued.
@@ -203,9 +217,8 @@ do_test2 (void)
SoupSession *session;
char *uri;
SoupMessage *msg;
-
- g_test_skip ("FIXME");
- return;
+ GInputStream *stream;
+ GMainLoop *loop;
idle = g_idle_add_full (G_PRIORITY_HIGH, idle_test2_fail, NULL, NULL);
@@ -218,7 +231,24 @@ do_test2 (void)
debug_printf (1, " send_message\n");
msg = soup_message_new ("GET", uri);
-// soup_session_send_message (session, msg);
+ stream = soup_session_send (session, msg, NULL, NULL);
+ soup_test_assert_message_status (msg, SOUP_STATUS_OK);
+ g_object_unref (stream);
+ g_object_unref (msg);
+
+ debug_printf (1, " queue_message\n");
+ msg = soup_message_new ("GET", uri);
+ loop = g_main_loop_new (async_context, FALSE);
+ g_signal_connect (msg, "finished", G_CALLBACK (message_finished), loop);
+ soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL,
+ (GAsyncReadyCallback)message_send_cb,
+ async_context);
+ g_main_loop_run (loop);
+ /* We need one more iteration, because SoupMessage::finished is emitted
+ * right before the message is unqueued.
+ */
+ g_main_context_iteration (async_context, TRUE);
+ g_main_loop_unref (loop);
soup_test_assert_message_status (msg, SOUP_STATUS_OK);
g_object_unref (msg);
@@ -230,98 +260,6 @@ do_test2 (void)
g_main_context_pop_thread_default (async_context);
}
-#if 0
-static void
-request_started (SoupMessage *msg, gpointer user_data)
-{
- g_object_set_data (G_OBJECT (msg), "started", GUINT_TO_POINTER (TRUE));
-}
-
-static void
-msg1_got_headers (SoupMessage *msg, gpointer user_data)
-{
- GMainLoop *loop = user_data;
-
- g_main_loop_quit (loop);
-}
-
-static void
-multi_msg_finished (SoupSession *session, SoupMessage *msg, gpointer user_data)
-{
- GMainLoop *loop = user_data;
-
- g_object_set_data (G_OBJECT (msg), "finished", GUINT_TO_POINTER (TRUE));
- g_main_loop_quit (loop);
-}
-
-static void
-do_multicontext_test (void)
-{
- SoupSession *session;
- SoupMessage *msg1, *msg2;
- GMainContext *context1, *context2;
- GMainLoop *loop1, *loop2;
-
- session = soup_test_session_new (NULL);
-
- context1 = g_main_context_new ();
- loop1 = g_main_loop_new (context1, FALSE);
- context2 = g_main_context_new ();
- loop2 = g_main_loop_new (context2, FALSE);
-
- g_main_context_push_thread_default (context1);
- msg1 = soup_message_new ("GET", base_uri);
- g_object_ref (msg1);
- g_signal_connect (msg1, "starting", G_CALLBACK (request_started), NULL);
- soup_session_queue_message (session, msg1, multi_msg_finished, loop1);
- g_signal_connect (msg1, "got-headers",
- G_CALLBACK (msg1_got_headers), loop1);
- g_object_set_data (G_OBJECT (msg1), "session", session);
- g_main_context_pop_thread_default (context1);
-
- g_main_context_push_thread_default (context2);
- msg2 = soup_message_new ("GET", base_uri);
- g_object_ref (msg2);
- g_signal_connect (msg2, "starting", G_CALLBACK (request_started), NULL);
- soup_session_queue_message (session, msg2, multi_msg_finished, loop2);
- g_main_context_pop_thread_default (context2);
-
- g_main_context_push_thread_default (context1);
- g_main_loop_run (loop1);
- g_main_context_pop_thread_default (context1);
-
- if (!g_object_get_data (G_OBJECT (msg1), "started"))
- soup_test_assert (FALSE, "msg1 not started");
- if (g_object_get_data (G_OBJECT (msg2), "started"))
- soup_test_assert (FALSE, "msg2 started while loop1 was running");
-
- g_main_context_push_thread_default (context2);
- g_main_loop_run (loop2);
- g_main_context_pop_thread_default (context2);
-
- if (g_object_get_data (G_OBJECT (msg1), "finished"))
- soup_test_assert (FALSE, "msg1 finished while loop2 was running");
- if (!g_object_get_data (G_OBJECT (msg2), "finished"))
- soup_test_assert (FALSE, "msg2 not finished");
-
- g_main_context_push_thread_default (context1);
- g_main_loop_run (loop1);
- g_main_context_pop_thread_default (context1);
-
- if (!g_object_get_data (G_OBJECT (msg1), "finished"))
- soup_test_assert (FALSE, "msg1 not finished");
-
- g_object_unref (msg1);
- g_object_unref (msg2);
-
- soup_test_session_abort_unref (session);
-
- g_main_loop_unref (loop1);
- g_main_loop_unref (loop2);
- g_main_context_unref (context1);
- g_main_context_unref (context2);
-}
-#endif
int
main (int argc, char **argv)
{
@@ -339,9 +277,6 @@ main (int argc, char **argv)
g_test_add_func ("/context/blocking/thread-default", do_test1);
g_test_add_func ("/context/nested/thread-default", do_test2);
-#if 0
- g_test_add_func ("/context/multiple", do_multicontext_test);
-#endif
ret = g_test_run ();
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]