[gnome-online-accounts/wip/rishi/gtask: 3/4] kerberos-identity-manager: Port to GThreadPool
- From: Debarshi Ray <debarshir src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-online-accounts/wip/rishi/gtask: 3/4] kerberos-identity-manager: Port to GThreadPool
- Date: Mon, 29 Jul 2019 17:48:02 +0000 (UTC)
commit 8d54812227930f530930b777a9ec5e64d5ce5dc5
Author: Debarshi Ray <debarshir gnome org>
Date: Thu Jul 25 19:21:48 2019 +0200
kerberos-identity-manager: Port to GThreadPool
The destruction sequence of a GoaKerberosIdentityManager instance is
simpler now because the GThreadPool makes some things easier, as
compared to having to directly handle a GAsyncQueue.
As before, there is a object-wide GCancellable and separate
GCancellables for each GAsyncResult-based cancellable action; and there
are two kinds of operations that get inserted into the thread pool -
those that are based on the GAsyncResult idiom, and those that are
internal fire-and-forget style operations. Note that due to the
semantics of the GAsyncResult idiom, a GoaKerberosIdentityManager
instance will not be destroyed while such a call is in flight because
it holds a reference to the instance.
When a GoaKerberosIdentityManager instance gets destroyed, the first
step is to cancel the object-wide GCancellable, and wait for any
ongoing to operation to complete. Then the GThreadPool gets cleared and
destroyed.
https://bugzilla.gnome.org/show_bug.cgi?id=764157
src/goaidentity/goakerberosidentitymanager.c | 265 ++++++++++++---------------
1 file changed, 117 insertions(+), 148 deletions(-)
---
diff --git a/src/goaidentity/goakerberosidentitymanager.c b/src/goaidentity/goakerberosidentitymanager.c
index 721bce89..ea589924 100644
--- a/src/goaidentity/goakerberosidentitymanager.c
+++ b/src/goaidentity/goakerberosidentitymanager.c
@@ -40,8 +40,8 @@ struct _GoaKerberosIdentityManagerPrivate
GHashTable *identities;
GHashTable *expired_identities;
GHashTable *identities_by_realm;
- GAsyncQueue *pending_operations;
GCancellable *scheduler_cancellable;
+ GThreadPool *thread_pool;
krb5_context kerberos_context;
GFileMonitor *credentials_cache_monitor;
@@ -64,13 +64,13 @@ typedef enum
OPERATION_TYPE_LIST,
OPERATION_TYPE_RENEW,
OPERATION_TYPE_SIGN_IN,
- OPERATION_TYPE_SIGN_OUT,
- OPERATION_TYPE_STOP_JOB
+ OPERATION_TYPE_SIGN_OUT
} OperationType;
typedef struct
{
GCancellable *cancellable;
+ GCancellable *scheduler_cancellable;
GoaKerberosIdentityManager *manager;
OperationType type;
GMainContext *context;
@@ -131,6 +131,8 @@ operation_new (GoaKerberosIdentityManager *self,
operation = g_slice_new0 (Operation);
operation->manager = self;
+ operation->scheduler_cancellable = g_object_ref (self->priv->scheduler_cancellable);
+
operation->type = type;
if (cancellable == NULL)
@@ -154,6 +156,7 @@ static void
operation_free (Operation *operation)
{
g_clear_object (&operation->cancellable);
+ g_clear_object (&operation->scheduler_cancellable);
g_clear_pointer (&operation->context, g_main_context_unref);
if (operation->type != OPERATION_TYPE_SIGN_IN &&
@@ -255,7 +258,7 @@ schedule_refresh (GoaKerberosIdentityManager *self)
g_atomic_int_inc (&self->priv->pending_refresh_count);
operation = operation_new (self, NULL, OPERATION_TYPE_REFRESH, NULL);
- g_async_queue_push (self->priv->pending_operations, operation);
+ g_thread_pool_push (self->priv->thread_pool, operation, NULL);
}
static IdentitySignalWork *
@@ -1037,116 +1040,96 @@ wait_for_scheduler_job_to_become_unblocked (GoaKerberosIdentityManager *self)
}
static void
-on_job_cancelled (GCancellable *cancellable,
- GoaKerberosIdentityManager *self)
+on_scheduler_cancellable_cancelled (GCancellable *cancellable,
+ GoaKerberosIdentityManager *self)
{
- Operation *operation;
- operation = operation_new (self, cancellable, OPERATION_TYPE_STOP_JOB, NULL);
- g_async_queue_push (self->priv->pending_operations, operation);
-
stop_blocking_scheduler_job (self);
}
-static gboolean
-on_job_scheduled (GIOSchedulerJob *job,
- GCancellable *cancellable,
- GoaKerberosIdentityManager *self)
+static void
+goa_kerberos_identity_manager_thread_pool_func (gpointer data, gpointer user_data)
{
- GAsyncQueue *pending_operations;
-
- g_assert (cancellable != NULL);
-
- g_cancellable_connect (cancellable, G_CALLBACK (on_job_cancelled), self, NULL);
+ Operation *operation = (Operation *) data;
+ GoaKerberosIdentityManager *self = GOA_KERBEROS_IDENTITY_MANAGER (operation->manager);
+ GError *error;
+ gboolean processed_operation = FALSE;
- /* Take ownership of queue, since we may out live the identity manager */
- pending_operations = g_async_queue_ref (self->priv->pending_operations);
- while (!g_cancellable_is_cancelled (cancellable))
+ error = NULL;
+ if (operation->result != NULL && g_cancellable_set_error_if_cancelled (operation->cancellable, &error))
{
- Operation *operation;
- gboolean processed_operation = FALSE;
- GError *error = NULL;
-
- operation = g_async_queue_pop (pending_operations);
-
- if (operation->result != NULL &&
- g_cancellable_set_error_if_cancelled (operation->cancellable,
- &error))
- {
- g_simple_async_result_take_error (operation->result, error);
- g_simple_async_result_complete_in_idle (operation->result);
- g_clear_object (&operation->result);
- continue;
- }
-
- switch (operation->type)
- {
- case OPERATION_TYPE_STOP_JOB:
- /* do nothing, loop will exit next iteration since cancellable
- * is cancelled
- */
- g_assert (g_cancellable_is_cancelled (cancellable));
- operation_free (operation);
- continue;
- case OPERATION_TYPE_REFRESH:
- processed_operation = refresh_identities (operation->manager, operation);
- break;
- case OPERATION_TYPE_GET_IDENTITY:
- get_identity (operation->manager, operation);
- processed_operation = TRUE;
- break;
- case OPERATION_TYPE_LIST:
- list_identities (operation->manager, operation);
- processed_operation = TRUE;
-
- /* We want to block refreshes (and their associated "added"
- * and "removed" signals) until the caller has had
- * a chance to look at the batch of
- * results we already processed
- */
- g_assert (operation->result != NULL);
-
- g_debug
- ("GoaKerberosIdentityManager: Blocking until identities list processed");
- block_scheduler_job (self);
- g_object_weak_ref (G_OBJECT (operation->result),
- (GWeakNotify) stop_blocking_scheduler_job, self);
- g_debug ("GoaKerberosIdentityManager: Continuing");
- break;
- case OPERATION_TYPE_SIGN_IN:
- sign_in_identity (operation->manager, operation);
- processed_operation = TRUE;
- break;
- case OPERATION_TYPE_SIGN_OUT:
- sign_out_identity (operation->manager, operation);
- processed_operation = TRUE;
- break;
- case OPERATION_TYPE_RENEW:
- renew_identity (operation->manager, operation);
- processed_operation = TRUE;
- break;
- default:
- break;
- }
-
- if (operation->result != NULL)
- {
- g_simple_async_result_complete_in_idle (operation->result);
- g_clear_object (&operation->result);
- }
- operation_free (operation);
+ g_simple_async_result_take_error (operation->result, error);
+ g_simple_async_result_complete_in_idle (operation->result);
+ g_clear_object (&operation->result);
+ goto out;
+ }
- wait_for_scheduler_job_to_become_unblocked (self);
+ if (g_cancellable_is_cancelled (operation->scheduler_cancellable))
+ goto out;
- /* Don't bother saying "Waiting for next operation" if this operation
- * was a no-op, since the debug spew probably already says the message
+ switch (operation->type)
+ {
+ case OPERATION_TYPE_REFRESH:
+ processed_operation = refresh_identities (operation->manager, operation);
+ break;
+
+ case OPERATION_TYPE_GET_IDENTITY:
+ get_identity (operation->manager, operation);
+ processed_operation = TRUE;
+ break;
+
+ case OPERATION_TYPE_LIST:
+ list_identities (operation->manager, operation);
+ processed_operation = TRUE;
+
+ /* We want to block refreshes (and their associated "added"
+ * and "removed" signals) until the caller has had
+ * a chance to look at the batch of
+ * results we already processed
*/
- if (processed_operation)
- g_debug ("GoaKerberosIdentityManager: Waiting for next operation");
+ g_assert (operation->result != NULL);
+
+ g_debug ("GoaKerberosIdentityManager: Blocking until identities list processed");
+ block_scheduler_job (self);
+ g_object_weak_ref (G_OBJECT (operation->result), (GWeakNotify) stop_blocking_scheduler_job, self);
+ g_debug ("GoaKerberosIdentityManager: Continuing");
+ break;
+
+ case OPERATION_TYPE_SIGN_IN:
+ sign_in_identity (operation->manager, operation);
+ processed_operation = TRUE;
+ break;
+
+ case OPERATION_TYPE_SIGN_OUT:
+ sign_out_identity (operation->manager, operation);
+ processed_operation = TRUE;
+ break;
+
+ case OPERATION_TYPE_RENEW:
+ renew_identity (operation->manager, operation);
+ processed_operation = TRUE;
+ break;
+
+ default:
+ break;
}
- g_async_queue_unref (pending_operations);
+ if (operation->result != NULL)
+ {
+ g_simple_async_result_complete_in_idle (operation->result);
+ g_clear_object (&operation->result);
+ }
- return FALSE;
+ wait_for_scheduler_job_to_become_unblocked (self);
+
+ /* Don't bother saying "Waiting for next operation" if this operation
+ * was a no-op, since the debug spew probably already says the message
+ */
+ if (processed_operation)
+ g_debug ("GoaKerberosIdentityManager: Waiting for next operation");
+
+ out:
+ operation_free (operation);
+ return;
}
static void
@@ -1169,7 +1152,7 @@ goa_kerberos_identity_manager_get_identity (GoaIdentityManager *manager,
operation->identifier = g_strdup (identifier);
- g_async_queue_push (self->priv->pending_operations, operation);
+ g_thread_pool_push (self->priv->thread_pool, operation, NULL);
}
static GoaIdentity *
@@ -1207,7 +1190,7 @@ goa_kerberos_identity_manager_list_identities (GoaIdentityManager *manager,
operation = operation_new (self, cancellable, OPERATION_TYPE_LIST, result);
g_object_unref (result);
- g_async_queue_push (self->priv->pending_operations, operation);
+ g_thread_pool_push (self->priv->thread_pool, operation, NULL);
}
static GList *
@@ -1248,7 +1231,7 @@ goa_kerberos_identity_manager_renew_identity (GoaIdentityManager *manager,
operation->identity = g_object_ref (identity);
- g_async_queue_push (self->priv->pending_operations, operation);
+ g_thread_pool_push (self->priv->thread_pool, operation, NULL);
}
static void
@@ -1297,7 +1280,7 @@ goa_kerberos_identity_manager_sign_identity_in (GoaIdentityManager *manager,
g_cond_init (&operation->inquiry_finished_condition);
operation->is_inquiring = FALSE;
- g_async_queue_push (self->priv->pending_operations, operation);
+ g_thread_pool_push (self->priv->thread_pool, operation, NULL);
}
static GoaIdentity *
@@ -1336,7 +1319,7 @@ goa_kerberos_identity_manager_sign_identity_out (GoaIdentityManager *manager,
operation->identity = g_object_ref (identity);
- g_async_queue_push (self->priv->pending_operations, operation);
+ g_thread_pool_push (self->priv->thread_pool, operation, NULL);
}
static void
@@ -1620,45 +1603,53 @@ initable_interface_init (GInitableIface *interface)
static void
goa_kerberos_identity_manager_init (GoaKerberosIdentityManager *self)
{
+ GError *error;
+
self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
GOA_TYPE_KERBEROS_IDENTITY_MANAGER,
GoaKerberosIdentityManagerPrivate);
self->priv->identities = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
self->priv->expired_identities = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
self->priv->identities_by_realm = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
- self->priv->pending_operations = g_async_queue_new ();
+
+ error = NULL;
+ self->priv->thread_pool = g_thread_pool_new (goa_kerberos_identity_manager_thread_pool_func,
+ NULL,
+ 1,
+ FALSE,
+ &error);
+ g_assert_no_error (error);
g_mutex_init (&self->priv->scheduler_job_lock);
g_cond_init (&self->priv->scheduler_job_unblocked);
self->priv->scheduler_cancellable = g_cancellable_new ();
- g_io_scheduler_push_job ((GIOSchedulerJobFunc)
- on_job_scheduled,
- self,
- NULL,
- G_PRIORITY_DEFAULT,
- self->priv->scheduler_cancellable);
-
+ g_cancellable_connect (self->priv->scheduler_cancellable,
+ G_CALLBACK (on_scheduler_cancellable_cancelled),
+ self,
+ NULL);
}
static void
-cancel_pending_operations (GoaKerberosIdentityManager *self)
+goa_kerberos_identity_manager_dispose (GObject *object)
{
- Operation *operation;
+ GoaKerberosIdentityManager *self = GOA_KERBEROS_IDENTITY_MANAGER (object);
- operation = g_async_queue_try_pop (self->priv->pending_operations);
- while (operation != NULL)
+ if (self->priv->scheduler_cancellable != NULL)
{
- g_cancellable_cancel (operation->cancellable);
- operation_free (operation);
- operation = g_async_queue_try_pop (self->priv->pending_operations);
+ if (!g_cancellable_is_cancelled (self->priv->scheduler_cancellable))
+ {
+ g_cancellable_cancel (self->priv->scheduler_cancellable);
+ }
+
+ g_clear_object (&self->priv->scheduler_cancellable);
}
-}
-static void
-goa_kerberos_identity_manager_dispose (GObject *object)
-{
- GoaKerberosIdentityManager *self = GOA_KERBEROS_IDENTITY_MANAGER (object);
+ if (self->priv->thread_pool != NULL)
+ {
+ g_thread_pool_free (self->priv->thread_pool, FALSE, TRUE);
+ self->priv->thread_pool = NULL;
+ }
if (self->priv->identities_by_realm != NULL)
{
@@ -1680,28 +1671,6 @@ goa_kerberos_identity_manager_dispose (GObject *object)
stop_watching_credentials_cache (self);
- if (self->priv->pending_operations != NULL)
- cancel_pending_operations (self);
-
- if (self->priv->scheduler_cancellable != NULL)
- {
- if (!g_cancellable_is_cancelled (self->priv->scheduler_cancellable))
- {
- g_cancellable_cancel (self->priv->scheduler_cancellable);
- }
-
- g_clear_object (&self->priv->scheduler_cancellable);
- }
-
- /* Note, other thread may still be holding a local reference to queue
- * while it shuts down from cancelled scheduler_cancellable above
- */
- if (self->priv->pending_operations != NULL)
- {
- g_async_queue_unref (self->priv->pending_operations);
- self->priv->pending_operations = NULL;
- }
-
G_OBJECT_CLASS (goa_kerberos_identity_manager_parent_class)->dispose (object);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]