[ostree/wip/async-pull] [wip] Asynchronous metadata fetch
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree/wip/async-pull] [wip] Asynchronous metadata fetch
- Date: Thu, 6 Sep 2012 21:59:40 +0000 (UTC)
commit 177964346c734a79d7824de518631919f0d58d09
Author: Colin Walters <walters verbum org>
Date: Tue Aug 28 09:41:09 2012 -0400
[wip] Asynchronous metadata fetch
src/ostree/ostree-pull.c | 512 +++++++++++++++++++++++++++++++++++-----------
1 files changed, 390 insertions(+), 122 deletions(-)
---
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index 0115b8b..b9ed448 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -66,6 +66,7 @@
#include "ostree-fetcher.h"
+
gboolean verbose;
gint opt_packfile_threshold = 66;
gboolean opt_related;
@@ -92,11 +93,17 @@ typedef struct {
GHashTable *file_checksums_to_fetch;
GMainLoop *loop;
+ GCancellable *cancellable;
/* Used in meta fetch phase */
guint outstanding_uri_requests;
guint outstanding_meta_requests;
+ GThread *metadata_scan_thread;
+ GAsyncQueue *metadata_objects_to_scan;
+
+ GHashTable *commit_to_depth; /* Maps commit checksum to depth we should fetch */
+
/* Used in content fetch phase */
guint outstanding_filemeta_requests;
guint outstanding_filecontent_requests;
@@ -115,6 +122,17 @@ suburi_new (SoupURI *base,
const char *first,
...) G_GNUC_NULL_TERMINATED;
+static gboolean scan_one_metadata_object (OtPullData *pull_data,
+ const guchar *csum,
+ OstreeObjectType objtype,
+ GCancellable *cancellable,
+ GError **error);
+static gboolean scan_one_metadata_object_v_name (OtPullData *pull_data,
+ GVariant *object,
+ GCancellable *cancellable,
+ GError **error);
+
+
static SoupURI *
suburi_new (SoupURI *base,
const char *first,
@@ -188,16 +206,9 @@ uri_fetch_update_status (gpointer user_data)
}
static void
-check_outstanding_requests_handle_error (OtPullData *pull_data,
- GError *error)
+throw_async_error (OtPullData *pull_data,
+ GError *error)
{
- if (pull_data->outstanding_uri_requests == 0 &&
- pull_data->outstanding_meta_requests == 0 &&
- pull_data->outstanding_filemeta_requests == 0 &&
- pull_data->outstanding_filecontent_requests == 0 &&
- pull_data->outstanding_checksum_requests == 0 &&
- (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
- g_main_loop_quit (pull_data->loop);
if (error)
{
if (!pull_data->caught_error)
@@ -214,6 +225,20 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
}
static void
+check_outstanding_requests_handle_error (OtPullData *pull_data,
+ GError *error)
+{
+ if (pull_data->outstanding_uri_requests == 0 &&
+ pull_data->outstanding_meta_requests == 0 &&
+ pull_data->outstanding_filemeta_requests == 0 &&
+ pull_data->outstanding_filecontent_requests == 0 &&
+ pull_data->outstanding_checksum_requests == 0 &&
+ (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
+ g_main_loop_quit (pull_data->loop);
+ throw_async_error (pull_data, error);
+}
+
+static void
run_mainloop_monitor_fetcher (OtPullData *pull_data)
{
GSource *update_timeout = NULL;
@@ -726,10 +751,6 @@ fetch_and_store_metadata (OtPullData *pull_data,
goto out;
}
- if (!ostree_repo_load_variant (pull_data->repo, objtype, checksum,
- &ret_variant, error))
- goto out;
-
ret = TRUE;
ot_transfer_out_value (out_variant, &ret_variant);
out:
@@ -741,11 +762,11 @@ fetch_and_store_metadata (OtPullData *pull_data,
}
static gboolean
-fetch_and_store_tree_metadata_recurse (OtPullData *pull_data,
- int depth,
- const char *rev,
- GCancellable *cancellable,
- GError **error)
+scan_dirtree_object (OtPullData *pull_data,
+ int depth,
+ const char *checksum,
+ GCancellable *cancellable,
+ GError **error)
{
gboolean ret = FALSE;
int i, n;
@@ -762,8 +783,8 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data,
goto out;
}
- if (!fetch_and_store_metadata (pull_data, rev, OSTREE_OBJECT_TYPE_DIR_TREE,
- &tree, cancellable, error))
+ if (!ostree_repo_load_variant (pull_data->repo, OSTREE_OBJECT_TYPE_DIR_TREE, checksum,
+ &tree, error))
goto out;
/* PARSE OSTREE_SERIALIZED_TREE_VARIANT */
@@ -792,6 +813,7 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data,
for (i = 0; i < n; i++)
{
const char *dirname;
+ gboolean is_stored;
ot_lvariant GVariant *tree_csum = NULL;
ot_lvariant GVariant *meta_csum = NULL;
ot_lfree char *tmp_checksum = NULL;
@@ -802,15 +824,14 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data,
if (!ot_util_filename_validate (dirname, error))
goto out;
- g_free (tmp_checksum);
- tmp_checksum = ostree_checksum_from_bytes_v (meta_csum);
- if (!fetch_and_store_metadata (pull_data, tmp_checksum, OSTREE_OBJECT_TYPE_DIR_META,
- NULL, cancellable, error))
+ if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_csum),
+ OSTREE_OBJECT_TYPE_DIR_TREE,
+ cancellable, error))
goto out;
-
- g_free (tmp_checksum);
- tmp_checksum = ostree_checksum_from_bytes_v (tree_csum);
- if (!fetch_and_store_tree_metadata_recurse (pull_data, depth+1, tmp_checksum, cancellable, error))
+
+ if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (meta_csum),
+ OSTREE_OBJECT_TYPE_DIR_META,
+ cancellable, error))
goto out;
}
@@ -820,94 +841,6 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data,
}
static gboolean
-fetch_and_store_commit_metadata_recurse (OtPullData *pull_data,
- int parent_depth,
- int related_depth,
- const char *rev,
- GCancellable *cancellable,
- GError **error)
-{
- gboolean ret = FALSE;
- ot_lvariant GVariant *commit = NULL;
- ot_lvariant GVariant *related_objects = NULL;
- ot_lvariant GVariant *tree_contents_csum = NULL;
- ot_lvariant GVariant *tree_meta_csum = NULL;
- ot_lfree char *tmp_checksum = NULL;
- GVariantIter *iter = NULL;
-
- if (!fetch_and_store_metadata (pull_data, rev, OSTREE_OBJECT_TYPE_COMMIT,
- &commit, cancellable, error))
- goto out;
-
- /* PARSE OSTREE_SERIALIZED_COMMIT_VARIANT */
- g_variant_get_child (commit, 6, "@ay", &tree_contents_csum);
- g_variant_get_child (commit, 7, "@ay", &tree_meta_csum);
-
- g_free (tmp_checksum);
- tmp_checksum = ostree_checksum_from_bytes_v (tree_meta_csum);
- if (!fetch_and_store_metadata (pull_data, tmp_checksum, OSTREE_OBJECT_TYPE_DIR_META,
- NULL, cancellable, error))
- goto out;
-
- g_free (tmp_checksum);
- tmp_checksum = ostree_checksum_from_bytes_v (tree_contents_csum);
- if (!fetch_and_store_tree_metadata_recurse (pull_data, 0, tmp_checksum,
- cancellable, error))
- goto out;
-
- if (opt_related)
- {
- const char *name;
- ot_lvariant GVariant *csum_v = NULL;
-
- if (parent_depth > OSTREE_MAX_RECURSION
- || related_depth > OSTREE_MAX_RECURSION)
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Exceeded maximum recursion");
- goto out;
- }
-
- related_objects = g_variant_get_child_value (commit, 2);
- iter = g_variant_iter_new (related_objects);
-
- while (g_variant_iter_loop (iter, "(&s ay)", &name, &csum_v))
- {
- ot_lfree char *checksum = ostree_checksum_from_bytes_v (csum_v);
-
- /* Pass opt_depth here to ensure we aren't fetching parents of related */
- if (!fetch_and_store_commit_metadata_recurse (pull_data, opt_depth,
- related_depth + 1, checksum,
- cancellable, error))
- goto out;
- }
- }
-
- if (parent_depth < opt_depth)
- {
- ot_lvariant GVariant *parent_csum_v = NULL;
-
- parent_csum_v = g_variant_get_child_value (commit, 1);
-
- if (g_variant_n_children (parent_csum_v) > 0)
- {
- ot_lfree char *checksum = ostree_checksum_from_bytes_v (parent_csum_v);
-
- if (!fetch_and_store_commit_metadata_recurse (pull_data, parent_depth + 1,
- 0, checksum,
- cancellable, error))
- goto out;
- }
- }
-
- ret = TRUE;
- out:
- if (iter)
- g_variant_iter_free (iter);
- return ret;
-}
-
-static gboolean
fetch_ref_contents (OtPullData *pull_data,
const char *ref,
char **out_contents,
@@ -1366,6 +1299,330 @@ fetch_content (OtPullData *pull_data,
return ret;
}
+typedef struct {
+ OtPullData *pull_data;
+ GVariant *object;
+} IdleFetchMetadataObjectData;
+
+static void
+meta_fetch_on_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ IdleFetchMetadataObjectData *fetch_data = user_data;
+ OtPullData *pull_data = fetch_data->pull_data;
+ ot_lobj GFile *temp_path = NULL;
+ ot_lobj GInputStream *input = NULL;
+ const char *checksum;
+ OstreeObjectType objtype;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+
+ temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+ if (!temp_path)
+ goto out;
+
+ ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
+
+ input = g_file_read (input, pull_data->cancellable, error);
+ if (!input)
+ goto out;
+
+ if (!ostree_repo_stage_object (pull_data->repo, objtype, checksum, input,
+ pull_data->cancellable, error))
+ goto out;
+
+ g_async_queue_push (pull_data->metadata_objects_to_scan,
+ g_variant_ref (fetch_data->object));
+
+ out:
+ (void) ot_gfile_unlink (temp_path, NULL, NULL);
+ throw_async_error (local_error);
+ g_variant_unref (fetch_data->object);
+ g_free (fetch_data);
+}
+
+static gboolean
+idle_fetch_metadata_object (gpointer data)
+{
+ IdleFetchMetadataObjectData *fetch_data = data;
+ ot_lfree char *objpath = NULL;
+ const char *checksum;
+ OstreeObjectType objtype;
+ SoupURI *obj_uri = NULL;
+ OtFetchOneMetaItemData *one_item_data;
+
+ ostree_object_name_deserialize (object, &checksum, &objtype);
+
+ objpath = ostree_get_relative_object_path (checksum, objtype);
+ obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
+
+ ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
+ meta_fetch_on_complete, fetch_data);
+ soup_uri_free (obj_uri);
+
+ return FALSE;
+}
+
+/**
+ * queue_metadata_object_fetch:
+ *
+ * Pass a request to the main thread to fetch a metadata object.
+ */
+static void
+queue_metadata_object_fetch (OtPullData *pull_data,
+ GVariant *object)
+{
+ IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData);
+ fetch_data->pull_data = pull_data;
+ fetch_data->object = g_variant_ref (object);
+ g_idle_add (idle_fetch_metadata_object, fetch_data);
+}
+
+/**
+ * queue_metadata_object_fetch_parsed:
+ *
+ * Like queue_metadata_object_fetch(), but also check to see if
+ * we've already scanned the object. If so, avoid the
+ * fetch request.
+ */
+static void
+queue_metadata_object_fetch_parsed (OtPullData *pull_data,
+ const char *checksum,
+ OstreeObjectType objtype)
+{
+ IdleFetchMetadataObjectData *fetch_data;
+ ot_lvariant GVariant *serialized = ostree_object_name_serialize (checksum, objtype);
+
+ if (g_hash_table_lookup (pull_data->scanned_metadata, serialized))
+ return;
+
+ fetch_data = g_new (IdleFetchMetadataObjectData);
+ fetch_data->pull_data = pull_data;
+ fetch_data->object = serialized;
+ serialized = NULL; /* Transfer ownership */
+ g_idle_add (idle_fetch_metadata_object, fetch_data);
+}
+
+static gboolean
+scan_commit_object (OtPullData *pull_data,
+ guint recursion_depth,
+ const char *checksum,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret = FALSE;
+ gboolean is_stored;
+ gboolean depth_found;
+ gpointer depth_ptr;
+ guint depth, traversed;
+ ot_lvariant GVariant *commit = NULL;
+ ot_lvariant GVariant *related_objects = NULL;
+ ot_lvariant GVariant *tree_contents_csum = NULL;
+ ot_lvariant GVariant *tree_meta_csum = NULL;
+ ot_lfree char *tmp_checksum = NULL;
+ GVariantIter *iter = NULL;
+
+ if (recursion_depth > OSTREE_MAX_RECURSION)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Exceeded maximum recursion");
+ goto out;
+ }
+
+ if (!ostree_repo_load_variant (pull_data->repo, objtype, checksum,
+ &commit, error))
+ goto out;
+
+ /* PARSE OSTREE_SERIALIZED_COMMIT_VARIANT */
+ g_variant_get_child (commit, 6, "@ay", &tree_contents_csum);
+ g_variant_get_child (commit, 7, "@ay", &tree_meta_csum);
+
+ if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_contents_csum),
+ OSTREE_OBJECT_TYPE_DIR_TREE,
+ cancellable, error))
+ goto out;
+
+ if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_meta_csum),
+ OSTREE_OBJECT_TYPE_DIR_META,
+ cancellable, error))
+ goto out;
+
+ traversed = 0;
+ while (traversed < opt_depth)
+ {
+ traversed++;
+ depth_found = g_hash_table_lookup_extended (pull_data->commit_to_depth, checksum, NULL,
+ &depth_ptr);
+ if (depth_found)
+ break;
+ }
+ if (depth_found)
+ avail_depth = GPOINTER_TO_UINT (depth_ptr);
+ else
+ avail_depth = opt_depth;
+
+ if (traversed <= avail_depth)
+ {
+ ot_lvariant GVariant *parent_csum_v = NULL;
+
+ parent_csum_v = g_variant_get_child_value (commit, 1);
+
+ if (g_variant_n_children (parent_csum_v) > 0)
+ {
+ g_hash_table_insert (pull_data->commit_to_depth,
+ ostree_checksum_from_bytes_v (parent_csum_v),
+ GUINT_TO_POINTER (avail_depth - traversed));
+ if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (parent_csum_v),
+ OSTREE_OBJECT_TYPE_COMMIT,
+ cancellable, error))
+ goto out;
+ }
+ }
+
+ if (opt_related)
+ {
+ const char *name;
+ ot_lvariant GVariant *csum_v = NULL;
+
+ related_objects = g_variant_get_child_value (commit, 2);
+ iter = g_variant_iter_new (related_objects);
+
+ while (g_variant_iter_loop (iter, "(&s ay)", &name, &csum_v))
+ {
+ g_hash_table_insert (pull_data->commit_to_depth,
+ ostree_checksum_from_bytes_v (csum_v),
+ GUINT_TO_POINTER (avail_depth - traversed));
+ if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (csum_v),
+ OSTREE_OBJECT_TYPE_COMMIT,
+ cancellable, error))
+ goto out;
+ }
+ }
+
+ ret = TRUE;
+ out:
+ if (iter)
+ g_variant_iter_free (iter);
+ return ret;
+}
+
+static gboolean
+scan_one_metadata_object (OtPullData *pull_data,
+ const guchar *csum,
+ OstreeObjectType objtype,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret = FALSE;
+ ot_lvariant GVariant *object = NULL;
+ ot_lfree char *tmp_checksum = NULL;
+ const char *checksum;
+ OstreeObjectType objtype;
+ gboolean is_stored;
+
+ tmp_checksum = ostree_checksum_from_bytes (csum);
+ object = ostree_object_name_serialize (tmp_checksum, objtype);
+
+ if (g_hash_table_lookup (pull_data->scanned_metadata, object))
+ return TRUE;
+
+ if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored,
+ cancellable, error))
+ goto out;
+
+ if (!is_stored)
+ {
+ queue_metadata_object_fetch (pull_data, object);
+ }
+ else
+ {
+ switch (objtype)
+ {
+ case OSTREE_OBJECT_TYPE_COMMIT:
+ if (!scan_commit_object (pull_data, 0, tmp_checksum, pull_data->cancellable,
+ error))
+ goto out;
+ break;
+ case OSTREE_OBJECT_TYPE_DIR_META:
+ break;
+ case OSTREE_OBJECT_TYPE_DIR_TREE:
+ if (!scan_dirtree_object (pull_data, tmp_checksum, pull_data->cancellable,
+ error))
+ goto out;
+ break;
+ case OSTREE_OBJECT_TYPE_FILE:
+ g_assert_not_reached ();
+ break;
+ }
+
+ }
+
+ g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object));
+
+ ret = TRUE;
+ out:
+ return ret;
+
+}
+
+static gboolean
+scan_one_metadata_object_v_name (OtPullData *pull_data,
+ GVariant *object,
+ GCancellable *cancellable,
+ GError **error)
+{
+ const char *checksum;
+ ot_lfree guchar *csum = NULL;
+ OstreeObjectType objtype;
+
+ ostree_object_name_deserialize (item, &checksum, &objtype);
+ csum = ostree_checksum_from_bytes
+
+ if (!scan_one_metadata_object (pull_data,
+
+}
+
+/**
+ * scan_metadata_thread_main:
+ *
+ * This thread starts from the given commit objects put into the
+ * queue, traversing them.
+ *
+ * If we're missing an object from one of
+ * them, we queue a request to the main thread to fetch it. When it's
+ * fetched, we get passed the object back and scan it.
+ */
+static gpointer
+scan_metadata_thread_main (gpointer user_data)
+{
+ OtPullData *pull_data = user_data;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+ ot_lhash GHashTable *scanned_metadata = NULL;
+
+ scanned_metadata = g_hash_table_new (ostree_hash_object_name, g_variant_equal,
+ (GDestroyNotify)g_variant_unref, NULL);
+
+ while (TRUE)
+ {
+ ot_lvariant GVariant *item = NULL;
+
+ item = g_async_queue_pop (pull_data->metadata_objects_to_scan);
+
+ if (item == NULL)
+ break;
+
+ if (!scan_one_metadata_object (pull_data, item, pull_data->cancellable, error))
+ goto out;
+ }
+
+ out:
+ if (local_error)
+ g_idle_add (idle_throw_error, local_error);
+ return NULL;
+}
+
static gboolean
parse_ref_summary (const char *contents,
GHashTable **out_refs,
@@ -1502,6 +1759,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
pull_data->repo = repo;
pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
+ pull_data->commit_to_depth = g_hash_table_new (g_str_hash, g_str_equal, g_free, NULL);
+
if (argc < 2)
{
ot_util_usage_error (context, "REMOTE must be specified", error);
@@ -1526,6 +1785,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
goto out;
}
+ pull_data->metadata_objects_to_scan = g_async_queue_new ();
+
requested_refs_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
updated_refs = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
commits_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
@@ -1610,15 +1871,17 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
goto out;
g_print ("Analyzing objects needed...\n");
+
+ g_thread_new ("metadatascan", scan_metadata_thread_main, pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *commit = value;
-
- if (!fetch_and_store_commit_metadata_recurse (pull_data, 0, 0, commit,
- cancellable, error))
- goto out;
+
+ g_hash_table_insert (pull_data->commit_to_depth, g_strdup (commit), opt_depth);
+ g_async_queue_push (pull_data->metadata_objects_to_scan,
+ ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT));
}
g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
@@ -1649,8 +1912,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
if (!ostree_validate_checksum_string (sha256, error))
goto out;
- if (!fetch_and_store_commit_metadata_recurse (pull_data, 0, 0, sha256, cancellable, error))
- goto out;
+ g_hash_table_insert (pull_data->commit_to_depth, g_strdup (sha256), opt_depth);
+ g_async_queue_push (pull_data->metadata_objects_to_scan,
+ ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT));
g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
}
@@ -1698,6 +1962,10 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
g_free (pull_data->remote_name);
if (pull_data->base_uri)
soup_uri_free (pull_data->base_uri);
+ if (pull_data->metadata_scan_thread)
+ g_thread_join (pull_data->metadata_scan_thread);
+ if (pull_data->metadata_objects_to_scan)
+ g_async_queue_unref (pull_data->metadata_objects_to_scan);
g_clear_pointer (&pull_data->file_checksums_to_fetch, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->cached_meta_pack_indexes, (GDestroyNotify) g_ptr_array_unref);
g_clear_pointer (&pull_data->cached_data_pack_indexes, (GDestroyNotify) g_ptr_array_unref);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]