[aravis/wip/emmanuel/wakeup: 2/2] stream: new delete_buffers API
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis/wip/emmanuel/wakeup: 2/2] stream: new delete_buffers API
- Date: Mon, 4 May 2015 19:04:53 +0000 (UTC)
commit 5f4e27ff6cd709908c06c860021acce654d7b21a
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Mon May 4 21:01:37 2015 +0200
stream: new delete_buffers API
src/arvgvstream.c | 176 ++++++++++++++++++++++++++++++++--------------------
src/arvstream.c | 38 +++++++++++
src/arvstream.h | 2 +
3 files changed, 148 insertions(+), 68 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index fbd7c23..114ec7f 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -28,6 +28,7 @@
#include <arvgvstream.h>
#include <arvstreamprivate.h>
#include <arvbufferprivate.h>
+#include <arvwakeupprivate.h>
#include <arvgvsp.h>
#include <arvgvcp.h>
#include <arvdebug.h>
@@ -101,7 +102,10 @@ typedef struct {
guint64 timestamp_tick_frequency;
guint data_size;
- gboolean cancel;
+ ArvWakeup *cancel;
+ GCond cancel_cond;
+ GMutex cancel_mutex;
+ gboolean exit_on_cancel;
guint16 packet_id;
@@ -584,8 +588,8 @@ arv_gv_stream_thread (void *data)
guint32 frame_id;
GTimeVal current_time;
guint64 time_us;
- GPollFD poll_fd;
- size_t read_count;
+ GPollFD poll_fd[2];
+ gssize read_count;
int timeout_ms;
int n_events;
int i;
@@ -601,9 +605,11 @@ arv_gv_stream_thread (void *data)
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
- poll_fd.fd = g_socket_get_fd (thread_data->socket);
- poll_fd.events = G_IO_IN;
- poll_fd.revents = 0;
+ poll_fd[0].fd = g_socket_get_fd (thread_data->socket);
+ poll_fd[0].events = G_IO_IN;
+ poll_fd[0].revents = 0;
+
+ arv_wakeup_get_pollfd (thread_data->cancel, &poll_fd[1]);
packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
@@ -613,81 +619,94 @@ arv_gv_stream_thread (void *data)
else
timeout_ms = ARV_GV_STREAM_POLL_TIMEOUT_US / 1000;
- n_events = g_poll (&poll_fd, 1, timeout_ms);
+ n_events = g_poll (poll_fd, 2, timeout_ms);
g_get_current_time (¤t_time);
time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
if (n_events > 0) {
- thread_data->n_received_packets++;
-
- read_count = g_socket_receive (thread_data->socket, (char *) packet,
- ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+ read_count = g_socket_receive_with_blocking (thread_data->socket, (char *) packet,
+ ARV_GV_STREAM_INCOMING_BUFFER_SIZE,
FALSE, NULL, NULL);
- frame_id = arv_gvsp_packet_get_frame_id (packet);
- packet_id = arv_gvsp_packet_get_packet_id (packet);
+ if (read_count > 0) {
+ thread_data->n_received_packets++;
- if (first_packet) {
- thread_data->last_frame_id = frame_id - 1;
- first_packet = FALSE;
- }
+ frame_id = arv_gvsp_packet_get_frame_id (packet);
+ packet_id = arv_gvsp_packet_get_packet_id (packet);
- frame = _find_frame_data (thread_data, frame_id, packet, packet_id, read_count,
time_us);
-
- if (frame != NULL) {
- ArvGvspPacketType packet_type = arv_gvsp_packet_get_packet_type (packet);
-
- if (packet_type != ARV_GVSP_PACKET_TYPE_OK &&
- packet_type != ARV_GVSP_PACKET_TYPE_RESEND) {
- arv_debug_stream_thread ("[GvStream::stream_thread]"
- " Error packet at dt = %" G_GINT64_FORMAT ",
packet id = %u"
- " frame id = %u",
- time_us - frame->first_packet_time_us,
- packet_id, frame->frame_id);
- arv_gvsp_packet_debug (packet, read_count, ARV_DEBUG_LEVEL_DEBUG);
- frame->error_packet_received = TRUE;
-
- thread_data->n_error_packets++;
- } else {
- /* Check for duplicated packets */
- if (packet_id < frame->n_packets) {
- if (frame->packet_data[packet_id].received)
- thread_data->n_duplicated_packets++;
- else
- frame->packet_data[packet_id].received = TRUE;
- }
+ if (first_packet) {
+ thread_data->last_frame_id = frame_id - 1;
+ first_packet = FALSE;
+ }
- /* Keep track of last packet of a continuous block starting from
packet 0 */
- for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
- if (!frame->packet_data[i].received)
- break;
- frame->last_valid_packet = i - 1;
-
- switch (arv_gvsp_packet_get_content_type (packet)) {
- case ARV_GVSP_CONTENT_TYPE_DATA_LEADER:
- _process_data_leader (thread_data, frame, packet,
packet_id);
- break;
- case ARV_GVSP_CONTENT_TYPE_DATA_BLOCK:
- _process_data_block (thread_data, frame, packet,
packet_id,
- read_count);
- break;
- case ARV_GVSP_CONTENT_TYPE_DATA_TRAILER:
- _process_data_trailer (thread_data, frame, packet,
packet_id);
- break;
- default:
- thread_data->n_ignored_packets++;
- break;
+ frame = _find_frame_data (thread_data, frame_id, packet, packet_id,
read_count, time_us);
+
+ if (frame != NULL) {
+ ArvGvspPacketType packet_type = arv_gvsp_packet_get_packet_type
(packet);
+
+ if (packet_type != ARV_GVSP_PACKET_TYPE_OK &&
+ packet_type != ARV_GVSP_PACKET_TYPE_RESEND) {
+ arv_debug_stream_thread ("[GvStream::stream_thread]"
+ " Error packet at dt = %"
G_GINT64_FORMAT ", packet id = %u"
+ " frame id = %u",
+ time_us -
frame->first_packet_time_us,
+ packet_id, frame->frame_id);
+ arv_gvsp_packet_debug (packet, read_count,
ARV_DEBUG_LEVEL_DEBUG);
+ frame->error_packet_received = TRUE;
+
+ thread_data->n_error_packets++;
+ } else {
+ /* Check for duplicated packets */
+ if (packet_id < frame->n_packets) {
+ if (frame->packet_data[packet_id].received)
+ thread_data->n_duplicated_packets++;
+ else
+ frame->packet_data[packet_id].received = TRUE;
+ }
+
+ /* Keep track of last packet of a continuous block starting
from packet 0 */
+ for (i = frame->last_valid_packet + 1; i < frame->n_packets;
i++)
+ if (!frame->packet_data[i].received)
+ break;
+ frame->last_valid_packet = i - 1;
+
+ switch (arv_gvsp_packet_get_content_type (packet)) {
+ case ARV_GVSP_CONTENT_TYPE_DATA_LEADER:
+ _process_data_leader (thread_data, frame,
packet, packet_id);
+ break;
+ case ARV_GVSP_CONTENT_TYPE_DATA_BLOCK:
+ _process_data_block (thread_data, frame,
packet, packet_id,
+ read_count);
+ break;
+ case ARV_GVSP_CONTENT_TYPE_DATA_TRAILER:
+ _process_data_trailer (thread_data, frame,
packet, packet_id);
+ break;
+ default:
+ thread_data->n_ignored_packets++;
+ break;
+ }
+
+ _missing_packet_check (thread_data, frame, packet_id,
time_us);
}
-
- _missing_packet_check (thread_data, frame, packet_id, time_us);
- }
+ } else
+ thread_data->n_ignored_packets++;
} else
- thread_data->n_ignored_packets++;
+ frame = NULL;
+
+ if (read_count < 1 || n_events > 1) {
+ arv_wakeup_acknowledge (thread_data->cancel);
+ }
} else
frame = NULL;
_check_frame_completion (thread_data, time_us, frame);
- } while (!thread_data->cancel);
+
+ if (thread_data->frames == NULL) {
+ g_mutex_lock (&thread_data->cancel_mutex);
+ g_cond_signal (&thread_data->cancel_cond);
+ g_mutex_unlock (&thread_data->cancel_mutex);
+ }
+ } while (!thread_data->exit_on_cancel);
_flush_frames (thread_data);
@@ -699,6 +718,19 @@ arv_gv_stream_thread (void *data)
return NULL;
}
+static void
+_release_buffers (ArvStream *stream)
+{
+ ArvGvStream *gv_stream = ARV_GV_STREAM (stream);
+ ArvGvStreamThreadData *thread_data = gv_stream->priv->thread_data;
+
+ arv_wakeup_signal (thread_data->cancel);
+
+ g_mutex_lock (&thread_data->cancel_mutex);
+ g_cond_wait (&thread_data->cancel_cond, &thread_data->cancel_mutex);
+ g_mutex_unlock (&thread_data->cancel_mutex);
+}
+
/* ArvGvStream implemenation */
guint16
@@ -767,7 +799,10 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
thread_data->frame_retention_us = ARV_GV_STREAM_FRAME_RETENTION_US_DEFAULT;
thread_data->timestamp_tick_frequency = timestamp_tick_frequency;
thread_data->data_size = packet_size - ARV_GVSP_PACKET_PROTOCOL_OVERHEAD;
- thread_data->cancel = FALSE;
+ thread_data->cancel = arv_wakeup_new ();
+ g_mutex_init (&thread_data->cancel_mutex);
+ g_cond_init (&thread_data->cancel_cond);
+ thread_data->exit_on_cancel = FALSE;
thread_data->packet_id = 65300;
thread_data->last_frame_id = 0;
@@ -920,8 +955,12 @@ arv_gv_stream_finalize (GObject *object)
thread_data = gv_stream->priv->thread_data;
- thread_data->cancel = TRUE;
+ thread_data->exit_on_cancel = TRUE;
+ arv_wakeup_signal (thread_data->cancel);
+
g_thread_join (gv_stream->priv->thread);
+
+ g_clear_pointer (&thread_data->cancel, arv_wakeup_free);
g_object_unref (thread_data->device_address);
@@ -988,6 +1027,7 @@ arv_gv_stream_class_init (ArvGvStreamClass *gv_stream_class)
object_class->set_property = arv_gv_stream_set_property;
object_class->get_property = arv_gv_stream_get_property;
+ stream_class->release_buffers = _release_buffers;
stream_class->get_statistics = _get_statistics;
g_object_class_install_property (
diff --git a/src/arvstream.c b/src/arvstream.c
index 2b643eb..e04572f 100644
--- a/src/arvstream.c
+++ b/src/arvstream.c
@@ -216,6 +216,44 @@ arv_stream_get_n_buffers (ArvStream *stream, gint *n_input_buffers, gint *n_outp
}
/**
+ * arv_stream_delete_buffers:
+ * @stream: a #ArvStream
+ *
+ * Free all buffers in input and output queues.
+ *
+ * Since: 0.3.8
+ */
+
+void
+arv_stream_delete_buffers (ArvStream *stream)
+{
+ ArvStreamClass *stream_class;
+ unsigned n_deleted = 0;
+
+ g_return_if_fail (ARV_IS_STREAM (stream));
+
+ g_async_queue_lock (stream->priv->input_queue);
+ while (g_async_queue_length_unlocked (stream->priv->input_queue) > 0) {
+ g_object_unref (g_async_queue_pop_unlocked (stream->priv->input_queue));
+ n_deleted++;
+ }
+ g_async_queue_unlock (stream->priv->input_queue);
+
+ stream_class = ARV_STREAM_GET_CLASS (stream);
+ if (stream_class->release_buffers)
+ stream_class->release_buffers (stream);
+
+ g_async_queue_lock (stream->priv->output_queue);
+ while (g_async_queue_length_unlocked (stream->priv->output_queue) > 0) {
+ g_object_unref (g_async_queue_pop_unlocked (stream->priv->output_queue));
+ n_deleted++;
+ }
+ g_async_queue_unlock (stream->priv->output_queue);
+
+ arv_debug_stream ("[Stream::delete_buffers] Number of deleted buffers = %d", n_deleted);
+}
+
+/**
* arv_stream_get_statistics:
* @stream: a #ArvStream
* @n_completed_buffers: (out) (allow-none): number of complete received buffers
diff --git a/src/arvstream.h b/src/arvstream.h
index 99a2897..6d534e7 100644
--- a/src/arvstream.h
+++ b/src/arvstream.h
@@ -65,6 +65,7 @@ struct _ArvStream {
struct _ArvStreamClass {
GObjectClass parent_class;
+ void (*release_buffers) (ArvStream *stream);
void (*get_statistics) (ArvStream *stream, guint64 *n_completed_buffers,
guint64 *n_failures, guint64 *n_underruns);
@@ -81,6 +82,7 @@ ArvBuffer * arv_stream_timeout_pop_buffer (ArvStream *stream, guint64 timeout
void arv_stream_get_n_buffers (ArvStream *stream,
gint *n_input_buffers,
gint *n_output_buffers);
+void arv_stream_delete_buffers (ArvStream *stream);
void arv_stream_get_statistics (ArvStream *stream,
guint64 *n_completed_buffers,
guint64 *n_failures,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]