[tracker/gdbus-porting-rebased: 39/65] libtracker-client: Reimplement / move around the old FD passing code



commit fc4227401b1df7bba87473b2b1761d9d3b4d5797
Author: Philip Van Hoof <philip codeminded be>
Date:   Wed Jan 5 12:03:42 2011 +0100

    libtracker-client: Reimplement / move around the old FD passing code

 src/libtracker-client/Makefile.am |    2 +-
 src/libtracker-client/tracker.c   |  351 ++++++++++++++++++++++++++++++++++--
 2 files changed, 332 insertions(+), 21 deletions(-)
---
diff --git a/src/libtracker-client/Makefile.am b/src/libtracker-client/Makefile.am
index 6b267ee..012f122 100644
--- a/src/libtracker-client/Makefile.am
+++ b/src/libtracker-client/Makefile.am
@@ -22,7 +22,7 @@ libtracker_clientincludedir = $(includedir)/tracker-$(TRACKER_API_VERSION)/libtr
 
 libtracker_client_ TRACKER_API_VERSION@_la_SOURCES =   \
 	tracker-sparql-builder.vala                    \
-	tracker.c
+	tracker.c                                      
 
 libtracker_client_ TRACKER_API_VERSION@_la_LIBADD =    \
 	$(top_builddir)/src/libtracker-common/libtracker-common.la \
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 9f8f739..d0c6403 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -36,6 +36,8 @@
 #include <gio/gunixinputstream.h>
 #include <gio/gunixoutputstream.h>
 
+#include <dbus/dbus-glib-bindings.h>
+
 #include <libtracker-common/tracker-dbus.h>
 
 #include "tracker.h"
@@ -113,6 +115,22 @@
  * Simple search API.
  **/
 
+typedef void (*TrackerClientSendAndSpliceCallback) (void     *buffer,
+                                                    gssize    buffer_size,
+                                                    GStrv     variable_names,
+                                                    GError   *error,
+                                                    gpointer  user_data);
+
+typedef struct {
+	GInputStream *unix_input_stream;
+	GInputStream *buffered_input_stream;
+	GOutputStream *output_stream;
+	DBusPendingCall *call;
+	TrackerClientSendAndSpliceCallback callback;
+	gpointer user_data;
+	gboolean expect_variable_names;
+} SendAndSpliceData;
+
 typedef struct {
 	DBusGConnection *connection;
 	DBusGProxy *proxy_statistics;
@@ -234,6 +252,304 @@ G_DEFINE_TYPE(TrackerClient, tracker_client, G_TYPE_OBJECT)
 
 #define TRACKER_CLIENT_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), TRACKER_TYPE_CLIENT, TrackerClientPrivate))
 
+
+static GStrv
+dbus_send_and_splice_get_variable_names (DBusMessage *message,
+                                         gboolean     copy_strings)
+{
+	GPtrArray *found;
+	DBusMessageIter iter, arr;
+
+	dbus_message_iter_init (message, &iter);
+	dbus_message_iter_recurse (&iter, &arr);
+
+	found = g_ptr_array_new ();
+
+	while (dbus_message_iter_get_arg_type (&arr) != DBUS_TYPE_INVALID) {
+		gchar *str;
+
+		dbus_message_iter_get_basic (&arr, &str);
+		g_ptr_array_add (found, copy_strings ? g_strdup (str) : str);
+		dbus_message_iter_next (&arr);
+	}
+
+	g_ptr_array_add (found, NULL);
+
+	return (GStrv) g_ptr_array_free (found, FALSE);
+}
+
+/*
+ * /!\ BIG FAT WARNING /!\
+ * The message must be destroyed for this function to succeed, so pass a
+ * message with a refcount of 1 (and say goodbye to it, 'cause you'll never
+ * see it again
+ */
+static gboolean
+tracker_client_dbus_send_and_splice (DBusConnection  *connection,
+                                     DBusMessage     *message,
+                                     int              fd,
+                                     GCancellable    *cancellable,
+                                     void           **dest_buffer,
+                                     gssize          *dest_buffer_size,
+                                     GStrv           *variable_names,
+                                     GError         **error)
+{
+	DBusPendingCall *call;
+	DBusMessage *reply = NULL;
+	GInputStream *unix_input_stream;
+	GInputStream *buffered_input_stream;
+	GOutputStream *output_stream;
+	GError *inner_error = NULL;
+	gboolean ret_value = FALSE;
+
+	g_return_val_if_fail (connection != NULL, FALSE);
+	g_return_val_if_fail (message != NULL, FALSE);
+	g_return_val_if_fail (fd > 0, FALSE);
+	g_return_val_if_fail (dest_buffer != NULL, FALSE);
+
+	dbus_connection_send_with_reply (connection,
+	                                 message,
+	                                 &call,
+	                                 -1);
+	dbus_message_unref (message);
+
+	if (!call) {
+		g_set_error (error,
+		             TRACKER_DBUS_ERROR,
+		             TRACKER_DBUS_ERROR_UNSUPPORTED,
+		             "FD passing unsupported or connection disconnected");
+		return FALSE;
+	}
+
+	unix_input_stream = g_unix_input_stream_new (fd, TRUE);
+	buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
+	                                                           TRACKER_DBUS_PIPE_BUFFER_SIZE);
+	output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+	g_output_stream_splice (output_stream,
+	                        buffered_input_stream,
+	                        G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+	                        G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+	                        cancellable,
+	                        &inner_error);
+
+	if (G_LIKELY (!inner_error)) {
+		/* Wait for any current d-bus call to finish */
+		dbus_pending_call_block (call);
+
+		/* Check we didn't get an error */
+		reply = dbus_pending_call_steal_reply (call);
+
+		if (G_UNLIKELY (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR)) {
+			DBusError dbus_error;
+
+			dbus_error_init (&dbus_error);
+			dbus_set_error_from_message (&dbus_error, reply);
+			dbus_set_g_error (error, &dbus_error);
+			dbus_error_free (&dbus_error);
+
+			/* If any error happened, we're not passing any received data, so we
+			 * need to free it */
+			g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream)));
+		} else {
+			*dest_buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream));
+
+			if (dest_buffer_size) {
+				*dest_buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (output_stream));
+			}
+
+			if (variable_names) {
+				*variable_names = dbus_send_and_splice_get_variable_names (reply, TRUE);
+			}
+
+			ret_value = TRUE;
+		}
+	} else {
+		g_set_error (error,
+		             TRACKER_DBUS_ERROR,
+		             TRACKER_DBUS_ERROR_BROKEN_PIPE,
+		             "Couldn't get results from server");
+		g_error_free (inner_error);
+		/* If any error happened, we're not passing any received data, so we
+		 * need to free it */
+		g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream)));
+	}
+
+	g_object_unref (output_stream);
+	g_object_unref (buffered_input_stream);
+	g_object_unref (unix_input_stream);
+
+	if (reply) {
+		dbus_message_unref (reply);
+	}
+
+	dbus_pending_call_unref (call);
+
+	return ret_value;
+}
+
+static SendAndSpliceData *
+send_and_splice_data_new (GInputStream                       *unix_input_stream,
+                          GInputStream                       *buffered_input_stream,
+                          GOutputStream                      *output_stream,
+                          gboolean                            expect_variable_names,
+                          DBusPendingCall                    *call,
+                          TrackerClientSendAndSpliceCallback  callback,
+                          gpointer                            user_data)
+{
+	SendAndSpliceData *data;
+
+	data = g_slice_new0 (SendAndSpliceData);
+	data->unix_input_stream = unix_input_stream;
+	data->buffered_input_stream = buffered_input_stream;
+	data->output_stream = output_stream;
+	data->call = call;
+	data->callback = callback;
+	data->user_data = user_data;
+	data->expect_variable_names = expect_variable_names;
+
+	return data;
+}
+
+static void
+send_and_splice_data_free (SendAndSpliceData *data)
+{
+	g_object_unref (data->unix_input_stream);
+	g_object_unref (data->buffered_input_stream);
+	g_object_unref (data->output_stream);
+	dbus_pending_call_unref (data->call);
+	g_slice_free (SendAndSpliceData, data);
+}
+
+static void
+send_and_splice_async_callback (GObject      *source,
+                                GAsyncResult *result,
+                                gpointer      user_data)
+{
+	SendAndSpliceData *data = user_data;
+	DBusMessage *reply = NULL;
+	GError *error = NULL;
+
+	g_output_stream_splice_finish (data->output_stream,
+	                               result,
+	                               &error);
+
+	if (G_LIKELY (!error)) {
+		dbus_pending_call_block (data->call);
+		reply = dbus_pending_call_steal_reply (data->call);
+
+		if (G_UNLIKELY (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR)) {
+			DBusError dbus_error;
+
+			/* If any error happened, we're not passing any received data, so we
+			 * need to free it */
+			g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)));
+
+			dbus_error_init (&dbus_error);
+			dbus_set_error_from_message (&dbus_error, reply);
+			dbus_set_g_error (&error, &dbus_error);
+			dbus_error_free (&dbus_error);
+
+			(* data->callback) (NULL, -1, NULL, error, data->user_data);
+
+			/* Note: GError should be freed by callback. We do this to be aligned
+			 * with the behavior of dbus-glib, where if an error happens, the
+			 * GError passed to the callback is supposed to be disposed by the
+			 * callback itself. */
+		} else {
+			GStrv v_names = NULL;
+
+			if (data->expect_variable_names) {
+				v_names = dbus_send_and_splice_get_variable_names (reply, FALSE);
+			}
+
+			dbus_pending_call_cancel (data->call);
+
+			(* data->callback) (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
+			                    g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
+			                    v_names,
+			                    NULL,
+			                    data->user_data);
+
+			/* Don't use g_strfreev here, see above */
+			g_free (v_names);
+		}
+	} else {
+		/* If any error happened, we're not passing any received data, so we
+		 * need to free it */
+		g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)));
+
+		(* data->callback) (NULL, -1, NULL, error, data->user_data);
+
+		/* Note: GError should be freed by callback. We do this to be aligned
+		 * with the behavior of dbus-glib, where if an error happens, the
+		 * GError passed to the callback is supposed to be disposed by the
+		 * callback itself. */
+	}
+
+	if (reply) {
+		dbus_message_unref (reply);
+	}
+
+	send_and_splice_data_free (data);
+}
+
+static gboolean
+tracker_client_dbus_send_and_splice_async (DBusConnection                     *connection,
+                                           DBusMessage                        *message,
+                                           int                                 fd,
+                                           gboolean                            expect_variable_names,
+                                           GCancellable                       *cancellable,
+                                           TrackerClientSendAndSpliceCallback  callback,
+                                           gpointer                            user_data)
+{
+	DBusPendingCall *call;
+	GInputStream *unix_input_stream;
+	GInputStream *buffered_input_stream;
+	GOutputStream *output_stream;
+	SendAndSpliceData *data;
+
+	g_return_val_if_fail (connection != NULL, FALSE);
+	g_return_val_if_fail (message != NULL, FALSE);
+	g_return_val_if_fail (fd > 0, FALSE);
+	g_return_val_if_fail (callback != NULL, FALSE);
+
+	dbus_connection_send_with_reply (connection,
+	                                 message,
+	                                 &call,
+	                                 -1);
+	dbus_message_unref (message);
+
+	if (!call) {
+		g_critical ("FD passing unsupported or connection disconnected");
+		return FALSE;
+	}
+
+	unix_input_stream = g_unix_input_stream_new (fd, TRUE);
+	buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
+	                                                           TRACKER_DBUS_PIPE_BUFFER_SIZE);
+	output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+	data = send_and_splice_data_new (unix_input_stream,
+	                                 buffered_input_stream,
+	                                 output_stream,
+	                                 expect_variable_names,
+	                                 call,
+	                                 callback,
+	                                 user_data);
+
+	g_output_stream_splice_async (output_stream,
+	                              buffered_input_stream,
+	                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+	                              G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+	                              0,
+	                              cancellable,
+	                              send_and_splice_async_callback,
+	                              data);
+
+	return TRUE;
+}
+
 /* This ID is shared between both fast and slow pending call hash
  * tables and is guaranteed to be unique.
  */
@@ -1742,17 +2058,15 @@ tracker_resources_sparql_query_iterate (TrackerClient  *client,
 
 	iterator = g_slice_new0 (TrackerResultIterator);
 
-#if 0
-	tracker_dbus_send_and_splice (connection,
-	                              message,
-	                              pipefd[0],
-	                              NULL,
-	                              (void **) &iterator->buffer,
-	                              &iterator->buffer_size,
-	                              NULL /* Not interested in variable_names */,
-	                              &inner_error);
+	tracker_client_dbus_send_and_splice (connection,
+	                                     message,
+	                                     pipefd[0],
+	                                     NULL,
+	                                     (void **) &iterator->buffer,
+	                                     &iterator->buffer_size,
+	                                     NULL /* Not interested in variable_names */,
+	                                     &inner_error);
 	/* message is destroyed by tracker_dbus_send_and_splice */
-#endif
 
 	if (G_UNLIKELY (inner_error)) {
 		g_propagate_error (error, inner_error);
@@ -2223,16 +2537,13 @@ tracker_resources_sparql_query_iterate_async (TrackerClient         *client,
 	                           user_data);
 	fad->iterator_callback = callback;
 
-#if 0
-	tracker_dbus_send_and_splice_async (connection,
-	                                    message,
-	                                    pipefd[0],
-	                                    TRUE,
-	                                    cancellable,
-	                                    callback_iterator,
-	                                    fad);
-
-#endif
+	tracker_client_dbus_send_and_splice_async (connection,
+	                                           message,
+	                                           pipefd[0],
+	                                           TRUE,
+	                                           cancellable,
+	                                           callback_iterator,
+	                                           fad);
 
 	return fad->request_id;
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]