[libsoup] SoupSession: allow creating a "plain" SoupSession for use with new APIs



commit 98d7c7372d276eacdcf86f82b7a8a5114a83fe60
Author: Dan Winship <danw gnome org>
Date:   Sun May 27 11:42:45 2012 -0400

    SoupSession: allow creating a "plain" SoupSession for use with new APIs
    
    In gio-based APIs, async vs sync is a function-level distinction, not
    a class-level distinction. Merge most of the existing SoupSessionAsync
    and SoupSessionSync code up into SoupSession, and make SoupSession
    non-abstract, so that you can create a SoupSession and then use either
    sync or async SoupRequest-based APIs on it. (The traditional APIs
    still require one of the traditional subclasses, although the code
    reorg does affect them in some ways, such as making SoupSessionAsync
    more thread-safe.)

 libsoup/soup-message-queue.h   |    3 +-
 libsoup/soup-session-async.c   |  565 +--------------------
 libsoup/soup-session-private.h |   27 +-
 libsoup/soup-session-sync.c    |  388 +--------------
 libsoup/soup-session.c         | 1109 ++++++++++++++++++++++++++++++++++------
 tests/requester-test.c         |   63 ++-
 6 files changed, 1024 insertions(+), 1131 deletions(-)
---
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index d3341bd..dd61924 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -46,7 +46,8 @@ struct _SoupMessageQueueItem {
 	guint paused            : 1;
 	guint new_api           : 1;
 	guint io_started        : 1;
-	guint redirection_count : 29;
+	guint async             : 1;
+	guint redirection_count : 28;
 
 	SoupMessageQueueItemState state;
 
diff --git a/libsoup/soup-session-async.c b/libsoup/soup-session-async.c
index 99edf32..a24f4ba 100644
--- a/libsoup/soup-session-async.c
+++ b/libsoup/soup-session-async.c
@@ -27,48 +27,13 @@
  * single-threaded programs.
  **/
 
-static void run_queue (SoupSessionAsync *sa);
-static void do_idle_run_queue (SoupSession *session);
-
-static void send_request_running   (SoupSession *session, SoupMessageQueueItem *item);
-static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
-static void send_request_finished  (SoupSession *session, SoupMessageQueueItem *item);
-
 G_DEFINE_TYPE (SoupSessionAsync, soup_session_async, SOUP_TYPE_SESSION)
 
-typedef struct {
-	SoupSessionAsync *sa;
-	GSList *sources;
-	gboolean disposed;
-
-} SoupSessionAsyncPrivate;
-#define SOUP_SESSION_ASYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_ASYNC, SoupSessionAsyncPrivate))
-
 static void
 soup_session_async_init (SoupSessionAsync *sa)
 {
-	SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
-
-	priv->sa = sa;
 }
 
-static void
-soup_session_async_dispose (GObject *object)
-{
-	SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (object);
-	GSList *iter;
-
-	priv->disposed = TRUE;
-	for (iter = priv->sources; iter; iter = iter->next) {
-		g_source_destroy (iter->data);
-		g_source_unref (iter->data);
-	}
-	g_clear_pointer (&priv->sources, g_slist_free);
-
-	G_OBJECT_CLASS (soup_session_async_parent_class)->dispose (object);
-}
-
-
 /**
  * soup_session_async_new:
  *
@@ -106,274 +71,15 @@ soup_session_async_new_with_options (const char *optname1, ...)
 }
 
 static void
-message_completed (SoupMessage *msg, gpointer user_data)
-{
-	SoupMessageQueueItem *item = user_data;
-
-	do_idle_run_queue (item->session);
-
-	if (item->state != SOUP_MESSAGE_RESTARTING)
-		item->state = SOUP_MESSAGE_FINISHING;
-}
-
-static void
-ssl_tunnel_completed (SoupConnection *conn, guint status, gpointer user_data)
-{
-	SoupMessageQueueItem *tunnel_item = user_data;
-	SoupMessageQueueItem *item = tunnel_item->related;
-	SoupSession *session = item->session;
-
-	soup_message_finished (tunnel_item->msg);
-	soup_message_queue_item_unref (tunnel_item);
-
-	if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
-		soup_session_set_item_connection (session, item, NULL);
-		soup_message_set_status (item->msg, status);
-	}
-
-	item->state = SOUP_MESSAGE_READY;
-	do_idle_run_queue (session);
-	soup_message_queue_item_unref (item);
-}
-
-static void
-tunnel_message_completed (SoupMessage *tunnel_msg, gpointer user_data)
-{
-	SoupMessageQueueItem *tunnel_item = user_data;
-	SoupSession *session = tunnel_item->session;
-	SoupMessageQueueItem *item = tunnel_item->related;
-
-	if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) {
-		soup_message_restarted (tunnel_msg);
-		if (tunnel_item->conn) {
-			tunnel_item->state = SOUP_MESSAGE_RUNNING;
-			soup_session_send_queue_item (session, tunnel_item,
-						      tunnel_message_completed);
-			return;
-		}
-
-		soup_message_set_status (tunnel_msg, SOUP_STATUS_TRY_AGAIN);
-	}
-
-	tunnel_item->state = SOUP_MESSAGE_FINISHED;
-	soup_session_unqueue_item (session, tunnel_item);
-
-	if (SOUP_STATUS_IS_SUCCESSFUL (tunnel_msg->status_code)) {
-		soup_connection_start_ssl_async (item->conn, item->cancellable,
-						 ssl_tunnel_completed, tunnel_item);
-	} else {
-		ssl_tunnel_completed (item->conn, tunnel_msg->status_code,
-				      tunnel_item);
-	}
-}
-
-static void
-got_connection (SoupConnection *conn, guint status, gpointer user_data)
-{
-	SoupMessageQueueItem *item = user_data;
-	SoupSession *session = item->session;
-
-	if (status != SOUP_STATUS_OK) {
-		if (item->state == SOUP_MESSAGE_CONNECTING) {
-			soup_session_set_item_status (session, item, status);
-			soup_session_set_item_connection (session, item, NULL);
-			item->state = SOUP_MESSAGE_READY;
-		}
-	} else
-		item->state = SOUP_MESSAGE_CONNECTED;
-
-	run_queue ((SoupSessionAsync *)session);
-	soup_message_queue_item_unref (item);
-}
-
-static void
-process_queue_item (SoupMessageQueueItem *item,
-		    gboolean             *should_prune,
-		    gboolean              loop)
-{
-	SoupSession *session = item->session;
-
-	if (item->async_context != soup_session_get_async_context (session))
-		return;
-
-	do {
-		if (item->paused)
-			return;
-
-		switch (item->state) {
-		case SOUP_MESSAGE_STARTING:
-			if (!soup_session_get_connection (session, item, should_prune))
-				return;
-
-			if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
-				item->state = SOUP_MESSAGE_READY;
-				break;
-			}
-
-			item->state = SOUP_MESSAGE_CONNECTING;
-			soup_message_queue_item_ref (item);
-			soup_connection_connect_async (item->conn, item->cancellable,
-						       got_connection, item);
-			return;
-
-		case SOUP_MESSAGE_CONNECTED:
-			if (soup_connection_is_tunnelled (item->conn)) {
-				SoupMessageQueueItem *tunnel_item;
-
-				soup_message_queue_item_ref (item);
-
-				item->state = SOUP_MESSAGE_TUNNELING;
-
-				tunnel_item = soup_session_make_connect_message (session, item->conn);
-				tunnel_item->related = item;
-				soup_session_send_queue_item (session, tunnel_item, tunnel_message_completed);
-				return;
-			}
-
-			item->state = SOUP_MESSAGE_READY;
-			break;
-
-		case SOUP_MESSAGE_READY:
-			soup_message_set_https_status (item->msg, item->conn);
-			if (item->msg->status_code) {
-				if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) {
-					soup_message_cleanup_response (item->msg);
-					item->state = SOUP_MESSAGE_STARTING;
-				} else
-					item->state = SOUP_MESSAGE_FINISHING;
-				break;
-			}
-
-			item->state = SOUP_MESSAGE_RUNNING;
-			soup_session_send_queue_item (session, item, message_completed);
-			if (item->new_api)
-				send_request_running (session, item);
-			break;
-
-		case SOUP_MESSAGE_RESTARTING:
-			item->state = SOUP_MESSAGE_STARTING;
-			soup_message_restarted (item->msg);
-			if (item->new_api)
-				send_request_restarted (session, item);
-			break;
-
-		case SOUP_MESSAGE_FINISHING:
-			item->state = SOUP_MESSAGE_FINISHED;
-			soup_message_finished (item->msg);
-			if (item->state != SOUP_MESSAGE_FINISHED) {
-				g_return_if_fail (!item->new_api);
-				break;
-			}
-
-			soup_message_queue_item_ref (item);
-			soup_session_unqueue_item (session, item);
-			if (item->callback)
-				item->callback (session, item->msg, item->callback_data);
-			else if (item->new_api)
-				send_request_finished (session, item);
-
-			soup_message_queue_item_unref (item);
-			return;
-
-		default:
-			/* Nothing to do with this message in any
-			 * other state.
-			 */
-			return;
-		}
-	} while (loop && item->state != SOUP_MESSAGE_FINISHED);
-}
-
-static void
-run_queue (SoupSessionAsync *sa)
-{
-	SoupSession *session = SOUP_SESSION (sa);
-	SoupMessageQueue *queue = soup_session_get_queue (session);
-	SoupMessageQueueItem *item;
-	SoupMessage *msg;
-	gboolean try_pruning = TRUE, should_prune = FALSE;
-
-	g_object_ref (session);
-	soup_session_cleanup_connections (session, FALSE);
-
- try_again:
-	for (item = soup_message_queue_first (queue);
-	     item;
-	     item = soup_message_queue_next (queue, item)) {
-		msg = item->msg;
-
-		/* CONNECT messages are handled specially */
-		if (msg->method != SOUP_METHOD_CONNECT)
-			process_queue_item (item, &should_prune, TRUE);
-	}
-
-	if (try_pruning && should_prune) {
-		/* There is at least one message in the queue that
-		 * could be sent if we pruned an idle connection from
-		 * some other server.
-		 */
-		if (soup_session_cleanup_connections (session, TRUE)) {
-			try_pruning = should_prune = FALSE;
-			goto try_again;
-		}
-	}
-
-	g_object_unref (session);
-}
-
-static gboolean
-idle_run_queue (gpointer user_data)
-{
-	SoupSessionAsyncPrivate *priv = user_data;
-	GSource *source;
-
-	if (priv->disposed)
-		return FALSE;
-
-	source = g_main_current_source ();
-	priv->sources = g_slist_remove (priv->sources, source);
-
-	/* Ensure that the source is destroyed before running the queue */
-	g_source_destroy (source);
-	g_source_unref (source);
-
-	run_queue (priv->sa);
-	return FALSE;
-}
-
-static void
-do_idle_run_queue (SoupSession *session)
-{
-	SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
-	GMainContext *async_context = soup_session_get_async_context (session);
-	GSource *source;
-
-	if (priv->disposed)
-		return;
-
-	/* We use priv rather than session as the source data, because
-	 * other parts of libsoup (or the calling app) may have sources
-	 * using the session as the source data.
-	 */
-
-	source = g_main_context_find_source_by_user_data (async_context, priv);
-	if (source)
-		return;
-
-	source = soup_add_completion_reffed (async_context, idle_run_queue, priv);
-	priv->sources = g_slist_prepend (priv->sources, source);
-}
-
-static void
 soup_session_async_queue_message (SoupSession *session, SoupMessage *req,
 				  SoupSessionCallback callback, gpointer user_data)
 {
 	SoupMessageQueueItem *item;
 
-	item = soup_session_append_queue_item (session, req, callback, user_data);
+	item = soup_session_append_queue_item (session, req, TRUE, FALSE,
+					       callback, user_data);
+	soup_session_kick_queue (session);
 	soup_message_queue_item_unref (item);
-
-	do_idle_run_queue (session);
 }
 
 static guint
@@ -383,10 +89,9 @@ soup_session_async_send_message (SoupSession *session, SoupMessage *req)
 	GMainContext *async_context =
 		soup_session_get_async_context (session);
 
-	soup_session_async_queue_message (session, req, NULL, NULL);
-
-	item = soup_message_queue_lookup (soup_session_get_queue (session), req);
-	g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
+	item = soup_session_append_queue_item (session, req, TRUE, FALSE,
+					       NULL, NULL);
+	soup_session_kick_queue (session);
 
 	while (item->state != SOUP_MESSAGE_FINISHED)
 		g_main_context_iteration (async_context, TRUE);
@@ -402,7 +107,6 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg,
 {
 	SoupMessageQueue *queue;
 	SoupMessageQueueItem *item;
-	gboolean dummy;
 
 	SOUP_SESSION_CLASS (soup_session_async_parent_class)->
 		cancel_message (session, msg, status_code);
@@ -424,7 +128,7 @@ soup_session_async_cancel_message (SoupSession *session, SoupMessage *msg,
 		item->state = SOUP_MESSAGE_FINISHING;
 
 	if (item->state != SOUP_MESSAGE_FINISHED)
-		process_queue_item (item, &dummy, FALSE);
+		soup_session_process_queue_item (session, item, NULL, FALSE);
 
 	soup_message_queue_item_unref (item);
 }
@@ -463,268 +167,13 @@ soup_session_async_auth_required (SoupSession *session, SoupMessage *msg,
 }
 
 static void
-soup_session_async_kick (SoupSession *session)
-{
-	do_idle_run_queue (session);
-}
-
-
-static void
-send_request_return_result (SoupMessageQueueItem *item,
-			    gpointer stream, GError *error)
-{
-	GTask *task;
-
-	task = item->task;
-	item->task = NULL;
-
-	if (item->io_source) {
-		g_source_destroy (item->io_source);
-		g_clear_pointer (&item->io_source, g_source_unref);
-	}
-
-	if (error)
-		g_task_return_error (task, error);
-	else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
-		if (stream)
-			g_object_unref (stream);
-		g_task_return_new_error (task, SOUP_HTTP_ERROR,
-					 item->msg->status_code,
-					 "%s",
-					 item->msg->reason_phrase);
-	} else
-		g_task_return_pointer (task, stream, g_object_unref);
-	g_object_unref (task);
-}
-
-static void
-send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
-{
-	/* We won't be needing this, then. */
-	g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
-	item->io_started = FALSE;
-}
-
-static void
-send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
-{
-	GMemoryOutputStream *mostream;
-	GInputStream *istream = NULL;
-	GError *error = NULL;
-
-	if (!item->task) {
-		/* Something else already took care of it. */
-		return;
-	}
-
-	mostream = g_object_get_data (G_OBJECT (item->task), "SoupSessionAsync:ostream");
-	if (mostream) {
-		gpointer data;
-		gssize size;
-
-		/* We thought it would be requeued, but it wasn't, so
-		 * return the original body.
-		 */
-		size = g_memory_output_stream_get_data_size (mostream);
-		data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
-		istream = g_memory_input_stream_new_from_data (data, size, g_free);
-	} else if (item->io_started) {
-		/* The message finished before becoming readable. This
-		 * will happen, eg, if it's cancelled from got-headers.
-		 * Do nothing; the op will complete via read_ready_cb()
-		 * after we return;
-		 */
-		return;
-	} else {
-		/* The message finished before even being started;
-		 * probably a tunnel connect failure.
-		 */
-		istream = g_memory_input_stream_new ();
-	}
-
-	send_request_return_result (item, istream, error);
-}
-
-static void
-send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
-{
-	SoupMessageQueueItem *item = user_data;
-	GInputStream *istream = g_object_get_data (source, "istream");
-	GError *error = NULL;
-
-	/* It should be safe to call the sync close() method here since
-	 * the message body has already been written.
-	 */
-	g_input_stream_close (istream, NULL, NULL);
-	g_object_unref (istream);
-
-	/* If the message was cancelled, it will be completed via other means */
-	if (g_cancellable_is_cancelled (item->cancellable) ||
-	    !item->task) {
-		soup_message_queue_item_unref (item);
-		return;
-	}
-
-	if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
-					   result, &error) == -1) {
-		send_request_return_result (item, NULL, error);
-		soup_message_queue_item_unref (item);
-		return;
-	}
-
-	/* Otherwise either restarted or finished will eventually be called. */
-	do_idle_run_queue (item->session);
-	soup_message_queue_item_unref (item);
-}
-
-static void
-send_async_maybe_complete (SoupMessageQueueItem *item,
-			   GInputStream         *stream)
-{
-	if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
-	    item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
-	    soup_session_would_redirect (item->session, item->msg)) {
-		GOutputStream *ostream;
-
-		/* Message may be requeued, so gather the current message body... */
-		ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-		g_object_set_data_full (G_OBJECT (item->task), "SoupSessionAsync:ostream",
-					ostream, g_object_unref);
-
-		g_object_set_data (G_OBJECT (ostream), "istream", stream);
-
-		/* Give the splice op its own ref on item */
-		soup_message_queue_item_ref (item);
-		g_output_stream_splice_async (ostream, stream,
-					      /* We can't use CLOSE_SOURCE because it
-					       * might get closed in the wrong thread.
-					       */
-					      G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-					      G_PRIORITY_DEFAULT,
-					      item->cancellable,
-					      send_async_spliced, item);
-		return;
-	}
-
-	send_request_return_result (item, stream, NULL);
-}
-
-static void try_run_until_read (SoupMessageQueueItem *item);
-
-static gboolean
-read_ready_cb (SoupMessage *msg, gpointer user_data)
-{
-	SoupMessageQueueItem *item = user_data;
-
-	g_clear_pointer (&item->io_source, g_source_unref);
-	try_run_until_read (item);
-	return FALSE;
-}
-
-static void
-try_run_until_read (SoupMessageQueueItem *item)
-{
-	GError *error = NULL;
-	GInputStream *stream = NULL;
-
-	if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
-		stream = soup_message_io_get_response_istream (item->msg, &error);
-	if (stream) {
-		send_async_maybe_complete (item, stream);
-		return;
-	}
-
-	if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
-		item->state = SOUP_MESSAGE_RESTARTING;
-		soup_message_io_finished (item->msg);
-		g_error_free (error);
-		return;
-	}
-
-	if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-		if (item->state != SOUP_MESSAGE_FINISHED) {
-			gboolean dummy;
-
-			if (soup_message_io_in_progress (item->msg))
-				soup_message_io_finished (item->msg);
-			item->state = SOUP_MESSAGE_FINISHING;
-			process_queue_item (item, &dummy, FALSE);
-		}
-		send_request_return_result (item, NULL, error);
-		return;
-	}
-
-	g_clear_error (&error);
-	item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
-						      read_ready_cb, item);
-	g_source_attach (item->io_source, soup_session_get_async_context (item->session));
-}
-
-static void
-send_request_running (SoupSession *session, SoupMessageQueueItem *item)
-{
-	item->io_started = TRUE;
-	try_run_until_read (item);
-}
-
-void
-soup_session_send_request_async (SoupSession         *session,
-				 SoupMessage         *msg,
-				 GCancellable        *cancellable,
-				 GAsyncReadyCallback  callback,
-				 gpointer             user_data)
-{
-	SoupMessageQueueItem *item;
-	gboolean use_thread_context;
-
-	g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
-
-	g_object_get (G_OBJECT (session),
-		      SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
-		      NULL);
-	g_return_if_fail (use_thread_context);
-
-	soup_session_async_queue_message (session, msg, NULL, NULL);
-
-	item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
-	g_return_if_fail (item != NULL);
-
-	item->new_api = TRUE;
-	item->task = g_task_new (session, cancellable, callback, user_data);
-	g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref);
-
-	if (cancellable) {
-		g_object_unref (item->cancellable);
-		item->cancellable = g_object_ref (cancellable);
-	}
-}
-
-GInputStream *
-soup_session_send_request_finish (SoupSession   *session,
-				  GAsyncResult  *result,
-				  GError       **error)
-{
-	g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
-	g_return_val_if_fail (g_task_is_valid (result, session), NULL);
-
-	return g_task_propagate_pointer (G_TASK (result), error);
-}
-
-static void
 soup_session_async_class_init (SoupSessionAsyncClass *soup_session_async_class)
 {
 	SoupSessionClass *session_class = SOUP_SESSION_CLASS (soup_session_async_class);
-	GObjectClass *object_class = G_OBJECT_CLASS (session_class);
-
-	g_type_class_add_private (soup_session_async_class,
-				  sizeof (SoupSessionAsyncPrivate));
 
 	/* virtual method override */
 	session_class->queue_message = soup_session_async_queue_message;
 	session_class->send_message = soup_session_async_send_message;
 	session_class->cancel_message = soup_session_async_cancel_message;
 	session_class->auth_required = soup_session_async_auth_required;
-	session_class->kick = soup_session_async_kick;
-
-	object_class->dispose = soup_session_async_dispose;
 }
diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h
index 297faf5..dc4d300 100644
--- a/libsoup/soup-session-private.h
+++ b/libsoup/soup-session-private.h
@@ -17,26 +17,12 @@ SoupMessageQueue     *soup_session_get_queue            (SoupSession          *s
 
 SoupMessageQueueItem *soup_session_append_queue_item    (SoupSession          *session,
 							 SoupMessage          *msg,
+							 gboolean              async,
+							 gboolean              new_api,
 							 SoupSessionCallback   callback,
 							 gpointer              user_data);
-SoupMessageQueueItem *soup_session_make_connect_message (SoupSession          *session,
-							 SoupConnection       *conn);
-gboolean              soup_session_get_connection       (SoupSession          *session,
-							 SoupMessageQueueItem *item,
-							 gboolean             *try_pruning);
-gboolean              soup_session_cleanup_connections  (SoupSession          *session,
-							 gboolean              prune_idle);
-void                  soup_session_send_queue_item      (SoupSession          *session,
-							 SoupMessageQueueItem *item,
-							 SoupMessageCompletionFn completion_cb);
-void                  soup_session_unqueue_item         (SoupSession          *session,
-							 SoupMessageQueueItem *item);
-void                  soup_session_set_item_connection  (SoupSession          *session,
-							 SoupMessageQueueItem *item,
-							 SoupConnection       *conn);
-void                  soup_session_set_item_status      (SoupSession          *session,
-							 SoupMessageQueueItem *item,
-							 guint                 status_code);
+
+void                  soup_session_kick_queue           (SoupSession          *session);
 
 GInputStream         *soup_session_send_request         (SoupSession          *session,
 							 SoupMessage          *msg,
@@ -52,6 +38,11 @@ GInputStream         *soup_session_send_request_finish  (SoupSession          *s
 							 GAsyncResult         *result,
 							 GError              **error);
 
+void                  soup_session_process_queue_item   (SoupSession          *session,
+							 SoupMessageQueueItem *item,
+							 gboolean             *should_prune,
+							 gboolean              loop);
+
 G_END_DECLS
 
 #endif /* SOUP_SESSION_PRIVATE_H */
diff --git a/libsoup/soup-session-sync.c b/libsoup/soup-session-sync.c
index 43d0a49..cbd2460 100644
--- a/libsoup/soup-session-sync.c
+++ b/libsoup/soup-session-sync.c
@@ -42,35 +42,13 @@
  * handler callbacks, until I/O is complete.
  **/
 
-typedef struct {
-	GMutex lock;
-	GCond cond;
-} SoupSessionSyncPrivate;
-#define SOUP_SESSION_SYNC_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_SESSION_SYNC, SoupSessionSyncPrivate))
-
 G_DEFINE_TYPE (SoupSessionSync, soup_session_sync, SOUP_TYPE_SESSION)
 
 static void
 soup_session_sync_init (SoupSessionSync *ss)
 {
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (ss);
-
-	g_mutex_init (&priv->lock);
-	g_cond_init (&priv->cond);
-}
-
-static void
-soup_session_sync_finalize (GObject *object)
-{
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (object);
-
-	g_mutex_clear (&priv->lock);
-	g_cond_clear (&priv->cond);
-
-	G_OBJECT_CLASS (soup_session_sync_parent_class)->finalize (object);
 }
 
-
 /**
  * soup_session_sync_new:
  *
@@ -107,181 +85,6 @@ soup_session_sync_new_with_options (const char *optname1, ...)
 	return session;
 }
 
-static guint
-tunnel_connect (SoupSession *session, SoupMessageQueueItem *related)
-{
-	SoupConnection *conn = related->conn;
-	SoupMessageQueueItem *item;
-	guint status;
-
-	g_object_ref (conn);
-
-	item = soup_session_make_connect_message (session, conn);
-	do {
-		soup_session_send_queue_item (session, item, NULL);
-		status = item->msg->status_code;
-		if (item->state == SOUP_MESSAGE_RESTARTING &&
-		    soup_message_io_in_progress (item->msg)) {
-			soup_message_restarted (item->msg);
-			item->state = SOUP_MESSAGE_RUNNING;
-		} else {
-			if (item->state == SOUP_MESSAGE_RESTARTING)
-				status = SOUP_STATUS_TRY_AGAIN;
-			item->state = SOUP_MESSAGE_FINISHED;
-			soup_message_finished (item->msg);
-		}
-	} while (item->state == SOUP_MESSAGE_STARTING);
-	soup_session_unqueue_item (session, item);
-	soup_message_queue_item_unref (item);
-
-	if (SOUP_STATUS_IS_SUCCESSFUL (status)) {
-		if (!soup_connection_start_ssl_sync (conn, related->cancellable))
-			status = SOUP_STATUS_SSL_FAILED;
-		soup_message_set_https_status (related->msg, conn);
-	}
-
-	g_object_unref (conn);
-	return status;
-}
-
-static void
-get_connection (SoupMessageQueueItem *item)
-{
-	SoupSession *session = item->session;
-	SoupMessage *msg = item->msg;
-	gboolean try_pruning = FALSE;
-	guint status;
-
-try_again:
-	soup_session_cleanup_connections (session, FALSE);
-
-	if (!soup_session_get_connection (session, item, &try_pruning)) {
-		if (!try_pruning)
-			return;
-		soup_session_cleanup_connections (session, TRUE);
-		if (!soup_session_get_connection (session, item, &try_pruning))
-			return;
-		try_pruning = FALSE;
-	}
-
-	if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IDLE) {
-		item->state = SOUP_MESSAGE_READY;
-		return;
-	}
-
-	if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_NEW) {
-		status = soup_connection_connect_sync (item->conn, item->cancellable);
-		if (status == SOUP_STATUS_TRY_AGAIN) {
-			soup_session_set_item_connection (session, item, NULL);
-			goto try_again;
-		}
-
-		soup_message_set_https_status (msg, item->conn);
-
-		if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
-			if (!msg->status_code)
-				soup_session_set_item_status (session, item, status);
-			item->state = SOUP_MESSAGE_FINISHING;
-			soup_session_set_item_connection (session, item, NULL);
-			return;
-		}
-	}
-
-	if (soup_connection_is_tunnelled (item->conn)) {
-		status = tunnel_connect (session, item);
-		if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
-			soup_session_set_item_connection (session, item, NULL);
-			if (status == SOUP_STATUS_TRY_AGAIN)
-				goto try_again;
-			soup_session_set_item_status (session, item, status);
-			item->state = SOUP_MESSAGE_FINISHING;
-			return;
-		}
-	}
-
-	item->state = SOUP_MESSAGE_READY;
-}
-
-static void process_queue_item (SoupMessageQueueItem *item);
-
-static void
-new_api_message_completed (SoupMessage *msg, gpointer user_data)
-{
-	SoupMessageQueueItem *item = user_data;
-
-	if (item->state != SOUP_MESSAGE_RESTARTING) {
-		item->state = SOUP_MESSAGE_FINISHING;
-		process_queue_item (item);
-	}
-}
-
-static void
-process_queue_item (SoupMessageQueueItem *item)
-{
-	SoupSession *session = item->session;
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
-	soup_message_queue_item_ref (item);
-
-	do {
-		if (item->paused) {
-			g_mutex_lock (&priv->lock);
-			while (item->paused)
-				g_cond_wait (&priv->cond, &priv->lock);
-			g_mutex_unlock (&priv->lock);
-		}
-
-		switch (item->state) {
-		case SOUP_MESSAGE_STARTING:
-			g_mutex_lock (&priv->lock);
-			do {
-				get_connection (item);
-				if (item->state == SOUP_MESSAGE_STARTING)
-					g_cond_wait (&priv->cond, &priv->lock);
-			} while (item->state == SOUP_MESSAGE_STARTING);
-			g_mutex_unlock (&priv->lock);
-			break;
-
-		case SOUP_MESSAGE_READY:
-			item->state = SOUP_MESSAGE_RUNNING;
-
-			if (item->new_api) {
-				soup_session_send_queue_item (item->session, item, new_api_message_completed);
-				goto out;
-			}
-
-			soup_session_send_queue_item (item->session, item, NULL);
-			if (item->state != SOUP_MESSAGE_RESTARTING)
-				item->state = SOUP_MESSAGE_FINISHING;
-			break;
-
-		case SOUP_MESSAGE_RUNNING:
-			g_warn_if_fail (item->new_api);
-			item->state = SOUP_MESSAGE_FINISHING;
-			break;
-
-		case SOUP_MESSAGE_RESTARTING:
-			item->state = SOUP_MESSAGE_STARTING;
-			soup_message_restarted (item->msg);
-			break;
-
-		case SOUP_MESSAGE_FINISHING:
-			item->state = SOUP_MESSAGE_FINISHED;
-			soup_message_finished (item->msg);
-			soup_session_unqueue_item (session, item);
-			break;
-
-		default:
-			g_warn_if_reached ();
-			item->state = SOUP_MESSAGE_FINISHING;
-			break;
-		}
-	} while (item->state != SOUP_MESSAGE_FINISHED);
-
- out:
-	soup_message_queue_item_unref (item);
-}
-
 static gboolean
 queue_message_callback (gpointer data)
 {
@@ -297,7 +100,7 @@ queue_message_thread (gpointer data)
 {
 	SoupMessageQueueItem *item = data;
 
-	process_queue_item (item);
+	soup_session_process_queue_item (item->session, item, NULL, TRUE);
 	if (item->callback) {
 		soup_add_completion (soup_session_get_async_context (item->session),
 				     queue_message_callback, item);
@@ -314,7 +117,8 @@ soup_session_sync_queue_message (SoupSession *session, SoupMessage *msg,
 	SoupMessageQueueItem *item;
 	GThread *thread;
 
-	item = soup_session_append_queue_item (session, msg, callback, user_data);
+	item = soup_session_append_queue_item (session, msg, FALSE, FALSE,
+					       callback, user_data);
 	thread = g_thread_new ("SoupSessionSync:queue_message",
 			       queue_message_thread, item);
 	g_thread_unref (thread);
@@ -326,25 +130,15 @@ soup_session_sync_send_message (SoupSession *session, SoupMessage *msg)
 	SoupMessageQueueItem *item;
 	guint status;
 
-	item = soup_session_append_queue_item (session, msg, NULL, NULL);
-	process_queue_item (item);
+	item = soup_session_append_queue_item (session, msg, FALSE, FALSE,
+					       NULL, NULL);
+	soup_session_process_queue_item (session, item, NULL, TRUE);
 	status = msg->status_code;
 	soup_message_queue_item_unref (item);
 	return status;
 }
 
 static void
-soup_session_sync_cancel_message (SoupSession *session, SoupMessage *msg, guint status_code)
-{
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
-	g_mutex_lock (&priv->lock);
-	SOUP_SESSION_CLASS (soup_session_sync_parent_class)->cancel_message (session, msg, status_code);
-	g_cond_broadcast (&priv->cond);
-	g_mutex_unlock (&priv->lock);
-}
-
-static void
 soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg,
 				 SoupAuth *auth, gboolean retrying)
 {
@@ -363,182 +157,12 @@ soup_session_sync_auth_required (SoupSession *session, SoupMessage *msg,
 }
 
 static void
-soup_session_sync_flush_queue (SoupSession *session)
-{
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-	SoupMessageQueue *queue;
-	SoupMessageQueueItem *item;
-	GHashTable *current;
-	gboolean done = FALSE;
-
-	/* Record the current contents of the queue */
-	current = g_hash_table_new (NULL, NULL);
-	queue = soup_session_get_queue (session);
-	for (item = soup_message_queue_first (queue);
-	     item;
-	     item = soup_message_queue_next (queue, item))
-		g_hash_table_insert (current, item, item);
-
-	/* Cancel everything */
-	SOUP_SESSION_CLASS (soup_session_sync_parent_class)->flush_queue (session);
-
-	/* Wait until all of the items in @current have been removed
-	 * from the queue. (This is not the same as "wait for the
-	 * queue to be empty", because the app may queue new requests
-	 * in response to the cancellation of the old ones. We don't
-	 * try to cancel those requests as well, since we'd likely
-	 * just end up looping forever.)
-	 */
-	g_mutex_lock (&priv->lock);
-	do {
-		done = TRUE;
-		for (item = soup_message_queue_first (queue);
-		     item;
-		     item = soup_message_queue_next (queue, item)) {
-			if (g_hash_table_lookup (current, item))
-				done = FALSE;
-		}
-
-		if (!done)
-			g_cond_wait (&priv->cond, &priv->lock);
-	} while (!done);
-	g_mutex_unlock (&priv->lock);
-
-	g_hash_table_destroy (current);
-}
-
-static void
-soup_session_sync_kick (SoupSession *session)
-{
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
-
-	g_cond_broadcast (&priv->cond);
-}
-
-static void
 soup_session_sync_class_init (SoupSessionSyncClass *session_sync_class)
 {
-	GObjectClass *object_class = G_OBJECT_CLASS (session_sync_class);
 	SoupSessionClass *session_class = SOUP_SESSION_CLASS (session_sync_class);
 
-	g_type_class_add_private (session_sync_class, sizeof (SoupSessionSyncPrivate));
-
 	/* virtual method override */
 	session_class->queue_message = soup_session_sync_queue_message;
 	session_class->send_message = soup_session_sync_send_message;
-	session_class->cancel_message = soup_session_sync_cancel_message;
 	session_class->auth_required = soup_session_sync_auth_required;
-	session_class->flush_queue = soup_session_sync_flush_queue;
-	session_class->kick = soup_session_sync_kick;
-
-	object_class->finalize = soup_session_sync_finalize;
-}
-
-
-GInputStream *
-soup_session_send_request (SoupSession   *session,
-			   SoupMessage   *msg,
-			   GCancellable  *cancellable,
-			   GError       **error)
-{
-	SoupMessageQueueItem *item;
-	GInputStream *stream = NULL;
-	GOutputStream *ostream;
-	GMemoryOutputStream *mostream;
-	gssize size;
-	GError *my_error = NULL;
-
-	g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
-
-	item = soup_session_append_queue_item (session, msg, NULL, NULL);
-
-	item->new_api = TRUE;
-	if (cancellable) {
-		g_object_unref (item->cancellable);
-		item->cancellable = g_object_ref (cancellable);
-	}
-
-	while (!stream) {
-		/* Get a connection, etc */
-		process_queue_item (item);
-		if (item->state != SOUP_MESSAGE_RUNNING)
-			break;
-
-		/* Send request, read headers */
-		if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
-			if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
-				item->state = SOUP_MESSAGE_RESTARTING;
-				soup_message_io_finished (item->msg);
-				g_clear_error (&my_error);
-				continue;
-			} else
-				break;
-		}
-
-		stream = soup_message_io_get_response_istream (msg, &my_error);
-		if (!stream)
-			break;
-
-		/* Break if the message doesn't look likely-to-be-requeued */
-		if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
-		    msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
-		    !soup_session_would_redirect (session, msg))
-			break;
-
-		/* Gather the current message body... */
-		ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
-		if (g_output_stream_splice (ostream, stream,
-					    G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
-					    G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-					    item->cancellable, &my_error) == -1) {
-			g_object_unref (stream);
-			g_object_unref (ostream);
-			stream = NULL;
-			break;
-		}
-		g_object_unref (stream);
-		stream = NULL;
-
-		/* If the message was requeued, loop */
-		if (item->state == SOUP_MESSAGE_RESTARTING) {
-			g_object_unref (ostream);
-			continue;
-		}
-
-		/* Not requeued, so return the original body */
-		mostream = G_MEMORY_OUTPUT_STREAM (ostream);
-		size = g_memory_output_stream_get_data_size (mostream);
-		stream = g_memory_input_stream_new ();
-		if (size) {
-			g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
-							g_memory_output_stream_steal_data (mostream),
-							size, g_free);
-		}
-		g_object_unref (ostream);
-	}
-
-	if (my_error)
-		g_propagate_error (error, my_error);
-	else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
-		if (stream) {
-			g_object_unref (stream);
-			stream = NULL;
-		}
-		g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
-				     msg->reason_phrase);
-	} else if (!stream)
-		stream = g_memory_input_stream_new ();
-
-	if (!stream) {
-		if (soup_message_io_in_progress (msg))
-			soup_message_io_finished (msg);
-		else if (item->state != SOUP_MESSAGE_FINISHED)
-			item->state = SOUP_MESSAGE_FINISHING;
-
-		if (item->state != SOUP_MESSAGE_FINISHED)
-			process_queue_item (item);
-	}
-
-	soup_message_queue_item_unref (item);
-	return stream;
 }
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 3eecd06..f5d93d4 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -77,6 +77,8 @@ static guint soup_host_uri_hash (gconstpointer key);
 static gboolean soup_host_uri_equal (gconstpointer v1, gconstpointer v2);
 
 typedef struct {
+	SoupSession *session;
+
 	GTlsDatabase *tlsdb;
 	char *ssl_ca_file;
 	gboolean ssl_strict;
@@ -100,12 +102,15 @@ typedef struct {
 	 * SoupSessionHost, adding/removing a connection,
 	 * disconnecting a connection, or moving a connection from
 	 * IDLE to IN_USE. Must not emit signals or destroy objects
-	 * while holding it.
+	 * while holding it. conn_cond is signaled when it may be
+	 * possible for a previously-blocked message to continue.
 	 */
 	GMutex conn_lock;
+	GCond conn_cond;
 
 	GMainContext *async_context;
 	gboolean use_thread_context;
+	GSList *run_queue_sources;
 
 	GResolver *resolver;
 
@@ -126,6 +131,10 @@ static void auth_manager_authenticate (SoupAuthManager *manager,
 				       SoupMessage *msg, SoupAuth *auth,
 				       gboolean retrying, gpointer user_data);
 
+static void async_run_queue (SoupSession *session);
+
+static void async_send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+
 #define SOUP_SESSION_MAX_CONNS_DEFAULT 10
 #define SOUP_SESSION_MAX_CONNS_PER_HOST_DEFAULT 2
 
@@ -133,9 +142,9 @@ static void auth_manager_authenticate (SoupAuthManager *manager,
 
 #define SOUP_SESSION_USER_AGENT_BASE "libsoup/" PACKAGE_VERSION
 
-G_DEFINE_ABSTRACT_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT,
-				  soup_init ();
-				  )
+G_DEFINE_TYPE_WITH_CODE (SoupSession, soup_session, G_TYPE_OBJECT,
+			 soup_init ();
+			 )
 
 enum {
 	REQUEST_QUEUED,
@@ -182,9 +191,12 @@ soup_session_init (SoupSession *session)
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 	SoupAuthManager *auth_manager;
 
+	priv->session = session;
+
 	priv->queue = soup_message_queue_new (session);
 
 	g_mutex_init (&priv->conn_lock);
+	g_cond_init (&priv->conn_cond);
 	priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash,
 						  soup_host_uri_equal,
 						  NULL, (GDestroyNotify)free_host);
@@ -225,6 +237,15 @@ soup_session_dispose (GObject *object)
 {
 	SoupSession *session = SOUP_SESSION (object);
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	GSList *iter;
+
+	priv->disposed = TRUE;
+
+	for (iter = priv->run_queue_sources; iter; iter = iter->next) {
+		g_source_destroy (iter->data);
+		g_source_unref (iter->data);
+	}
+	g_clear_pointer (&priv->run_queue_sources, g_slist_free);
 
 	priv->disposed = TRUE;
 	soup_session_abort (session);
@@ -245,6 +266,7 @@ soup_session_finalize (GObject *object)
 	soup_message_queue_destroy (priv->queue);
 
 	g_mutex_clear (&priv->conn_lock);
+	g_cond_clear (&priv->conn_cond);
 	g_hash_table_destroy (priv->http_hosts);
 	g_hash_table_destroy (priv->https_hosts);
 	g_hash_table_destroy (priv->conns);
@@ -826,10 +848,7 @@ get_host_for_uri (SoupSession *session, SoupURI *uri)
 	return host;
 }
 
-/* Note: get_host_for_message doesn't lock the conn_lock. The caller
- * must do it itself if there's a chance the host doesn't already
- * exist.
- */
+/* Requires conn_lock to be locked */
 static SoupSessionHost *
 get_host_for_message (SoupSession *session, SoupMessage *msg)
 {
@@ -1031,6 +1050,36 @@ redirect_handler (SoupMessage *msg, gpointer user_data)
 }
 
 static void
+proxy_connection_event (SoupConnection      *conn,
+			GSocketClientEvent   event,
+			GIOStream           *connection,
+			gpointer             user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+
+	soup_message_network_event (item->msg, event, connection);
+}
+
+static void
+soup_session_set_item_connection (SoupSession          *session,
+				  SoupMessageQueueItem *item,
+				  SoupConnection       *conn)
+{
+	if (item->conn) {
+		g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item);
+		g_object_unref (item->conn);
+	}
+
+	item->conn = conn;
+
+	if (item->conn) {
+		g_object_ref (item->conn);
+		g_signal_connect (item->conn, "event",
+				  G_CALLBACK (proxy_connection_event), item);
+	}
+}
+
+static void
 message_restarted (SoupMessage *msg, gpointer user_data)
 {
 	SoupMessageQueueItem *item = user_data;
@@ -1048,6 +1097,7 @@ message_restarted (SoupMessage *msg, gpointer user_data)
 
 SoupMessageQueueItem *
 soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
+				gboolean async, gboolean new_api,
 				SoupSessionCallback callback, gpointer user_data)
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
@@ -1057,6 +1107,8 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
 	soup_message_cleanup_response (msg);
 
 	item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+	item->async = async;
+	item->new_api = new_api;
 
 	g_mutex_lock (&priv->conn_lock);
 	host = get_host_for_message (session, item->msg);
@@ -1077,7 +1129,7 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
 	return item;
 }
 
-void
+static void
 soup_session_send_queue_item (SoupSession *session,
 			      SoupMessageQueueItem *item,
 			      SoupMessageCompletionFn completion_cb)
@@ -1116,9 +1168,9 @@ soup_session_send_queue_item (SoupSession *session,
 		soup_connection_send_request (item->conn, item, completion_cb, item);
 }
 
-gboolean
+static gboolean
 soup_session_cleanup_connections (SoupSession *session,
-				  gboolean     prune_idle)
+				  gboolean     cleanup_idle)
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 	GSList *conns = NULL, *c;
@@ -1131,7 +1183,7 @@ soup_session_cleanup_connections (SoupSession *session,
 	while (g_hash_table_iter_next (&iter, &conn, &host)) {
 		state = soup_connection_get_state (conn);
 		if (state == SOUP_CONNECTION_REMOTE_DISCONNECTED ||
-		    (prune_idle && state == SOUP_CONNECTION_IDLE)) {
+		    (cleanup_idle && state == SOUP_CONNECTION_IDLE)) {
 			conns = g_slist_prepend (conns, g_object_ref (conn));
 			g_hash_table_iter_remove (&iter);
 			drop_connection (session, host, conn);
@@ -1233,7 +1285,7 @@ connection_disconnected (SoupConnection *conn, gpointer user_data)
 
 	g_mutex_unlock (&priv->conn_lock);
 
-	SOUP_SESSION_GET_CLASS (session)->kick (session);
+	soup_session_kick_queue (session);
 }
 
 static void
@@ -1243,88 +1295,281 @@ connection_state_changed (GObject *object, GParamSpec *param, gpointer user_data
 	SoupConnection *conn = SOUP_CONNECTION (object);
 
 	if (soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE)
-		SOUP_SESSION_GET_CLASS (session)->kick (session);
+		soup_session_kick_queue (session);
 }
 
-SoupMessageQueueItem *
-soup_session_make_connect_message (SoupSession    *session,
-				   SoupConnection *conn)
+SoupMessageQueue *
+soup_session_get_queue (SoupSession *session)
+{
+	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+
+	return priv->queue;
+}
+
+static void
+soup_session_unqueue_item (SoupSession          *session,
+			   SoupMessageQueueItem *item)
+{
+	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	SoupSessionHost *host;
+
+	if (item->conn) {
+		if (item->msg->method != SOUP_METHOD_CONNECT ||
+		    !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
+			soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
+		soup_session_set_item_connection (session, item, NULL);
+	}
+
+	if (item->state != SOUP_MESSAGE_FINISHED) {
+		g_warning ("finished an item with state %d", item->state);
+		return;
+	}
+
+	soup_message_queue_remove (priv->queue, item);
+
+	g_mutex_lock (&priv->conn_lock);
+	host = get_host_for_message (session, item->msg);
+	host->num_messages--;
+	g_mutex_unlock (&priv->conn_lock);
+
+	/* g_signal_handlers_disconnect_by_func doesn't work if you
+	 * have a metamarshal, meaning it doesn't work with
+	 * soup_message_add_header_handler()
+	 */
+	g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
+					      0, 0, NULL, NULL, item);
+	g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg);
+	soup_message_queue_item_unref (item);
+}
+
+static void
+soup_session_set_item_status (SoupSession          *session,
+			      SoupMessageQueueItem *item,
+			      guint                 status_code)
+{
+	SoupURI *uri;
+	char *msg;
+
+	switch (status_code) {
+	case SOUP_STATUS_CANT_RESOLVE:
+	case SOUP_STATUS_CANT_CONNECT:
+		uri = soup_message_get_uri (item->msg);
+		msg = g_strdup_printf ("%s (%s)",
+				       soup_status_get_phrase (status_code),
+				       uri->host);
+		soup_message_set_status_full (item->msg, status_code, msg);
+		g_free (msg);
+		break;
+
+	case SOUP_STATUS_CANT_RESOLVE_PROXY:
+	case SOUP_STATUS_CANT_CONNECT_PROXY:
+		if (item->proxy_uri && item->proxy_uri->host) {
+			msg = g_strdup_printf ("%s (%s)",
+					       soup_status_get_phrase (status_code),
+					       item->proxy_uri->host);
+			soup_message_set_status_full (item->msg, status_code, msg);
+			g_free (msg);
+			break;
+		}
+		soup_message_set_status (item->msg, status_code);
+		break;
+
+	case SOUP_STATUS_SSL_FAILED:
+		if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) {
+			soup_message_set_status_full (item->msg, status_code,
+						      "TLS/SSL support not available; install glib-networking");
+		} else
+			soup_message_set_status (item->msg, status_code);
+		break;
+
+	default:
+		soup_message_set_status (item->msg, status_code);
+		break;
+	}
+}
+
+
+static void
+message_completed (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+
+	if (item->async)
+		soup_session_kick_queue (item->session);
+
+	if (item->state != SOUP_MESSAGE_RESTARTING) {
+		item->state = SOUP_MESSAGE_FINISHING;
+
+		if (item->new_api && !item->async)
+			soup_session_process_queue_item (item->session, item, NULL, TRUE);
+	}
+}
+
+static void
+tunnel_complete (SoupConnection *conn, guint status, gpointer user_data)
+{
+	SoupMessageQueueItem *tunnel_item = user_data;
+	SoupMessageQueueItem *item = tunnel_item->related;
+	SoupSession *session = tunnel_item->session;
+
+	soup_message_finished (tunnel_item->msg);
+	soup_message_queue_item_unref (tunnel_item);
+
+	if (item->msg->status_code)
+		item->state = SOUP_MESSAGE_FINISHING;
+	else
+		soup_message_set_https_status (item->msg, item->conn);
+
+	if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
+		soup_session_set_item_connection (session, item, NULL);
+		soup_session_set_item_status (session, item, status);
+	}
+
+	item->state = SOUP_MESSAGE_READY;
+	if (item->async)
+		soup_session_kick_queue (session);
+	soup_message_queue_item_unref (item);
+}
+
+static void
+tunnel_message_completed (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *tunnel_item = user_data;
+	SoupMessageQueueItem *item = tunnel_item->related;
+	SoupSession *session = tunnel_item->session;
+	guint status;
+
+	if (tunnel_item->state == SOUP_MESSAGE_RESTARTING) {
+		soup_message_restarted (msg);
+		if (tunnel_item->conn) {
+			tunnel_item->state = SOUP_MESSAGE_RUNNING;
+			soup_session_send_queue_item (session, tunnel_item,
+						      tunnel_message_completed);
+			return;
+		}
+
+		soup_message_set_status (msg, SOUP_STATUS_TRY_AGAIN);
+	}
+
+	tunnel_item->state = SOUP_MESSAGE_FINISHED;
+	soup_session_unqueue_item (session, tunnel_item);
+
+	status = tunnel_item->msg->status_code;
+	if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
+		tunnel_complete (item->conn, status, tunnel_item);
+		return;
+	}
+
+	if (tunnel_item->async) {
+		soup_connection_start_ssl_async (item->conn, item->cancellable,
+						 tunnel_complete, tunnel_item);
+	} else {
+		status = soup_connection_start_ssl_sync (item->conn, item->cancellable);
+		tunnel_complete (item->conn, status, tunnel_item);
+	}
+}
+
+static void
+tunnel_connect (SoupMessageQueueItem *item)
 {
+	SoupSession *session = item->session;
+	SoupMessageQueueItem *tunnel_item;
 	SoupURI *uri;
 	SoupMessage *msg;
-	SoupMessageQueueItem *item;
 
-	uri = soup_connection_get_remote_uri (conn);
+	item->state = SOUP_MESSAGE_TUNNELING;
+
+	uri = soup_connection_get_remote_uri (item->conn);
 	msg = soup_message_new_from_uri (SOUP_METHOD_CONNECT, uri);
 	soup_message_set_flags (msg, SOUP_MESSAGE_NO_REDIRECT);
 
-	item = soup_session_append_queue_item (session, msg, NULL, NULL);
-	soup_session_set_item_connection (session, item, conn);
+	tunnel_item = soup_session_append_queue_item (session, msg,
+						      item->async, FALSE,
+						      NULL, NULL);
 	g_object_unref (msg);
-	item->state = SOUP_MESSAGE_RUNNING;
+	tunnel_item->related = item;
+	soup_message_queue_item_ref (item);
+	soup_session_set_item_connection (session, tunnel_item, item->conn);
+	tunnel_item->state = SOUP_MESSAGE_RUNNING;
 
-	g_signal_emit (session, signals[TUNNELING], 0, conn);
-	return item;
+	g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn);
+
+	soup_session_send_queue_item (session, tunnel_item,
+				      tunnel_message_completed);
 }
 
-gboolean
-soup_session_get_connection (SoupSession *session,
-			     SoupMessageQueueItem *item,
-			     gboolean *try_pruning)
+static void
+got_connection (SoupConnection *conn, guint status, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+	SoupSession *session = item->session;
+
+	if (status != SOUP_STATUS_OK) {
+		if (item->state == SOUP_MESSAGE_CONNECTING) {
+			soup_session_set_item_status (session, item, status);
+			soup_session_set_item_connection (session, item, NULL);
+			item->state = SOUP_MESSAGE_READY;
+		}
+	} else
+		item->state = SOUP_MESSAGE_CONNECTED;
+
+	if (item->async) {
+		if (item->state == SOUP_MESSAGE_CONNECTED ||
+		    item->state == SOUP_MESSAGE_READY)
+			async_run_queue (item->session);
+		else
+			soup_session_kick_queue (item->session);
+
+		soup_message_queue_item_unref (item);
+	}
+}
+
+/* requires conn_lock */
+static SoupConnection *
+get_connection_for_host (SoupSession *session,
+			 SoupMessageQueueItem *item,
+			 SoupSessionHost *host,
+			 gboolean need_new_connection,
+			 gboolean *try_cleanup)
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 	SoupConnection *conn;
-	SoupSessionHost *host;
 	GSList *conns;
 	int num_pending = 0;
-	gboolean need_new_connection;
 
 	if (priv->disposed)
 		return FALSE;
 
 	if (item->conn) {
 		g_return_val_if_fail (soup_connection_get_state (item->conn) != SOUP_CONNECTION_DISCONNECTED, FALSE);
-		return TRUE;
+		return item->conn;
 	}
 
-	need_new_connection =
-		(soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) ||
-		(!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) &&
-		 !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method));
-
-	g_mutex_lock (&priv->conn_lock);
-
-	host = get_host_for_message (session, item->msg);
 	for (conns = host->connections; conns; conns = conns->next) {
-		if (!need_new_connection && soup_connection_get_state (conns->data) == SOUP_CONNECTION_IDLE) {
-			soup_connection_set_state (conns->data, SOUP_CONNECTION_IN_USE);
-			g_mutex_unlock (&priv->conn_lock);
-			soup_session_set_item_connection (session, item, conns->data);
-			soup_message_set_https_status (item->msg, item->conn);
-			return TRUE;
-		} else if (soup_connection_get_state (conns->data) == SOUP_CONNECTION_CONNECTING)
+		conn = conns->data;
+
+		if (!need_new_connection && soup_connection_get_state (conn) == SOUP_CONNECTION_IDLE) {
+			soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE);
+			return conn;
+		} else if (soup_connection_get_state (conn) == SOUP_CONNECTION_CONNECTING)
 			num_pending++;
 	}
 
 	/* Limit the number of pending connections; num_messages / 2
 	 * is somewhat arbitrary...
 	 */
-	if (num_pending > host->num_messages / 2) {
-		g_mutex_unlock (&priv->conn_lock);
-		return FALSE;
-	}
+	if (num_pending > host->num_messages / 2)
+		return NULL;
 
 	if (host->num_conns >= priv->max_conns_per_host) {
 		if (need_new_connection)
-			*try_pruning = TRUE;
-		g_mutex_unlock (&priv->conn_lock);
-		return FALSE;
+			*try_cleanup = TRUE;
+		return NULL;
 	}
 
 	if (priv->num_conns >= priv->max_conns) {
-		*try_pruning = TRUE;
-		g_mutex_unlock (&priv->conn_lock);
-		return FALSE;
+		*try_cleanup = TRUE;
+		return NULL;
 	}
 
 	conn = g_object_new (
@@ -1347,6 +1592,10 @@ soup_session_get_connection (SoupSession *session,
 			  G_CALLBACK (connection_state_changed),
 			  session);
 
+	/* This is a debugging-related signal, and so can ignore the
+	 * usual rule about not emitting signals while holding
+	 * conn_lock.
+	 */
 	g_signal_emit (session, signals[CONNECTION_CREATED], 0, conn);
 
 	g_hash_table_insert (priv->conns, conn, host);
@@ -1361,129 +1610,220 @@ soup_session_get_connection (SoupSession *session,
 		host->keep_alive_src = NULL;
 	}
 
-	g_mutex_unlock (&priv->conn_lock);
-	soup_session_set_item_connection (session, item, conn);
-	return TRUE;
+	return conn;
 }
 
-SoupMessageQueue *
-soup_session_get_queue (SoupSession *session)
+static gboolean
+get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
 {
+	SoupSession *session = item->session;
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	SoupSessionHost *host;
+	SoupConnection *conn = NULL;
+	gboolean my_should_cleanup = FALSE;
+	gboolean need_new_connection;
 
-	return priv->queue;
+	need_new_connection =
+		(soup_message_get_flags (item->msg) & SOUP_MESSAGE_NEW_CONNECTION) ||
+		(!(soup_message_get_flags (item->msg) & SOUP_MESSAGE_IDEMPOTENT) &&
+		 !SOUP_METHOD_IS_IDEMPOTENT (item->msg->method));
+
+	g_mutex_lock (&priv->conn_lock);
+	host = get_host_for_message (session, item->msg);
+	while (TRUE) {
+		conn = get_connection_for_host (session, item, host,
+						need_new_connection,
+						&my_should_cleanup);
+		if (conn || item->async)
+			break;
+
+		if (my_should_cleanup) {
+			g_mutex_unlock (&priv->conn_lock);
+			soup_session_cleanup_connections (session, TRUE);
+			g_mutex_lock (&priv->conn_lock);
+
+			my_should_cleanup = FALSE;
+			continue;
+		}
+
+		g_cond_wait (&priv->conn_cond, &priv->conn_lock);
+	}
+	g_mutex_unlock (&priv->conn_lock);
+
+	if (!conn) {
+		if (should_cleanup)
+			*should_cleanup = my_should_cleanup;
+		return FALSE;
+	}
+
+	soup_session_set_item_connection (session, item, conn);
+	soup_message_set_https_status (item->msg, item->conn);
+
+	if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
+		item->state = SOUP_MESSAGE_READY;
+		return TRUE;
+	}
+
+	item->state = SOUP_MESSAGE_CONNECTING;
+
+	if (item->async) {
+		soup_message_queue_item_ref (item);
+		soup_connection_connect_async (item->conn, item->cancellable,
+					       got_connection, item);
+		return FALSE;
+	} else {
+		guint status;
+
+		status = soup_connection_connect_sync (item->conn, item->cancellable);
+		got_connection (item->conn, status, item);
+
+		return TRUE;
+	}
 }
 
 void
-soup_session_unqueue_item (SoupSession          *session,
-			   SoupMessageQueueItem *item)
+soup_session_process_queue_item (SoupSession          *session,
+				 SoupMessageQueueItem *item,
+				 gboolean             *should_cleanup,
+				 gboolean              loop)
 {
-	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
-	SoupSessionHost *host;
+	g_assert (item->session == session);
 
-	if (item->conn) {
-		if (item->msg->method != SOUP_METHOD_CONNECT ||
-		    !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
-			soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
-		soup_session_set_item_connection (session, item, NULL);
-	}
+	do {
+		if (item->paused)
+			return;
 
-	if (item->state != SOUP_MESSAGE_FINISHED) {
-		g_warning ("finished an item with state %d", item->state);
-		return;
-	}
+		switch (item->state) {
+		case SOUP_MESSAGE_STARTING:
+			if (!get_connection (item, should_cleanup))
+				return;
+			break;
 
-	soup_message_queue_remove (priv->queue, item);
+		case SOUP_MESSAGE_CONNECTED:
+			if (soup_connection_is_tunnelled (item->conn))
+				tunnel_connect (item);
+			else
+				item->state = SOUP_MESSAGE_READY;
+			break;
 
-	g_mutex_lock (&priv->conn_lock);
-	host = get_host_for_message (session, item->msg);
-	host->num_messages--;
-	g_mutex_unlock (&priv->conn_lock);
+		case SOUP_MESSAGE_READY:
+			soup_message_set_https_status (item->msg, item->conn);
+			if (item->msg->status_code) {
+				if (item->msg->status_code == SOUP_STATUS_TRY_AGAIN) {
+					soup_message_cleanup_response (item->msg);
+					item->state = SOUP_MESSAGE_STARTING;
+				} else
+					item->state = SOUP_MESSAGE_FINISHING;
+				break;
+			}
 
-	/* g_signal_handlers_disconnect_by_func doesn't work if you
-	 * have a metamarshal, meaning it doesn't work with
-	 * soup_message_add_header_handler()
-	 */
-	g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
-					      0, 0, NULL, NULL, item);
-	g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, item->msg);
-	soup_message_queue_item_unref (item);
+			item->state = SOUP_MESSAGE_RUNNING;
+
+			soup_session_send_queue_item (session, item, message_completed);
+
+			if (item->new_api) {
+				if (item->async)
+					async_send_request_running (session, item);
+				return;
+			}
+			break;
+
+		case SOUP_MESSAGE_RUNNING:
+			if (item->async)
+				return;
+
+			g_warn_if_fail (item->new_api);
+			item->state = SOUP_MESSAGE_FINISHING;
+			break;
+
+		case SOUP_MESSAGE_RESTARTING:
+			item->state = SOUP_MESSAGE_STARTING;
+			soup_message_restarted (item->msg);
+			break;
+
+		case SOUP_MESSAGE_FINISHING:
+			item->state = SOUP_MESSAGE_FINISHED;
+			soup_message_finished (item->msg);
+			if (item->state != SOUP_MESSAGE_FINISHED) {
+				g_return_if_fail (!item->new_api);
+				break;
+			}
+
+			soup_session_unqueue_item (session, item);
+			if (item->async && item->callback)
+				item->callback (session, item->msg, item->callback_data);
+			return;
+
+		default:
+			/* Nothing to do with this message in any
+			 * other state.
+			 */
+			g_warn_if_fail (item->async);
+			return;
+		}
+	} while (loop && item->state != SOUP_MESSAGE_FINISHED);
 }
 
 static void
-proxy_connection_event (SoupConnection      *conn,
-			GSocketClientEvent   event,
-			GIOStream           *connection,
-			gpointer             user_data)
+async_run_queue (SoupSession *session)
 {
-	SoupMessageQueueItem *item = user_data;
+	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	SoupMessageQueueItem *item;
+	SoupMessage *msg;
+	gboolean try_cleanup = TRUE, should_cleanup = FALSE;
 
-	soup_message_network_event (item->msg, event, connection);
-}
+	g_object_ref (session);
+	soup_session_cleanup_connections (session, FALSE);
 
-void
-soup_session_set_item_connection (SoupSession          *session,
-				  SoupMessageQueueItem *item,
-				  SoupConnection       *conn)
-{
-	if (item->conn) {
-		g_signal_handlers_disconnect_by_func (item->conn, proxy_connection_event, item);
-		g_object_unref (item->conn);
-	}
+ try_again:
+	for (item = soup_message_queue_first (priv->queue);
+	     item;
+	     item = soup_message_queue_next (priv->queue, item)) {
+		msg = item->msg;
 
-	item->conn = conn;
+		/* CONNECT messages are handled specially */
+		if (msg->method == SOUP_METHOD_CONNECT)
+			continue;
 
-	if (item->conn) {
-		g_object_ref (item->conn);
-		g_signal_connect (item->conn, "event",
-				  G_CALLBACK (proxy_connection_event), item);
+		if (item->async_context != soup_session_get_async_context (session))
+			continue;
+
+		soup_session_process_queue_item (session, item, &should_cleanup, TRUE);
+	}
+
+	if (try_cleanup && should_cleanup) {
+		/* There is at least one message in the queue that
+		 * could be sent if we cleanupd an idle connection from
+		 * some other server.
+		 */
+		if (soup_session_cleanup_connections (session, TRUE)) {
+			try_cleanup = should_cleanup = FALSE;
+			goto try_again;
+		}
 	}
+
+	g_object_unref (session);
 }
 
-void
-soup_session_set_item_status (SoupSession          *session,
-			      SoupMessageQueueItem *item,
-			      guint                 status_code)
+static gboolean
+idle_run_queue (gpointer user_data)
 {
-	SoupURI *uri;
-	char *msg;
+	SoupSessionPrivate *priv = user_data;
+	GSource *source;
 
-	switch (status_code) {
-	case SOUP_STATUS_CANT_RESOLVE:
-	case SOUP_STATUS_CANT_CONNECT:
-		uri = soup_message_get_uri (item->msg);
-		msg = g_strdup_printf ("%s (%s)",
-				       soup_status_get_phrase (status_code),
-				       uri->host);
-		soup_message_set_status_full (item->msg, status_code, msg);
-		g_free (msg);
-		break;
+	if (priv->disposed)
+		return FALSE;
 
-	case SOUP_STATUS_CANT_RESOLVE_PROXY:
-	case SOUP_STATUS_CANT_CONNECT_PROXY:
-		if (item->proxy_uri && item->proxy_uri->host) {
-			msg = g_strdup_printf ("%s (%s)",
-					       soup_status_get_phrase (status_code),
-					       item->proxy_uri->host);
-			soup_message_set_status_full (item->msg, status_code, msg);
-			g_free (msg);
-			break;
-		}
-		soup_message_set_status (item->msg, status_code);
-		break;
+	source = g_main_current_source ();
+	priv->run_queue_sources = g_slist_remove (priv->run_queue_sources, source);
 
-	case SOUP_STATUS_SSL_FAILED:
-		if (!g_tls_backend_supports_tls (g_tls_backend_get_default ())) {
-			soup_message_set_status_full (item->msg, status_code,
-						      "TLS/SSL support not available; install glib-networking");
-		} else
-			soup_message_set_status (item->msg, status_code);
-		break;
+	/* Ensure that the source is destroyed before running the queue */
+	g_source_destroy (source);
+	g_source_unref (source);
 
-	default:
-		soup_message_set_status (item->msg, status_code);
-		break;
-	}
+	g_assert (priv->session);
+	async_run_queue (priv->session);
+	return FALSE;
 }
 
 /**
@@ -1517,7 +1857,7 @@ void
 soup_session_queue_message (SoupSession *session, SoupMessage *msg,
 			    SoupSessionCallback callback, gpointer user_data)
 {
-	g_return_if_fail (SOUP_IS_SESSION (session));
+	g_return_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session));
 	g_return_if_fail (SOUP_IS_MESSAGE (msg));
 
 	SOUP_SESSION_GET_CLASS (session)->queue_message (session, msg,
@@ -1557,7 +1897,6 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg)
 	SOUP_SESSION_GET_CLASS (session)->requeue_message (session, msg);
 }
 
-
 /**
  * soup_session_send_message:
  * @session: a #SoupSession
@@ -1574,7 +1913,7 @@ soup_session_requeue_message (SoupSession *session, SoupMessage *msg)
 guint
 soup_session_send_message (SoupSession *session, SoupMessage *msg)
 {
-	g_return_val_if_fail (SOUP_IS_SESSION (session), SOUP_STATUS_MALFORMED);
+	g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session) || SOUP_IS_SESSION_SYNC (session), SOUP_STATUS_MALFORMED);
 	g_return_val_if_fail (SOUP_IS_MESSAGE (msg), SOUP_STATUS_MALFORMED);
 
 	return SOUP_SESSION_GET_CLASS (session)->send_message (session, msg);
@@ -1609,6 +1948,48 @@ soup_session_pause_message (SoupSession *session,
 	soup_message_queue_item_unref (item);
 }
 
+static void
+soup_session_real_kick_queue (SoupSession *session)
+{
+	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	SoupMessageQueueItem *item;
+	gboolean have_sync_items = FALSE;
+
+	if (priv->disposed)
+		return;
+
+	for (item = soup_message_queue_first (priv->queue);
+	     item;
+	     item = soup_message_queue_next (priv->queue, item)) {
+		if (item->async) {
+			GSource *source;
+
+			/* We use priv rather than session as the
+			 * source data, because other parts of libsoup
+			 * (or the calling app) may have sources using
+			 * the session as the source data.
+			 */
+			source = g_main_context_find_source_by_user_data (item->async_context, priv);
+			if (!source) {
+				source = soup_add_completion_reffed (item->async_context,
+								     idle_run_queue, priv);
+				priv->run_queue_sources = g_slist_prepend (priv->run_queue_sources,
+									   source);
+			}
+		} else
+			have_sync_items = TRUE;
+	}
+
+	if (have_sync_items)
+		g_cond_broadcast (&priv->conn_cond);
+}
+
+void
+soup_session_kick_queue (SoupSession *session)
+{
+	SOUP_SESSION_GET_CLASS (session)->kick (session);
+}
+
 /**
  * soup_session_unpause_message:
  * @session: a #SoupSession
@@ -1640,7 +2021,7 @@ soup_session_unpause_message (SoupSession *session,
 		soup_message_io_unpause (msg);
 	soup_message_queue_item_unref (item);
 
-	SOUP_SESSION_GET_CLASS (session)->kick (session);
+	soup_session_kick_queue (session);
 }
 
 
@@ -1657,6 +2038,7 @@ soup_session_real_cancel_message (SoupSession *session, SoupMessage *msg, guint
 	soup_message_set_status (msg, status_code);
 	g_cancellable_cancel (item->cancellable);
 
+	soup_session_kick_queue (item->session);
 	soup_message_queue_item_unref (item);
 }
 
@@ -1716,13 +2098,52 @@ soup_session_real_flush_queue (SoupSession *session)
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 	SoupMessageQueueItem *item;
+	GHashTable *current = NULL;
+	gboolean done = FALSE;
+
+	if (SOUP_IS_SESSION_SYNC (session)) {
+		/* Record the current contents of the queue */
+		current = g_hash_table_new (NULL, NULL);
+		for (item = soup_message_queue_first (priv->queue);
+		     item;
+		     item = soup_message_queue_next (priv->queue, item))
+			g_hash_table_insert (current, item, item);
+	}
 
+	/* Cancel everything */
 	for (item = soup_message_queue_first (priv->queue);
 	     item;
 	     item = soup_message_queue_next (priv->queue, item)) {
 		soup_session_cancel_message (session, item->msg,
 					     SOUP_STATUS_CANCELLED);
 	}
+
+	if (SOUP_IS_SESSION_SYNC (session)) {
+		/* Wait until all of the items in @current have been
+		 * removed from the queue. (This is not the same as
+		 * "wait for the queue to be empty", because the app
+		 * may queue new requests in response to the
+		 * cancellation of the old ones. We don't try to
+		 * cancel those requests as well, since we'd likely
+		 * just end up looping forever.)
+		 */
+		g_mutex_lock (&priv->conn_lock);
+		do {
+			done = TRUE;
+			for (item = soup_message_queue_first (priv->queue);
+			     item;
+			     item = soup_message_queue_next (priv->queue, item)) {
+				if (g_hash_table_lookup (current, item))
+					done = FALSE;
+			}
+
+			if (!done)
+				g_cond_wait (&priv->conn_cond, &priv->conn_lock);
+		} while (!done);
+		g_mutex_unlock (&priv->conn_lock);
+
+		g_hash_table_destroy (current);
+	}
 }
 
 /**
@@ -2107,6 +2528,7 @@ soup_session_class_init (SoupSessionClass *session_class)
 	session_class->cancel_message = soup_session_real_cancel_message;
 	session_class->auth_required = soup_session_real_auth_required;
 	session_class->flush_queue = soup_session_real_flush_queue;
+	session_class->kick = soup_session_real_kick_queue;
 
 	/* virtual method override */
 	object_class->dispose = soup_session_dispose;
@@ -2808,3 +3230,384 @@ soup_session_class_init (SoupSessionClass *session_class)
 				    G_TYPE_STRV,
 				    G_PARAM_READWRITE));
 }
+
+
+/* send_request_async */
+
+static void
+async_send_request_return_result (SoupMessageQueueItem *item,
+				  gpointer stream, GError *error)
+{
+	GTask *task;
+
+	g_return_if_fail (item->task != NULL);
+
+	g_signal_handlers_disconnect_matched (item->msg, G_SIGNAL_MATCH_DATA,
+					      0, 0, NULL, NULL, item);
+
+	task = item->task;
+	item->task = NULL;
+
+	if (item->io_source) {
+		g_source_destroy (item->io_source);
+		g_clear_pointer (&item->io_source, g_source_unref);
+	}
+
+	if (error)
+		g_task_return_error (task, error);
+	else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+		if (stream)
+			g_object_unref (stream);
+		g_task_return_new_error (task, SOUP_HTTP_ERROR,
+					 item->msg->status_code,
+					 "%s",
+					 item->msg->reason_phrase);
+	} else
+		g_task_return_pointer (task, stream, g_object_unref);
+	g_object_unref (task);
+}
+
+static void
+async_send_request_restarted (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+
+	/* We won't be needing this, then. */
+	g_object_set_data (G_OBJECT (item->msg), "SoupSession:ostream", NULL);
+	item->io_started = FALSE;
+}
+
+static void
+async_send_request_finished (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+	GMemoryOutputStream *mostream;
+	GInputStream *istream = NULL;
+	GError *error = NULL;
+
+	if (!item->task) {
+		/* Something else already took care of it. */
+		return;
+	}
+
+	mostream = g_object_get_data (G_OBJECT (item->task), "SoupSession:ostream");
+	if (mostream) {
+		gpointer data;
+		gssize size;
+
+		/* We thought it would be requeued, but it wasn't, so
+		 * return the original body.
+		 */
+		size = g_memory_output_stream_get_data_size (mostream);
+		data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+		istream = g_memory_input_stream_new_from_data (data, size, g_free);
+	} else if (item->io_started) {
+		/* The message finished before becoming readable. This
+		 * will happen, eg, if it's cancelled from got-headers.
+		 * Do nothing; the op will complete via read_ready_cb()
+		 * after we return;
+		 */
+		return;
+	} else {
+		/* The message finished before even being started;
+		 * probably a tunnel connect failure.
+		 */
+		istream = g_memory_input_stream_new ();
+	}
+
+	async_send_request_return_result (item, istream, error);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+	GInputStream *istream = g_object_get_data (source, "istream");
+	GError *error = NULL;
+
+	/* It should be safe to call the sync close() method here since
+	 * the message body has already been written.
+	 */
+	g_input_stream_close (istream, NULL, NULL);
+	g_object_unref (istream);
+
+	/* If the message was cancelled, it will be completed via other means */
+	if (g_cancellable_is_cancelled (item->cancellable) ||
+	    !item->task) {
+		soup_message_queue_item_unref (item);
+		return;
+	}
+
+	if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+					   result, &error) == -1) {
+		async_send_request_return_result (item, NULL, error);
+		soup_message_queue_item_unref (item);
+		return;
+	}
+
+	/* Otherwise either restarted or finished will eventually be called. */
+	soup_session_kick_queue (item->session);
+	soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+			   GInputStream         *stream)
+{
+	if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+	    item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+	    soup_session_would_redirect (item->session, item->msg)) {
+		GOutputStream *ostream;
+
+		/* Message may be requeued, so gather the current message body... */
+		ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+		g_object_set_data_full (G_OBJECT (item->task), "SoupSession:ostream",
+					ostream, g_object_unref);
+
+		g_object_set_data (G_OBJECT (ostream), "istream", stream);
+
+		/* Give the splice op its own ref on item */
+		soup_message_queue_item_ref (item);
+		g_output_stream_splice_async (ostream, stream,
+					      /* We can't use CLOSE_SOURCE because it
+					       * might get closed in the wrong thread.
+					       */
+					      G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+					      G_PRIORITY_DEFAULT,
+					      item->cancellable,
+					      send_async_spliced, item);
+		return;
+	}
+
+	async_send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+
+	g_clear_pointer (&item->io_source, g_source_unref);
+	try_run_until_read (item);
+	return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+	GError *error = NULL;
+	GInputStream *stream = NULL;
+
+	if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+		stream = soup_message_io_get_response_istream (item->msg, &error);
+	if (stream) {
+		send_async_maybe_complete (item, stream);
+		return;
+	}
+
+	if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+		item->state = SOUP_MESSAGE_RESTARTING;
+		soup_message_io_finished (item->msg);
+		g_error_free (error);
+		return;
+	}
+
+	if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+		if (item->state != SOUP_MESSAGE_FINISHED) {
+			if (soup_message_io_in_progress (item->msg))
+				soup_message_io_finished (item->msg);
+			item->state = SOUP_MESSAGE_FINISHING;
+			soup_session_process_queue_item (item->session, item, NULL, FALSE);
+		}
+		async_send_request_return_result (item, NULL, error);
+		return;
+	}
+
+	g_clear_error (&error);
+	item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
+						      read_ready_cb, item);
+	g_source_attach (item->io_source, soup_session_get_async_context (item->session));
+}
+
+static void
+async_send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+	item->io_started = TRUE;
+	try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession         *session,
+				 SoupMessage         *msg,
+				 GCancellable        *cancellable,
+				 GAsyncReadyCallback  callback,
+				 gpointer             user_data)
+{
+	SoupMessageQueueItem *item;
+	gboolean use_thread_context;
+
+	g_return_if_fail (SOUP_IS_SESSION (session));
+	g_return_if_fail (!SOUP_IS_SESSION_SYNC (session));
+
+	g_object_get (G_OBJECT (session),
+		      SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+		      NULL);
+	g_return_if_fail (use_thread_context);
+
+	item = soup_session_append_queue_item (session, msg, TRUE, TRUE,
+					       NULL, NULL);
+	g_signal_connect (msg, "restarted",
+			  G_CALLBACK (async_send_request_restarted), item);
+	g_signal_connect (msg, "finished",
+			  G_CALLBACK (async_send_request_finished), item);
+
+	item->new_api = TRUE;
+	item->task = g_task_new (session, cancellable, callback, user_data);
+	g_task_set_task_data (item->task, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+	if (cancellable) {
+		g_object_unref (item->cancellable);
+		item->cancellable = g_object_ref (cancellable);
+	}
+
+	soup_session_kick_queue (session);
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession   *session,
+				  GAsyncResult  *result,
+				  GError       **error)
+{
+	GTask *task;
+
+	g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
+	g_return_val_if_fail (!SOUP_IS_SESSION_SYNC (session), NULL);
+	g_return_val_if_fail (g_task_is_valid (result, session), NULL);
+
+	task = G_TASK (result);
+	if (g_task_had_error (task)) {
+		SoupMessageQueueItem *item = g_task_get_task_data (task);
+
+		if (soup_message_io_in_progress (item->msg))
+			soup_message_io_finished (item->msg);
+		else if (item->state != SOUP_MESSAGE_FINISHED)
+			item->state = SOUP_MESSAGE_FINISHING;
+
+		if (item->state != SOUP_MESSAGE_FINISHED)
+			soup_session_process_queue_item (session, item, NULL, FALSE);
+	}
+
+	return g_task_propagate_pointer (task, error);
+}
+
+GInputStream *
+soup_session_send_request (SoupSession   *session,
+			   SoupMessage   *msg,
+			   GCancellable  *cancellable,
+			   GError       **error)
+{
+	SoupMessageQueueItem *item;
+	GInputStream *stream = NULL;
+	GOutputStream *ostream;
+	GMemoryOutputStream *mostream;
+	gssize size;
+	GError *my_error = NULL;
+
+	g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
+	g_return_val_if_fail (!SOUP_IS_SESSION_ASYNC (session), NULL);
+
+	item = soup_session_append_queue_item (session, msg, FALSE, TRUE,
+					       NULL, NULL);
+
+	item->new_api = TRUE;
+	if (cancellable) {
+		g_object_unref (item->cancellable);
+		item->cancellable = g_object_ref (cancellable);
+	}
+
+	while (!stream) {
+		/* Get a connection, etc */
+		soup_session_process_queue_item (session, item, NULL, TRUE);
+		if (item->state != SOUP_MESSAGE_RUNNING)
+			break;
+
+		/* Send request, read headers */
+		if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
+			if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+				item->state = SOUP_MESSAGE_RESTARTING;
+				soup_message_io_finished (item->msg);
+				g_clear_error (&my_error);
+				continue;
+			} else
+				break;
+		}
+
+		stream = soup_message_io_get_response_istream (msg, &my_error);
+		if (!stream)
+			break;
+
+		/* Break if the message doesn't look likely-to-be-requeued */
+		if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+		    msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+		    !soup_session_would_redirect (session, msg))
+			break;
+
+		/* Gather the current message body... */
+		ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+		if (g_output_stream_splice (ostream, stream,
+					    G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+					    G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+					    item->cancellable, &my_error) == -1) {
+			g_object_unref (stream);
+			g_object_unref (ostream);
+			stream = NULL;
+			break;
+		}
+		g_object_unref (stream);
+		stream = NULL;
+
+		/* If the message was requeued, loop */
+		if (item->state == SOUP_MESSAGE_RESTARTING) {
+			g_object_unref (ostream);
+			continue;
+		}
+
+		/* Not requeued, so return the original body */
+		mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+		size = g_memory_output_stream_get_data_size (mostream);
+		stream = g_memory_input_stream_new ();
+		if (size) {
+			g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+							g_memory_output_stream_steal_data (mostream),
+							size, g_free);
+		}
+		g_object_unref (ostream);
+	}
+
+	if (my_error)
+		g_propagate_error (error, my_error);
+	else if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+		if (stream) {
+			g_object_unref (stream);
+			stream = NULL;
+		}
+		g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+				     msg->reason_phrase);
+	} else if (!stream)
+		stream = g_memory_input_stream_new ();
+
+	if (!stream) {
+		if (soup_message_io_in_progress (msg))
+			soup_message_io_finished (msg);
+		else if (item->state != SOUP_MESSAGE_FINISHED)
+			item->state = SOUP_MESSAGE_FINISHING;
+
+		if (item->state != SOUP_MESSAGE_FINISHED)
+			soup_session_process_queue_item (session, item, NULL, TRUE);
+	}
+
+	soup_message_queue_item_unref (item);
+	return stream;
+}
diff --git a/tests/requester-test.c b/tests/requester-test.c
index bc45e9f..a437937 100644
--- a/tests/requester-test.c
+++ b/tests/requester-test.c
@@ -372,21 +372,22 @@ do_test_for_thread_and_context (SoupSession *session, const char *base_uri)
 }
 
 static void
-do_simple_test (const char *uri)
+do_simple_test (const char *uri, gboolean plain_session)
 {
 	SoupSession *session;
 
-	debug_printf (1, "Simple streaming test\n");
+	debug_printf (1, "Simple streaming test with %s\n",
+		      plain_session ? "SoupSession" : "SoupSessionAsync");
 
-	session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC,
+	session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC,
 					 SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
 					 NULL);
 	do_test_for_thread_and_context (session, uri);
 	soup_test_session_abort_unref (session);
 }
 
-static gpointer
-do_test_with_context (const char *uri)
+static void
+do_test_with_context_and_type (const char *uri, gboolean plain_session)
 {
 	GMainContext *async_context;
 	SoupSession *session;
@@ -394,7 +395,7 @@ do_test_with_context (const char *uri)
 	async_context = g_main_context_new ();
 	g_main_context_push_thread_default (async_context);
 
-	session = soup_test_session_new (SOUP_TYPE_SESSION_ASYNC,
+	session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_ASYNC,
 					 SOUP_SESSION_ASYNC_CONTEXT, async_context,
 					 SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
 					 NULL);
@@ -404,25 +405,43 @@ do_test_with_context (const char *uri)
 
 	g_main_context_pop_thread_default (async_context);
 	g_main_context_unref (async_context);
+}
+
+static gpointer
+do_test_with_context (gpointer uri)
+{
+	do_test_with_context_and_type (uri, FALSE);
+	return NULL;
+}
+
+static gpointer
+do_plain_test_with_context (gpointer uri)
+{
+	do_test_with_context_and_type (uri, TRUE);
 	return NULL;
 }
 
 static void
-do_context_test (const char *uri)
+do_context_test (const char *uri, gboolean plain_session)
 {
-	debug_printf (1, "Streaming with a non-default-context\n");
-	do_test_with_context (uri);
+	debug_printf (1, "Streaming with a non-default-context with %s\n",
+		      plain_session ? "SoupSession" : "SoupSessionAsync");
+	if (plain_session)
+		do_plain_test_with_context ((gpointer)uri);
+	else
+		do_test_with_context ((gpointer)uri);
 }
 
 static void
-do_thread_test (const char *uri)
+do_thread_test (const char *uri, gboolean plain_session)
 {
 	GThread *thread;
 
-	debug_printf (1, "Streaming in another thread\n");
+	debug_printf (1, "Streaming in another thread with %s\n",
+		      plain_session ? "SoupSession" : "SoupSessionAsync");
 
 	thread = g_thread_new ("do_test_with_context",
-			       (GThreadFunc)do_test_with_context,
+			       plain_session ? do_plain_test_with_context : do_test_with_context,
 			       (gpointer)uri);
 	g_thread_join (thread);
 }
@@ -542,16 +561,17 @@ do_sync_request (SoupSession *session, SoupRequest *request,
 }
 
 static void
-do_sync_test (const char *uri_string)
+do_sync_test (const char *uri_string, gboolean plain_session)
 {
 	SoupSession *session;
 	SoupRequester *requester;
 	SoupRequest *request;
 	SoupURI *uri;
 
-	debug_printf (1, "Sync streaming\n");
+	debug_printf (1, "Sync streaming with %s\n",
+		      plain_session ? "SoupSession" : "SoupSessionSync");
 
-	session = soup_test_session_new (SOUP_TYPE_SESSION_SYNC, NULL);
+	session = soup_test_session_new (plain_session ? SOUP_TYPE_SESSION : SOUP_TYPE_SESSION_SYNC, NULL);
 	requester = soup_requester_new ();
 	soup_session_add_feature (session, SOUP_SESSION_FEATURE (requester));
 	g_object_unref (requester);
@@ -614,10 +634,15 @@ main (int argc, char **argv)
 
 	uri = g_strdup_printf ("http://127.0.0.1:%u/foo";, soup_server_get_port (server));
 
-	do_simple_test (uri);
-	do_thread_test (uri);
-	do_context_test (uri);
-	do_sync_test (uri);
+	do_simple_test (uri, FALSE);
+	do_thread_test (uri, FALSE);
+	do_context_test (uri, FALSE);
+	do_sync_test (uri, FALSE);
+
+	do_simple_test (uri, TRUE);
+	do_thread_test (uri, TRUE);
+	do_context_test (uri, TRUE);
+	do_sync_test (uri, TRUE);
 
 	g_free (uri);
 	soup_buffer_free (response);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]