[gnio] Implement condition check/wait and GSource for win32
- From: Alexander Larsson <alexl src gnome org>
- To: svn-commits-list gnome org
- Subject: [gnio] Implement condition check/wait and GSource for win32
- Date: Tue, 5 May 2009 07:31:13 -0400 (EDT)
commit 75cf6b45f3cdcedd4470b602c99a2145710bb67c
Author: Alexander Larsson <alexl redhat com>
Date: Tue May 5 13:24:18 2009 +0200
Implement condition check/wait and GSource for win32
---
gio/gsocket.c | 534 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 494 insertions(+), 40 deletions(-)
diff --git a/gio/gsocket.c b/gio/gsocket.c
index 39f06bc..5150f29 100644
--- a/gio/gsocket.c
+++ b/gio/gsocket.c
@@ -70,14 +70,22 @@ struct _GSocketPrivate
GSocketType type;
char *protocol;
gint fd;
- gboolean blocking;
gint listen_backlog;
- gboolean reuse_address;
- gboolean keepalive;
GError *construct_error;
GSocketAddress *local_address;
GSocketAddress *remote_address;
- gboolean closed;
+ guint blocking : 1;
+ guint reuse_address : 1;
+ guint keepalive : 1;
+ guint closed : 1;
+ guint blocking_mode_unknown : 1;
+#ifdef G_OS_WIN32
+ WSAEVENT event;
+ int current_events;
+ int current_errors;
+ int selected_events;
+ GList *requested_conditions; /* list of requested GIOCondition * */
+#endif
};
static int
@@ -145,6 +153,18 @@ socket_strerror (int err)
#endif
}
+#ifdef G_OS_WIN32
+#define win32_unset_event_mask(_socket, _mask) _win32_unset_event_mask (_socket, _mask)
+static void
+_win32_unset_event_mask (GSocket *socket, int mask)
+{
+ socket->priv->current_events &= ~mask;
+ socket->priv->current_errors &= ~mask;
+}
+#else
+#define win32_unset_event_mask(_socket, _mask)
+#endif
+
static gboolean
check_socket (GSocket *socket,
GError **error)
@@ -272,6 +292,7 @@ g_socket_details_from_fd (GSocket *socket)
#else
/* There doesn't seem to be a way to get this on win32... */
socket->priv->blocking = FALSE;
+ socket->priv->blocking_mode_unknown = TRUE;
#endif
optlen = sizeof bool_val;
@@ -505,6 +526,10 @@ g_socket_finalize (GObject *object)
!socket->priv->closed)
g_socket_close (socket, NULL);
+#ifdef G_OS_WIN32
+ g_assert (socket->priv->requested_conditions == NULL);
+#endif
+
if (G_OBJECT_CLASS (g_socket_parent_class)->finalize)
(*G_OBJECT_CLASS (g_socket_parent_class)->finalize) (object);
}
@@ -636,6 +661,9 @@ g_socket_init (GSocket *socket)
socket->priv->construct_error = NULL;
socket->priv->remote_address = NULL;
socket->priv->local_address = NULL;
+#ifdef G_OS_WIN32
+ socket->priv->event = WSA_INVALID_EVENT;
+#endif
}
GSocket *
@@ -666,7 +694,8 @@ g_socket_set_blocking (GSocket *socket,
blocking = !!blocking;
- if (socket->priv->blocking == blocking)
+ if (socket->priv->blocking == blocking &&
+ !socket->priv->blocking_mode_unknown)
return;
#ifndef G_OS_WIN32
@@ -917,6 +946,7 @@ GSocket *
g_socket_accept (GSocket *socket,
GError **error)
{
+ GSocket *new_socket;
gint ret;
g_return_val_if_fail (G_IS_SOCKET (socket), NULL);
@@ -933,6 +963,8 @@ g_socket_accept (GSocket *socket,
if (errsv == EINTR)
continue;
+ win32_unset_event_mask (socket, FD_ACCEPT);
+
g_set_error (error, G_IO_ERROR,
socket_io_error_from_errno (errsv),
"error accepting connection: %s", socket_strerror (errsv));
@@ -941,7 +973,30 @@ g_socket_accept (GSocket *socket,
break;
}
- return g_socket_new_from_fd (ret);
+ win32_unset_event_mask (socket, FD_ACCEPT);
+
+ new_socket = g_socket_new_from_fd (ret);
+
+#ifdef G_OS_WIN32
+ {
+ gulong arg;
+
+ /* The socket inherits the accepting sockets event mask and even object,
+ we need to remove that */
+ WSAEventSelect (ret, NULL, 0);
+
+ /* It also inherits the blocking mode, but on unix newly accepted
+ sockets are blocking, disable blocking to get the same behaviour
+ as on unix. */
+ arg = FALSE;
+ if (ioctlsocket (new_socket->priv->fd, FIONBIO, &arg) == SOCKET_ERROR)
+ g_warning ("Unable to set newly allocated socket to blocking mode");
+ new_socket->priv->blocking = TRUE;
+ new_socket->priv->blocking_mode_unknown = FALSE;
+ }
+#endif
+
+ return new_socket;
}
gboolean
@@ -971,20 +1026,22 @@ g_socket_connect (GSocket *socket,
#ifndef G_OS_WIN32
if (errsv == EINPROGRESS)
#else
- if (errsv == WSAEINPROGRESS)
+ if (errsv == WSAEINPROGRESS)
#endif
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
- "connection in progress");
- else
- g_set_error (error, G_IO_ERROR,
- socket_io_error_from_errno (errsv),
- "error connecting: %s", socket_strerror (errsv));
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
+ "connection in progress");
+ else
+ g_set_error (error, G_IO_ERROR,
+ socket_io_error_from_errno (errsv),
+ "error connecting: %s", socket_strerror (errsv));
return FALSE;
}
break;
}
+ win32_unset_event_mask (socket, FD_CONNECT);
+
socket->priv->remote_address = g_object_ref (address);
return TRUE;
@@ -1012,11 +1069,16 @@ g_socket_receive (GSocket *socket,
if (errsv == EINTR)
continue;
+ win32_unset_event_mask (socket, FD_READ);
+
g_set_error (error, G_IO_ERROR,
socket_io_error_from_errno (errsv),
"error receiving data: %s", socket_strerror (errsv));
return -1;
}
+
+ win32_unset_event_mask (socket, FD_READ);
+
break;
}
@@ -1045,6 +1107,11 @@ g_socket_send (GSocket *socket,
if (errsv == EINTR)
continue;
+#ifdef WSAEWOULDBLOCK
+ if (errsv == WSAEWOULDBLOCK)
+ win32_unset_event_mask (socket, FD_WRITE);
+#endif
+
g_set_error (error, G_IO_ERROR,
socket_io_error_from_errno (errsv),
"error sending data: %s", socket_strerror (errsv));
@@ -1092,6 +1159,14 @@ g_socket_close (GSocket *socket,
break;
}
+#ifdef G_OS_WIN32
+ if (socket->priv->event != WSA_INVALID_EVENT)
+ {
+ WSACloseEvent (socket->priv->event);
+ socket->priv->event = WSA_INVALID_EVENT;
+ }
+#endif
+
socket->priv->closed = TRUE;
return res != -1;
@@ -1103,14 +1178,320 @@ g_socket_is_closed (GSocket *socket)
return socket->priv->closed;
}
+#ifdef G_OS_WIN32
+/* Broken source, used on errors */
+static gboolean
+broken_prepare (GSource *source,
+ gint *timeout)
+{
+ return FALSE;
+}
+
+static gboolean
+broken_check (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+broken_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ return TRUE;
+}
+
+static GSourceFuncs broken_funcs =
+{
+ broken_prepare,
+ broken_check,
+ broken_dispatch,
+ NULL
+};
+
+static gint
+network_events_for_condition (GIOCondition condition)
+{
+ int event_mask = 0;
+
+ if (condition & G_IO_IN)
+ event_mask |= (FD_READ | FD_ACCEPT);
+ if (condition & G_IO_OUT)
+ event_mask |= (FD_WRITE | FD_CONNECT);
+ event_mask |= FD_CLOSE;
+
+ return event_mask;
+}
+
+static void
+ensure_event (GSocket *socket)
+{
+ if (socket->priv->event == WSA_INVALID_EVENT)
+ socket->priv->event = WSACreateEvent();
+}
+
+static void
+update_select_events (GSocket *socket)
+{
+ int event_mask;
+ GIOCondition *ptr;
+ GList *l;
+ WSAEVENT event;
+
+ ensure_event (socket);
+
+ event_mask = 0;
+ for (l = socket->priv->requested_conditions; l != NULL; l = l->next)
+ {
+ ptr = l->data;
+ event_mask |= network_events_for_condition (*ptr);
+ }
+
+ if (event_mask != socket->priv->selected_events)
+ {
+ /* If no events selected, disable event so we can unset
+ nonblocking mode */
+
+ if (event_mask == 0)
+ event = NULL;
+ else
+ event = socket->priv->event;
+
+ if (WSAEventSelect (socket->priv->fd, event, event_mask) == 0)
+ {
+ socket->priv->selected_events = event_mask;
+
+ /* This automatically enables nonblocking mode */
+ if (socket->priv->blocking)
+ {
+ socket->priv->blocking = FALSE;
+ g_object_notify (G_OBJECT (socket), "blocking");
+ }
+ }
+ }
+}
+
+static void
+add_condition_watch (GSocket *socket,
+ GIOCondition *condition)
+{
+ g_assert (g_list_find (socket->priv->requested_conditions, condition) == NULL);
+
+ socket->priv->requested_conditions =
+ g_list_prepend (socket->priv->requested_conditions, condition);
+
+ update_select_events (socket);
+}
+
+static void
+remove_condition_watch (GSocket *socket,
+ GIOCondition *condition)
+{
+ g_assert (g_list_find (socket->priv->requested_conditions, condition) != NULL);
+
+ socket->priv->requested_conditions =
+ g_list_remove (socket->priv->requested_conditions, condition);
+
+ update_select_events (socket);
+}
+
+static GIOCondition
+update_condition (GSocket *socket)
+{
+ WSANETWORKEVENTS events;
+ GIOCondition condition;
+
+ if (WSAEnumNetworkEvents (socket->priv->fd,
+ socket->priv->event,
+ &events) == 0)
+ {
+ socket->priv->current_events |= events.lNetworkEvents;
+ if (events.lNetworkEvents & FD_WRITE &&
+ events.iErrorCode[FD_WRITE_BIT] != 0)
+ socket->priv->current_errors |= FD_WRITE;
+ if (events.lNetworkEvents & FD_CONNECT &&
+ events.iErrorCode[FD_CONNECT_BIT] != 0)
+ socket->priv->current_errors |= FD_CONNECT;
+ }
+
+ condition = 0;
+ if (socket->priv->current_events & (FD_READ | FD_ACCEPT))
+ condition |= G_IO_IN;
+
+ if (socket->priv->current_events & FD_CLOSE ||
+ socket->priv->closed)
+ condition |= G_IO_HUP;
+
+ /* Never report both G_IO_OUT and HUP, these are
+ mutually exclusive (can't write to a closed socket) */
+ if ((condition & G_IO_HUP) == 0 &&
+ socket->priv->current_events & FD_WRITE)
+ {
+ if (socket->priv->current_errors & FD_WRITE)
+ condition |= G_IO_ERR;
+ else
+ condition |= G_IO_OUT;
+ }
+ else
+ {
+ if (socket->priv->current_events & FD_CONNECT)
+ {
+ if (socket->priv->current_errors & FD_CONNECT)
+ condition |= (G_IO_HUP | G_IO_ERR);
+ else
+ condition |= G_IO_OUT;
+ }
+ }
+
+ return condition;
+}
+
+typedef struct {
+ GSource source;
+ GPollFD pollfd;
+ GSocket *socket;
+ GIOCondition condition;
+ GCancellable *cancellable;
+ GPollFD cancel_pollfd;
+ GIOCondition result_condition;
+} GWinsockSource;
+
+static gboolean
+winsock_prepare (GSource *source,
+ gint *timeout)
+{
+ GWinsockSource *winsock_source = (GWinsockSource *)source;
+ GIOCondition current_condition;
+
+ current_condition = update_condition (winsock_source->socket);
+
+ if (g_cancellable_is_cancelled (winsock_source->cancellable))
+ {
+ winsock_source->result_condition = current_condition;
+ return TRUE;
+ }
+
+ if ((winsock_source->condition & current_condition) != 0)
+ {
+ winsock_source->result_condition = current_condition;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static gboolean
+winsock_check (GSource *source)
+{
+ GWinsockSource *winsock_source = (GWinsockSource *)source;
+ GIOCondition current_condition;
+
+ current_condition = update_condition (winsock_source->socket);
+
+ if (g_cancellable_is_cancelled (winsock_source->cancellable))
+ {
+ winsock_source->result_condition = current_condition;
+ return TRUE;
+ }
+
+ if ((winsock_source->condition & current_condition) != 0)
+ {
+ winsock_source->result_condition = current_condition;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static gboolean
+winsock_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ GSocketSourceFunc func = (GSocketSourceFunc)callback;
+ GWinsockSource *winsock_source = (GWinsockSource *)source;
+
+ return (*func) (user_data, winsock_source->result_condition);
+}
+
+static void
+winsock_finalize (GSource *source)
+{
+ GWinsockSource *winsock_source = (GWinsockSource *)source;
+ GSocket *socket;
+
+ socket = winsock_source->socket;
+
+ remove_condition_watch (socket, &winsock_source->condition);
+ g_object_unref (socket);
+
+ if (winsock_source->cancellable)
+ g_object_unref (winsock_source->cancellable);
+}
+
+static GSourceFuncs winsock_funcs =
+{
+ winsock_prepare,
+ winsock_check,
+ winsock_dispatch,
+ winsock_finalize
+};
+
+static GSource *
+winsock_source_new (GSocket *socket,
+ GIOCondition condition,
+ GCancellable *cancellable)
+{
+ GSource *source;
+ GWinsockSource *winsock_source;
+
+ ensure_event (socket);
+
+ if (socket->priv->event == WSA_INVALID_EVENT)
+ {
+ g_warning ("Failed to create WSAEvent");
+ return g_source_new (&broken_funcs, sizeof (GSource));
+ }
+
+ condition |= G_IO_HUP | G_IO_ERR;
+
+ source = g_source_new (&winsock_funcs, sizeof (GWinsockSource));
+ winsock_source = (GWinsockSource *)source;
+
+ winsock_source->socket = g_object_ref (socket);
+ winsock_source->condition = condition;
+ add_condition_watch (socket, &winsock_source->condition);
+
+ if (cancellable)
+ {
+ winsock_source->cancellable = g_object_ref (cancellable);
+ g_cancellable_make_pollfd (cancellable,
+ &winsock_source->cancel_pollfd);
+ g_source_add_poll (source, &winsock_source->cancel_pollfd);
+ }
+
+ winsock_source->pollfd.fd = (gintptr) socket->priv->event;
+ winsock_source->pollfd.events = condition;
+ g_source_add_poll (source, &winsock_source->pollfd);
+
+ return source;
+}
+#endif
+
GSource *
g_socket_create_source (GSocket *socket,
GIOCondition condition,
GCancellable *cancellable)
{
+ GSource *source;
g_return_val_if_fail (G_IS_SOCKET (socket) && (cancellable == NULL || G_IS_CANCELLABLE (cancellable)), NULL);
- return _g_fd_source_new (socket->priv->fd, condition, cancellable);
+#ifdef G_OS_WIN32
+ source = winsock_source_new (socket, condition, cancellable);
+#else
+ source =_g_fd_source_new (socket->priv->fd, condition, cancellable);
+#endif
+ return source;
}
/**
@@ -1133,20 +1514,34 @@ GIOCondition
g_socket_condition_check (GSocket *socket,
GIOCondition condition)
{
- GPollFD poll_fd;
- gint result;
-
if (!check_socket (socket, NULL))
return 0;
- poll_fd.fd = socket->priv->fd;
- poll_fd.events = condition;
+#ifdef G_OS_WIN32
+ {
+ GIOCondition current_condition;
- do
- result = g_poll (&poll_fd, 1, 0);
- while (result == -1 && get_socket_errno () == EINTR);
+ condition |= G_IO_ERR | G_IO_HUP;
- return poll_fd.revents;
+ add_condition_watch (socket, &condition);
+ current_condition = update_condition (socket);
+ remove_condition_watch (socket, &condition);
+ return condition & current_condition;
+ }
+#else
+ {
+ GPollFD poll_fd;
+ gint result;
+ poll_fd.fd = socket->priv->fd;
+ poll_fd.events = condition;
+
+ do
+ result = g_poll (&poll_fd, 1, 0);
+ while (result == -1 && get_socket_errno () == EINTR);
+
+ return poll_fd.revents;
+ }
+#endif
}
/**
@@ -1169,29 +1564,83 @@ g_socket_condition_wait (GSocket *socket,
GCancellable *cancellable,
GError **error)
{
- GPollFD poll_fd[2];
- gint result;
- gint num;
-
if (!check_socket (socket, error))
return FALSE;
- poll_fd[0].fd = socket->priv->fd;
- poll_fd[0].events = condition;
- num = 1;
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
- if (cancellable)
- {
- g_cancellable_make_pollfd (cancellable, &poll_fd[1]);
- num++;
- }
+#ifdef G_OS_WIN32
+ {
+ GIOCondition current_condition;
+ WSAEVENT events[2];
+ DWORD res;
+ GPollFD cancel_fd;
+ int num_events;
+
+ /* Always check these */
+ condition |= G_IO_ERR | G_IO_HUP;
- do
- result = g_poll (poll_fd, num, -1);
- while (result == -1 && get_socket_errno () == EINTR);
+ add_condition_watch (socket, &condition);
- return cancellable == NULL ||
- !g_cancellable_set_error_if_cancelled (cancellable, error);
+ num_events = 0;
+ events[num_events++] = socket->priv->event;
+
+ if (cancellable)
+ {
+ g_cancellable_make_pollfd (cancellable, &cancel_fd);
+ events[num_events++] = (WSAEVENT)cancel_fd.fd;
+ }
+
+ current_condition = update_condition (socket);
+ while ((condition & current_condition) == 0)
+ {
+ res = WSAWaitForMultipleEvents(num_events, events,
+ FALSE, WSA_INFINITE, FALSE);
+ if (res == WSA_WAIT_FAILED)
+ {
+ int errsv = get_socket_errno ();
+
+ g_set_error (error, G_IO_ERROR,
+ socket_io_error_from_errno (errsv),
+ "waiting for socket condition: %s",
+ socket_strerror (errsv));
+ break;
+ }
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ break;
+
+ current_condition = update_condition (socket);
+ }
+ remove_condition_watch (socket, &condition);
+
+ return (condition & current_condition) != 0;
+ }
+#else
+ {
+ GPollFD poll_fd[2];
+ gint result;
+ gint num;
+
+ poll_fd[0].fd = socket->priv->fd;
+ poll_fd[0].events = condition;
+ num = 1;
+
+ if (cancellable)
+ {
+ g_cancellable_make_pollfd (cancellable, &poll_fd[1]);
+ num++;
+ }
+
+ do
+ result = g_poll (poll_fd, num, -1);
+ while (result == -1 && get_socket_errno () == EINTR);
+
+ return cancellable == NULL ||
+ !g_cancellable_set_error_if_cancelled (cancellable, error);
+ }
+ #endif
}
gssize
@@ -1377,6 +1826,9 @@ g_socket_send_message (GSocket *socket,
if (errsv == WSAEINTR)
continue;
+ if (errsv == WSAEWOULDBLOCK)
+ win32_unset_event_mask (socket, FD_WRITE);
+
g_set_error (error, G_IO_ERROR,
socket_io_error_from_errno (errsv),
"WSASendTo: %s", socket_strerror (errsv));
@@ -1665,12 +2117,14 @@ g_socket_receive_message (GSocket *socket,
if (errsv == WSAEINTR)
continue;
+ win32_unset_event_mask (socket, FD_READ);
g_set_error (error, G_IO_ERROR,
socket_io_error_from_errno (errsv),
"WSARecvFrom: %s", socket_strerror (errsv));
return -1;
}
+ win32_unset_event_mask (socket, FD_READ);
break;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]