[libsoup/giobased: 2/11] Use gio instead of soup_socket_read*
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/giobased: 2/11] Use gio instead of soup_socket_read*
- Date: Tue, 22 Mar 2011 13:06:48 +0000 (UTC)
commit e4b5ed759235423ee736790f623c7235be77234a
Author: Dan Winship <danw gnome org>
Date: Wed Dec 8 18:19:27 2010 +0100
Use gio instead of soup_socket_read*
libsoup/Makefile.am | 2 +
libsoup/soup-input-stream.c | 228 +++++++++++++++++++++++++++++++++++++++++++
libsoup/soup-input-stream.h | 55 ++++++++++
libsoup/soup-message-io.c | 156 ++++++++++++++++++-----------
4 files changed, 383 insertions(+), 58 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index a26d820..aec7948 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -139,6 +139,8 @@ libsoup_2_4_la_SOURCES = \
soup-headers.c \
soup-http-input-stream.h \
soup-http-input-stream.c \
+ soup-input-stream.h \
+ soup-input-stream.c \
soup-logger.c \
soup-message.c \
soup-message-body.c \
diff --git a/libsoup/soup-input-stream.c b/libsoup/soup-input-stream.c
new file mode 100644
index 0000000..3f334df
--- /dev/null
+++ b/libsoup/soup-input-stream.c
@@ -0,0 +1,228 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-input-stream.c
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <string.h>
+#include <gio/gio.h>
+
+#include "soup-input-stream.h"
+
+struct _SoupInputStreamPrivate {
+ GInputStream *base_stream;
+ GByteArray *read_buf;
+};
+
+static void soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_input_stream_pollable_init))
+
+static void
+soup_input_stream_init (SoupInputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+ SOUP_TYPE_INPUT_STREAM,
+ SoupInputStreamPrivate);
+}
+
+static void
+constructed (GObject *object)
+{
+ SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
+
+ sstream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (sstream));
+}
+
+static void
+finalize (GObject *object)
+{
+ SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
+
+ if (sstream->priv->read_buf)
+ g_byte_array_free (sstream->priv->read_buf, TRUE);
+
+ G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize (object);
+}
+
+static gssize
+read_from_buf (SoupInputStream *sstream, gpointer buffer, gsize count)
+{
+ GByteArray *read_buf = sstream->priv->read_buf;
+
+ if (read_buf->len < count)
+ count = read_buf->len;
+ memcpy (buffer, read_buf->data, count);
+
+ if (count == read_buf->len) {
+ g_byte_array_free (read_buf, TRUE);
+ sstream->priv->read_buf = NULL;
+ } else {
+ memmove (read_buf->data, read_buf->data + count,
+ read_buf->len - count);
+ g_byte_array_set_size (read_buf, read_buf->len - count);
+ }
+
+ return count;
+}
+
+static gssize
+soup_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+
+ if (sstream->priv->read_buf) {
+ return read_from_buf (sstream, buffer, count);
+ } else {
+ return g_input_stream_read (sstream->priv->base_stream,
+ buffer, count,
+ cancellable, error);
+ }
+}
+
+static gboolean
+soup_input_stream_is_readable (GPollableInputStream *stream)
+{
+ SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+
+ if (sstream->priv->read_buf)
+ return TRUE;
+ else
+ return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream));
+}
+
+static GSource *
+soup_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+ GSource *base_source, *pollable_source;
+
+ if (sstream->priv->read_buf)
+ base_source = g_idle_source_new ();
+ else
+ base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream), cancellable);
+
+ g_source_set_dummy_callback (base_source);
+ pollable_source = g_pollable_source_new (G_OBJECT (stream));
+ g_source_add_child_source (pollable_source, base_source);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
+
+static void
+soup_input_stream_class_init (SoupInputStreamClass *stream_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+ GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+ g_type_class_add_private (stream_class, sizeof (SoupInputStreamPrivate));
+
+ object_class->constructed = constructed;
+ object_class->finalize = finalize;
+
+ input_stream_class->read_fn = soup_input_stream_read;
+}
+
+static void
+soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ pollable_interface->is_readable = soup_input_stream_is_readable;
+ pollable_interface->create_source = soup_input_stream_create_source;
+}
+
+GInputStream *
+soup_input_stream_new (GInputStream *base_stream)
+{
+ return g_object_new (SOUP_TYPE_INPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ NULL);
+}
+
+gssize
+soup_input_stream_read_line (SoupInputStream *sstream,
+ void *buffer,
+ gsize length,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+ guint8 *p, *buf = buffer;
+
+ g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
+
+ if (sstream->priv->read_buf) {
+ GByteArray *read_buf = sstream->priv->read_buf;
+
+ p = memchr (read_buf->data, '\n', read_buf->len);
+ nread = p ? p + 1 - read_buf->data : read_buf->len;
+ return read_from_buf (sstream, buffer, nread);
+ }
+
+ nread = g_input_stream_read (sstream->priv->base_stream,
+ buffer, length,
+ cancellable, error);
+ if (nread <= 0)
+ return nread;
+
+ p = memchr (buffer, '\n', nread);
+ if (!p || p == buf + nread - 1)
+ return nread;
+
+ p++;
+ sstream->priv->read_buf = g_byte_array_new ();
+ g_byte_array_append (sstream->priv->read_buf,
+ p, nread - (p - buf));
+ return p - buf;
+}
+
+gssize
+soup_input_stream_read_line_nonblocking (SoupInputStream *sstream,
+ void *buffer,
+ gsize length,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+ guint8 *p, *buf = buffer;
+
+ g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
+
+ if (sstream->priv->read_buf) {
+ GByteArray *read_buf = sstream->priv->read_buf;
+
+ p = memchr (read_buf->data, '\n', read_buf->len);
+ nread = p ? p + 1 - read_buf->data : read_buf->len;
+ return read_from_buf (sstream, buffer, nread);
+ }
+
+ nread = g_pollable_input_stream_read_nonblocking (
+ G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream),
+ buffer, length, cancellable, error);
+ if (nread <= 0)
+ return nread;
+
+ p = memchr (buffer, '\n', nread);
+ if (!p || p == buf + nread - 1)
+ return nread;
+
+ p++;
+ sstream->priv->read_buf = g_byte_array_new ();
+ g_byte_array_append (sstream->priv->read_buf,
+ p, nread - (p - buf));
+ return p - buf;
+}
diff --git a/libsoup/soup-input-stream.h b/libsoup/soup-input-stream.h
new file mode 100644
index 0000000..ac35b07
--- /dev/null
+++ b/libsoup/soup-input-stream.h
@@ -0,0 +1,55 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ */
+
+#ifndef SOUP_INPUT_STREAM_H
+#define SOUP_INPUT_STREAM_H 1
+
+#include <libsoup/soup-types.h>
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_INPUT_STREAM (soup_input_stream_get_type ())
+#define SOUP_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_INPUT_STREAM, SoupInputStream))
+#define SOUP_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_INPUT_STREAM, SoupInputStreamClass))
+#define SOUP_IS_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_INPUT_STREAM))
+#define SOUP_IS_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_INPUT_STREAM))
+#define SOUP_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_INPUT_STREAM, SoupInputStreamClass))
+
+typedef struct _SoupInputStreamPrivate SoupInputStreamPrivate;
+
+typedef struct {
+ GFilterInputStream parent;
+
+ SoupInputStreamPrivate *priv;
+} SoupInputStream;
+
+typedef struct {
+ GFilterInputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_libsoup_reserved1) (void);
+ void (*_libsoup_reserved2) (void);
+ void (*_libsoup_reserved3) (void);
+ void (*_libsoup_reserved4) (void);
+} SoupInputStreamClass;
+
+GType soup_input_stream_get_type (void);
+
+GInputStream *soup_input_stream_new (GInputStream *base_stream);
+
+gssize soup_input_stream_read_line (SoupInputStream *sstream,
+ void *buffer,
+ gsize length,
+ GCancellable *cancellable,
+ GError **error);
+gssize soup_input_stream_read_line_nonblocking (SoupInputStream *sstream,
+ void *buffer,
+ gsize length,
+ GCancellable *cancellable,
+ GError **error);
+
+G_END_DECLS
+
+#endif /* SOUP_INPUT_STREAM_H */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index b5d9840..40f8d54 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -13,6 +13,7 @@
#include <string.h>
#include "soup-connection.h"
+#include "soup-input-stream.h"
#include "soup-message.h"
#include "soup-message-private.h"
#include "soup-message-queue.h"
@@ -73,8 +74,8 @@ typedef struct {
goffset write_length;
goffset written;
- guint read_tag, tls_signal_id;
- GSource *write_source;
+ guint tls_signal_id;
+ GSource *read_source, *write_source;
GSource *unpause_source;
gboolean paused;
@@ -113,6 +114,8 @@ soup_message_io_cleanup (SoupMessage *msg)
g_signal_handler_disconnect (io->sock, io->tls_signal_id);
if (io->sock)
g_object_unref (io->sock);
+ if (io->istream)
+ g_object_unref (io->istream);
if (io->async_context)
g_main_context_unref (io->async_context);
if (io->item)
@@ -139,9 +142,9 @@ soup_message_io_stop (SoupMessage *msg)
if (!io)
return;
- if (io->read_tag) {
- g_signal_handler_disconnect (io->sock, io->read_tag);
- io->read_tag = 0;
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ io->read_source = NULL;
}
if (io->write_source) {
g_source_destroy (io->write_source);
@@ -215,6 +218,19 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
}
static gboolean
+io_readable (GPollableInputStream *stream, gpointer msg)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ g_source_destroy (io->read_source);
+ io->read_source = NULL;
+
+ io_read (io->sock, msg);
+ return FALSE;
+}
+
+static gboolean
io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
@@ -285,23 +301,44 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- SoupSocketIOStatus status;
guchar read_buf[RESPONSE_BLOCK_SIZE];
- gsize nread;
+ gssize nread;
gboolean got_lf;
GError *error = NULL;
+ if (!io->istream) {
+ io_error (io->sock, msg, NULL);
+ return FALSE;
+ }
+
while (1) {
- status = soup_socket_read_until (io->sock, read_buf,
- sizeof (read_buf),
- "\n", 1, &nread, &got_lf,
- io->cancellable, &error);
- switch (status) {
- case SOUP_SOCKET_OK:
- g_byte_array_append (io->read_meta_buf, read_buf, nread);
- break;
+ if (io->non_blocking) {
+ nread = soup_input_stream_read_line_nonblocking (
+ SOUP_INPUT_STREAM (io->istream),
+ read_buf, sizeof (read_buf),
+ io->cancellable, &error);
+ } else {
+ nread = soup_input_stream_read_line (
+ SOUP_INPUT_STREAM (io->istream),
+ read_buf, sizeof (read_buf),
+ io->cancellable, &error);
+ }
- case SOUP_SOCKET_EOF:
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_error_free (error);
+ io->read_source = g_pollable_input_stream_create_source (
+ G_POLLABLE_INPUT_STREAM (io->istream),
+ NULL);
+ g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
+ g_source_attach (io->read_source, io->async_context);
+ g_source_unref (io->read_source);
+ return FALSE;
+ }
+
+ if (nread > 0) {
+ g_byte_array_append (io->read_meta_buf, read_buf, nread);
+ got_lf = memchr (read_buf, '\n', nread) != NULL;
+ } else if (nread == 0) {
/* More lame server handling... deal with
* servers that don't send the final chunk.
*/
@@ -310,22 +347,18 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
g_byte_array_append (io->read_meta_buf,
(guchar *)"0\r\n", 3);
got_lf = TRUE;
- break;
} else if (io->read_state == SOUP_MESSAGE_IO_STATE_TRAILERS &&
io->read_meta_buf->len == 0) {
g_byte_array_append (io->read_meta_buf,
(guchar *)"\r\n", 2);
got_lf = TRUE;
- break;
+ } else {
+ io_error (io->sock, msg, NULL);
+ return FALSE;
}
- /* else fall through */
-
- case SOUP_SOCKET_ERROR:
+ } else {
io_error (io->sock, msg, error);
return FALSE;
-
- case SOUP_SOCKET_WOULD_BLOCK:
- return FALSE;
}
if (got_lf) {
@@ -439,14 +472,18 @@ read_body_chunk (SoupMessage *msg)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- SoupSocketIOStatus status;
guchar *stack_buf = NULL;
gsize len;
gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF);
- gsize nread;
+ gssize nread;
GError *error = NULL;
SoupBuffer *buffer;
+ if (!io->istream) {
+ io_error (io->sock, msg, NULL);
+ return FALSE;
+ }
+
if (!io_handle_sniffing (msg, FALSE))
return FALSE;
@@ -470,11 +507,18 @@ read_body_chunk (SoupMessage *msg)
else
len = MIN (buffer->length, io->read_length);
- status = soup_socket_read (io->sock,
- (guchar *)buffer->data, len,
- &nread, io->cancellable, &error);
+ if (io->non_blocking) {
+ nread = g_pollable_input_stream_read_nonblocking (
+ G_POLLABLE_INPUT_STREAM (io->istream),
+ (guchar *)buffer->data, len,
+ io->cancellable, &error);
+ } else {
+ nread = g_input_stream_read (io->istream,
+ (guchar *)buffer->data, len,
+ io->cancellable, &error);
+ }
- if (status == SOUP_SOCKET_OK && nread) {
+ if (nread > 0) {
buffer->length = nread;
io->read_length -= nread;
@@ -501,24 +545,28 @@ read_body_chunk (SoupMessage *msg)
}
soup_buffer_free (buffer);
- switch (status) {
- case SOUP_SOCKET_OK:
- break;
- case SOUP_SOCKET_EOF:
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_error_free (error);
+ io->read_source = g_pollable_input_stream_create_source (
+ G_POLLABLE_INPUT_STREAM (io->istream),
+ NULL);
+ g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
+ g_source_attach (io->read_source, io->async_context);
+ g_source_unref (io->read_source);
+ return FALSE;
+ }
+
+ if (nread == 0) {
if (io->read_eof_ok) {
io->read_length = 0;
return TRUE;
}
- /* else fall through */
-
- case SOUP_SOCKET_ERROR:
- io_error (io->sock, msg, error);
- return FALSE;
-
- case SOUP_SOCKET_WOULD_BLOCK:
- return FALSE;
+ /* else... */
}
+
+ io_error (io->sock, msg, error);
+ return FALSE;
}
return TRUE;
@@ -1062,9 +1110,9 @@ io_read (SoupSocket *sock, SoupMessage *msg)
case SOUP_MESSAGE_IO_STATE_FINISHING:
- if (io->read_tag) {
- g_signal_handler_disconnect (io->sock, io->read_tag);
- io->read_tag = 0;
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ io->read_source = NULL;
}
io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
@@ -1126,7 +1174,7 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
io->sock = g_object_ref (sock);
iostream = soup_socket_get_iostream (sock);
if (iostream) {
- io->istream = g_io_stream_get_input_stream (iostream);
+ io->istream = soup_input_stream_new (g_io_stream_get_input_stream (iostream));
io->ostream = g_io_stream_get_output_stream (iostream);
}
g_object_get (io->sock,
@@ -1137,9 +1185,6 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
io->read_meta_buf = g_byte_array_new ();
io->write_buf = g_string_new (NULL);
- io->read_tag = g_signal_connect (io->sock, "readable",
- G_CALLBACK (io_read), msg);
-
io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
@@ -1213,9 +1258,9 @@ soup_message_io_pause (SoupMessage *msg)
g_source_destroy (io->write_source);
io->write_source = NULL;
}
- if (io->read_tag) {
- g_signal_handler_disconnect (io->sock, io->read_tag);
- io->read_tag = 0;
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ io->read_source = NULL;
}
if (io->unpause_source) {
@@ -1236,14 +1281,9 @@ io_unpause_internal (gpointer msg)
io->unpause_source = NULL;
io->paused = FALSE;
- if (io->write_source || io->read_tag)
+ if (io->write_source || io->read_source)
return FALSE;
- if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) {
- io->read_tag = g_signal_connect (io->sock, "readable",
- G_CALLBACK (io_read), msg);
- }
-
if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
io_write (io->sock, msg);
else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]