From b593aa05efe667db934c818543ac01a6f0b3e7a4 Mon Sep 17 00:00:00 2001 From: Marvin W Date: Tue, 9 Nov 2021 22:06:47 +0100 Subject: [PATCH] RTP: Encode with device --- plugins/rtp/src/device.vala | 335 ++++++++++++++++++++++++++++-------- plugins/rtp/src/stream.vala | 258 ++++++++++++++++----------- 2 files changed, 416 insertions(+), 177 deletions(-) diff --git a/plugins/rtp/src/device.vala b/plugins/rtp/src/device.vala index e25271b1..3546ab94 100644 --- a/plugins/rtp/src/device.vala +++ b/plugins/rtp/src/device.vala @@ -1,5 +1,9 @@ +using Xmpp.Xep.JingleRtp; +using Gee; + public class Dino.Plugins.Rtp.Device : MediaDevice, Object { public Plugin plugin { get; private set; } + public CodecUtil codec_util { get { return plugin.codec_util; } } public Gst.Device device { get; private set; } private string device_name; @@ -17,28 +21,45 @@ public class Dino.Plugins.Rtp.Device : MediaDevice, Object { return plugin.pipe; }} public string? media { get { - if (device.device_class.has_prefix("Audio/")) { + if (device.has_classes("Audio")) { return "audio"; - } else if (device.device_class.has_prefix("Video/")) { + } else if (device.has_classes("Video")) { return "video"; } else { return null; } }} public bool is_source { get { - return device.device_class.has_suffix("/Source"); + return device.has_classes("Source"); }} public bool is_sink { get { - return device.device_class.has_suffix("/Sink"); + return device.has_classes("Sink"); }} private Gst.Element element; private Gst.Element tee; private Gst.Element dsp; - private Gst.Element mixer; + private Gst.Base.Aggregator mixer; private Gst.Element filter; - private Gst.Element rate; - private int links = 0; + private int links; + + // Codecs + private Gee.Map codecs = new HashMap(PayloadType.hash_func, PayloadType.equals_func); + private Gee.Map codec_tees = new HashMap(PayloadType.hash_func, PayloadType.equals_func); + private Gee.Map> payloaders = new HashMap>(PayloadType.hash_func, PayloadType.equals_func); + private Gee.Map> payloader_tees = new HashMap>(PayloadType.hash_func, PayloadType.equals_func); + private Gee.Map> payloader_links = new HashMap>(PayloadType.hash_func, PayloadType.equals_func); + private Gee.Map> codec_bitrates = new HashMap>(PayloadType.hash_func, PayloadType.equals_func); + + private class CodecBitrate { + public uint bitrate; + public int64 timestamp; + + public CodecBitrate(uint bitrate) { + this.bitrate = bitrate; + this.timestamp = get_monotonic_time(); + } + } public Device(Plugin plugin, Gst.Device device) { this.plugin = plugin; @@ -57,25 +78,154 @@ public class Dino.Plugins.Rtp.Device : MediaDevice, Object { } public Gst.Element? link_sink() { + if (!is_sink) return null; if (element == null) create(); links++; - if (mixer != null) return mixer; - if (is_sink && media == "audio") return filter; + if (mixer != null) { + Gst.Element rate = Gst.ElementFactory.make("audiorate", @"$(id)_rate_$(Random.next_int())"); + pipe.add(rate); + rate.link(mixer); + return rate; + } + if (media == "audio") return filter; return element; } - public Gst.Element? link_source() { + public Gst.Element? link_source(PayloadType? payload_type = null, uint ssrc = Random.next_int(), int seqnum_offset = -1) { + if (!is_source) return null; if (element == null) create(); links++; + if (payload_type != null && tee != null) { + bool new_codec = false; + string? codec = CodecUtil.get_codec_from_payload(media, payload_type); + if (!codecs.has_key(payload_type)) { + codecs[payload_type] = codec_util.get_encode_bin_without_payloader(media, payload_type, @"$(id)_$(codec)_encoder"); + pipe.add(codecs[payload_type]); + new_codec = true; + } + if (!codec_tees.has_key(payload_type)) { + codec_tees[payload_type] = Gst.ElementFactory.make("tee", @"$(id)_$(codec)_tee"); + codec_tees[payload_type].@set("allow-not-linked", true); + pipe.add(codec_tees[payload_type]); + codecs[payload_type].link(codec_tees[payload_type]); + } + if (!payloaders.has_key(payload_type)) { + payloaders[payload_type] = new HashMap(); + } + if (!payloaders[payload_type].has_key(ssrc)) { + payloaders[payload_type][ssrc] = codec_util.get_payloader_bin(media, payload_type, @"$(id)_$(codec)_$(ssrc)"); + var payload = (Gst.RTP.BasePayload) ((Gst.Bin) payloaders[payload_type][ssrc]).get_by_name(@"$(id)_$(codec)_$(ssrc)_rtp_pay"); + payload.ssrc = ssrc; + payload.seqnum_offset = seqnum_offset; + pipe.add(payloaders[payload_type][ssrc]); + codec_tees[payload_type].link(payloaders[payload_type][ssrc]); + } + if (!payloader_tees.has_key(payload_type)) { + payloader_tees[payload_type] = new HashMap(); + } + if (!payloader_tees[payload_type].has_key(ssrc)) { + payloader_tees[payload_type][ssrc] = Gst.ElementFactory.make("tee", @"$(id)_$(codec)_$(ssrc)_tee"); + payloader_tees[payload_type][ssrc].@set("allow-not-linked", true); + pipe.add(payloader_tees[payload_type][ssrc]); + payloaders[payload_type][ssrc].link(payloader_tees[payload_type][ssrc]); + } + if (!payloader_links.has_key(payload_type)) { + payloader_links[payload_type] = new HashMap(); + } + if (!payloader_links[payload_type].has_key(ssrc)) { + payloader_links[payload_type][ssrc] = 1; + } else { + payloader_links[payload_type][ssrc] = payloader_links[payload_type][ssrc] + 1; + } + if (new_codec) { + tee.link(codecs[payload_type]); + } + return payloader_tees[payload_type][ssrc]; + } if (tee != null) return tee; return element; } - public void unlink() { + public void update_bitrate(PayloadType payload_type, uint bitrate) { + if (codecs.has_key(payload_type)) { + lock(codec_bitrates); + if (!codec_bitrates.has_key(payload_type)) { + codec_bitrates[payload_type] = new ArrayList(); + } + codec_bitrates[payload_type].add(new CodecBitrate(bitrate)); + var remove = new ArrayList(); + foreach (CodecBitrate rate in codec_bitrates[payload_type]) { + if (rate.timestamp < get_monotonic_time() - 5000000L) { + remove.add(rate); + continue; + } + if (rate.bitrate < bitrate) { + bitrate = rate.bitrate; + } + } + codec_bitrates[payload_type].remove_all(remove); + codec_util.update_bitrate(media, payload_type, codecs[payload_type], bitrate); + unlock(codec_bitrates); + } + } + + public void unlink(Gst.Element? link = null) { if (links <= 0) { critical("Link count below zero."); return; } + if (link != null && is_source && tee != null) { + PayloadType payload_type = payloader_tees.first_match((entry) => entry.value.any_match((entry) => entry.value == link)).key; + uint ssrc = payloader_tees[payload_type].first_match((entry) => entry.value == link).key; + payloader_links[payload_type][ssrc] = payloader_links[payload_type][ssrc] - 1; + if (payloader_links[payload_type][ssrc] == 0) { + plugin.pause(); + + codec_tees[payload_type].unlink(payloaders[payload_type][ssrc]); + payloaders[payload_type][ssrc].set_locked_state(true); + payloaders[payload_type][ssrc].set_state(Gst.State.NULL); + payloaders[payload_type][ssrc].unlink(payloader_tees[payload_type][ssrc]); + pipe.remove(payloaders[payload_type][ssrc]); + payloaders[payload_type].unset(ssrc); + payloader_tees[payload_type][ssrc].set_locked_state(true); + payloader_tees[payload_type][ssrc].set_state(Gst.State.NULL); + pipe.remove(payloader_tees[payload_type][ssrc]); + payloader_tees[payload_type].unset(ssrc); + + payloader_links[payload_type].unset(ssrc); + plugin.unpause(); + } + if (payloader_links[payload_type].size == 0) { + plugin.pause(); + + tee.unlink(codecs[payload_type]); + codecs[payload_type].set_locked_state(true); + codecs[payload_type].set_state(Gst.State.NULL); + codecs[payload_type].unlink(codec_tees[payload_type]); + pipe.remove(codecs[payload_type]); + codecs.unset(payload_type); + codec_tees[payload_type].set_locked_state(true); + codec_tees[payload_type].set_state(Gst.State.NULL); + pipe.remove(codec_tees[payload_type]); + codec_tees.unset(payload_type); + + payloaders.unset(payload_type); + payloader_tees.unset(payload_type); + payloader_links.unset(payload_type); + plugin.unpause(); + } + } + if (link != null && is_sink && mixer != null) { + plugin.pause(); + link.set_locked_state(true); + Gst.Base.AggregatorPad mixer_sink_pad = (Gst.Base.AggregatorPad) link.get_static_pad("src").get_peer(); + link.get_static_pad("src").unlink(mixer_sink_pad); + mixer_sink_pad.set_active(false); + link.set_state(Gst.State.NULL); + pipe.remove(link); + mixer.release_request_pad(mixer_sink_pad); + plugin.unpause(); + } links--; if (links == 0) { destroy(); @@ -150,15 +300,59 @@ public class Dino.Plugins.Rtp.Device : MediaDevice, Object { return target; } + private static Gst.PadProbeReturn log_probe(Gst.Pad pad, Gst.PadProbeInfo info) { + if ((info.type & Gst.PadProbeType.EVENT_DOWNSTREAM) > 0) { + debug("%s.%s probed downstream event %s", pad.get_parent_element().name, pad.name, info.get_event().type.get_name()); + } + if ((info.type & Gst.PadProbeType.EVENT_UPSTREAM) > 0) { + var event = info.get_event(); + if (event.type == Gst.EventType.RECONFIGURE) return Gst.PadProbeReturn.DROP; + if (event.type == Gst.EventType.QOS) { + Gst.QOSType qos_type; + double proportion; + Gst.ClockTimeDiff diff; + Gst.ClockTime timestamp; + event.parse_qos(out qos_type, out proportion, out diff, out timestamp); + debug("%s.%s probed qos event: type: %s, proportion: %f, diff: %lli, timestamp: %llu", pad.get_parent_element().name, pad.name, @"$qos_type", proportion, diff, timestamp); + } else { + debug("%s.%s probed upstream event %s", pad.get_parent_element().name, pad.name, event.type.get_name()); + } + } + if ((info.type & Gst.PadProbeType.QUERY_DOWNSTREAM) > 0) { + debug("%s.%s probed downstream query %s", pad.get_parent_element().name, pad.name, info.get_query().type.get_name()); + } + if ((info.type & Gst.PadProbeType.QUERY_UPSTREAM) > 0) { + debug("%s.%s probed upstream query %s", pad.get_parent_element().name, pad.name, info.get_query().type.get_name()); + } + if ((info.type & Gst.PadProbeType.BUFFER) > 0) { + uint id = pad.get_data("no_buffer_probe_timeout"); + if (id != 0) { + Source.remove(id); + } + string name = @"$(pad.get_parent_element().name).$(pad.name)"; + id = Timeout.add_seconds(1, () => { + debug("%s probed no buffer for 1 second", name); + return Source.REMOVE; + }); + pad.set_data("no_buffer_probe_timeout", id); + } + return Gst.PadProbeReturn.PASS; + } + private void create() { debug("Creating device %s", id); plugin.pause(); element = device.create_element(id); + if (is_sink) { + element.@set("async", false); + element.@set("sync", false); + } pipe.add(element); if (is_source) { element.@set("do-timestamp", true); filter = Gst.ElementFactory.make("capsfilter", @"caps_filter_$id"); filter.@set("caps", get_best_caps()); + filter.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, log_probe); pipe.add(filter); element.link(filter); #if WITH_VOICE_PROCESSOR @@ -174,22 +368,18 @@ public class Dino.Plugins.Rtp.Device : MediaDevice, Object { pipe.add(tee); (dsp ?? filter).link(tee); } - if (is_sink) { - element.@set("async", false); - element.@set("sync", false); - } if (is_sink && media == "audio") { - filter = Gst.ElementFactory.make("capsfilter", @"caps_filter_$id"); - filter.@set("caps", get_best_caps()); - pipe.add(filter); - if (plugin.echoprobe != null) { - rate = Gst.ElementFactory.make("audiorate", @"rate_$id"); - rate.@set("tolerance", 100000000); - pipe.add(rate); - filter.link(rate); - rate.link(plugin.echoprobe); + mixer = (Gst.Base.Aggregator) Gst.ElementFactory.make("audiomixer", @"mixer_$id"); + pipe.add(mixer); + mixer.link(pipe); + if (plugin.echoprobe != null && !plugin.echoprobe.get_static_pad("src").is_linked()) { + mixer.link(plugin.echoprobe); plugin.echoprobe.link(element); } else { + filter = Gst.ElementFactory.make("capsfilter", @"caps_filter_$id"); + filter.@set("caps", get_best_caps()); + pipe.add(filter); + mixer.link(filter); filter.link(element); } } @@ -197,38 +387,25 @@ public class Dino.Plugins.Rtp.Device : MediaDevice, Object { } private void destroy() { - if (mixer != null) { - if (is_sink && media == "audio" && plugin.echoprobe != null) { - plugin.echoprobe.unlink(mixer); + if (is_sink) { + if (mixer != null) { + int linked_sink_pads = 0; + mixer.foreach_sink_pad((_, pad) => { + if (pad.is_linked()) linked_sink_pads++; + return true; + }); + if (linked_sink_pads > 0) { + warning("%s-mixer still has %i sink pads while being destroyed", id, linked_sink_pads); + } + mixer.unlink(plugin.echoprobe ?? element); } - int linked_sink_pads = 0; - mixer.foreach_sink_pad((_, pad) => { - if (pad.is_linked()) linked_sink_pads++; - return true; - }); - if (linked_sink_pads > 0) { - warning("%s-mixer still has %i sink pads while being destroyed", id, linked_sink_pads); - } - mixer.set_locked_state(true); - mixer.set_state(Gst.State.NULL); - mixer.unlink(element); - pipe.remove(mixer); - mixer = null; - } else if (is_sink && media == "audio") { if (filter != null) { filter.set_locked_state(true); filter.set_state(Gst.State.NULL); - filter.unlink(rate ?? ((Gst.Element)plugin.echoprobe) ?? element); + filter.unlink(element); pipe.remove(filter); filter = null; } - if (rate != null) { - rate.set_locked_state(true); - rate.set_state(Gst.State.NULL); - rate.unlink(plugin.echoprobe); - pipe.remove(rate); - rate = null; - } if (plugin.echoprobe != null) { plugin.echoprobe.unlink(element); } @@ -239,34 +416,42 @@ public class Dino.Plugins.Rtp.Device : MediaDevice, Object { else if (is_source) element.unlink(tee); pipe.remove(element); element = null; - if (filter != null) { - filter.set_locked_state(true); - filter.set_state(Gst.State.NULL); - filter.unlink(dsp ?? tee); - pipe.remove(filter); - filter = null; + if (mixer != null) { + mixer.set_locked_state(true); + mixer.set_state(Gst.State.NULL); + pipe.remove(mixer); + mixer = null; } - if (dsp != null) { - dsp.set_locked_state(true); - dsp.set_state(Gst.State.NULL); - dsp.unlink(tee); - pipe.remove(dsp); - dsp = null; - } - if (tee != null) { - int linked_src_pads = 0; - tee.foreach_src_pad((_, pad) => { - if (pad.is_linked()) linked_src_pads++; - return true; - }); - if (linked_src_pads != 0) { - warning("%s-tee still has %d src pads while being destroyed", id, linked_src_pads); + if (is_source) { + if (filter != null) { + filter.set_locked_state(true); + filter.set_state(Gst.State.NULL); + filter.unlink(dsp ?? tee); + pipe.remove(filter); + filter = null; + } + if (dsp != null) { + dsp.set_locked_state(true); + dsp.set_state(Gst.State.NULL); + dsp.unlink(tee); + pipe.remove(dsp); + dsp = null; + } + if (tee != null) { + int linked_src_pads = 0; + tee.foreach_src_pad((_, pad) => { + if (pad.is_linked()) linked_src_pads++; + return true; + }); + if (linked_src_pads != 0) { + warning("%s-tee still has %d src pads while being destroyed", id, linked_src_pads); + } + tee.set_locked_state(true); + tee.set_state(Gst.State.NULL); + pipe.remove(tee); + tee = null; } - tee.set_locked_state(true); - tee.set_state(Gst.State.NULL); - pipe.remove(tee); - tee = null; } debug("Destroyed device %s", id); } -} \ No newline at end of file +} diff --git a/plugins/rtp/src/stream.vala b/plugins/rtp/src/stream.vala index bd8a279f..86f52ee7 100644 --- a/plugins/rtp/src/stream.vala +++ b/plugins/rtp/src/stream.vala @@ -18,22 +18,19 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { private Gst.App.Sink send_rtcp; private Gst.App.Src recv_rtp; private Gst.App.Src recv_rtcp; - private Gst.Element encode; - private Gst.RTP.BasePayload encode_pay; private Gst.Element decode; private Gst.RTP.BaseDepayload decode_depay; private Gst.Element input; + private Gst.Pad input_pad; private Gst.Element output; private Gst.Element session; private Device _input_device; public Device input_device { get { return _input_device; } set { if (!paused) { - if (this._input_device != null) { - this._input_device.unlink(); - this._input_device = null; - } - set_input(value != null ? value.link_source() : null); + var input = this.input; + set_input(value != null ? value.link_source(payload_type, our_ssrc, next_seqnum_offset) : null); + if (this._input_device != null) this._input_device.unlink(input); } this._input_device = value; }} @@ -47,7 +44,9 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { public bool created { get; private set; default = false; } public bool paused { get; private set; default = false; } private bool push_recv_data = false; - private string participant_ssrc = null; + private uint our_ssrc = Random.next_int(); + private int next_seqnum_offset = -1; + private uint32 participant_ssrc = 0; private Gst.Pad recv_rtcp_sink_pad; private Gst.Pad recv_rtp_sink_pad; @@ -93,7 +92,10 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { send_rtp.caps = CodecUtil.get_caps(media, payload_type, false); send_rtp.emit_signals = true; send_rtp.sync = false; + send_rtp.drop = true; + send_rtp.wait_on_eos = false; send_rtp.new_sample.connect(on_new_sample); + send_rtp.connect("signal::eos", on_eos_static, this); pipe.add(send_rtp); send_rtcp = Gst.ElementFactory.make("appsink", @"rtcp_sink_$rtpid") as Gst.App.Sink; @@ -101,7 +103,10 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { send_rtcp.caps = new Gst.Caps.empty_simple("application/x-rtcp"); send_rtcp.emit_signals = true; send_rtcp.sync = false; + send_rtcp.drop = true; + send_rtcp.wait_on_eos = false; send_rtcp.new_sample.connect(on_new_sample); + send_rtcp.connect("signal::eos", on_eos_static, this); pipe.add(send_rtcp); recv_rtp = Gst.ElementFactory.make("appsrc", @"rtp_src_$rtpid") as Gst.App.Src; @@ -125,18 +130,15 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { recv_rtcp.get_static_pad("src").link(recv_rtcp_sink_pad); // Connect input - encode = codec_util.get_encode_bin(media, payload_type, @"encode_$rtpid"); - encode_pay = (Gst.RTP.BasePayload)((Gst.Bin)encode).get_by_name(@"encode_$(rtpid)_rtp_pay"); - pipe.add(encode); send_rtp_sink_pad = rtpbin.get_request_pad(@"send_rtp_sink_$rtpid"); - encode.get_static_pad("src").link(send_rtp_sink_pad); if (input != null) { - input.link(encode); + input_pad = input.get_request_pad(@"src_$rtpid"); + input_pad.link(send_rtp_sink_pad); } // Connect output decode = codec_util.get_decode_bin(media, payload_type, @"decode_$rtpid"); - decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)encode).get_by_name(@"decode_$(rtpid)_rtp_depay"); + decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)decode).get_by_name(@"decode_$(rtpid)_rtp_depay"); pipe.add(decode); if (output != null) { decode.link(output); @@ -159,8 +161,8 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { } Timeout.add(1000, () => remb_adjust()); } - if (media == "video") { - codec_util.update_bitrate(media, payload_type, encode, 256); + if (input_device != null && media == "video") { + input_device.update_bitrate(payload_type, 256); } } @@ -185,11 +187,14 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { warning("No source-stats for session %u", rtpid); return Source.REMOVE; } + + if (input_device == null) return Source.CONTINUE; + foreach (Value value in source_stats.values) { unowned Gst.Structure source_stat = (Gst.Structure) value.get_boxed(); - uint ssrc; + uint32 ssrc; if (!source_stat.get_uint("ssrc", out ssrc)) continue; - if (ssrc.to_string() == participant_ssrc) { + if (ssrc == participant_ssrc) { int packets_lost; uint64 packets_received, octets_received; source_stat.get_int("packets-lost", out packets_lost); @@ -218,10 +223,10 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { 1, 0, 0, 0, 0, 0, 0, 0 }; - data[4] = (uint8)((encode_pay.ssrc >> 24) & 0xff); - data[5] = (uint8)((encode_pay.ssrc >> 16) & 0xff); - data[6] = (uint8)((encode_pay.ssrc >> 8) & 0xff); - data[7] = (uint8)(encode_pay.ssrc & 0xff); + data[4] = (uint8)((our_ssrc >> 24) & 0xff); + data[5] = (uint8)((our_ssrc >> 16) & 0xff); + data[6] = (uint8)((our_ssrc >> 8) & 0xff); + data[7] = (uint8)(our_ssrc & 0xff); uint8 br_exp = 0; uint32 br_mant = remb * 1000; uint8 bits = (uint8)Math.log2(br_mant); @@ -243,7 +248,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { } private static void on_feedback_rtcp(Gst.Element session, uint type, uint fbtype, uint sender_ssrc, uint media_ssrc, Gst.Buffer? fci, Stream self) { - if (type == 206 && fbtype == 15 && fci != null && sender_ssrc.to_string() == self.participant_ssrc) { + if (self.input_device != null && self.media == "video" && type == 206 && fbtype == 15 && fci != null && sender_ssrc == self.participant_ssrc) { // https://tools.ietf.org/html/draft-alvestrand-rmcat-remb-03 uint8[] data; fci.extract_dup(0, fci.get_size(), out data); @@ -251,7 +256,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { uint8 br_exp = data[5] >> 2; uint32 br_mant = (((uint32)data[5] & 0x3) << 16) + ((uint32)data[6] << 8) + (uint32)data[7]; uint bitrate = (br_mant << br_exp) / 1000; - self.codec_util.update_bitrate(self.media, self.payload_type, self.encode, bitrate * 8); + self.input_device.update_bitrate(self.payload_type, bitrate * 8); } } @@ -267,20 +272,30 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { debug("Sink is null"); return Gst.FlowReturn.EOS; } + if (sink != send_rtp && sink != send_rtcp) { + warning("unknown sample"); + return Gst.FlowReturn.NOT_SUPPORTED; + } Gst.Sample sample = sink.pull_sample(); Gst.Buffer buffer = sample.get_buffer(); uint8[] data; buffer.extract_dup(0, buffer.get_size(), out data); prepare_local_crypto(); if (sink == send_rtp) { + Gst.RTP.Buffer rtp_buffer; + if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) { + if (our_ssrc != rtp_buffer.get_ssrc()) { + warning("Sending buffer with SSRC %u when our ssrc is %u", rtp_buffer.get_ssrc(), our_ssrc); + } + next_seqnum_offset = rtp_buffer.get_seq() + 1; + rtp_buffer.unmap(); + } if (crypto_session.has_encrypt) { data = crypto_session.encrypt_rtp(data); } on_send_rtp_data(new Bytes.take((owned) data)); } else if (sink == send_rtcp) { encrypt_and_send_rtcp((owned) data); - } else { - warning("unknown sample"); } return Gst.FlowReturn.OK; } @@ -300,41 +315,59 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { return Gst.PadProbeReturn.DROP; } - public override void destroy() { - // Stop network communication - push_recv_data = false; - recv_rtp.end_of_stream(); - recv_rtcp.end_of_stream(); - send_rtp.new_sample.disconnect(on_new_sample); - send_rtcp.new_sample.disconnect(on_new_sample); - - // Disconnect input device - if (input != null) { - input.unlink(encode); - input = null; - } - if (this._input_device != null) { - if (!paused) this._input_device.unlink(); - this._input_device = null; + private static void on_eos_static(Gst.App.Sink sink, Stream self) { + debug("EOS on %s", sink.name); + if (sink == self.send_rtp) { + Idle.add(() => { self.on_send_rtp_eos(); return Source.REMOVE; }); + } else if (sink == self.send_rtcp) { + Idle.add(() => { self.on_send_rtcp_eos(); return Source.REMOVE; }); } + } - // Disconnect encode - encode.set_locked_state(true); - encode.set_state(Gst.State.NULL); - encode.get_static_pad("src").unlink(send_rtp_sink_pad); - pipe.remove(encode); - encode = null; - encode_pay = null; - - // Disconnect RTP sending + private void on_send_rtp_eos() { if (send_rtp_src_pad != null) { - send_rtp_src_pad.add_probe(Gst.PadProbeType.BLOCK, drop_probe); send_rtp_src_pad.unlink(send_rtp.get_static_pad("sink")); + send_rtp_src_pad = null; } send_rtp.set_locked_state(true); send_rtp.set_state(Gst.State.NULL); pipe.remove(send_rtp); send_rtp = null; + debug("Stopped sending RTP for %u", rtpid); + } + + private void on_send_rtcp_eos() { + send_rtcp.set_locked_state(true); + send_rtcp.set_state(Gst.State.NULL); + pipe.remove(send_rtcp); + send_rtcp = null; + debug("Stopped sending RTCP for %u", rtpid); + } + + public override void destroy() { + // Stop network communication + push_recv_data = false; + if (recv_rtp != null) recv_rtp.end_of_stream(); + if (recv_rtcp != null) recv_rtcp.end_of_stream(); + if (send_rtp != null) send_rtp.new_sample.disconnect(on_new_sample); + if (send_rtcp != null) send_rtcp.new_sample.disconnect(on_new_sample); + + // Disconnect input device + if (input != null) { + input_pad.unlink(send_rtp_sink_pad); + input.release_request_pad(input_pad); + input_pad = null; + } + if (this._input_device != null) { + if (!paused) this._input_device.unlink(input); + this._input_device = null; + this.input = null; + } + + // Inject EOS + if (send_rtp_sink_pad != null) { + send_rtp_sink_pad.send_event(new Gst.Event.eos()); + } // Disconnect decode if (recv_rtp_src_pad != null) { @@ -342,57 +375,63 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { recv_rtp_src_pad.unlink(decode.get_static_pad("sink")); } - // Disconnect RTP receiving - recv_rtp.set_locked_state(true); - recv_rtp.set_state(Gst.State.NULL); - recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad); - pipe.remove(recv_rtp); - recv_rtp = null; - // Disconnect output if (output != null) { + decode.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, drop_probe); decode.unlink(output); } - decode.set_locked_state(true); - decode.set_state(Gst.State.NULL); - pipe.remove(decode); - decode = null; - decode_depay = null; - output = null; // Disconnect output device if (this._output_device != null) { - this._output_device.unlink(); + this._output_device.unlink(output); this._output_device = null; } + output = null; - // Disconnect RTCP receiving - recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad); - recv_rtcp.set_locked_state(true); - recv_rtcp.set_state(Gst.State.NULL); - pipe.remove(recv_rtcp); - recv_rtcp = null; + // Destroy decode + if (decode != null) { + decode.set_locked_state(true); + decode.set_state(Gst.State.NULL); + pipe.remove(decode); + decode = null; + decode_depay = null; + } - // Disconnect RTCP sending - send_rtcp_src_pad.unlink(send_rtcp.get_static_pad("sink")); - send_rtcp.set_locked_state(true); - send_rtcp.set_state(Gst.State.NULL); - pipe.remove(send_rtcp); - send_rtcp = null; + // Disconnect and remove RTP input + if (recv_rtp != null) { + recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad); + recv_rtp.set_locked_state(true); + recv_rtp.set_state(Gst.State.NULL); + pipe.remove(recv_rtp); + recv_rtp = null; + } + + // Disconnect and remove RTCP input + if (recv_rtcp != null) { + recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad); + recv_rtcp.set_locked_state(true); + recv_rtcp.set_state(Gst.State.NULL); + pipe.remove(recv_rtcp); + recv_rtcp = null; + } // Release rtp pads - rtpbin.release_request_pad(send_rtp_sink_pad); - send_rtp_sink_pad = null; - rtpbin.release_request_pad(recv_rtp_sink_pad); - recv_rtp_sink_pad = null; - rtpbin.release_request_pad(recv_rtcp_sink_pad); - recv_rtcp_sink_pad = null; - rtpbin.release_request_pad(send_rtcp_src_pad); - send_rtcp_src_pad = null; - send_rtp_src_pad = null; - recv_rtp_src_pad = null; - - session = null; + if (send_rtp_sink_pad != null) { + rtpbin.release_request_pad(send_rtp_sink_pad); + send_rtp_sink_pad = null; + } + if (recv_rtp_sink_pad != null) { + rtpbin.release_request_pad(recv_rtp_sink_pad); + recv_rtp_sink_pad = null; + } + if (send_rtcp_src_pad != null) { + rtpbin.release_request_pad(send_rtcp_src_pad); + send_rtcp_src_pad = null; + } + if (recv_rtcp_sink_pad != null) { + rtpbin.release_request_pad(recv_rtcp_sink_pad); + recv_rtcp_sink_pad = null; + } } private void prepare_remote_crypto() { @@ -502,10 +541,10 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { debug("RTCP is ready, resending rtcp: %s", rtp_sent.to_string()); } - public void on_ssrc_pad_added(string ssrc, Gst.Pad pad) { - debug("New ssrc %s with pad %s", ssrc, pad.name); - if (participant_ssrc != null && participant_ssrc != ssrc) { - warning("Got second ssrc on stream (old: %s, new: %s), ignoring", participant_ssrc, ssrc); + public void on_ssrc_pad_added(uint32 ssrc, Gst.Pad pad) { + debug("New ssrc %u with pad %s", ssrc, pad.name); + if (participant_ssrc != 0 && participant_ssrc != ssrc) { + warning("Got second ssrc on stream (old: %u, new: %u), ignoring", participant_ssrc, ssrc); return; } participant_ssrc = ssrc; @@ -534,7 +573,9 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { private void set_input_and_pause(Gst.Element? input, bool paused) { if (created && this.input != null) { - this.input.unlink(encode); + this.input_pad.unlink(send_rtp_sink_pad); + this.input.release_request_pad(this.input_pad); + this.input_pad = null; this.input = null; } @@ -543,28 +584,41 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { if (created && sending && !paused && input != null) { plugin.pause(); - input.link(encode); + input_pad = input.get_request_pad(@"src_$rtpid"); + input_pad.link(send_rtp_sink_pad); plugin.unpause(); } } public void pause() { if (paused) return; + var input = this.input; set_input_and_pause(null, true); - if (input_device != null) input_device.unlink(); + if (input != null && input_device != null) input_device.unlink(input); } public void unpause() { if (!paused) return; - set_input_and_pause(input_device != null ? input_device.link_source() : null, false); + set_input_and_pause(input_device != null ? input_device.link_source(payload_type, our_ssrc, next_seqnum_offset) : null, false); + } + + public uint get_participant_ssrc(Xmpp.Jid participant) { + if (participant.equals(content.session.peer_full_jid)) { + return participant_ssrc; + } + return 0; } ulong block_probe_handler_id = 0; - public virtual void add_output(Gst.Element element) { + public virtual void add_output(Gst.Element element, Xmpp.Jid? participant = null) { if (output != null) { critical("add_output() invoked more than once"); return; } + if (participant != null) { + critical("add_output() invoked with participant when not supported"); + return; + } this.output = element; if (created) { plugin.pause(); @@ -586,7 +640,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { decode.unlink(element); } if (this._output_device != null) { - this._output_device.unlink(); + this._output_device.unlink(element); this._output_device = null; } this.output = null; @@ -657,7 +711,7 @@ public class Dino.Plugins.Rtp.VideoStream : Stream { disconnect(video_orientation_changed_handler); } - public override void add_output(Gst.Element element) { + public override void add_output(Gst.Element element, Xmpp.Jid? participant) { if (element == output_tee || element == rotate) { base.add_output(element); return; @@ -678,4 +732,4 @@ public class Dino.Plugins.Rtp.VideoStream : Stream { output_tee.unlink(element); } } -} \ No newline at end of file +}