diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index f1526b16..04518e72 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -22,7 +22,6 @@ public class MessageProcessor : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private Database db; - private Object lock_send_unsent; private HashMap current_catchup_id = new HashMap(Account.hash_func, Account.equals_func); private HashMap> mam_times = new HashMap>(); public HashMap hitted_range = new HashMap(); @@ -565,39 +564,48 @@ public class MessageProcessor : StreamInteractionModule, Object { } public void send_xmpp_message(Entities.Message message, Conversation conversation, bool delayed = false) { - lock (lock_send_unsent) { - XmppStream stream = stream_interactor.get_stream(conversation.account); - message.marked = Entities.Message.Marked.NONE; - if (stream != null) { - Xmpp.MessageStanza new_message = new Xmpp.MessageStanza(message.stanza_id); - new_message.to = message.counterpart; - new_message.body = message.body; - if (conversation.type_ == Conversation.Type.GROUPCHAT) { - new_message.type_ = Xmpp.MessageStanza.TYPE_GROUPCHAT; - } else { - new_message.type_ = Xmpp.MessageStanza.TYPE_CHAT; - } - build_message_stanza(message, new_message, conversation); - pre_message_send(message, new_message, conversation); - if (message.marked == Entities.Message.Marked.UNSENT || message.marked == Entities.Message.Marked.WONTSEND) return; - if (delayed) { - Xmpp.Xep.DelayedDelivery.Module.set_message_delay(new_message, message.time); - } + XmppStream stream = stream_interactor.get_stream(conversation.account); + message.marked = Entities.Message.Marked.NONE; - // Set an origin ID if a MUC doen't guarantee to keep IDs - if (conversation.type_ == Conversation.Type.GROUPCHAT) { - Xep.Muc.Flag? flag = stream.get_flag(Xep.Muc.Flag.IDENTITY); - if (flag == null) return; - if(!flag.has_room_feature(conversation.counterpart, Xep.Muc.Feature.STABLE_ID)) { - Xep.UniqueStableStanzaIDs.set_origin_id(new_message, message.stanza_id); - } - } + if (stream == null) { + message.marked = Entities.Message.Marked.UNSENT; + return; + } - stream.get_module(Xmpp.MessageModule.IDENTITY).send_message(stream, new_message); - } else { + MessageStanza new_message = new MessageStanza(message.stanza_id); + new_message.to = message.counterpart; + new_message.body = message.body; + if (conversation.type_ == Conversation.Type.GROUPCHAT) { + new_message.type_ = MessageStanza.TYPE_GROUPCHAT; + } else { + new_message.type_ = MessageStanza.TYPE_CHAT; + } + build_message_stanza(message, new_message, conversation); + pre_message_send(message, new_message, conversation); + if (message.marked == Entities.Message.Marked.UNSENT || message.marked == Entities.Message.Marked.WONTSEND) return; + if (delayed) { + DelayedDelivery.Module.set_message_delay(new_message, message.time); + } + + // Set an origin ID if a MUC doen't guarantee to keep IDs + if (conversation.type_ == Conversation.Type.GROUPCHAT) { + Xep.Muc.Flag? flag = stream.get_flag(Xep.Muc.Flag.IDENTITY); + if (flag == null) { message.marked = Entities.Message.Marked.UNSENT; + return; + } + if(!flag.has_room_feature(conversation.counterpart, Xep.Muc.Feature.STABLE_ID)) { + UniqueStableStanzaIDs.set_origin_id(new_message, message.stanza_id); } } + + stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => { + try { + stream.get_module(MessageModule.IDENTITY).send_message.end(res); + } catch (IOStreamError e) { + message.marked = Entities.Message.Marked.UNSENT; + } + }); } } diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala index 5ddbbe1a..5f7d40f0 100644 --- a/xmpp-vala/src/module/message/module.vala +++ b/xmpp-vala/src/module/message/module.vala @@ -14,10 +14,9 @@ namespace Xmpp { public signal void received_message(XmppStream stream, MessageStanza message); public signal void received_message_unprocessed(XmppStream stream, MessageStanza message); - public void send_message(XmppStream stream, MessageStanza message) { - send_pipeline.run.begin(stream, message, (obj, res) => { - stream.write(message.stanza); - }); + public async void send_message(XmppStream stream, MessageStanza message) throws IOStreamError { + yield send_pipeline.run(stream, message); + yield stream.write_async(message.stanza); } public async void received_message_stanza_async(XmppStream stream, StanzaNode node) { diff --git a/xmpp-vala/src/module/xep/0045_muc/module.vala b/xmpp-vala/src/module/xep/0045_muc/module.vala index 79fd2c31..59cb703e 100644 --- a/xmpp-vala/src/module/xep/0045_muc/module.vala +++ b/xmpp-vala/src/module/xep/0045_muc/module.vala @@ -131,7 +131,7 @@ public class Module : XmppStreamModule { message.to = jid; message.type_ = MessageStanza.TYPE_GROUPCHAT; message.stanza.put_node((new StanzaNode.build("subject")).put_node(new StanzaNode.text(subject))); - stream.get_module(MessageModule.IDENTITY).send_message(stream, message); + stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message); } public void change_nick(XmppStream stream, Jid jid, string new_nick) { @@ -151,7 +151,7 @@ public class Module : XmppStreamModule { StanzaNode invite_node = new StanzaNode.build("x", NS_URI_USER).add_self_xmlns() .put_node(new StanzaNode.build("invite", NS_URI_USER).put_attribute("to", jid.to_string())); message.stanza.put_node(invite_node); - stream.get_module(MessageModule.IDENTITY).send_message(stream, message); + stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message); } public void kick(XmppStream stream, Jid jid, string nick) { diff --git a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala index 7c1c9172..bf7515cb 100644 --- a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala +++ b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala @@ -27,7 +27,7 @@ public class Module : XmppStreamModule { MessageProcessingHints.set_message_hint(message, MessageProcessingHints.HINT_NO_STORE); - stream.get_module(MessageModule.IDENTITY).send_message(stream, message); + stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message); } public override void attach(XmppStream stream) { diff --git a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala index b51178c7..199cfee8 100644 --- a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala +++ b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala @@ -12,7 +12,7 @@ namespace Xmpp.Xep.MessageDeliveryReceipts { MessageStanza received_message = new MessageStanza(); received_message.to = from; received_message.stanza.put_node(new StanzaNode.build("received", NS_URI).add_self_xmlns().put_attribute("id", message_id)); - stream.get_module(MessageModule.IDENTITY).send_message(stream, received_message); + stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, received_message); } public static bool requests_receipt(MessageStanza message) { diff --git a/xmpp-vala/src/module/xep/0333_chat_markers.vala b/xmpp-vala/src/module/xep/0333_chat_markers.vala index 257f6967..7f92fbe3 100644 --- a/xmpp-vala/src/module/xep/0333_chat_markers.vala +++ b/xmpp-vala/src/module/xep/0333_chat_markers.vala @@ -21,7 +21,7 @@ public class Module : XmppStreamModule { received_message.to = jid; received_message.type_ = type_; received_message.stanza.put_node(new StanzaNode.build(marker, NS_URI).add_self_xmlns().put_attribute("id", message_id)); - stream.get_module(MessageModule.IDENTITY).send_message(stream, received_message); + stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, received_message); } public static bool requests_marking(MessageStanza message) {