diff --git a/libdino/src/service/chat_interaction.vala b/libdino/src/service/chat_interaction.vala index 635a4a73..89a692ce 100644 --- a/libdino/src/service/chat_interaction.vala +++ b/libdino/src/service/chat_interaction.vala @@ -126,7 +126,7 @@ public class ChatInteraction : StreamInteractionModule, Object { private class ReceivedMessageListener : MessageListener { - public string[] after_actions_const = new string[]{ }; + public string[] after_actions_const = new string[]{ "DEDUPLICATE" }; public override string action_group { get { return "OTHER_NODES"; } } public override string[] after_actions { get { return after_actions_const; } } diff --git a/libdino/src/service/counterpart_interaction_manager.vala b/libdino/src/service/counterpart_interaction_manager.vala index 26be0096..b6c1faab 100644 --- a/libdino/src/service/counterpart_interaction_manager.vala +++ b/libdino/src/service/counterpart_interaction_manager.vala @@ -53,30 +53,39 @@ public class CounterpartInteractionManager : StreamInteractionModule, Object { } private void on_chat_marker_received(Account account, Jid jid, string marker, string stanza_id) { - foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations(jid, account)) { + bool own_marker = account.bare_jid.to_string() == jid.bare_jid.to_string(); + if (own_marker) { + Conversation? conversation = stream_interactor.get_module(MessageStorage.IDENTITY).get_conversation_for_stanza_id(account, stanza_id); + if (conversation == null) return; Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(stanza_id, conversation); - if (message != null) { - switch (marker) { - case Xep.ChatMarkers.MARKER_RECEIVED: - received_message_received(account, jid, message); - message.marked = Entities.Message.Marked.RECEIVED; - break; - case Xep.ChatMarkers.MARKER_DISPLAYED: - received_message_displayed(account, jid, message); - Gee.List messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation); - foreach (Entities.Message m in messages) { - if (m.equals(message)) break; - if (m.marked == Entities.Message.Marked.RECEIVED) m.marked = Entities.Message.Marked.READ; - } - message.marked = Entities.Message.Marked.READ; - break; - } - } else { - if (marker_wo_message.has_key(stanza_id) && + if (message == null) return; + conversation.read_up_to = message; + } else { + foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations(jid, account)) { + Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(stanza_id, conversation); + if (message != null) { + switch (marker) { + case Xep.ChatMarkers.MARKER_RECEIVED: + received_message_received(account, jid, message); + message.marked = Entities.Message.Marked.RECEIVED; + break; + case Xep.ChatMarkers.MARKER_DISPLAYED: + received_message_displayed(account, jid, message); + Gee.List messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation); + foreach (Entities.Message m in messages) { + if (m.equals(message)) break; + if (m.marked == Entities.Message.Marked.RECEIVED) m.marked = Entities.Message.Marked.READ; + } + message.marked = Entities.Message.Marked.READ; + break; + } + } else { + if (marker_wo_message.has_key(stanza_id) && marker_wo_message[stanza_id] == Xep.ChatMarkers.MARKER_DISPLAYED && marker == Xep.ChatMarkers.MARKER_RECEIVED) { - return; + return; + } + marker_wo_message[stanza_id] = marker; } - marker_wo_message[stanza_id] = marker; } } } diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index 01050631..9a20cf3f 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -13,6 +13,7 @@ public class MessageProcessor : StreamInteractionModule, Object { public signal void build_message_stanza(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation); public signal void pre_message_send(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation); public signal void message_sent(Entities.Message message, Conversation conversation); + public signal void history_synced(Account account); public MessageListenerHolder received_pipeline = new MessageListenerHolder(); @@ -66,7 +67,9 @@ public class MessageProcessor : StreamInteractionModule, Object { }); stream_interactor.module_manager.get_module(account, Xmpp.Xep.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => { DateTime start_time = account.mam_earliest_synced.to_unix() > 60 ? account.mam_earliest_synced.add_minutes(-1) : account.mam_earliest_synced; - stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).query_archive(stream, null, start_time, null); + stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).query_archive(stream, null, start_time, null, () => { + history_synced(account); + }); }); } diff --git a/libdino/src/service/message_storage.vala b/libdino/src/service/message_storage.vala index 27d9504d..35e05074 100644 --- a/libdino/src/service/message_storage.vala +++ b/libdino/src/service/message_storage.vala @@ -78,6 +78,16 @@ public class MessageStorage : StreamInteractionModule, Object { return null; } + public Conversation? get_conversation_for_stanza_id(Account account, string stanza_id) { + foreach (Conversation conversation in messages.keys) { + if (!conversation.account.equals(account)) continue; + foreach (Message message in messages[conversation]) { + if (message.stanza_id == stanza_id) return conversation; + } + } + return null; + } + private void init_conversation(Conversation conversation) { if (!messages.has_key(conversation)) { messages[conversation] = new Gee.TreeSet((a, b) => { diff --git a/libdino/src/service/notification_events.vala b/libdino/src/service/notification_events.vala index 5db581ba..13fef3e3 100644 --- a/libdino/src/service/notification_events.vala +++ b/libdino/src/service/notification_events.vala @@ -14,6 +14,9 @@ public class NotificationEvents : StreamInteractionModule, Object { private StreamInteractor stream_interactor; + private HashMap> mam_potential_new = new HashMap>(Account.hash_func, Account.equals_func); + private Gee.List synced_accounts = new ArrayList(); + public static void start(StreamInteractor stream_interactor) { NotificationEvents m = new NotificationEvents(stream_interactor); stream_interactor.add_module(m); @@ -24,9 +27,29 @@ public class NotificationEvents : StreamInteractionModule, Object { stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received); stream_interactor.get_module(PresenceManager.IDENTITY).received_subscription_request.connect(on_received_subscription_request); + stream_interactor.get_module(MessageProcessor.IDENTITY).history_synced.connect((account) => { + synced_accounts.add(account); + if (!mam_potential_new.has_key(account)) return; + foreach (Conversation c in mam_potential_new[account].keys) { + Entities.Message m = mam_potential_new[account][c]; + Entities.Message last_message = stream_interactor.get_module(MessageStorage.IDENTITY).get_last_message(c); + if (m.equals(last_message) && !c.read_up_to.equals(m)) { + on_message_received(m, c); + } + } + mam_potential_new[account].clear(); + }); } private void on_message_received(Entities.Message message, Conversation conversation) { + if (!synced_accounts.contains(conversation.account)) { + if (!mam_potential_new.has_key(conversation.account)) { + mam_potential_new[conversation.account] = new HashMap(Conversation.hash_func, Conversation.equals_func); + } + mam_potential_new[conversation.account][conversation] = message; + return; + } + if (!should_notify_message(message, conversation)) return; if (!should_notify_message(message, conversation)) return; if (stream_interactor.get_module(ChatInteraction.IDENTITY).is_active_focus()) return; notify_message(message, conversation); diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 343a5fbd..00f8f99b 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -14,7 +14,8 @@ public class Module : XmppStreamModule { private ReceivedPipelineListener received_pipeline_listener = new ReceivedPipelineListener(); - public void query_archive(XmppStream stream, string? jid, DateTime? start, DateTime? end) { + public delegate void OnFinished(XmppStream stream); + public void query_archive(XmppStream stream, string? jid, DateTime? start, DateTime? end, owned OnFinished? on_finished = null) { if (stream.get_flag(Flag.IDENTITY) == null) return; DataForms.DataForm data_form = new DataForms.DataForm(); @@ -38,7 +39,7 @@ public class Module : XmppStreamModule { } StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node()); Iq.Stanza iq = new Iq.Stanza.set(query_node); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, page_through_results); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { page_through_results(stream, iq, on_finished); }); } public override void attach(XmppStream stream) { @@ -53,10 +54,11 @@ public class Module : XmppStreamModule { public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private static void page_through_results(XmppStream stream, Iq.Stanza iq) { + private static void page_through_results(XmppStream stream, Iq.Stanza iq, owned OnFinished? on_finished = null) { string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last"); if (last == null) { stream.get_flag(Flag.IDENTITY).cought_up = true; + if (on_finished != null) on_finished(stream); return; } @@ -67,7 +69,7 @@ public class Module : XmppStreamModule { ) ) ); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, page_through_results); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, (stream, iq) => { page_through_results(stream, iq, on_finished); }); } private void query_availability(XmppStream stream) {