From a59f728bdd7b81964a460b96b6c56da0d9dfda01 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sat, 12 Aug 2017 23:14:50 +0200 Subject: [PATCH] Stream Management --- libdino/src/entity/jid.vala | 8 +- libdino/src/service/connection_manager.vala | 2 +- libdino/src/service/module_manager.vala | 14 ++- libdino/src/service/stream_interactor.vala | 3 +- .../http-files/src/upload_stream_module.vala | 3 +- xmpp-vala/CMakeLists.txt | 1 + xmpp-vala/src/core/xmpp_stream.vala | 51 +++++--- xmpp-vala/src/module/bind.vala | 7 +- xmpp-vala/src/module/sasl.vala | 4 +- .../module/xep/0082_date_time_profiles.vala | 21 ++-- .../module/xep/0198_stream_management.vala | 112 ++++++++++++++++++ 11 files changed, 185 insertions(+), 41 deletions(-) create mode 100644 xmpp-vala/src/module/xep/0198_stream_management.vala diff --git a/libdino/src/entity/jid.vala b/libdino/src/entity/jid.vala index 2283b466..42218fb2 100644 --- a/libdino/src/entity/jid.vala +++ b/libdino/src/entity/jid.vala @@ -63,12 +63,12 @@ public class Dino.Entities.Jid : Object { return jid; } - public bool equals_bare(Jid jid) { - return equals_bare_func(this, jid); + public bool equals_bare(Jid? jid) { + return jid != null && equals_bare_func(this, jid); } - public bool equals(Jid jid) { - return equals_func(this, jid); + public bool equals(Jid? jid) { + return jid != null && equals_func(this, jid); } public static new bool equals_bare_func(Jid jid1, Jid jid2) { diff --git a/libdino/src/service/connection_manager.vala b/libdino/src/service/connection_manager.vala index 509964b3..d932d1cc 100644 --- a/libdino/src/service/connection_manager.vala +++ b/libdino/src/service/connection_manager.vala @@ -126,7 +126,7 @@ public class ConnectionManager { Connection connection = new Connection(stream, new DateTime.now_local()); connections[account] = connection; change_connection_state(account, ConnectionState.CONNECTING); - stream.stream_negotiated.connect((stream) => { + stream.attached_modules.connect((stream) => { change_connection_state(account, ConnectionState.CONNECTED); }); stream.get_module(PlainSasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => { diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index b10765f1..437ecaf0 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -33,20 +33,30 @@ public class ModuleManager { public ArrayList get_modules(Account account, string? resource = null) { ArrayList modules = new ArrayList(); - modules.add(new Bind.Module(resource == null ? account.resourcepart : resource)); - modules.add(new PlainSasl.Module(account.bare_jid.to_string(), account.password)); lock (module_map) { if (!module_map.has_key(account)) initialize(account); foreach (Core.XmppStreamModule module in module_map[account]) modules.add(module); } + + foreach (Core.XmppStreamModule module in module_map[account]) { + if (module.get_id() == Bind.Module.IDENTITY.id) { + (module as Bind.Module).requested_resource == null ? account.resourcepart : resource; + } else if (module.get_id() == PlainSasl.Module.IDENTITY.id) { + (module as PlainSasl.Module).password = account.password; + } + } return modules; } public void initialize(Account account) { lock(module_map) { module_map[account] = new ArrayList(); + module_map[account].add(new Iq.Module()); module_map[account].add(new Tls.Module()); module_map[account].add(new Xep.SrvRecordsTls.Module()); + module_map[account].add(new PlainSasl.Module(account.bare_jid.to_string(), account.password)); + module_map[account].add(new Xep.StreamManagement.Module()); + module_map[account].add(new Bind.Module(account.resourcepart)); module_map[account].add(new Session.Module()); module_map[account].add(new Roster.Module()); module_map[account].add(new Xep.ServiceDiscovery.Module.with_identity("client", "pc")); diff --git a/libdino/src/service/stream_interactor.vala b/libdino/src/service/stream_interactor.vala index 028d031b..f97eb482 100644 --- a/libdino/src/service/stream_interactor.vala +++ b/libdino/src/service/stream_interactor.vala @@ -10,6 +10,7 @@ public class StreamInteractor { public signal void account_added(Account account); public signal void account_removed(Account account); public signal void stream_negotiated(Account account, Core.XmppStream stream); + public signal void attached_modules(Account account, Core.XmppStream stream); public ModuleManager module_manager; public ConnectionManager connection_manager; @@ -84,4 +85,4 @@ public interface StreamInteractionModule : Object { public abstract string id { get; } } -} \ No newline at end of file +} diff --git a/plugins/http-files/src/upload_stream_module.vala b/plugins/http-files/src/upload_stream_module.vala index cb105cee..573f298a 100644 --- a/plugins/http-files/src/upload_stream_module.vala +++ b/plugins/http-files/src/upload_stream_module.vala @@ -77,8 +77,7 @@ public class UploadStreamModule : XmppStreamModule { public override void attach(XmppStream stream) { Iq.Module.require(stream); ServiceDiscovery.Module.require(stream); - - query_availability(stream); + stream.stream_negotiated.connect(query_availability); } public override void detach(XmppStream stream) { diff --git a/xmpp-vala/CMakeLists.txt b/xmpp-vala/CMakeLists.txt index 2e2287a6..4c4f34d2 100644 --- a/xmpp-vala/CMakeLists.txt +++ b/xmpp-vala/CMakeLists.txt @@ -55,6 +55,7 @@ SOURCES "src/module/xep/0084_user_avatars.vala" "src/module/xep/0085_chat_state_notifications.vala" "src/module/xep/0115_entitiy_capabilities.vala" + "src/module/xep/0198_stream_management.vala" "src/module/xep/0199_ping.vala" "src/module/xep/0184_message_delivery_receipts.vala" "src/module/xep/0203_delayed_delivery.vala" diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index a8201a22..a461b2cb 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -21,11 +21,12 @@ public class XmppStream { private StanzaReader? reader; private StanzaWriter? writer; - private Gee.List flags = new ArrayList(); - private Gee.List modules = new ArrayList(); + public Gee.List flags { get; private set; default=new ArrayList(); } + public Gee.List modules { get; private set; default=new ArrayList(); } private Gee.List connection_providers = new ArrayList(); + public bool negotiation_complete { get; set; default=false; } private bool setup_needed = false; - private bool negotiation_complete = false; + private bool non_negotiation_modules_attached = false; public signal void received_node(XmppStream stream, StanzaNode node); public signal void received_root_node(XmppStream stream, StanzaNode node); @@ -35,6 +36,7 @@ public class XmppStream { public signal void received_iq_stanza(XmppStream stream, StanzaNode node); public signal void received_nonza(XmppStream stream, StanzaNode node); public signal void stream_negotiated(XmppStream stream); + public signal void attached_modules(XmppStream stream); public XmppStream() { register_connection_provider(new StartTlsConnectionProvider()); @@ -42,6 +44,7 @@ public class XmppStream { public void connect(string? remote_name = null) throws IOStreamError { if (remote_name != null) this.remote_name = (!)remote_name; + attach_negotation_modules(); try { int min_priority = -1; ConnectionProvider? best_provider = null; @@ -138,9 +141,7 @@ public class XmppStream { public XmppStream add_module(XmppStreamModule module) { modules.add(module); - if (negotiation_complete || module as XmppStreamNegotiationModule != null) { - module.attach(this); - } + if (negotiation_complete) module.attach(this); return this; } @@ -160,6 +161,16 @@ public class XmppStream { connection_providers.add(connection_provider); } + public bool is_negotiation_active() { + foreach (XmppStreamModule module in modules) { + if (module is XmppStreamNegotiationModule) { + XmppStreamNegotiationModule negotiation_module = (XmppStreamNegotiationModule) module; + if (negotiation_module.negotiation_active(this)) return true; + } + } + return false; + } + private void setup() throws IOStreamError { StanzaNode outs = new StanzaNode.build("stream", "http://etherx.jabber.org/streams") .put_attribute("to", remote_name) @@ -203,10 +214,13 @@ public class XmppStream { received_nonza(this, node); } - if (!negotiation_complete && negotiation_modules_done()) { - negotiation_complete = true; + if (!non_negotiation_modules_attached && negotiation_modules_done()) { attach_non_negotation_modules(); - stream_negotiated(this); + non_negotiation_modules_attached = true; + if (!negotiation_complete) { + stream_negotiated(this); + negotiation_complete = true; + } } } } @@ -214,15 +228,13 @@ public class XmppStream { private bool negotiation_modules_done() throws IOStreamError { if (!setup_needed) { bool mandatory_outstanding = false; - bool negotiation_active = false; foreach (XmppStreamModule module in modules) { - XmppStreamNegotiationModule? negotiation_module = module as XmppStreamNegotiationModule; - if (negotiation_module != null) { - if (((!)negotiation_module).negotiation_active(this)) negotiation_active = true; - if (((!)negotiation_module).mandatory_outstanding(this)) mandatory_outstanding = true; + if (module is XmppStreamNegotiationModule) { + XmppStreamNegotiationModule negotiation_module = (XmppStreamNegotiationModule) module; + if (negotiation_module.mandatory_outstanding(this)) mandatory_outstanding = true; } } - if (!negotiation_active) { + if (!is_negotiation_active()) { if (mandatory_outstanding) { throw new IOStreamError.CONNECT("mandatory-to-negotiate feature not negotiated"); } else { @@ -239,6 +251,15 @@ public class XmppStream { module.attach(this); } } + attached_modules(this); + } + + private void attach_negotation_modules() { + foreach (XmppStreamModule module in modules) { + if (module as XmppStreamNegotiationModule != null) { + module.attach(this); + } + } } private StanzaNode read_root() throws IOStreamError { diff --git a/xmpp-vala/src/module/bind.vala b/xmpp-vala/src/module/bind.vala index 7b08e0f7..a13f9640 100644 --- a/xmpp-vala/src/module/bind.vala +++ b/xmpp-vala/src/module/bind.vala @@ -7,7 +7,7 @@ namespace Xmpp.Bind { public class Module : XmppStreamNegotiationModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "bind_module"); - private string requested_resource; + public string requested_resource { get; set; } public signal void bound_to_resource(XmppStream stream, string my_jid); @@ -28,15 +28,14 @@ namespace Xmpp.Bind { public void received_features_node(XmppStream stream) { if (stream.is_setup_needed()) return; + if (stream.is_negotiation_active()) return; var bind = stream.features.get_subnode("bind", NS_URI); if (bind != null) { var flag = new Flag(); StanzaNode bind_node = new StanzaNode.build("bind", NS_URI).add_self_xmlns(); bind_node.put_node(new StanzaNode.build("resource", NS_URI).put_node(new StanzaNode.text(requested_resource))); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.set(bind_node), (stream, iq) => { - stream.get_module(Module.IDENTITY).iq_response_stanza(stream, iq); - }); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.set(bind_node), iq_response_stanza); stream.add_flag(flag); } } diff --git a/xmpp-vala/src/module/sasl.vala b/xmpp-vala/src/module/sasl.vala index ee6a87f7..dee9d3e6 100644 --- a/xmpp-vala/src/module/sasl.vala +++ b/xmpp-vala/src/module/sasl.vala @@ -7,8 +7,8 @@ namespace Xmpp.PlainSasl { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "plain_module"); private const string MECHANISM = "PLAIN"; - private string name; - private string password; + public string name { get; set; } + public string password { get; set; } public bool use_full_name = false; public signal void received_auth_failure(XmppStream stream, StanzaNode node); diff --git a/xmpp-vala/src/module/xep/0082_date_time_profiles.vala b/xmpp-vala/src/module/xep/0082_date_time_profiles.vala index b2ce1077..57c7ec4d 100644 --- a/xmpp-vala/src/module/xep/0082_date_time_profiles.vala +++ b/xmpp-vala/src/module/xep/0082_date_time_profiles.vala @@ -1,13 +1,13 @@ namespace Xmpp.Xep.DateTimeProfiles { - public class Module { - public Regex DATETIME_REGEX; +public class Module { + public Regex DATETIME_REGEX; - public Module() { - DATETIME_REGEX = new Regex("""^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(\.(\d{3}))?(Z|((\+|\-)(\d{2}):(\d{2})))$"""); - } + public Module() { + DATETIME_REGEX = new Regex("""^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(\.(\d{3}))?(Z|((\+|\-)(\d{2})(:(\d{2}))?))$"""); + } - public DateTime? parse_string(string time_string) { + public DateTime? parse_string(string time_string) { MatchInfo match_info; if (DATETIME_REGEX.match(time_string, RegexMatchFlags.ANCHORED, out match_info)) { int year = int.parse(match_info.fetch(1)); @@ -33,9 +33,10 @@ namespace Xmpp.Xep.DateTimeProfiles { return null; } - public string to_datetime(DateTime time) { - return time.format("%Y-%m-%dT%H:%M:%SZ"); - } +public string to_datetime(DateTime time) { + return time.to_utc().format("%Y-%m-%dT%H:%M:%SZ"); } -} \ No newline at end of file +} + +} diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala new file mode 100644 index 00000000..03be907c --- /dev/null +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -0,0 +1,112 @@ +using Xmpp.Core; + +namespace Xmpp.Xep.StreamManagement { + +public const string NS_URI = "urn:xmpp:sm:3"; + +public class Module : XmppStreamNegotiationModule { + public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0313_message_archive_management"); + + public int h_inbound { get; private set; default=0; } + public string? session_id { get; set; default=null; } + public Gee.List flags = null; + + public override void attach(XmppStream stream) { + stream.get_module(Bind.Module.IDENTITY).bound_to_resource.connect(check_enable); + stream.received_features_node.connect(check_resume); + + stream.received_nonza.connect(on_received_nonza); + stream.received_message_stanza.connect(on_stanza_received); + stream.received_presence_stanza.connect(on_stanza_received); + stream.received_iq_stanza.connect(on_stanza_received); + } + + public override void detach(XmppStream stream) { } + + public static void require(XmppStream stream) { + if (stream.get_module(IDENTITY) == null) stream.add_module(new PrivateXmlStorage.Module()); + } + + public override bool mandatory_outstanding(XmppStream stream) { return false; } + + public override bool negotiation_active(XmppStream stream) { + return stream.has_flag(Flag.IDENTITY) && !stream.get_flag(Flag.IDENTITY).finished; + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } + + private void on_stanza_received(XmppStream stream, StanzaNode node) { + lock (h_inbound) h_inbound++; + } + + private void check_resume(XmppStream stream) { + if (stream_has_sm_feature(stream) && session_id != null) { + Tls.Flag? tls_flag = stream.get_flag(Tls.Flag.IDENTITY); + if (tls_flag != null && tls_flag.finished) { + StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns() + .put_attribute("h", h_inbound.to_string()) + .put_attribute("previd", session_id); + stream.write(node); + stream.add_flag(new Flag()); + } + } + } + + private void check_enable(XmppStream stream) { + if (stream_has_sm_feature(stream) && session_id == null) { + StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true"); + stream.write(node); + stream.add_flag(new Flag()); + } + } + + private void on_received_nonza(XmppStream stream, StanzaNode node) { + if (node.ns_uri == NS_URI) { + if (node.name == "r") { + send_ack(stream); + } else if (node.name == "a") { + handle_ack(stream, node); + } else if (node.name in new string[]{"enabled", "resumed", "failed"}) { + stream.get_flag(Flag.IDENTITY).finished = true; + + if (node.name == "enabled") { + lock(h_inbound) h_inbound = 0; + session_id = node.get_attribute("id", NS_URI); + flags = stream.flags; + } else if (node.name == "resumed") { + foreach (XmppStreamFlag flag in flags) { + stream.add_flag(flag); + } + stream.negotiation_complete = true; + } else if (node.name == "failed") { + stream.received_features_node(stream); + session_id = null; + } + } + } + } + + private void send_ack(XmppStream stream) { + StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string()); + stream.write(node); + } + + private void handle_ack(XmppStream stream, StanzaNode node) { + + } + + private bool stream_has_sm_feature(XmppStream stream) { + return stream.features.get_subnode("sm", NS_URI) != null; + } +} + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "stream_management"); + public bool finished = false; + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +}