[wing/wip/poll-stream] inputstream: rework to implement read_async
- From: Ignacio Casal Quinteiro <icq src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [wing/wip/poll-stream] inputstream: rework to implement read_async
- Date: Fri, 30 Nov 2018 09:48:16 +0000 (UTC)
commit e934ec3951882e2621598e889252d7051471eb0f
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date: Fri Nov 30 10:30:04 2018 +0100
inputstream: rework to implement read_async
wing/winginputstream.c | 240 ++++++++++++++++++++++++++-----------------------
1 file changed, 127 insertions(+), 113 deletions(-)
---
diff --git a/wing/winginputstream.c b/wing/winginputstream.c
index 851b71b..5750432 100644
--- a/wing/winginputstream.c
+++ b/wing/winginputstream.c
@@ -52,12 +52,7 @@ enum {
static GParamSpec *props[LAST_PROP];
-static void wing_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
-
-G_DEFINE_TYPE_WITH_CODE (WingInputStream, wing_input_stream, G_TYPE_INPUT_STREAM,
- G_ADD_PRIVATE (WingInputStream)
- G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
wing_input_stream_pollable_iface_init)
- )
+G_DEFINE_TYPE_WITH_PRIVATE (WingInputStream, wing_input_stream, G_TYPE_INPUT_STREAM)
static void
wing_input_stream_finalize (GObject *object)
@@ -126,17 +121,17 @@ wing_input_stream_get_property (GObject *object,
}
static gssize
-read_internal (GInputStream *stream,
- void *buffer,
- gsize count,
- gboolean blocking,
- GCancellable *cancellable,
- GError **error)
+wing_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
{
WingInputStream *wing_stream;
WingInputStreamPrivate *priv;
BOOL res;
DWORD nbytes, nread;
+ OVERLAPPED overlap = { 0, };
gssize retval = -1;
wing_stream = WING_INPUT_STREAM (stream);
@@ -145,64 +140,31 @@ read_internal (GInputStream *stream,
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
- if (!blocking && g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (stream)))
- {
- gboolean result;
-
- result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nread, FALSE);
- if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
- {
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
- g_strerror (EAGAIN));
- return -1;
- }
-
- ResetEvent (priv->overlap.hEvent);
-
- retval = nread;
- goto end;
- }
-
if (count > G_MAXINT)
nbytes = G_MAXINT;
else
nbytes = count;
- ResetEvent (priv->overlap.hEvent);
+ overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
+ g_return_val_if_fail (overlap.hEvent != NULL, -1);
- res = ReadFile (priv->handle, buffer, nbytes, &nread, &priv->overlap);
+ res = ReadFile (priv->handle, buffer, nbytes, &nread, &overlap);
if (res)
- {
- retval = nread;
- ResetEvent (priv->overlap.hEvent);
- }
+ retval = nread;
else
{
int errsv = GetLastError ();
- if (errsv == ERROR_IO_PENDING)
+ if (errsv == ERROR_IO_PENDING &&
+ wing_overlap_wait_result (priv->handle,
+ &overlap, &nread, cancellable))
{
- if (!blocking)
- {
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
- g_strerror (EAGAIN));
- goto end;
- }
- else if (blocking && wing_overlap_wait_result (priv->handle,
- &priv->overlap,
- &nread, cancellable))
- {
- retval = nread;
- ResetEvent (priv->overlap.hEvent);
- goto end;
- }
+ retval = nread;
+ goto end;
}
if (g_cancellable_set_error_if_cancelled (cancellable, error))
- {
- ResetEvent (priv->overlap.hEvent);
- goto end;
- }
+ goto end;
errsv = GetLastError ();
if (errsv == ERROR_MORE_DATA)
@@ -212,7 +174,6 @@ read_internal (GInputStream *stream,
* parameter specifies, ReadFile returns FALSE and
* GetLastError returns ERROR_MORE_DATA */
retval = nread;
- ResetEvent (priv->overlap.hEvent);
goto end;
}
else if (errsv == ERROR_HANDLE_EOF ||
@@ -239,19 +200,10 @@ read_internal (GInputStream *stream,
}
end:
+ CloseHandle (overlap.hEvent);
return retval;
}
-static gssize
-wing_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
-{
- return read_internal (stream, buffer, count, TRUE, cancellable, error);
-}
-
static gboolean
wing_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
@@ -284,6 +236,114 @@ wing_input_stream_close (GInputStream *stream,
return TRUE;
}
+static gboolean
+read_async_ready (WingInputStream *stream,
+ gpointer user_data)
+{
+ WingInputStreamPrivate *priv;
+ GTask *task = user_data;
+ DWORD nread;
+ gboolean result;
+
+ priv = wing_input_stream_get_instance_private (wing_stream);
+
+ result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nread, FALSE);
+ if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
+ {
+ /* Try again to wait for the event to get ready */
+ ResetEvent (priv->overlap.hEvent);
+ return G_SOURCE_CONTINUE;
+ }
+
+ ResetEvent (priv->overlap.hEvent);
+
+ g_task_return_int (task, nread);
+
+ return G_SOURCE_REMOVE;
+}
+
+static void
+wing_input_stream_read_async (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ WingInputStream *wing_stream;
+ WingInputStreamPrivate *priv;
+ DWORD nbytes, nread;
+ int errsv;
+ GTask *task;
+ gchar *emsg;
+
+ wing_stream = WING_INPUT_STREAM (stream);
+ priv = wing_input_stream_get_instance_private (wing_stream);
+
+ task = g_task_new (stream, cancellable, callback, user_data);
+
+ if (count > G_MAXINT)
+ nbytes = G_MAXINT;
+ else
+ nbytes = count;
+
+ ResetEvent (priv->overlap.hEvent);
+
+ res = ReadFile (priv->handle, buffer, nbytes, &nread, &priv->overlap);
+ if (res)
+ {
+ ResetEvent (priv->overlap.hEvent);
+ g_task_return_int (task, nread);
+ g_object_unref (task);
+ return;
+ }
+
+ errsv = GetLastError ();
+
+ if (errsv == ERROR_IO_PENDING)
+ {
+ GSource *handle_source;
+
+ handle_source = wing_create_source (priv->overlap.hEvent, G_IO_IN, cancellable);
+ g_task_attach_source (task, handle_source,
+ (GSourceFunc)read_async_ready);
+ g_source_unref (handle_source);
+ return;
+ }
+
+ if (errsv == ERROR_MORE_DATA)
+ {
+ /* If a named pipe is being read in message mode and the
+ * next message is longer than the nNumberOfBytesToRead
+ * parameter specifies, ReadFile returns FALSE and
+ * GetLastError returns ERROR_MORE_DATA */
+ ResetEvent (priv->overlap.hEvent);
+ g_task_return_int (task, nread);
+ return
+ }
+
+ if (errsv == ERROR_HANDLE_EOF ||
+ errsv == ERROR_BROKEN_PIPE)
+ {
+ /* TODO: the other end of a pipe may call the WriteFile
+ * function with nNumberOfBytesToWrite set to zero. In this
+ * case, it's not possible for the caller to know if it's
+ * broken pipe or a read of 0. Perhaps we should add a
+ * is_broken flag for this win32 case.. */
+ g_task_return_int (task, 0);
+ return;
+ }
+
+ emsg = g_win32_error_message (errsv);
+ g_task_report_new_error (stream, callback, user_data,
+ wing_input_stream_read_async,
+ G_IO_ERROR, g_io_error_from_win32_error (errsv),
+ "Error reading from handle: %s",
+ emsg);
+ g_free (emsg);
+}
+
static void
wing_input_stream_class_init (WingInputStreamClass *klass)
{
@@ -296,6 +356,7 @@ wing_input_stream_class_init (WingInputStreamClass *klass)
stream_class->read_fn = wing_input_stream_read;
stream_class->close_fn = wing_input_stream_close;
+ stream_class->read_async = wing_input_stream_read_async;
/**
* WingInputStream:handle:
@@ -338,53 +399,6 @@ wing_input_stream_init (WingInputStream *wing_stream)
g_return_if_fail (priv->overlap.hEvent != INVALID_HANDLE_VALUE);
}
-
-static gboolean
-wing_input_stream_pollable_is_readable (GPollableInputStream *pollable)
-{
- WingInputStream *wing_stream = WING_INPUT_STREAM (pollable);
- WingInputStreamPrivate *priv;
-
- priv = wing_input_stream_get_instance_private (wing_stream);
-
- return WaitForSingleObject (priv->overlap.hEvent, 0) == WAIT_OBJECT_0;
-}
-
-static GSource *
-wing_input_stream_pollable_create_source (GPollableInputStream *pollable,
- GCancellable *cancellable)
-{
- WingInputStream *wing_stream = WING_INPUT_STREAM (pollable);
- WingInputStreamPrivate *priv;
- GSource *handle_source, *pollable_source;
-
- priv = wing_input_stream_get_instance_private (wing_stream);
-
- handle_source = wing_create_source (priv->overlap.hEvent,
- G_IO_IN, cancellable);
- pollable_source = g_pollable_source_new_full (pollable, handle_source, cancellable);
- g_source_unref (handle_source);
-
- return pollable_source;
-}
-
-static gssize
-wing_input_stream_pollable_read_nonblocking (GPollableInputStream *pollable,
- void *buffer,
- gsize count,
- GError **error)
-{
- return read_internal (G_INPUT_STREAM (pollable), buffer, count, FALSE, NULL, error);
-}
-
-static void
-wing_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
-{
- iface->is_readable = wing_input_stream_pollable_is_readable;
- iface->create_source = wing_input_stream_pollable_create_source;
- iface->read_nonblocking = wing_input_stream_pollable_read_nonblocking;
-}
-
/**
* wing_input_stream_new:
* @handle: a Win32 file handle
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]