[calls] sip: media-pipeline: Keep track of pipeline state
- From: Evangelos Ribeiro Tzaras <devrtz src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [calls] sip: media-pipeline: Keep track of pipeline state
- Date: Sun, 6 Mar 2022 00:28:32 +0000 (UTC)
commit fe6951c93836c3d455800827d01d7d11838bceaf
Author: Evangelos Ribeiro Tzaras <devrtz fortysixandtwo eu>
Date: Mon Feb 28 11:18:36 2022 +0100
sip: media-pipeline: Keep track of pipeline state
This can be used by the media manager to dispose of pipelines which are done.
plugins/sip/calls-sip-media-pipeline.c | 275 ++++++++++++++++++++++++++++++---
plugins/sip/calls-sip-media-pipeline.h | 48 ++++--
plugins/sip/meson.build | 8 +
3 files changed, 304 insertions(+), 27 deletions(-)
---
diff --git a/plugins/sip/calls-sip-media-pipeline.c b/plugins/sip/calls-sip-media-pipeline.c
index ca9afc61..0d7b762f 100644
--- a/plugins/sip/calls-sip-media-pipeline.c
+++ b/plugins/sip/calls-sip-media-pipeline.c
@@ -24,6 +24,7 @@
#define G_LOG_DOMAIN "CallsSipMediaPipeline"
+#include "calls-media-pipeline-enums.h"
#include "calls-sip-media-pipeline.h"
#include "util.h"
@@ -57,6 +58,39 @@
* Both pipelines are using RTCP.
*/
+
+/* The following defines are used to set/reset bitmaps of playing/paused/stop state */
+#define EL_SEND_PIPELINE (1<<0)
+#define EL_SEND_AUDIO_SRC (1<<1)
+#define EL_SEND_RTPBIN (1<<2)
+#define EL_SEND_RTP_SINK (1<<3)
+#define EL_SEND_RTCP_SINK (1<<4)
+#define EL_SEND_RTCP_SRC (1<<5)
+#define EL_SEND_PAYLOADER (1<<6)
+#define EL_SEND_ENCODER (1<<7)
+
+#define EL_SEND_ALL_RTP EL_SEND_PIPELINE | EL_SEND_AUDIO_SRC | \
+ EL_SEND_RTPBIN | EL_SEND_RTP_SINK | EL_SEND_RTCP_SRC | EL_SEND_RTCP_SINK | \
+ EL_SEND_PAYLOADER | EL_SEND_ENCODER
+#define EL_SEND_SENDING EL_SEND_AUDIO_SRC | EL_SEND_RTPBIN | EL_SEND_RTP_SINK | \
+ EL_SEND_PAYLOADER | EL_SEND_ENCODER
+
+/* leave some room for more elements to be added later */
+
+#define EL_RECV_PIPELINE (1<<16)
+#define EL_RECV_AUDIO_SINK (1<<17)
+#define EL_RECV_RTPBIN (1<<18)
+#define EL_RECV_RTP_SRC (1<<19)
+#define EL_RECV_RTCP_SINK (1<<20)
+#define EL_RECV_RTCP_SRC (1<<21)
+#define EL_RECV_DEPAYLOADER (1<<22)
+#define EL_RECV_DECODER (1<<23)
+
+#define EL_RECV_ALL_RTP EL_RECV_PIPELINE | EL_RECV_AUDIO_SINK | \
+ EL_RECV_RTPBIN | EL_RECV_RTP_SRC | EL_RECV_RTCP_SRC | EL_RECV_RTCP_SINK | \
+ EL_RECV_DEPAYLOADER | EL_RECV_DECODER
+
+
enum {
PROP_0,
PROP_CODEC,
@@ -66,15 +100,30 @@ enum {
PROP_LPORT_RTCP,
PROP_RPORT_RTCP,
PROP_DEBUG,
+ PROP_STATE,
PROP_LAST_PROP,
};
+
+enum {
+ SENDING_STARTED,
+ N_SIGNALS
+};
+
static GParamSpec *props[PROP_LAST_PROP];
+static uint signals[N_SIGNALS];
+
struct _CallsSipMediaPipeline {
GObject parent;
MediaCodecInfo *codec;
gboolean debug;
+
+ CallsMediaPipelineState state;
+ uint element_map_playing;
+ uint element_map_paused;
+ uint element_map_stopped;
+ gboolean emitted_sending_signal;
/* Connection details */
char *remote;
@@ -84,8 +133,6 @@ struct _CallsSipMediaPipeline {
gint rport_rtcp;
gint lport_rtcp;
- gboolean is_running;
-
/* Gstreamer Elements (sending) */
GstElement *send_pipeline;
GstElement *audiosrc;
@@ -119,6 +166,56 @@ static void initable_iface_init (GInitableIface *iface);
G_DEFINE_TYPE_WITH_CODE (CallsSipMediaPipeline, calls_sip_media_pipeline, G_TYPE_OBJECT,
G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, initable_iface_init));
+
+static void
+set_state (CallsSipMediaPipeline *self,
+ CallsMediaPipelineState state)
+{
+ g_assert (CALLS_SIP_MEDIA_PIPELINE (self));
+
+ if (self->state == state)
+ return;
+
+ self->state = state;
+ g_object_notify_by_pspec (G_OBJECT (self), props[PROP_STATE]);
+
+ self->emitted_sending_signal = FALSE;
+}
+
+
+static void
+check_element_maps (CallsSipMediaPipeline *self)
+{
+ g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
+
+ if (self->element_map_playing == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) {
+ g_debug ("All pipeline elements are playing");
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAYING);
+ return;
+ }
+
+ if (self->element_map_paused == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) {
+ g_debug ("All pipeline elements are paused");
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_PAUSED);
+ return;
+ }
+
+ if (self->element_map_stopped == (EL_SEND_ALL_RTP | EL_RECV_ALL_RTP)) {
+ g_debug ("All pipeline elements are stopped");
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOPPED);
+ return;
+ }
+
+ if ((self->element_map_playing & (EL_SEND_SENDING)) == (EL_SEND_SENDING) &&
+ !self->emitted_sending_signal) {
+ g_debug ("Sender pipeline is sending data to %s RTP/RTCP %d/%d",
+ self->remote, self->rport_rtp, self->rport_rtcp);
+ g_signal_emit (self, signals[SENDING_STARTED], 0);
+ self->emitted_sending_signal = TRUE;
+ }
+}
+
+
/* rtpbin adds a pad once the payload is verified */
static void
on_pad_added (GstElement *rtpbin,
@@ -144,7 +241,7 @@ on_bus_message (GstBus *bus,
GstMessage *message,
gpointer data)
{
- CallsSipMediaPipeline *pipeline = CALLS_SIP_MEDIA_PIPELINE (data);
+ CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (data);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:
@@ -169,24 +266,79 @@ on_bus_message (GstBus *bus,
case GST_MESSAGE_EOS:
g_debug ("Received end of stream");
- calls_sip_media_pipeline_stop (pipeline);
+ calls_sip_media_pipeline_stop (self);
break;
case GST_MESSAGE_STATE_CHANGED:
{
GstState oldstate;
GstState newstate;
+ uint element_id = 0;
+ uint unset_element_id;
gst_message_parse_state_changed (message, &oldstate, &newstate, NULL);
+
g_debug ("Element %s has changed state from %s to %s",
GST_OBJECT_NAME (message->src),
gst_element_state_get_name (oldstate),
gst_element_state_get_name (newstate));
+
+ /* Sender pipeline elements */
+ if (message->src == GST_OBJECT (self->send_pipeline))
+ element_id = EL_SEND_PIPELINE;
+ else if (message->src == GST_OBJECT (self->audiosrc))
+ element_id = EL_SEND_AUDIO_SRC;
+ else if (message->src == GST_OBJECT (self->send_rtpbin))
+ element_id = EL_SEND_RTPBIN;
+ else if (message->src == GST_OBJECT (self->rtp_sink))
+ element_id = EL_SEND_RTP_SINK;
+ else if (message->src == GST_OBJECT (self->rtcp_send_sink))
+ element_id = EL_SEND_RTCP_SINK;
+ else if (message->src == GST_OBJECT (self->rtcp_send_src))
+ element_id = EL_SEND_RTCP_SRC;
+ else if (message->src == GST_OBJECT (self->payloader))
+ element_id = EL_SEND_PAYLOADER;
+ else if (message->src == GST_OBJECT (self->encoder))
+ element_id = EL_SEND_ENCODER;
+ /* Receiver pipeline elements */
+ else if (message->src == GST_OBJECT (self->recv_pipeline))
+ element_id = EL_RECV_PIPELINE;
+ else if (message->src == GST_OBJECT (self->audiosink))
+ element_id = EL_RECV_AUDIO_SINK;
+ else if (message->src == GST_OBJECT (self->recv_rtpbin))
+ element_id = EL_RECV_RTPBIN;
+ else if (message->src == GST_OBJECT (self->rtp_src))
+ element_id = EL_RECV_RTP_SRC;
+ else if (message->src == GST_OBJECT (self->rtcp_recv_sink))
+ element_id = EL_RECV_RTCP_SINK;
+ else if (message->src == GST_OBJECT (self->rtcp_recv_src))
+ element_id = EL_RECV_RTCP_SRC;
+ else if (message->src == GST_OBJECT (self->depayloader))
+ element_id = EL_RECV_DEPAYLOADER;
+ else if (message->src == GST_OBJECT (self->decoder))
+ element_id = EL_RECV_DECODER;
+
+ unset_element_id = G_MAXUINT ^ element_id;
+ if (newstate == GST_STATE_PLAYING) {
+ self->element_map_playing |= element_id;
+ self->element_map_paused &= unset_element_id;
+ self->element_map_stopped &= unset_element_id;
+ } else if (newstate == GST_STATE_PAUSED) {
+ self->element_map_paused |= element_id;
+ self->element_map_playing &= unset_element_id;
+ self->element_map_stopped &= unset_element_id;
+ } else if (newstate == GST_STATE_NULL) {
+ self->element_map_stopped |= element_id;
+ self->element_map_playing &= unset_element_id;
+ self->element_map_paused &= unset_element_id;
+ }
+
+ check_element_maps (self);
break;
}
default:
- if (pipeline->debug)
+ if (self->debug)
g_debug ("Got unhandled %s message", GST_MESSAGE_TYPE_NAME (message));
break;
}
@@ -610,6 +762,10 @@ calls_sip_media_pipeline_get_property (GObject *object,
g_value_set_boolean (value, self->debug);
break;
+ case PROP_STATE:
+ g_value_set_enum (value, calls_sip_media_pipeline_get_state (self));
+ break;
+
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@@ -733,7 +889,21 @@ calls_sip_media_pipeline_class_init (CallsSipMediaPipelineClass *klass)
FALSE,
G_PARAM_READWRITE);
+ props[PROP_STATE] = g_param_spec_enum ("state",
+ "State",
+ "The state of the media pipeline",
+ CALLS_TYPE_MEDIA_PIPELINE_STATE,
+ CALLS_MEDIA_PIPELINE_STATE_UNKNOWN,
+ G_PARAM_READABLE);
+
g_object_class_install_properties (object_class, PROP_LAST_PROP, props);
+
+ signals[SENDING_STARTED] =
+ g_signal_new ("sending-started",
+ G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST,
+ 0, NULL, NULL, NULL,
+ G_TYPE_NONE, 0);
}
@@ -752,13 +922,20 @@ pipelines_initable_init (GInitable *initable,
{
CallsSipMediaPipeline *self = CALLS_SIP_MEDIA_PIPELINE (initable);
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_INITIALIZING);
+
if (!recv_pipeline_init (self, cancellable, error))
- return FALSE;
+ goto err;
if (!send_pipeline_init (self, cancellable, error))
- return FALSE;
+ goto err;
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC);
return TRUE;
+
+ err:
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
+ return FALSE;
}
@@ -815,17 +992,21 @@ calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
if (!recv_pipeline_setup_codecs (self, codec, &error)) {
g_warning ("Error trying to setup codec for receive pipeline: %s",
error->message);
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return;
}
if (!send_pipeline_setup_codecs (self, codec, &error)) {
g_warning ("Error trying to setup codec for send pipeline: %s",
error->message);
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_ERROR);
return;
}
self->codec = codec;
g_object_notify_by_pspec (G_OBJECT (self), props[PROP_CODEC]);
+
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_READY);
}
static void
@@ -867,7 +1048,12 @@ diagnose_ports_in_use (CallsSipMediaPipeline *self)
gboolean same_socket = FALSE;
g_assert (CALLS_IS_SIP_MEDIA_PIPELINE (self));
- g_assert (self->is_running);
+
+ if (self->state != CALLS_MEDIA_PIPELINE_STATE_PLAYING &&
+ self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSED) {
+ g_warning ("Cannot diagnose ports when pipeline is not active");
+ return;
+ }
g_object_get (self->rtp_src, "used-socket", &socket_in, NULL);
g_object_get (self->rtp_sink, "used-socket", &socket_out, NULL);
@@ -897,13 +1083,12 @@ calls_sip_media_pipeline_start (CallsSipMediaPipeline *self)
GSocket *socket;
g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self));
- if (!self->codec) {
- g_warning ("Codec not set for this pipeline. Cannot start");
+ if (self->state != CALLS_MEDIA_PIPELINE_STATE_READY) {
+ g_warning ("Cannot start pipeline because it's not ready");
return;
}
g_debug ("Starting media pipeline");
- self->is_running = TRUE;
/* First start the receiver pipeline so that
we may reuse the socket in the sender pipeline */
@@ -924,9 +1109,10 @@ calls_sip_media_pipeline_start (CallsSipMediaPipeline *self)
/* Now start the sender pipeline */
gst_element_set_state (self->send_pipeline, GST_STATE_PLAYING);
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING);
+
if (self->debug)
diagnose_ports_in_use (self);
-
}
@@ -936,11 +1122,12 @@ calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self)
g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self));
g_debug ("Stopping media pipeline");
- self->is_running = FALSE;
/* Stop the pipelines in reverse order (compared to the starting) */
gst_element_set_state (self->send_pipeline, GST_STATE_NULL);
gst_element_set_state (self->recv_pipeline, GST_STATE_NULL);
+
+ set_state (self, CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING);
}
@@ -950,20 +1137,38 @@ calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
{
g_return_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self));
- if (self->is_running != pause)
+ if (pause &&
+ (self->state == CALLS_MEDIA_PIPELINE_STATE_PAUSED ||
+ self->state == CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING))
+ return;
+
+ if (!pause &&
+ (self->state == CALLS_MEDIA_PIPELINE_STATE_PLAYING ||
+ self->state == CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING))
+ return;
+
+ if (self->state != CALLS_MEDIA_PIPELINE_STATE_PLAYING &&
+ self->state != CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING &&
+ self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSED &&
+ self->state != CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING) {
+ g_warning ("Cannot pause or unpause pipeline because it's not currently active");
return;
+ }
- g_debug ("%s media pipeline", self->is_running ?
+ g_debug ("%s media pipeline", pause ?
"Pausing" :
"Unpausing");
- gst_element_set_state (self->recv_pipeline, self->is_running ?
+
+ gst_element_set_state (self->recv_pipeline, pause ?
GST_STATE_PAUSED :
GST_STATE_PLAYING);
- gst_element_set_state (self->send_pipeline, self->is_running ?
+ gst_element_set_state (self->send_pipeline, pause ?
GST_STATE_PAUSED :
GST_STATE_PLAYING);
- self->is_running = !self->is_running;
+ set_state (self, pause ?
+ CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING :
+ CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING);
}
@@ -992,4 +1197,38 @@ calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self)
return port;
}
+
+CallsMediaPipelineState
+calls_sip_media_pipeline_get_state (CallsSipMediaPipeline *self)
+{
+ g_return_val_if_fail (CALLS_IS_SIP_MEDIA_PIPELINE (self),
+ CALLS_MEDIA_PIPELINE_STATE_UNKNOWN);
+
+ return self->state;
+}
+
#undef MAKE_ELEMENT
+
+#undef EL_SEND_PIPELINE
+#undef EL_SEND_AUDIO_SRC
+#undef EL_SEND_RTPBIN
+#undef EL_SEND_RTP_SINK
+#undef EL_SEND_RTCP_SINK
+#undef EL_SEND_RTCP_SRC
+#undef EL_SEND_PAYLOADER
+#undef EL_SEND_ENCODER
+
+#undef EL_SEND_ALL_RTP
+#undef EL_SEND_SENDING
+
+#undef EL_RECV_PIPELINE
+#undef EL_RECV_AUDIO_SINK
+#undef EL_RECV_RTPBIN
+#undef EL_RECV_RTP_SRC
+#undef EL_RECV_RTCP_SINK
+#undef EL_RECV_RTCP_SRC
+#undef EL_RECV_DEPAYLOADER
+#undef EL_RECV_DECODER
+
+#undef EL_RECV_ALL_RTP
+
diff --git a/plugins/sip/calls-sip-media-pipeline.h b/plugins/sip/calls-sip-media-pipeline.h
index 4cc01152..951cc921 100644
--- a/plugins/sip/calls-sip-media-pipeline.h
+++ b/plugins/sip/calls-sip-media-pipeline.h
@@ -30,19 +30,49 @@
G_BEGIN_DECLS
+/**
+ * CallsMediaPipelineState:
+ * @CALLS_MEDIA_PIPELINE_STATE_UNKNOWN: Default state for new pipelines
+ * @CALLS_MEDIA_PIPELINE_STATE_ERROR: Pipeline is in an error state
+ * @CALLS_MEDIA_PIPELINE_STATE_INITIALIZING: Pipeline is initializing
+ * @CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC: Pipeline was initialized and needs a codec set
+ * @CALLS_MEDIA_PIPELINE_STATE_READY: Pipeline is ready to be set into playing state
+ * @CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING: Request to start pipeline pending
+ * @CALLS_MEDIA_PIPELINE_STATE_PLAYING: Pipeline is currently playing
+ * @CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING: Request to pause pipeline pending
+ * @CALLS_MEDIA_PIPELINE_STATE_PAUSED: Pipeline is currently paused
+ * @CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING: Request to stop pipeline pending
+ * @CALLS_MEDIA_PIPELINE_STATE_STOPPED: Pipeline has stopped playing (f.e. received BYE packet)
+ */
+typedef enum {
+ CALLS_MEDIA_PIPELINE_STATE_UNKNOWN = 0,
+ CALLS_MEDIA_PIPELINE_STATE_ERROR,
+ CALLS_MEDIA_PIPELINE_STATE_INITIALIZING,
+ CALLS_MEDIA_PIPELINE_STATE_NEED_CODEC,
+ CALLS_MEDIA_PIPELINE_STATE_READY,
+ CALLS_MEDIA_PIPELINE_STATE_PLAY_PENDING,
+ CALLS_MEDIA_PIPELINE_STATE_PLAYING,
+ CALLS_MEDIA_PIPELINE_STATE_PAUSE_PENDING,
+ CALLS_MEDIA_PIPELINE_STATE_PAUSED,
+ CALLS_MEDIA_PIPELINE_STATE_STOP_PENDING,
+ CALLS_MEDIA_PIPELINE_STATE_STOPPED
+} CallsMediaPipelineState;
+
+
#define CALLS_TYPE_SIP_MEDIA_PIPELINE (calls_sip_media_pipeline_get_type ())
G_DECLARE_FINAL_TYPE (CallsSipMediaPipeline, calls_sip_media_pipeline, CALLS, SIP_MEDIA_PIPELINE, GObject)
-CallsSipMediaPipeline* calls_sip_media_pipeline_new (MediaCodecInfo *codec);
-void calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
- MediaCodecInfo *info);
-void calls_sip_media_pipeline_start (CallsSipMediaPipeline *self);
-void calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self);
-void calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
- gboolean pause);
-int calls_sip_media_pipeline_get_rtp_port (CallsSipMediaPipeline *self);
-int calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self);
+CallsSipMediaPipeline* calls_sip_media_pipeline_new (MediaCodecInfo *codec);
+void calls_sip_media_pipeline_set_codec (CallsSipMediaPipeline *self,
+ MediaCodecInfo *info);
+void calls_sip_media_pipeline_start (CallsSipMediaPipeline *self);
+void calls_sip_media_pipeline_stop (CallsSipMediaPipeline *self);
+void calls_sip_media_pipeline_pause (CallsSipMediaPipeline *self,
+ gboolean pause);
+int calls_sip_media_pipeline_get_rtp_port (CallsSipMediaPipeline *self);
+int calls_sip_media_pipeline_get_rtcp_port (CallsSipMediaPipeline *self);
+CallsMediaPipelineState calls_sip_media_pipeline_get_state (CallsSipMediaPipeline *self);
G_END_DECLS
diff --git a/plugins/sip/meson.build b/plugins/sip/meson.build
index f092d154..4e97c843 100644
--- a/plugins/sip/meson.build
+++ b/plugins/sip/meson.build
@@ -55,6 +55,14 @@ sip_sources = files(
]
)
+pipeline_enum_headers = [
+ 'calls-sip-media-pipeline.h',
+]
+
+pipeline_enums = gnome.mkenums_simple('calls-media-pipeline-enums',
+ sources: pipeline_enum_headers)
+sip_sources += pipeline_enums
+
sip_enum_headers = [
'calls-sip-util.h',
]
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]