[ostree] fetcher: Move the SoupSession to a separate thread
- From: Matthew Barnes <mbarnes src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree] fetcher: Move the SoupSession to a separate thread
- Date: Mon, 14 Dec 2015 17:53:41 +0000 (UTC)
commit 54066420cf1d2f7aed7817c6f94758cc70752fc4
Author: Matthew Barnes <mbarnes redhat com>
Date: Fri Nov 13 15:44:20 2015 -0500
fetcher: Move the SoupSession to a separate thread
Move the SoupSession to a separate thread with its own isolated main
context and main loop. All interaction with the SoupSession occurs
by way of idle sources attached to the session's main context, which
execute on the session's thread.
This should solve the problem of running an asynchronous fetch request
synchronously by pushing a new thread-default main context and iterating
a main loop until the request completes. Prior to this, the new thread-
default main context would interfere with the SoupSession's own async
processing.
src/libostree/ostree-fetcher.c | 647 +++++++++++++++++++++++++++++-----------
1 files changed, 465 insertions(+), 182 deletions(-)
---
diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c
index c3bacd1..2964a1a 100644
--- a/src/libostree/ostree-fetcher.c
+++ b/src/libostree/ostree-fetcher.c
@@ -41,7 +41,29 @@ typedef enum {
} OstreeFetcherState;
typedef struct {
- OstreeFetcher *self;
+ volatile int ref_count;
+
+ SoupSession *session;
+ GMainContext *main_context;
+ GMainLoop *main_loop;
+
+ int tmpdir_dfd;
+ int max_outstanding;
+
+ /* Queue for libsoup, see bgo#708591 */
+ GQueue pending_queue;
+ GHashTable *outstanding;
+
+ /* Shared across threads; be sure to lock. */
+ GHashTable *output_stream_set; /* set<GOutputStream> */
+ GMutex output_stream_set_lock;
+
+ /* Also protected by output_stream_set_lock. */
+ guint64 total_downloaded;
+} ThreadClosure;
+
+typedef struct {
+ ThreadClosure *thread_closure;
SoupURI *uri;
OstreeFetcherState state;
@@ -60,29 +82,30 @@ typedef struct {
GTask *task;
} OstreeFetcherPendingURI;
+/* Used by session_thread_idle_add() */
+typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
+ gpointer data);
+
+/* Used by session_thread_idle_add() */
+typedef struct {
+ ThreadClosure *thread_closure;
+ SessionThreadFunc function;
+ gpointer data;
+ GDestroyNotify notify;
+} IdleClosure;
+
struct OstreeFetcher
{
GObject parent_instance;
OstreeFetcherConfigFlags config_flags;
- int tmpdir_dfd;
+
char *tmpdir_name;
GLnxLockFile tmpdir_lock;
int base_tmpdir_dfd;
- GTlsCertificate *client_cert;
-
- SoupSession *session;
- SoupRequester *requester;
-
- GHashTable *output_stream_set; /* set<GOutputStream> */
-
- guint64 total_downloaded;
-
- /* Queue for libsoup, see bgo#708591 */
- GQueue pending_queue;
- GHashTable *outstanding;
- gint max_outstanding;
+ GThread *session_thread;
+ ThreadClosure *thread_closure;
};
enum {
@@ -92,6 +115,56 @@ enum {
G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
+static ThreadClosure *
+thread_closure_ref (ThreadClosure *thread_closure)
+{
+ g_return_val_if_fail (thread_closure != NULL, NULL);
+ g_return_val_if_fail (thread_closure->ref_count > 0, NULL);
+
+ g_atomic_int_inc (&thread_closure->ref_count);
+
+ return thread_closure;
+}
+
+static void
+thread_closure_unref (ThreadClosure *thread_closure)
+{
+ g_return_if_fail (thread_closure != NULL);
+ g_return_if_fail (thread_closure->ref_count > 0);
+
+ if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
+ {
+ g_clear_object (&thread_closure->session);
+
+ g_clear_pointer (&thread_closure->main_context, g_main_context_unref);
+ g_clear_pointer (&thread_closure->main_loop, g_main_loop_unref);
+
+ if (thread_closure->tmpdir_dfd != -1)
+ close (thread_closure->tmpdir_dfd);
+
+ while (!g_queue_is_empty (&thread_closure->pending_queue))
+ g_object_unref (g_queue_pop_head (&thread_closure->pending_queue));
+
+ g_clear_pointer (&thread_closure->outstanding, g_hash_table_unref);
+
+ g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
+ g_mutex_clear (&thread_closure->output_stream_set_lock);
+
+ g_slice_free (ThreadClosure, thread_closure);
+ }
+}
+
+static void
+idle_closure_free (IdleClosure *idle_closure)
+{
+ g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);
+
+ if (idle_closure->notify != NULL)
+ idle_closure->notify (idle_closure->data);
+
+ g_slice_free (IdleClosure, idle_closure);
+}
+
static int
pending_task_compare (gconstpointer a,
gconstpointer b,
@@ -107,10 +180,11 @@ pending_task_compare (gconstpointer a,
static void
pending_uri_free (OstreeFetcherPendingURI *pending)
{
- g_hash_table_remove (pending->self->outstanding, pending);
+ g_hash_table_remove (pending->thread_closure->outstanding, pending);
+
+ g_clear_pointer (&pending->thread_closure, thread_closure_unref);
soup_uri_free (pending->uri);
- g_clear_object (&pending->self);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_free (pending->out_tmpfile);
@@ -118,6 +192,250 @@ pending_uri_free (OstreeFetcherPendingURI *pending)
g_free (pending);
}
+static gboolean
+session_thread_idle_dispatch (gpointer data)
+{
+ IdleClosure *idle_closure = data;
+
+ idle_closure->function (idle_closure->thread_closure,
+ idle_closure->data);
+
+ return G_SOURCE_REMOVE;
+}
+
+static void
+session_thread_idle_add (ThreadClosure *thread_closure,
+ SessionThreadFunc function,
+ gpointer data,
+ GDestroyNotify notify)
+{
+ IdleClosure *idle_closure;
+
+ g_return_if_fail (thread_closure != NULL);
+ g_return_if_fail (function != NULL);
+
+ idle_closure = g_slice_new (IdleClosure);
+ idle_closure->thread_closure = thread_closure_ref (thread_closure);
+ idle_closure->function = function;
+ idle_closure->data = data;
+ idle_closure->notify = notify;
+
+ g_main_context_invoke_full (thread_closure->main_context,
+ G_PRIORITY_DEFAULT,
+ session_thread_idle_dispatch,
+ idle_closure, /* takes ownership */
+ (GDestroyNotify) idle_closure_free);
+}
+
+static void
+session_thread_add_logger (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ glnx_unref_object SoupLogger *logger = NULL;
+
+ logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, 500);
+ soup_session_add_feature (thread_closure->session,
+ SOUP_SESSION_FEATURE (logger));
+}
+
+static void
+session_thread_config_flags (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ OstreeFetcherConfigFlags config_flags;
+
+ config_flags = GPOINTER_TO_UINT (data);
+
+ if ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
+ {
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_SSL_STRICT,
+ FALSE, NULL);
+ }
+}
+
+static void
+session_thread_set_proxy_cb (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ SoupURI *proxy_uri = data;
+
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_PROXY_URI,
+ proxy_uri, NULL);
+}
+
+static void
+session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ GTlsInteraction *interaction = data;
+
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_TLS_INTERACTION,
+ interaction, NULL);
+}
+
+static void
+session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ GTlsDatabase *database = data;
+
+ if (database != NULL)
+ {
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_TLS_DATABASE,
+ database, NULL);
+ }
+ else
+ {
+ g_object_set (thread_closure->session,
+ SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE,
+ TRUE, NULL);
+ }
+}
+
+static void
+on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
+
+static void
+session_thread_process_pending_queue (ThreadClosure *thread_closure)
+{
+
+ while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
+ g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
+ {
+ GTask *task;
+ OstreeFetcherPendingURI *pending;
+ GCancellable *cancellable;
+
+ task = g_queue_pop_head (&thread_closure->pending_queue);
+
+ pending = g_task_get_task_data (task);
+ cancellable = g_task_get_cancellable (task);
+
+ /* pending_uri_free() removes this. */
+ g_hash_table_add (thread_closure->outstanding, pending);
+
+ soup_request_send_async (pending->request,
+ cancellable,
+ on_request_sent,
+ g_object_ref (task));
+
+ g_object_unref (task);
+ }
+}
+
+static void
+session_thread_request_uri (ThreadClosure *thread_closure,
+ gpointer data)
+{
+ GTask *task = G_TASK (data);
+ OstreeFetcherPendingURI *pending;
+ GCancellable *cancellable;
+ GError *local_error = NULL;
+
+ pending = g_task_get_task_data (task);
+ cancellable = g_task_get_cancellable (task);
+
+ pending->request = soup_session_request_uri (thread_closure->session,
+ pending->uri,
+ &local_error);
+
+ if (local_error != NULL)
+ {
+ g_task_return_error (task, local_error);
+ return;
+ }
+
+ if (pending->is_stream)
+ {
+ soup_request_send_async (pending->request,
+ cancellable,
+ on_request_sent,
+ g_object_ref (task));
+ }
+ else
+ {
+ g_autofree char *uristring = soup_uri_to_string (pending->uri, FALSE);
+ g_autofree char *tmpfile = NULL;
+ struct stat stbuf;
+ gboolean exists;
+
+ tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
+
+ if (fstatat (thread_closure->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
+ exists = TRUE;
+ else
+ {
+ if (errno == ENOENT)
+ exists = FALSE;
+ else
+ {
+ gs_set_error_from_errno (&local_error, errno);
+ g_task_return_error (task, local_error);
+ return;
+ }
+ }
+
+ if (SOUP_IS_REQUEST_HTTP (pending->request))
+ {
+ glnx_unref_object SoupMessage *msg = NULL;
+ msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
+ if (exists && stbuf.st_size > 0)
+ soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
+ }
+ pending->out_tmpfile = tmpfile;
+ tmpfile = NULL; /* Transfer ownership */
+
+ g_queue_insert_sorted (&thread_closure->pending_queue,
+ g_object_ref (task),
+ pending_task_compare, NULL);
+ session_thread_process_pending_queue (thread_closure);
+ }
+}
+
+static gpointer
+ostree_fetcher_session_thread (gpointer data)
+{
+ ThreadClosure *closure = data;
+ gint max_conns;
+
+ /* This becomes the GMainContext that SoupSession schedules async
+ * callbacks and emits signals from. Make it the thread-default
+ * context for this thread before creating the session. */
+ g_main_context_push_thread_default (closure->main_context);
+
+ closure->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
+ SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
+ SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
+ SOUP_SESSION_ADD_FEATURE_BY_TYPE,
SOUP_TYPE_REQUESTER,
+ SOUP_SESSION_TIMEOUT, 60,
+ SOUP_SESSION_IDLE_TIMEOUT, 60,
+ NULL);
+
+ g_object_get (closure->session, "max-conns-per-host", &max_conns, NULL);
+ if (max_conns < 8)
+ {
+ /* We download a lot of small objects in ostree, so this
+ * helps a lot. Also matches what most modern browsers do. */
+ max_conns = 8;
+ g_object_set (closure->session,
+ "max-conns-per-host",
+ max_conns, NULL);
+ }
+ closure->max_outstanding = 3 * max_conns;
+
+ g_main_loop_run (closure->main_loop);
+
+ g_main_context_pop_thread_default (closure->main_context);
+
+ thread_closure_unref (closure);
+
+ return NULL;
+}
+
static void
_ostree_fetcher_set_property (GObject *object,
guint prop_id,
@@ -159,12 +477,12 @@ _ostree_fetcher_get_property (GObject *object,
static void
_ostree_fetcher_finalize (GObject *object)
{
- OstreeFetcher *self;
-
- self = OSTREE_FETCHER (object);
+ OstreeFetcher *self = OSTREE_FETCHER (object);
- if (self->tmpdir_dfd != -1)
- close (self->tmpdir_dfd);
+ /* Terminate the session thread. */
+ g_main_loop_quit (self->thread_closure->main_loop);
+ g_clear_pointer (&self->session_thread, g_thread_unref);
+ g_clear_pointer (&self->thread_closure, thread_closure_unref);
/* Note: We don't remove the tmpdir here, because that would cause
us to not reuse it on resume. This happens because we use two
@@ -174,17 +492,55 @@ _ostree_fetcher_finalize (GObject *object)
g_free (self->tmpdir_name);
glnx_release_lock_file (&self->tmpdir_lock);
- g_clear_object (&self->session);
- g_clear_object (&self->client_cert);
+ G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
+}
- g_hash_table_destroy (self->output_stream_set);
+static void
+_ostree_fetcher_constructed (GObject *object)
+{
+ OstreeFetcher *self = OSTREE_FETCHER (object);
+ g_autoptr(GMainContext) main_context = NULL;
+ const char *http_proxy;
- while (!g_queue_is_empty (&self->pending_queue))
- g_object_unref (g_queue_pop_head (&self->pending_queue));
+ main_context = g_main_context_new ();
- g_hash_table_destroy (self->outstanding);
+ self->thread_closure = g_slice_new0 (ThreadClosure);
+ self->thread_closure->ref_count = 1;
+ self->thread_closure->main_context = g_main_context_ref (main_context);
+ self->thread_closure->main_loop = g_main_loop_new (main_context, FALSE);
+ self->thread_closure->tmpdir_dfd = -1;
- G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
+ self->thread_closure->outstanding = g_hash_table_new (NULL, NULL);
+ self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
+ (GDestroyNotify) NULL,
+ (GDestroyNotify) g_object_unref);
+
+ if (g_getenv ("OSTREE_DEBUG_HTTP"))
+ {
+ session_thread_idle_add (self->thread_closure,
+ session_thread_add_logger,
+ NULL, (GDestroyNotify) NULL);
+ }
+
+ if (self->config_flags != 0)
+ {
+ session_thread_idle_add (self->thread_closure,
+ session_thread_config_flags,
+ GUINT_TO_POINTER (self->config_flags),
+ (GDestroyNotify) NULL);
+ }
+
+ http_proxy = g_getenv ("http_proxy");
+ if (http_proxy != NULL)
+ _ostree_fetcher_set_proxy (self, http_proxy);
+
+ /* FIXME Maybe implement GInitableIface and use g_thread_try_new()
+ * so we can try to handle thread creation errors gracefully? */
+ self->session_thread = g_thread_new ("fetcher-session-thread",
+ ostree_fetcher_session_thread,
+ thread_closure_ref (self->thread_closure));
+
+ G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
}
static void
@@ -195,6 +551,7 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass)
gobject_class->set_property = _ostree_fetcher_set_property;
gobject_class->get_property = _ostree_fetcher_get_property;
gobject_class->finalize = _ostree_fetcher_finalize;
+ gobject_class->constructed = _ostree_fetcher_constructed;
g_object_class_install_property (gobject_class,
PROP_CONFIG_FLAGS,
@@ -211,50 +568,9 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass)
static void
_ostree_fetcher_init (OstreeFetcher *self)
{
- gint max_conns;
- const char *http_proxy;
GLnxLockFile empty_lockfile = GLNX_LOCK_FILE_INIT;
- g_queue_init (&self->pending_queue);
- self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
- SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
- SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
- SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
- SOUP_SESSION_TIMEOUT, 60,
- SOUP_SESSION_IDLE_TIMEOUT, 60,
- NULL);
-
- http_proxy = g_getenv ("http_proxy");
- if (http_proxy)
- {
- _ostree_fetcher_set_proxy (self, http_proxy);
- }
-
- if (g_getenv ("OSTREE_DEBUG_HTTP"))
- soup_session_add_feature (self->session, (SoupSessionFeature*)soup_logger_new (SOUP_LOGGER_LOG_BODY,
500));
-
- if ((self->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
- g_object_set (self->session, SOUP_SESSION_SSL_STRICT, FALSE, NULL);
-
- self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
- g_object_get (self->session, "max-conns-per-host", &max_conns, NULL);
- if (max_conns <= 8)
- {
- // We download a lot of small objects in ostree, so this helps a
- // lot. Also matches what most modern browsers do.
- max_conns = 8;
- g_object_set (self->session, "max-conns-per-host", max_conns, NULL);
- }
-
- self->max_outstanding = 3 * max_conns;
-
- self->output_stream_set = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_object_unref);
-
- self->outstanding = g_hash_table_new_full (NULL, NULL, NULL, NULL);
-
- self->tmpdir_dfd = -1;
self->tmpdir_lock = empty_lockfile;
-
}
OstreeFetcher *
@@ -270,13 +586,13 @@ _ostree_fetcher_new (int tmpdir_dfd,
if (!_ostree_repo_allocate_tmpdir (tmpdir_dfd,
"fetcher-",
&self->tmpdir_name,
- &self->tmpdir_dfd,
+ &self->thread_closure->tmpdir_dfd,
&self->tmpdir_lock,
NULL,
cancellable, error))
return NULL;
- self->tmpdir_dfd = tmpdir_dfd;
+ self->base_tmpdir_dfd = tmpdir_dfd;
return self;
}
@@ -284,81 +600,69 @@ _ostree_fetcher_new (int tmpdir_dfd,
int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
- return fetcher->tmpdir_dfd;
+ return fetcher->thread_closure->tmpdir_dfd;
}
void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
const char *http_proxy)
{
- SoupURI *proxy_uri = soup_uri_new (http_proxy);
+ SoupURI *proxy_uri;
+
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (http_proxy != NULL);
+
+ proxy_uri = soup_uri_new (http_proxy);
+
if (!proxy_uri)
{
g_warning ("Invalid proxy URI '%s'", http_proxy);
}
else
{
- g_object_set (self->session, SOUP_SESSION_PROXY_URI, proxy_uri, NULL);
- soup_uri_free (proxy_uri);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_proxy_cb,
+ proxy_uri, /* takes ownership */
+ (GDestroyNotify) soup_uri_free);
}
}
void
-_ostree_fetcher_set_client_cert (OstreeFetcher *fetcher,
- GTlsCertificate *cert)
+_ostree_fetcher_set_client_cert (OstreeFetcher *self,
+ GTlsCertificate *cert)
{
- g_clear_object (&fetcher->client_cert);
- fetcher->client_cert = g_object_ref (cert);
- if (fetcher->client_cert)
- {
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (G_IS_TLS_CERTIFICATE (cert));
+
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
- g_autoptr(GTlsInteraction) interaction =
- (GTlsInteraction*)_ostree_tls_cert_interaction_new (fetcher->client_cert);
- g_object_set (fetcher->session, "tls-interaction", interaction, NULL);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_tls_interaction_cb,
+ _ostree_tls_cert_interaction_new (cert),
+ (GDestroyNotify) g_object_unref);
#else
- g_warning ("This version of OSTree is compiled without client side certificate support");
+ g_warning ("This version of OSTree is compiled without client side certificate support");
#endif
- }
}
void
_ostree_fetcher_set_tls_database (OstreeFetcher *self,
GTlsDatabase *db)
{
- if (db)
- g_object_set ((GObject*)self->session, "tls-database", db, NULL);
- else
- g_object_set ((GObject*)self->session, "ssl-use-system-ca-file", TRUE, NULL);
-}
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (db == NULL || G_IS_TLS_DATABASE (db));
-static void
-on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
-
-static void
-ostree_fetcher_process_pending_queue (OstreeFetcher *self)
-{
-
- while (g_queue_peek_head (&self->pending_queue) != NULL &&
- g_hash_table_size (self->outstanding) < self->max_outstanding)
+ if (db != NULL)
{
- GTask *task;
- OstreeFetcherPendingURI *pending;
- GCancellable *cancellable;
-
- task = g_queue_pop_head (&self->pending_queue);
-
- pending = g_task_get_task_data (task);
- cancellable = g_task_get_cancellable (task);
-
- /* pending_uri_free() removes this. */
- g_hash_table_add (self->outstanding, pending);
-
- soup_request_send_async (pending->request,
- cancellable,
- on_request_sent,
- g_object_ref (task));
-
- g_object_unref (task);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_tls_database_cb,
+ g_object_ref (db),
+ (GDestroyNotify) g_object_unref);
+ }
+ else
+ {
+ session_thread_idle_add (self->thread_closure,
+ session_thread_set_tls_database_cb,
+ NULL, (GDestroyNotify) NULL);
}
}
@@ -377,11 +681,17 @@ finish_stream (OstreeFetcherPendingURI *pending,
{
if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out;
- g_hash_table_remove (pending->self->output_stream_set, pending->out_stream);
+
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ g_hash_table_remove (pending->thread_closure->output_stream_set,
+ pending->out_stream);
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
- if (fstatat (pending->self->tmpdir_dfd, pending->out_tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
+ if (fstatat (pending->thread_closure->tmpdir_dfd,
+ pending->out_tmpfile,
+ &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{
gs_set_error_from_errno (error, errno);
goto out;
@@ -390,7 +700,7 @@ finish_stream (OstreeFetcherPendingURI *pending,
/* Now that we've finished downloading, continue with other queued
* requests.
*/
- ostree_fetcher_process_pending_queue (pending->self);
+ session_thread_process_pending_queue (pending->thread_closure);
if (stbuf.st_size < pending->content_length)
{
@@ -399,7 +709,9 @@ finish_stream (OstreeFetcherPendingURI *pending,
}
else
{
- pending->self->total_downloaded += stbuf.st_size;
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ pending->thread_closure->total_downloaded += stbuf.st_size;
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
ret = TRUE;
@@ -593,14 +905,20 @@ on_request_sent (GObject *object,
else
oflags |= O_TRUNC;
- fd = openat (pending->self->tmpdir_dfd, pending->out_tmpfile, oflags, 0666);
+ fd = openat (pending->thread_closure->tmpdir_dfd,
+ pending->out_tmpfile, oflags, 0666);
if (fd == -1)
{
gs_set_error_from_errno (&local_error, errno);
goto out;
}
pending->out_stream = g_unix_output_stream_new (fd, TRUE);
- g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream));
+
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ g_hash_table_add (pending->thread_closure->output_stream_set,
+ g_object_ref (pending->out_stream));
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
@@ -636,13 +954,15 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
gpointer user_data,
gpointer source_tag)
{
- GTask *task;
- OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1);
- GError *local_error = NULL;
+ g_autoptr(GTask) task = NULL;
+ OstreeFetcherPendingURI *pending;
- pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
+ g_return_if_fail (OSTREE_IS_FETCHER (self));
+ g_return_if_fail (uri != NULL);
- pending->self = g_object_ref (self);
+ /* SoupRequest is created in session thread. */
+ pending = g_new0 (OstreeFetcherPendingURI, 1);
+ pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->uri = soup_uri_copy (uri);
pending->max_size = max_size;
pending->is_stream = is_stream;
@@ -654,55 +974,10 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
- if (is_stream)
- {
- soup_request_send_async (pending->request,
- cancellable,
- on_request_sent,
- g_object_ref (task));
- }
- else
- {
- g_autofree char *uristring = soup_uri_to_string (uri, FALSE);
- g_autofree char *tmpfile = NULL;
- struct stat stbuf;
- gboolean exists;
-
- tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
-
- if (fstatat (self->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
- exists = TRUE;
- else
- {
- if (errno == ENOENT)
- exists = FALSE;
- else
- {
- gs_set_error_from_errno (&local_error, errno);
- goto out;
- }
- }
-
- if (SOUP_IS_REQUEST_HTTP (pending->request))
- {
- glnx_unref_object SoupMessage *msg = NULL;
- msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
- if (exists && stbuf.st_size > 0)
- soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
- }
- pending->out_tmpfile = tmpfile;
- tmpfile = NULL; /* Transfer ownership */
-
- g_queue_insert_sorted (&self->pending_queue,
- g_object_ref (task),
- pending_task_compare, NULL);
- ostree_fetcher_process_pending_queue (self);
- }
-
- g_assert_no_error (local_error);
-
-out:
- g_object_unref (task);
+ session_thread_idle_add (self->thread_closure,
+ session_thread_request_uri,
+ g_object_ref (task),
+ (GDestroyNotify) g_object_unref);
}
void
@@ -760,11 +1035,17 @@ ostree_fetcher_stream_uri_finish (OstreeFetcher *self,
guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
- guint64 ret = self->total_downloaded;
GHashTableIter hiter;
gpointer key, value;
+ guint64 ret;
+
+ g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);
+
+ g_mutex_lock (&self->thread_closure->output_stream_set_lock);
- g_hash_table_iter_init (&hiter, self->output_stream_set);
+ ret = self->thread_closure->total_downloaded;
+
+ g_hash_table_iter_init (&hiter, self->thread_closure->output_stream_set);
while (g_hash_table_iter_next (&hiter, &key, &value))
{
GFileOutputStream *stream = key;
@@ -776,7 +1057,9 @@ _ostree_fetcher_bytes_transferred (OstreeFetcher *self)
ret += stbuf.st_size;
}
}
-
+
+ g_mutex_unlock (&self->thread_closure->output_stream_set_lock);
+
return ret;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]