[tracker/gdbus-porting-rebased: 39/65] libtracker-client: Reimplement / move around the old FD passing code
- From: Philip Van Hoof <pvanhoof src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/gdbus-porting-rebased: 39/65] libtracker-client: Reimplement / move around the old FD passing code
- Date: Wed, 12 Jan 2011 11:19:03 +0000 (UTC)
commit fc4227401b1df7bba87473b2b1761d9d3b4d5797
Author: Philip Van Hoof <philip codeminded be>
Date: Wed Jan 5 12:03:42 2011 +0100
libtracker-client: Reimplement / move around the old FD passing code
src/libtracker-client/Makefile.am | 2 +-
src/libtracker-client/tracker.c | 351 ++++++++++++++++++++++++++++++++++--
2 files changed, 332 insertions(+), 21 deletions(-)
---
diff --git a/src/libtracker-client/Makefile.am b/src/libtracker-client/Makefile.am
index 6b267ee..012f122 100644
--- a/src/libtracker-client/Makefile.am
+++ b/src/libtracker-client/Makefile.am
@@ -22,7 +22,7 @@ libtracker_clientincludedir = $(includedir)/tracker-$(TRACKER_API_VERSION)/libtr
libtracker_client_ TRACKER_API_VERSION@_la_SOURCES = \
tracker-sparql-builder.vala \
- tracker.c
+ tracker.c
libtracker_client_ TRACKER_API_VERSION@_la_LIBADD = \
$(top_builddir)/src/libtracker-common/libtracker-common.la \
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 9f8f739..d0c6403 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -36,6 +36,8 @@
#include <gio/gunixinputstream.h>
#include <gio/gunixoutputstream.h>
+#include <dbus/dbus-glib-bindings.h>
+
#include <libtracker-common/tracker-dbus.h>
#include "tracker.h"
@@ -113,6 +115,22 @@
* Simple search API.
**/
+typedef void (*TrackerClientSendAndSpliceCallback) (void *buffer,
+ gssize buffer_size,
+ GStrv variable_names,
+ GError *error,
+ gpointer user_data);
+
+typedef struct {
+ GInputStream *unix_input_stream;
+ GInputStream *buffered_input_stream;
+ GOutputStream *output_stream;
+ DBusPendingCall *call;
+ TrackerClientSendAndSpliceCallback callback;
+ gpointer user_data;
+ gboolean expect_variable_names;
+} SendAndSpliceData;
+
typedef struct {
DBusGConnection *connection;
DBusGProxy *proxy_statistics;
@@ -234,6 +252,304 @@ G_DEFINE_TYPE(TrackerClient, tracker_client, G_TYPE_OBJECT)
#define TRACKER_CLIENT_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), TRACKER_TYPE_CLIENT, TrackerClientPrivate))
+
+static GStrv
+dbus_send_and_splice_get_variable_names (DBusMessage *message,
+ gboolean copy_strings)
+{
+ GPtrArray *found;
+ DBusMessageIter iter, arr;
+
+ dbus_message_iter_init (message, &iter);
+ dbus_message_iter_recurse (&iter, &arr);
+
+ found = g_ptr_array_new ();
+
+ while (dbus_message_iter_get_arg_type (&arr) != DBUS_TYPE_INVALID) {
+ gchar *str;
+
+ dbus_message_iter_get_basic (&arr, &str);
+ g_ptr_array_add (found, copy_strings ? g_strdup (str) : str);
+ dbus_message_iter_next (&arr);
+ }
+
+ g_ptr_array_add (found, NULL);
+
+ return (GStrv) g_ptr_array_free (found, FALSE);
+}
+
+/*
+ * /!\ BIG FAT WARNING /!\
+ * The message must be destroyed for this function to succeed, so pass a
+ * message with a refcount of 1 (and say goodbye to it, 'cause you'll never
+ * see it again
+ */
+static gboolean
+tracker_client_dbus_send_and_splice (DBusConnection *connection,
+ DBusMessage *message,
+ int fd,
+ GCancellable *cancellable,
+ void **dest_buffer,
+ gssize *dest_buffer_size,
+ GStrv *variable_names,
+ GError **error)
+{
+ DBusPendingCall *call;
+ DBusMessage *reply = NULL;
+ GInputStream *unix_input_stream;
+ GInputStream *buffered_input_stream;
+ GOutputStream *output_stream;
+ GError *inner_error = NULL;
+ gboolean ret_value = FALSE;
+
+ g_return_val_if_fail (connection != NULL, FALSE);
+ g_return_val_if_fail (message != NULL, FALSE);
+ g_return_val_if_fail (fd > 0, FALSE);
+ g_return_val_if_fail (dest_buffer != NULL, FALSE);
+
+ dbus_connection_send_with_reply (connection,
+ message,
+ &call,
+ -1);
+ dbus_message_unref (message);
+
+ if (!call) {
+ g_set_error (error,
+ TRACKER_DBUS_ERROR,
+ TRACKER_DBUS_ERROR_UNSUPPORTED,
+ "FD passing unsupported or connection disconnected");
+ return FALSE;
+ }
+
+ unix_input_stream = g_unix_input_stream_new (fd, TRUE);
+ buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
+ TRACKER_DBUS_PIPE_BUFFER_SIZE);
+ output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+ g_output_stream_splice (output_stream,
+ buffered_input_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ cancellable,
+ &inner_error);
+
+ if (G_LIKELY (!inner_error)) {
+ /* Wait for any current d-bus call to finish */
+ dbus_pending_call_block (call);
+
+ /* Check we didn't get an error */
+ reply = dbus_pending_call_steal_reply (call);
+
+ if (G_UNLIKELY (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR)) {
+ DBusError dbus_error;
+
+ dbus_error_init (&dbus_error);
+ dbus_set_error_from_message (&dbus_error, reply);
+ dbus_set_g_error (error, &dbus_error);
+ dbus_error_free (&dbus_error);
+
+ /* If any error happened, we're not passing any received data, so we
+ * need to free it */
+ g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream)));
+ } else {
+ *dest_buffer = g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream));
+
+ if (dest_buffer_size) {
+ *dest_buffer_size = g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (output_stream));
+ }
+
+ if (variable_names) {
+ *variable_names = dbus_send_and_splice_get_variable_names (reply, TRUE);
+ }
+
+ ret_value = TRUE;
+ }
+ } else {
+ g_set_error (error,
+ TRACKER_DBUS_ERROR,
+ TRACKER_DBUS_ERROR_BROKEN_PIPE,
+ "Couldn't get results from server");
+ g_error_free (inner_error);
+ /* If any error happened, we're not passing any received data, so we
+ * need to free it */
+ g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (output_stream)));
+ }
+
+ g_object_unref (output_stream);
+ g_object_unref (buffered_input_stream);
+ g_object_unref (unix_input_stream);
+
+ if (reply) {
+ dbus_message_unref (reply);
+ }
+
+ dbus_pending_call_unref (call);
+
+ return ret_value;
+}
+
+static SendAndSpliceData *
+send_and_splice_data_new (GInputStream *unix_input_stream,
+ GInputStream *buffered_input_stream,
+ GOutputStream *output_stream,
+ gboolean expect_variable_names,
+ DBusPendingCall *call,
+ TrackerClientSendAndSpliceCallback callback,
+ gpointer user_data)
+{
+ SendAndSpliceData *data;
+
+ data = g_slice_new0 (SendAndSpliceData);
+ data->unix_input_stream = unix_input_stream;
+ data->buffered_input_stream = buffered_input_stream;
+ data->output_stream = output_stream;
+ data->call = call;
+ data->callback = callback;
+ data->user_data = user_data;
+ data->expect_variable_names = expect_variable_names;
+
+ return data;
+}
+
+static void
+send_and_splice_data_free (SendAndSpliceData *data)
+{
+ g_object_unref (data->unix_input_stream);
+ g_object_unref (data->buffered_input_stream);
+ g_object_unref (data->output_stream);
+ dbus_pending_call_unref (data->call);
+ g_slice_free (SendAndSpliceData, data);
+}
+
+static void
+send_and_splice_async_callback (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ SendAndSpliceData *data = user_data;
+ DBusMessage *reply = NULL;
+ GError *error = NULL;
+
+ g_output_stream_splice_finish (data->output_stream,
+ result,
+ &error);
+
+ if (G_LIKELY (!error)) {
+ dbus_pending_call_block (data->call);
+ reply = dbus_pending_call_steal_reply (data->call);
+
+ if (G_UNLIKELY (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR)) {
+ DBusError dbus_error;
+
+ /* If any error happened, we're not passing any received data, so we
+ * need to free it */
+ g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)));
+
+ dbus_error_init (&dbus_error);
+ dbus_set_error_from_message (&dbus_error, reply);
+ dbus_set_g_error (&error, &dbus_error);
+ dbus_error_free (&dbus_error);
+
+ (* data->callback) (NULL, -1, NULL, error, data->user_data);
+
+ /* Note: GError should be freed by callback. We do this to be aligned
+ * with the behavior of dbus-glib, where if an error happens, the
+ * GError passed to the callback is supposed to be disposed by the
+ * callback itself. */
+ } else {
+ GStrv v_names = NULL;
+
+ if (data->expect_variable_names) {
+ v_names = dbus_send_and_splice_get_variable_names (reply, FALSE);
+ }
+
+ dbus_pending_call_cancel (data->call);
+
+ (* data->callback) (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
+ g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (data->output_stream)),
+ v_names,
+ NULL,
+ data->user_data);
+
+ /* Don't use g_strfreev here, see above */
+ g_free (v_names);
+ }
+ } else {
+ /* If any error happened, we're not passing any received data, so we
+ * need to free it */
+ g_free (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (data->output_stream)));
+
+ (* data->callback) (NULL, -1, NULL, error, data->user_data);
+
+ /* Note: GError should be freed by callback. We do this to be aligned
+ * with the behavior of dbus-glib, where if an error happens, the
+ * GError passed to the callback is supposed to be disposed by the
+ * callback itself. */
+ }
+
+ if (reply) {
+ dbus_message_unref (reply);
+ }
+
+ send_and_splice_data_free (data);
+}
+
+static gboolean
+tracker_client_dbus_send_and_splice_async (DBusConnection *connection,
+ DBusMessage *message,
+ int fd,
+ gboolean expect_variable_names,
+ GCancellable *cancellable,
+ TrackerClientSendAndSpliceCallback callback,
+ gpointer user_data)
+{
+ DBusPendingCall *call;
+ GInputStream *unix_input_stream;
+ GInputStream *buffered_input_stream;
+ GOutputStream *output_stream;
+ SendAndSpliceData *data;
+
+ g_return_val_if_fail (connection != NULL, FALSE);
+ g_return_val_if_fail (message != NULL, FALSE);
+ g_return_val_if_fail (fd > 0, FALSE);
+ g_return_val_if_fail (callback != NULL, FALSE);
+
+ dbus_connection_send_with_reply (connection,
+ message,
+ &call,
+ -1);
+ dbus_message_unref (message);
+
+ if (!call) {
+ g_critical ("FD passing unsupported or connection disconnected");
+ return FALSE;
+ }
+
+ unix_input_stream = g_unix_input_stream_new (fd, TRUE);
+ buffered_input_stream = g_buffered_input_stream_new_sized (unix_input_stream,
+ TRACKER_DBUS_PIPE_BUFFER_SIZE);
+ output_stream = g_memory_output_stream_new (NULL, 0, g_realloc, NULL);
+
+ data = send_and_splice_data_new (unix_input_stream,
+ buffered_input_stream,
+ output_stream,
+ expect_variable_names,
+ call,
+ callback,
+ user_data);
+
+ g_output_stream_splice_async (output_stream,
+ buffered_input_stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ 0,
+ cancellable,
+ send_and_splice_async_callback,
+ data);
+
+ return TRUE;
+}
+
/* This ID is shared between both fast and slow pending call hash
* tables and is guaranteed to be unique.
*/
@@ -1742,17 +2058,15 @@ tracker_resources_sparql_query_iterate (TrackerClient *client,
iterator = g_slice_new0 (TrackerResultIterator);
-#if 0
- tracker_dbus_send_and_splice (connection,
- message,
- pipefd[0],
- NULL,
- (void **) &iterator->buffer,
- &iterator->buffer_size,
- NULL /* Not interested in variable_names */,
- &inner_error);
+ tracker_client_dbus_send_and_splice (connection,
+ message,
+ pipefd[0],
+ NULL,
+ (void **) &iterator->buffer,
+ &iterator->buffer_size,
+ NULL /* Not interested in variable_names */,
+ &inner_error);
/* message is destroyed by tracker_dbus_send_and_splice */
-#endif
if (G_UNLIKELY (inner_error)) {
g_propagate_error (error, inner_error);
@@ -2223,16 +2537,13 @@ tracker_resources_sparql_query_iterate_async (TrackerClient *client,
user_data);
fad->iterator_callback = callback;
-#if 0
- tracker_dbus_send_and_splice_async (connection,
- message,
- pipefd[0],
- TRUE,
- cancellable,
- callback_iterator,
- fad);
-
-#endif
+ tracker_client_dbus_send_and_splice_async (connection,
+ message,
+ pipefd[0],
+ TRUE,
+ cancellable,
+ callback_iterator,
+ fad);
return fad->request_id;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]