[tracker/wip/carlosg/serialize-api: 36/53] libtracker-sparql: Implement serialize_async/finish in direct connection
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/carlosg/serialize-api: 36/53] libtracker-sparql: Implement serialize_async/finish in direct connection
- Date: Mon, 13 Dec 2021 01:10:25 +0000 (UTC)
commit dbc19d2e2b1f06d28bf9a292b51964f8c5055fe4
Author: Carlos Garnacho <carlosg gnome org>
Date: Sun Nov 21 00:02:06 2021 +0100
libtracker-sparql: Implement serialize_async/finish in direct connection
This allows serialization of RDF into GInputStream, currently only TTL
format.
src/libtracker-sparql/direct/tracker-direct.c | 163 ++++++++++++++++++++++++--
1 file changed, 151 insertions(+), 12 deletions(-)
---
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index a4a69d44c..649da93eb 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -29,6 +29,7 @@
#include <libtracker-data/tracker-sparql.h>
#include <libtracker-sparql/tracker-notifier-private.h>
#include <libtracker-sparql/tracker-private.h>
+#include <libtracker-sparql/tracker-serializer.h>
typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
@@ -61,6 +62,11 @@ typedef struct {
TrackerResource *resource;
} UpdateResource;
+typedef struct {
+ gchar *query;
+ TrackerRdfFormat format;
+} SerializeRdf;
+
enum {
PROP_0,
PROP_FLAGS,
@@ -78,6 +84,7 @@ typedef enum {
TASK_TYPE_UPDATE_RESOURCE,
TASK_TYPE_UPDATE_BATCH,
TASK_TYPE_RELEASE_MEMORY,
+ TASK_TYPE_SERIALIZE,
} TaskType;
typedef struct {
@@ -208,6 +215,7 @@ update_thread_func (gpointer data,
switch (task_data->type) {
case TASK_TYPE_QUERY:
+ case TASK_TYPE_SERIALIZE:
g_warning ("Queries don't go through this thread");
break;
case TASK_TYPE_UPDATE:
@@ -246,18 +254,86 @@ update_thread_func (gpointer data,
g_mutex_unlock (&priv->mutex);
}
+static void
+execute_query_in_thread (GTask *task,
+ TaskData *task_data)
+{
+ TrackerSparqlCursor *cursor;
+ GError *error = NULL;
+
+ cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
+ task_data->data,
+ g_task_get_cancellable (task),
+ &error);
+ if (cursor)
+ g_task_return_pointer (task, cursor, g_object_unref);
+ else
+ g_task_return_error (task, error);
+}
+
+static TrackerSerializerFormat
+convert_format (TrackerRdfFormat format)
+{
+ switch (format) {
+ case TRACKER_RDF_FORMAT_TURTLE:
+ return TRACKER_SERIALIZER_FORMAT_TTL;
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+static void
+serialize_in_thread (GTask *task,
+ TaskData *task_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerSparql *query = NULL;
+ TrackerSparqlCursor *cursor = NULL;
+ GInputStream *istream = NULL;
+ SerializeRdf *data = task_data->data;
+ GError *error = NULL;
+
+ conn = g_task_get_source_object (task);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ query = tracker_sparql_new (priv->data_manager, data->query);
+ if (!tracker_sparql_is_serializable (query)) {
+ g_set_error (&error,
+ TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_PARSE,
+ "Query is not DESCRIBE or CONSTRUCT");
+ goto out;
+ }
+
+ cursor = tracker_sparql_execute_cursor (query, NULL, &error);
+ tracker_direct_connection_update_timestamp (conn);
+ if (!cursor)
+ goto out;
+
+ tracker_sparql_cursor_set_connection (cursor, TRACKER_SPARQL_CONNECTION (conn));
+ istream = tracker_serializer_new (cursor, convert_format (data->format));
+
+ out:
+ g_clear_object (&query);
+ g_clear_object (&cursor);
+ g_mutex_unlock (&priv->mutex);
+
+ if (istream)
+ g_task_return_pointer (task, istream, g_object_unref);
+ else
+ g_task_return_error (task, error);
+}
+
static void
query_thread_pool_func (gpointer data,
gpointer user_data)
{
TrackerDirectConnection *conn = user_data;
TrackerDirectConnectionPrivate *priv;
- TrackerSparqlCursor *cursor;
GTask *task = data;
TaskData *task_data = g_task_get_task_data (task);
- GError *error = NULL;
-
- g_assert (task_data->type == TASK_TYPE_QUERY);
priv = tracker_direct_connection_get_instance_private (conn);
@@ -270,14 +346,16 @@ query_thread_pool_func (gpointer data,
return;
}
- cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
- task_data->data,
- g_task_get_cancellable (task),
- &error);
- if (cursor)
- g_task_return_pointer (task, cursor, g_object_unref);
- else
- g_task_return_error (task, error);
+ switch (task_data->type) {
+ case TASK_TYPE_QUERY:
+ execute_query_in_thread (task, task_data);
+ break;
+ case TASK_TYPE_SERIALIZE:
+ serialize_in_thread (task, task_data);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
g_object_unref (task);
}
@@ -1227,6 +1305,65 @@ tracker_direct_connection_lookup_dbus_service (TrackerSparqlConnection *connect
return TRUE;
}
+static SerializeRdf *
+serialize_rdf_data_new (const gchar *query,
+ TrackerRdfFormat format)
+{
+ SerializeRdf *data;
+
+ data = g_new0 (SerializeRdf, 1);
+ data->query = g_strdup (query);
+ data->format = format;
+
+ return data;
+}
+
+static void
+serialize_rdf_data_free (gpointer user_data)
+{
+ SerializeRdf *data = user_data;
+
+ g_free (data->query);
+ g_free (data);
+}
+
+static void
+tracker_direct_connection_serialize_async (TrackerSparqlConnection *self,
+ TrackerRdfFormat format,
+ const gchar *query,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GError *error = NULL;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_SERIALIZE,
+ serialize_rdf_data_new (query, format),
+ serialize_rdf_data_free),
+ (GDestroyNotify) task_data_free);
+
+ if (!g_thread_pool_push (priv->select_pool, task, &error)) {
+ g_task_return_error (task, _translate_internal_error (error));
+ g_object_unref (task);
+ }
+}
+
+static GInputStream *
+tracker_direct_connection_serialize_finish (TrackerSparqlConnection *connection,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
static void
tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
{
@@ -1262,6 +1399,8 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish;
sparql_connection_class->create_batch = tracker_direct_connection_create_batch;
sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
+ sparql_connection_class->serialize_async = tracker_direct_connection_serialize_async;
+ sparql_connection_class->serialize_finish = tracker_direct_connection_serialize_finish;
props[PROP_FLAGS] =
g_param_spec_flags ("flags",
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]