SoupInputStream / trying out gio



OK, I've written a GInputStream subclass using libsoup to download HTTP
responses. It's not beautiful (because it has to convert between
libsoup's push API and gio's pull API), but it shows that it can work
for gvfs. The biggest problem with it right now is that there's no way
to make libsoup use caller-provided buffers for I/O, so we end up having
to do extra copies (sometimes twice). I can add more API later to fix
that though. (The current code works with plain libsoup 2.2.104, though
not with anything earlier.)

The other problem with it (in terms of code ugliness) is that I had to
use the async libsoup API, because there's no way to implement
GCancellable using the sync libsoup API. (You can cancel an entire HTTP
operation, but you can't say "stop reading now but then I may ask you to
start up again later", which is what gio's cancellation semantics seem
to require.) This ends up working fine though, because SoupSession lets
you set a non-default GMainContext for it to use for I/O, so by doing
that, we can avoid having to recursively enter the main loop when doing
synchronous calls, even though underneath we're using async I/O.


GInputStream was mostly unproblematic to work with. The problems I did
have came when I wanted to implement a new pair of operations on my
subclass (soup_input_stream_send/send_async, which you can use if you
want to examine the status code/response headers before you start
reading the body). GInputStream provides g_input_stream_get_pending()
and g_input_stream_set_pending(), but I ended up needing to duplicate the

      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
		   _("Stream has outstanding operation"));

part myself, which would in turn require duplicating the translations of
that message. It would be cleaner if I could just do:

      if (!g_input_stream_set_pending (stream, TRUE, error))
        return FALSE;

(or remove the TRUE and have set_pending/clear_pending, with the latter
not taking a GError **).

A related issue was that with soup_input_stream_send_async, I ended up
needing to have a wrapper callback to clear the pending flag before
calling the real callback, just like GInputStream does for its async
ops. Maybe GSimpleAsyncResult could support that idiom directly, by
providing an "implementation_callback" or whatever in addition to the
caller-provided callback.

Sources attached, plus a demo.

-- Dan
/* soup-input-stream.c, based on gsocketinputstream.c
 *
 * Copyright (C) 2006-2007 Red Hat, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General
 * Public License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 */

#include <config.h>

#include <string.h>

#include <glib.h>
#include <gio/gioerror.h>
#include <gio/gcancellable.h>
#include <gio/gsimpleasyncresult.h>

#include <libsoup/soup.h>

#include "soup-input-stream.h"

G_DEFINE_TYPE (SoupInputStream, soup_input_stream, G_TYPE_INPUT_STREAM);

typedef void (*SoupInputStreamCallback) (GInputStream *);

typedef struct {
  SoupSession *session;
  GMainContext *async_context;
  SoupMessage *msg;
  gboolean got_headers;

  GCancellable *cancellable;
  GSource *cancel_watch;
  SoupInputStreamCallback got_headers_cb;
  SoupInputStreamCallback got_chunk_cb;
  SoupInputStreamCallback finished_cb;
  SoupInputStreamCallback cancelled_cb;

  guchar *leftover_buffer;
  gsize leftover_bufsize, leftover_offset;

  guchar *caller_buffer;
  gsize caller_bufsize, caller_nread;
  GAsyncReadyCallback outstanding_callback;
  GSimpleAsyncResult *result;

} SoupInputStreamPrivate;
#define SOUP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_INPUT_STREAM, SoupInputStreamPrivate))


static gssize   soup_input_stream_read         (GInputStream         *stream,
						void                 *buffer,
						gsize                 count,
						GCancellable         *cancellable,
						GError              **error);
static gboolean soup_input_stream_close        (GInputStream         *stream,
						GCancellable         *cancellable,
						GError              **error);
static void     soup_input_stream_read_async   (GInputStream         *stream,
						void                 *buffer,
						gsize                 count,
						int                   io_priority,
						GCancellable         *cancellable,
						GAsyncReadyCallback   callback,
						gpointer              data);
static gssize   soup_input_stream_read_finish  (GInputStream         *stream,
						GAsyncResult         *result,
						GError              **error);
static void     soup_input_stream_close_async  (GInputStream         *stream,
						int                   io_priority,
						GCancellable         *cancellable,
						GAsyncReadyCallback   callback,
						gpointer              data);
static gboolean soup_input_stream_close_finish (GInputStream         *stream,
						GAsyncResult         *result,
						GError              **error);

static void soup_input_stream_got_headers (SoupMessage *msg, gpointer stream);
static void soup_input_stream_got_chunk (SoupMessage *msg, gpointer stream);
static void soup_input_stream_finished (SoupMessage *msg, gpointer stream);

static void
soup_input_stream_finalize (GObject *object)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  g_object_unref (priv->session);

  g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (soup_input_stream_got_headers), stream);
  g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (soup_input_stream_got_chunk), stream);
  g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (soup_input_stream_finished), stream);
  g_object_unref (priv->msg);

  if (priv->leftover_buffer)
    g_free (priv->leftover_buffer);

  if (G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize)
    (*G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize) (object);
}

static void
soup_input_stream_class_init (SoupInputStreamClass *klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
  
  g_type_class_add_private (klass, sizeof (SoupInputStreamPrivate));
  
  gobject_class->finalize = soup_input_stream_finalize;

  stream_class->read = soup_input_stream_read;
  stream_class->close = soup_input_stream_close;
  stream_class->read_async = soup_input_stream_read_async;
  stream_class->read_finish = soup_input_stream_read_finish;
  stream_class->close_async = soup_input_stream_close_async;
  stream_class->close_finish = soup_input_stream_close_finish;
}

static void
soup_input_stream_init (SoupInputStream *stream)
{
  ;
}

/**
 * soup_input_stream_new:
 * @session: the #SoupSession to use
 * @msg: the #SoupMessage whose response will be streamed
 * 
 * Prepares to send @msg over @session, and returns a #GInputStream
 * that can be used to read the response.
 *
 * @msg may not be sent until the first read call; if you need to look
 * at the status code or response headers before reading the body, you
 * can use soup_input_stream_send() or soup_input_stream_send_async()
 * to force the message to be sent and the response headers read.
 *
 * If @msg gets a non-2xx result, the first read (or send) will return
 * an error with type %SOUP_INPUT_STREAM_HTTP_ERROR.
 *
 * Internally, #SoupInputStream is implemented using asynchronous I/O,
 * so if you are using the synchronous API (eg,
 * g_input_stream_read()), you should create a new #GMainContext and
 * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If
 * you don't, then synchronous #GInputStream calls will cause the main
 * loop to be run recursively.) The async #GInputStream API works fine
 * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset.
 *
 * Returns: a new #GInputStream.
 **/
GInputStream *
soup_input_stream_new (SoupSession *session, SoupMessage *msg)
{
  SoupInputStream *stream;
  SoupInputStreamPrivate *priv;

  g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
  g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL);

  stream = g_object_new (SOUP_TYPE_INPUT_STREAM, NULL);
  priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  priv->session = g_object_ref (session);
  priv->async_context = soup_session_get_async_context (session);
  priv->msg = g_object_ref (msg);

  g_signal_connect (msg, "got_headers",
		    G_CALLBACK (soup_input_stream_got_headers), stream);
  g_signal_connect (msg, "got_chunk",
		    G_CALLBACK (soup_input_stream_got_chunk), stream);
  g_signal_connect (msg, "finished",
		    G_CALLBACK (soup_input_stream_finished), stream);

  /* Add an extra ref since soup_session_queue_message steals one */
  g_object_ref (msg);
  soup_session_queue_message (session, msg, NULL, NULL);

  return G_INPUT_STREAM (stream);
}

static void
soup_input_stream_got_headers (SoupMessage *msg, gpointer stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  /* If the status is unsuccessful, we just ignore the signal and let
   * libsoup keep going (eventually either it will requeue the request
   * (after handling authentication/redirection), or else the
   * "finished" handler will run).
   */
  if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
    return;

  priv->got_headers = TRUE;
  if (!priv->caller_buffer)
    {
      /* Not ready to read the body yet */
      soup_message_io_pause (msg);
    }

  if (priv->got_headers_cb)
    priv->got_headers_cb (stream);
}

static void
soup_input_stream_got_chunk (SoupMessage *msg, gpointer stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
  gchar *chunk = msg->response.body;
  gsize chunk_size = msg->response.length;

  /* We only pay attention to the chunk if it's part of a successful
   * response.
   */
  if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
    return;

  /* Sanity check */
  if (priv->caller_bufsize == 0 || priv->leftover_bufsize != 0)
    g_warning ("soup_input_stream_got_chunk called again before previous chunk was processed");

  /* Copy what we can into priv->caller_buffer */
  if (priv->caller_bufsize - priv->caller_nread > 0)
    {
      gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread);

      memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread);
      priv->caller_nread += nread;
      chunk += nread;
      chunk_size -= nread;
    }

  if (chunk_size > 0)
    {
      /* Copy the rest into priv->leftover_buffer. If there's already
       * some data there, realloc and append. Otherwise just copy.
       */
      if (priv->leftover_bufsize)
	{
	  priv->leftover_buffer = g_realloc (priv->leftover_buffer,
					     priv->leftover_bufsize + chunk_size);
	  memcpy (priv->leftover_buffer + priv->leftover_bufsize,
		  chunk, chunk_size);
	  priv->leftover_bufsize += chunk_size;
	}
      else
	{
	  priv->leftover_bufsize = chunk_size;
	  priv->leftover_buffer = g_memdup (chunk, chunk_size);
	  priv->leftover_offset = 0;
	}
    }

  soup_message_io_pause (msg);
  if (priv->got_chunk_cb)
    priv->got_chunk_cb (stream);
}

static void
soup_input_stream_finished (SoupMessage *msg, gpointer stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  if (priv->finished_cb)
    priv->finished_cb (stream);
}

static gboolean
soup_input_stream_cancelled (GIOChannel *chan, GIOCondition condition,
			     gpointer stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  priv->cancel_watch = NULL;

  soup_message_io_pause (priv->msg);
  if (priv->cancelled_cb)
    priv->cancelled_cb (stream);

  return FALSE;
}  

static void
soup_input_stream_prepare_for_io (GInputStream *stream,
				  GCancellable *cancellable,
				  guchar       *buffer,
				  gsize         count)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
  int cancel_fd;

  priv->cancellable = cancellable;
  cancel_fd = g_cancellable_get_fd (cancellable);
  if (cancel_fd != -1)
    {
      GIOChannel *chan = g_io_channel_unix_new (cancel_fd);
      priv->cancel_watch = soup_add_io_watch (priv->async_context, chan,
					      G_IO_IN | G_IO_ERR | G_IO_HUP,
					      soup_input_stream_cancelled,
					      stream);
      g_io_channel_unref (chan);
    }

  priv->caller_buffer = buffer;
  priv->caller_bufsize = count;
  priv->caller_nread = 0;

  if (priv->msg->status == SOUP_MESSAGE_STATUS_RUNNING)
    soup_message_io_unpause (priv->msg);
}

static void
soup_input_stream_done_io (GInputStream *stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  if (priv->cancel_watch)
    {
      g_source_destroy (priv->cancel_watch);
      priv->cancel_watch = NULL;
    }
  priv->cancellable = NULL;

  priv->caller_buffer = NULL;
  priv->caller_bufsize = 0;
}

static gboolean
set_error_if_http_failed (SoupMessage *msg, GError **error)
{
  if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
    {
      g_set_error (error, SOUP_HTTP_ERROR,
		   msg->status_code, "%s", msg->reason_phrase);
      return TRUE;
    }
  return FALSE;
}

static gsize
read_from_leftover (SoupInputStreamPrivate *priv,
		    gpointer buffer, gsize bufsize)
{
  gsize nread;

  if (priv->leftover_bufsize - priv->leftover_offset <= bufsize)
    {
      nread = priv->leftover_bufsize - priv->leftover_offset;
      memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);

      g_free (priv->leftover_buffer);
      priv->leftover_buffer = NULL;
      priv->leftover_bufsize = priv->leftover_offset = 0;
    }
  else
    {
      nread = bufsize;
      memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
      priv->leftover_offset += nread;
    }
  return nread;
}

/* This does the work of soup_input_stream_send(), assuming that the
 * GInputStream pending flag has already been set. It is also used by
 * soup_input_stream_send_async() in some circumstances.
 */
static gboolean
soup_input_stream_send_internal (GInputStream  *stream,
				 GCancellable  *cancellable,
				 GError       **error)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
  while (priv->msg->status != SOUP_MESSAGE_STATUS_FINISHED &&
	 !priv->got_headers &&
	 !g_cancellable_is_cancelled (cancellable))
    g_main_context_iteration (priv->async_context, TRUE);
  soup_input_stream_done_io (stream);

  if (g_cancellable_set_error_if_cancelled (cancellable, error))
    return FALSE;
  else if (set_error_if_http_failed (priv->msg, error))
    return FALSE;
  return TRUE;
}

/**
 * soup_input_stream_send:
 * @stream: a #SoupInputStream
 * @cancellable: optional #GCancellable object, %NULL to ignore.
 * @error: location to store the error occuring, or %NULL to ignore
 *
 * Synchronously sends the HTTP request associated with @stream, and
 * reads the response headers. Call this after soup_input_stream_new()
 * and before the first g_input_stream_read() if you want to check the
 * HTTP status code before you start reading.
 *
 * Return value: %TRUE if msg has a successful (2xx) status, %FALSE if
 * not.
 **/
gboolean
soup_input_stream_send (GInputStream *stream,
			GCancellable *cancellable,
			GError      **error)
{
  SoupInputStreamPrivate *priv;
  gboolean result;

  g_return_val_if_fail (SOUP_IS_INPUT_STREAM (stream), FALSE);
  priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  if (g_input_stream_has_pending (stream))
    {
      /* FIXME: should get this error message from gio */
      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
		   "Stream has outstanding operation");
      return FALSE;
    }

  g_input_stream_set_pending (stream, TRUE);
  result = soup_input_stream_send_internal (stream, cancellable, error);
  g_input_stream_set_pending (stream, FALSE);

  return result;
}

static gssize
soup_input_stream_read (GInputStream *stream,
			void         *buffer,
			gsize         count,
			GCancellable *cancellable,
			GError      **error)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  if (priv->msg->status == SOUP_MESSAGE_STATUS_FINISHED)
    return 0;

  /* If there is data leftover from a previous read, return it. */
  if (priv->leftover_bufsize)
    return read_from_leftover (priv, buffer, count);

  /* No leftover data, accept one chunk from the network */
  soup_input_stream_prepare_for_io (stream, cancellable, buffer, count);
  while (priv->msg->status != SOUP_MESSAGE_STATUS_FINISHED &&
	 priv->caller_nread == 0 &&
	 !g_cancellable_is_cancelled (cancellable))
    g_main_context_iteration (priv->async_context, TRUE);
  soup_input_stream_done_io (stream);

  if (priv->caller_nread > 0)
    return priv->caller_nread;

  if (g_cancellable_set_error_if_cancelled (cancellable, error))
    return -1;
  else if (set_error_if_http_failed (priv->msg, error))
    return -1;
  else
    return 0;
}

static gboolean
soup_input_stream_close (GInputStream *stream,
			 GCancellable *cancellable,
			 GError      **error)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  if (priv->msg->status != SOUP_MESSAGE_STATUS_FINISHED)
    {
      soup_message_set_status (priv->msg, SOUP_STATUS_CANCELLED);
      soup_session_cancel_message (priv->session, priv->msg);
    }

  return TRUE;
}

static void
wrapper_callback (GObject *source_object, GAsyncResult *res,
		  gpointer user_data)
{
  GInputStream *stream = G_INPUT_STREAM (source_object);
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  g_input_stream_set_pending (stream, FALSE);
  if (priv->outstanding_callback)
    (*priv->outstanding_callback) (source_object, res, user_data);
  priv->outstanding_callback = NULL;
  g_object_unref (stream);
}

static void
send_async_thread (GSimpleAsyncResult *res,
		   GObject *object,
		   GCancellable *cancellable)
{
  GError *error = NULL;
  gboolean success;

  success = soup_input_stream_send_internal (G_INPUT_STREAM (object),
					     cancellable, &error);
  g_simple_async_result_set_op_res_gboolean (res, success);
  if (error)
    {
      g_simple_async_result_set_from_error (res, error);
      g_error_free (error);
    }
}

static void
soup_input_stream_send_async_in_thread (GInputStream        *stream,
					int                  io_priority,
					GCancellable        *cancellable,
					GAsyncReadyCallback  callback,
					gpointer             user_data)
{
  GSimpleAsyncResult *res;

  res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
				   soup_input_stream_send_async_in_thread);
  g_simple_async_result_run_in_thread (res, send_async_thread,
				       io_priority, cancellable);
  g_object_unref (res);
}

static void
send_async_finished (GInputStream *stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
  GSimpleAsyncResult *result;

  priv->got_headers_cb = NULL;
  priv->finished_cb = NULL;
  soup_input_stream_done_io (stream);

  result = priv->result;
  priv->result = NULL;
  g_simple_async_result_set_op_res_gboolean (result, SOUP_STATUS_IS_SUCCESSFUL (priv->msg->status_code));
  g_simple_async_result_complete (result);
}

static void
soup_input_stream_send_async_internal (GInputStream        *stream,
				       int                  io_priority,
				       GCancellable        *cancellable,
				       GAsyncReadyCallback  callback,
				       gpointer             user_data)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  g_object_ref (stream);
  priv->outstanding_callback = callback;

  /* If the session uses the default GMainContext, then we can do
   * async I/O directly. But if it has its own main context, it's
   * easier to just run it in another thread.
   */
  if (soup_session_get_async_context (priv->session))
    {
      soup_input_stream_send_async_in_thread (stream, io_priority, cancellable,
					      wrapper_callback, user_data);
      return;
    }

  priv->got_headers_cb = send_async_finished;
  priv->finished_cb = send_async_finished;

  soup_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
  priv->result = g_simple_async_result_new (G_OBJECT (stream),
					    wrapper_callback, user_data,
					    soup_input_stream_send_async);
}

/**
 * soup_input_stream_send_async:
 * @stream: a #SoupInputStream
 * @io_priority: the io priority of the request.
 * @cancellable: optional #GCancellable object, %NULL to ignore.
 * @callback: callback to call when the request is satisfied
 * @user_data: the data to pass to callback function
 *
 * Asynchronously sends the HTTP request associated with @stream, and
 * reads the response headers. Call this after soup_input_stream_new()
 * and before the first g_input_stream_read_async() if you want to
 * check the HTTP status code before you start reading.
 **/
void
soup_input_stream_send_async (GInputStream        *stream,
			      int                  io_priority,
			      GCancellable        *cancellable,
			      GAsyncReadyCallback  callback,
			      gpointer             user_data)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  g_return_if_fail (SOUP_IS_INPUT_STREAM (stream));
  priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);

  if (g_input_stream_has_pending (stream))
    {
      /* FIXME: should get this error message from gio */
      g_simple_async_report_error_in_idle (G_OBJECT (stream),
					   callback,
					   user_data,
					   G_IO_ERROR, G_IO_ERROR_PENDING,
					   "Stream has outstanding operation");
      return;
    }
  g_input_stream_set_pending (stream, TRUE);
  soup_input_stream_send_async_internal (stream, io_priority, cancellable,
					 callback, user_data);
}

/**
 * soup_input_stream_send_finish:
 * @stream: a #SoupInputStream
 * @result: a #GAsyncResult.
 * @error: a #GError location to store the error occuring, or %NULL to 
 * ignore.
 *
 * Finishes a soup_input_stream_send_async() operation.
 *
 * Return value: %TRUE if the message was sent successfully and
 * received a successful status code, %FALSE if not.
 **/
gboolean
soup_input_stream_send_finish (GInputStream  *stream,
			       GAsyncResult  *result,
			       GError       **error)
{
  GSimpleAsyncResult *simple;

  g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
  simple = G_SIMPLE_ASYNC_RESULT (result);
  g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_send_async, FALSE);

  return g_simple_async_result_get_op_res_gboolean (simple);
}

static void
read_async_done (GInputStream *stream)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
  GSimpleAsyncResult *result;
  GError *error = NULL;

  result = priv->result;
  priv->result = NULL;

  if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) ||
      set_error_if_http_failed (priv->msg, &error))
    {
      g_simple_async_result_set_from_error (result, error);
      g_error_free (error);
    }
  else
    g_simple_async_result_set_op_res_gssize (result, priv->caller_nread);

  priv->got_chunk_cb = NULL;
  priv->finished_cb = NULL;
  priv->cancelled_cb = NULL;
  soup_input_stream_done_io (stream);

  g_simple_async_result_complete (result);
}

static void
soup_input_stream_read_async (GInputStream        *stream,
			      void                *buffer,
			      gsize                count,
			      int                  io_priority,
			      GCancellable        *cancellable,
			      GAsyncReadyCallback  callback,
			      gpointer             user_data)
{
  SoupInputStreamPrivate *priv = SOUP_INPUT_STREAM_GET_PRIVATE (stream);
  GSimpleAsyncResult *result;

  /* If the session uses the default GMainContext, then we can do
   * async I/O directly. But if it has its own main context, we fall
   * back to the async-via-sync-in-another-thread implementation.
   */
  if (soup_session_get_async_context (priv->session))
    {
      G_INPUT_STREAM_CLASS (soup_input_stream_parent_class)->
	read_async (stream, buffer, count, io_priority,
		    cancellable, callback, user_data);
      return;
    }

  result = g_simple_async_result_new (G_OBJECT (stream),
				      callback, user_data,
				      soup_input_stream_read_async);

  if (priv->msg->status == SOUP_MESSAGE_STATUS_FINISHED)
    {
      g_simple_async_result_set_op_res_gssize (result, 0);
      g_simple_async_result_complete_in_idle (result);
      return;
    }

  if (priv->leftover_bufsize)
    {
      gsize nread = read_from_leftover (priv, buffer, count);
      g_simple_async_result_set_op_res_gssize (result, nread);
      g_simple_async_result_complete_in_idle (result);
      return;
    }

  priv->result = result;

  priv->got_chunk_cb = read_async_done;
  priv->finished_cb = read_async_done;
  priv->cancelled_cb = read_async_done;
  soup_input_stream_prepare_for_io (stream, cancellable, buffer, count);
}

static gssize
soup_input_stream_read_finish (GInputStream  *stream,
			       GAsyncResult  *result,
			       GError       **error)
{
  GSimpleAsyncResult *simple;

  g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1);
  simple = G_SIMPLE_ASYNC_RESULT (result);
  g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_input_stream_read_async, -1);

  return g_simple_async_result_get_op_res_gssize (simple);
}

static void
soup_input_stream_close_async (GInputStream       *stream,
			       int                 io_priority,
			       GCancellable       *cancellable,
			       GAsyncReadyCallback callback,
			       gpointer            user_data)
{
  GSimpleAsyncResult *result;
  gboolean success;
  GError *error = NULL;

  result = g_simple_async_result_new (G_OBJECT (stream),
				      callback, user_data,
				      soup_input_stream_close_async);
  success = soup_input_stream_close (stream, cancellable, &error);
  g_simple_async_result_set_op_res_gboolean (result, success);
  if (error)
    {
      g_simple_async_result_set_from_error (result, error);
      g_error_free (error);
    }

  g_simple_async_result_complete_in_idle (result);
}

static gboolean
soup_input_stream_close_finish (GInputStream  *stream,
				GAsyncResult  *result,
				GError       **error)
{
  /* Failures handled in generic close_finish code */
  return TRUE;
}

GQuark
soup_http_error_quark (void)
{
	static GQuark error;
	if (!error)
		error = g_quark_from_static_string ("soup_http_error_quark");
	return error;
}
/* Copyright (C) 2006-2007 Red Hat, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General
 * Public License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 */

#ifndef __SOUP_INPUT_STREAM_H__
#define __SOUP_INPUT_STREAM_H__

#include <gio/ginputstream.h>
#include <libsoup/soup-types.h>

G_BEGIN_DECLS

#define SOUP_TYPE_INPUT_STREAM         (soup_input_stream_get_type ())
#define SOUP_INPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), SOUP_TYPE_INPUT_STREAM, SoupInputStream))
#define SOUP_INPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), SOUP_TYPE_INPUT_STREAM, SoupInputStreamClass))
#define SOUP_IS_INPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), SOUP_TYPE_INPUT_STREAM))
#define SOUP_IS_INPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), SOUP_TYPE_INPUT_STREAM))
#define SOUP_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SOUP_TYPE_INPUT_STREAM, SoupInputStreamClass))

typedef struct SoupInputStream         SoupInputStream;
typedef struct SoupInputStreamClass    SoupInputStreamClass;

struct SoupInputStream
{
  GInputStream parent;

};

struct SoupInputStreamClass
{
  GInputStreamClass parent_class;

  /* Padding for future expansion */
  void (*_g_reserved1) (void);
  void (*_g_reserved2) (void);
  void (*_g_reserved3) (void);
  void (*_g_reserved4) (void);
  void (*_g_reserved5) (void);
};

GType soup_input_stream_get_type (void) G_GNUC_CONST;

GInputStream *soup_input_stream_new         (SoupSession         *session,
					     SoupMessage         *msg);

gboolean      soup_input_stream_send        (GInputStream        *stream,
					     GCancellable        *cancellable,
					     GError             **error);

void          soup_input_stream_send_async  (GInputStream        *stream,
					     int                  io_priority,
					     GCancellable        *cancellable,
					     GAsyncReadyCallback  callback,
					     gpointer             user_data);
gboolean      soup_input_stream_send_finish (GInputStream        *stream,
					     GAsyncResult        *result,
					     GError             **error);

#define SOUP_HTTP_ERROR soup_http_error_quark()
GQuark soup_http_error_quark (void);

G_END_DECLS

#endif /* __SOUP_INPUT_STREAM_H__ */
/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/*
 * Copyright (C) 2007 Red Hat, Inc.
 */

#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>

#include <libsoup/soup.h>
#include "soup-input-stream.h"

GCancellable *cancellable;

typedef struct {
	GInputStream *stream;
	GCancellable *cancellable;
	FILE *out;
	GError **err;

	GMainLoop *loop;

	char *buffer;
	gsize bufsize;
} AsyncGetData;

static gboolean do_async_get (gpointer user_data);

static void
get_async (GInputStream *stream, GCancellable *cancellable,
	   FILE *out, GError **err)
{
	AsyncGetData async_get_data;
	GMainLoop *loop;
	char buffer[1024];

	loop = g_main_loop_new (NULL, FALSE);

	async_get_data.stream = stream;
	async_get_data.cancellable = cancellable;
	async_get_data.out = out;
	async_get_data.err = err;
	async_get_data.loop = loop;
	async_get_data.buffer = buffer;
	async_get_data.bufsize = sizeof (buffer);

	g_idle_add (do_async_get, &async_get_data);
	g_main_loop_run (loop);
	g_main_loop_unref (loop);
	g_main_context_unref (g_main_context_default ());
}
	
static void
get_async_callback (GObject *source_object, GAsyncResult *res,
		    gpointer user_data)
{
	AsyncGetData *agd = user_data;
	gssize nread;

	nread = g_input_stream_read_finish (agd->stream, res, agd->err);
	if (nread <= 0) {
		g_main_loop_quit (agd->loop);
		return;
	}

	fwrite (agd->buffer, 1, nread, agd->out);

	g_timeout_add (g_random_int_range (0, 100), do_async_get, agd);
}

static gboolean
do_async_get (gpointer user_data)
{
	AsyncGetData *agd = user_data;

	g_input_stream_read_async (agd->stream, agd->buffer, agd->bufsize,
				   G_PRIORITY_DEFAULT, agd->cancellable,
				   get_async_callback, agd);
	return FALSE;
}

static void
get_sync (GInputStream *stream, GCancellable *cancellable,
	  FILE *out, GError **err)
{
	char buffer[1024];
	gssize nread;

	while ((nread = g_input_stream_read (stream, buffer, sizeof (buffer),
					     cancellable, err)) > 0)
		fwrite (buffer, 1, nread, out);
}

static void
usage (void)
{
	fprintf (stderr, "Usage: stream-get [-c CAfile] [-p proxy URL] [-d] URL\n");
	exit (1);
}

static void
interrupt (int sig)
{
	/* Exit cleanly, to demonstrate that we can. */
	/* Yes I know it's probably not legal to do this from a signal
	 * handler.
	 */
	if (cancellable)
		g_cancellable_cancel (cancellable);
}

int
main (int argc, char **argv)
{
	SoupSession *session;
	GInputStream *stream;
	GError *err = NULL;
	const char *cafile = NULL, *uri;
	SoupUri *proxy = NULL;
	GMainContext *async_context;
	SoupMessage *msg;
	char *outfile = NULL;
	FILE *out;
	int opt;
	gboolean async = FALSE;

	g_type_init ();
	g_thread_init (NULL);
	signal (SIGINT, interrupt);

	while ((opt = getopt (argc, argv, "ac:o:p:")) != -1) {
		switch (opt) {
		case 'a':
			async = TRUE;
			break;
		case 'c':
			cafile = optarg;
			break;
		case 'o':
			outfile = optarg;
			break;
		case 'p':
			proxy = soup_uri_new (optarg);
			if (!proxy) {
				fprintf (stderr, "Could not parse %s as URI\n",
					 optarg);
				exit (1);
			}
			break;

		case '?':
			usage ();
			break;
		}
	}
	argc -= optind;
	argv += optind;

	if (argc != 1)
		usage ();
	uri = argv[0];

	if (outfile)
		out = fopen (outfile, "w");
	else
		out = stdout;

	async_context = async ? NULL : g_main_context_new ();
	session = soup_session_async_new_with_options (
		SOUP_SESSION_SSL_CA_FILE, cafile,
		SOUP_SESSION_PROXY_URI, proxy,
		SOUP_SESSION_ASYNC_CONTEXT, async_context,
		NULL);
	if (async_context)
		g_main_context_unref (async_context);

	msg = soup_message_new ("GET", uri);
	stream = soup_input_stream_new (session, msg);
	g_object_unref (msg);

	cancellable = g_cancellable_new ();

	if (async)
		get_async (stream, cancellable, out, &err);
	else
		get_sync (stream, cancellable, out, &err);

	if (err) {
		fprintf (stderr, "Read failed: %s\n", err->message);
		g_error_free (err);
		err = NULL;
	}

	g_input_stream_close (stream, cancellable, &err);
	if (err) {
		fprintf (stderr, "Close failed: %s\n", err->message);
		g_error_free (err);
		err = NULL;
	}

	g_object_unref (stream);
	g_object_unref (cancellable);

	soup_session_abort (session);
	g_object_unref (session);
	return err != NULL;
}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]