From d092473fe401a5a668e57f054efbd1e84ac6ca59 Mon Sep 17 00:00:00 2001 From: Marvin W Date: Mon, 6 Feb 2023 21:13:33 +0100 Subject: [PATCH] Improve history sync under load --- libdino/src/service/history_sync.vala | 92 +++++++++++++++++++-------- libdino/src/service/muc_manager.vala | 36 +++++++++-- libdino/src/util/weak_map.vala | 13 ++-- xmpp-vala/src/core/xmpp_stream.vala | 2 +- 4 files changed, 102 insertions(+), 41 deletions(-) diff --git a/libdino/src/service/history_sync.vala b/libdino/src/service/history_sync.vala index d9ecd41b..e83b3cb4 100644 --- a/libdino/src/service/history_sync.vala +++ b/libdino/src/service/history_sync.vala @@ -11,6 +11,8 @@ public class Dino.HistorySync { private Database db; public HashMap> current_catchup_id = new HashMap>(Account.hash_func, Account.equals_func); + public WeakMap sync_streams = new WeakMap(Account.hash_func, Account.equals_func); + public HashMap> cancellables = new HashMap>(Account.hash_func, Account.equals_func); public HashMap> mam_times = new HashMap>(); public HashMap hitted_range = new HashMap(); @@ -27,9 +29,11 @@ public class Dino.HistorySync { stream_interactor.account_added.connect(on_account_added); - stream_interactor.connection_manager.stream_opened.connect((account, stream) => { - debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string()); - current_catchup_id.unset(account); + stream_interactor.stream_negotiated.connect((account, stream) => { + if (current_catchup_id.has_key(account)) { + debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string()); + current_catchup_id[account].clear(); + } }); } @@ -206,7 +210,7 @@ public class Dino.HistorySync { PageRequestResult page_result = yield get_mam_page(account, query_params, null, cancellable); debug("[%s | %s] Latest page result: %s", account.bare_jid.to_string(), mam_server.to_string(), page_result.page_result.to_string()); - if (page_result.page_result == PageResult.Error) { + if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled) { return null; } @@ -239,7 +243,7 @@ public class Dino.HistorySync { } // Either we need to fetch more pages or this is the first db entry ever - debug("[%s | %s] Creating new db range for latest page", mam_server.to_string(), mam_server.to_string()); + debug("[%s | %s] Creating new db range for latest page", account.bare_jid.to_string(), mam_server.to_string()); string from_id = page_result.query_result.first; string to_id = page_result.query_result.last; @@ -328,7 +332,7 @@ public class Dino.HistorySync { page_result = yield get_mam_page(account, query_params, page_result, cancellable); debug("Page result %s %b", page_result.page_result.to_string(), page_result.stanzas == null); - if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result; + if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.stanzas == null) return page_result; string earliest_mam_id = page_result.query_result.first; long earliest_mam_time = (long)mam_times[account][earliest_mam_id].to_unix(); @@ -354,7 +358,8 @@ public class Dino.HistorySync { TargetReached, NoMoreMessages, Duplicate, - Error + Error, + Cancelled } /** @@ -364,9 +369,9 @@ public class Dino.HistorySync { XmppStream stream = stream_interactor.get_stream(account); Xmpp.MessageArchiveManagement.QueryResult query_result = null; if (prev_page_result == null) { - query_result = yield Xmpp.MessageArchiveManagement.V2.query_archive(stream, query_params); + query_result = yield Xmpp.MessageArchiveManagement.V2.query_archive(stream, query_params, cancellable); } else { - query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result); + query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result, cancellable); } return yield process_query_result(account, query_params, query_result, cancellable); } @@ -394,6 +399,10 @@ public class Dino.HistorySync { string query_id = query_params.query_id; string? after_id = query_params.start_id; + if (cancellable != null && cancellable.is_cancelled()) { + return new PageRequestResult(PageResult.Cancelled, query_result, stanzas[query_id]); + } + if (stanzas.has_key(query_id) && !stanzas[query_id].is_empty) { // Check it we reached our target (from_id) @@ -402,17 +411,21 @@ public class Dino.HistorySync { if (mam_message_flag != null && mam_message_flag.mam_id != null) { if (after_id != null && mam_message_flag.mam_id == after_id) { // Successfully fetched the whole range - var ret = new PageRequestResult(PageResult.TargetReached, query_result, stanzas[query_id]); - send_messages_back_into_pipeline(account, query_id); - return ret; + yield send_messages_back_into_pipeline(account, query_id, cancellable); + if (cancellable != null && cancellable.is_cancelled()) { + return new PageRequestResult(PageResult.Cancelled, query_result, stanzas[query_id]); + } + return new PageRequestResult(PageResult.TargetReached, query_result, stanzas[query_id]); } } } if (hitted_range.has_key(query_id) && hitted_range[query_id] == -2) { - // Message got filtered out by xmpp-vala, but succesfull range fetch nevertheless - var ret = new PageRequestResult(PageResult.TargetReached, query_result, stanzas[query_id]); - send_messages_back_into_pipeline(account, query_id); - return ret; + // Message got filtered out by xmpp-vala, but succesful range fetch nevertheless + yield send_messages_back_into_pipeline(account, query_id); + if (cancellable != null && cancellable.is_cancelled()) { + return new PageRequestResult(PageResult.Cancelled, query_result, stanzas[query_id]); + } + return new PageRequestResult(PageResult.TargetReached, query_result, stanzas[query_id]); } // Check for duplicates. Go through all messages and build a db query. @@ -444,16 +457,19 @@ public class Dino.HistorySync { } } - var res = new PageRequestResult(page_result, query_result, stanzas.has_key(query_id) ? stanzas[query_id] : null); - send_messages_back_into_pipeline(account, query_id); - return res; + yield send_messages_back_into_pipeline(account, query_id); + if (cancellable != null && cancellable.is_cancelled()) { + page_result = PageResult.Cancelled; + } + return new PageRequestResult(page_result, query_result, stanzas.has_key(query_id) ? stanzas[query_id] : null); } - private void send_messages_back_into_pipeline(Account account, string query_id) { + private async void send_messages_back_into_pipeline(Account account, string query_id, Cancellable? cancellable = null) { if (!stanzas.has_key(query_id)) return; foreach (Xmpp.MessageStanza message in stanzas[query_id]) { - stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce.begin(account, message); + if (cancellable != null && cancellable.is_cancelled()) break; + yield stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce(account, message); } stanzas.unset(query_id); } @@ -463,14 +479,16 @@ public class Dino.HistorySync { mam_times[account] = new HashMap(); - XmppStream? stream_bak = null; - stream_interactor.module_manager.get_module(account, Xmpp.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => { - if (stream == stream_bak) return; + stream_interactor.connection_manager.stream_attached_modules.connect((account, stream) => { + if (!current_catchup_id.has_key(account)) { + current_catchup_id[account] = new HashMap(Jid.hash_func, Jid.equals_func); + } else { + current_catchup_id[account].clear(); + } + }); - current_catchup_id[account] = new HashMap(Jid.hash_func, Jid.equals_func); - stream_bak = stream; - debug("[%s] MAM available", account.bare_jid.to_string()); - fetch_everything.begin(account, account.bare_jid); + stream_interactor.module_manager.get_module(account, Xmpp.MessageArchiveManagement.Module.IDENTITY).feature_available.connect((stream) => { + consider_fetch_everything(account, stream); }); stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message_unprocessed.connect((stream, message) => { @@ -478,6 +496,24 @@ public class Dino.HistorySync { }); } + private void consider_fetch_everything(Account account, XmppStream stream) { + if (sync_streams.has(account, stream)) return; + + debug("[%s] MAM available", account.bare_jid.to_string()); + sync_streams[account] = stream; + if (!cancellables.has_key(account)) { + cancellables[account] = new HashMap(); + } + if (cancellables[account].has_key(account.bare_jid)) { + cancellables[account][account.bare_jid].cancel(); + } + cancellables[account][account.bare_jid] = new Cancellable(); + fetch_everything.begin(account, account.bare_jid, cancellables[account][account.bare_jid], new DateTime.from_unix_utc(0), (_, res) => { + fetch_everything.end(res); + cancellables[account].unset(account.bare_jid); + }); + } + public static void cleanup_db_ranges(Database db, Account account) { var ranges = new HashMap>(Jid.hash_func, Jid.equals_func); foreach (Row row in db.mam_catchup.select().with(db.mam_catchup.account_id, "=", account.id)) { diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala index f95e10f2..f796c36f 100644 --- a/libdino/src/service/muc_manager.vala +++ b/libdino/src/service/muc_manager.vala @@ -23,6 +23,7 @@ public class MucManager : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private HashMap> mucs_todo = new HashMap>(Account.hash_func, Account.equals_func); private HashMap> mucs_joining = new HashMap>(Account.hash_func, Account.equals_func); + private HashMap> mucs_sync_cancellables = new HashMap>(Account.hash_func, Account.equals_func); private HashMap enter_errors = new HashMap(Jid.hash_func, Jid.equals_func); private ReceivedMessageListener received_message_listener; private HashMap bookmarks_provider = new HashMap(Account.hash_func, Account.equals_func); @@ -56,7 +57,7 @@ public class MucManager : StreamInteractionModule, Object { } // already_autojoin: Without this flag we'd be retrieving bookmarks (to check for autojoin) from the sender on every join - public async Muc.JoinResult? join(Account account, Jid jid, string? nick, string? password, bool already_autojoin = false) { + public async Muc.JoinResult? join(Account account, Jid jid, string? nick, string? password, bool already_autojoin = false, Cancellable? cancellable = null) { XmppStream? stream = stream_interactor.get_stream(account); if (stream == null) return null; @@ -102,14 +103,22 @@ public class MucManager : StreamInteractionModule, Object { stream_interactor.get_module(ConversationManager.IDENTITY).start_conversation(joined_conversation); if (can_do_mam) { + var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync; if (conversation == null) { // We never joined the conversation before, just fetch the latest MAM page - yield stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync - .fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0)); + yield history_sync.fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0), cancellable); } else { // Fetch everything up to the last time the user actively joined - stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync - .fetch_everything.begin(account, jid.bare_jid, null, conversation.active_last_changed); + if (!mucs_sync_cancellables.has_key(account)) { + mucs_sync_cancellables[account] = new HashMap(); + } + if (!mucs_sync_cancellables[account].has_key(jid.bare_jid)) { + mucs_sync_cancellables[account][jid.bare_jid] = new Cancellable(); + history_sync.fetch_everything.begin(account, jid.bare_jid, mucs_sync_cancellables[account][jid.bare_jid], conversation.active_last_changed, (_, res) => { + history_sync.fetch_everything.end(res); + mucs_sync_cancellables[account].unset(jid.bare_jid); + }); + } } } } else if (res.muc_error != null) { @@ -132,6 +141,14 @@ public class MucManager : StreamInteractionModule, Object { Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid, account); if (conversation != null) stream_interactor.get_module(ConversationManager.IDENTITY).close_conversation(conversation); + + cancel_sync(account, jid); + } + + private void cancel_sync(Account account, Jid jid) { + if (mucs_sync_cancellables.has_key(account) && mucs_sync_cancellables[account].has_key(jid.bare_jid) && !mucs_sync_cancellables[account][jid.bare_jid].is_cancelled()) { + mucs_sync_cancellables[account][jid.bare_jid].cancel(); + } } public async DataForms.DataForm? get_config_form(Account account, Jid jid) { @@ -403,6 +420,7 @@ public class MucManager : StreamInteractionModule, Object { private void on_account_added(Account account) { stream_interactor.module_manager.get_module(account, Xep.Muc.Module.IDENTITY).self_removed_from_room.connect( (stream, jid, code) => { + cancel_sync(account, jid); left(account, jid); }); stream_interactor.module_manager.get_module(account, Xep.Muc.Module.IDENTITY).subject_set.connect( (stream, subject, jid) => { @@ -467,6 +485,14 @@ public class MucManager : StreamInteractionModule, Object { } private async void on_stream_negotiated(Account account, XmppStream stream) { + if (mucs_sync_cancellables.has_key(account)) { + foreach (Cancellable cancellable in mucs_sync_cancellables[account].values) { + if (!cancellable.is_cancelled()) { + cancellable.cancel(); + } + } + } + yield initialize_bookmarks_provider(account); Set? conferences = yield bookmarks_provider[account].get_conferences(stream); diff --git a/libdino/src/util/weak_map.vala b/libdino/src/util/weak_map.vala index 0fd9d55d..a7f4bc44 100644 --- a/libdino/src/util/weak_map.vala +++ b/libdino/src/util/weak_map.vala @@ -22,12 +22,11 @@ public class WeakMap : Gee.AbstractMap { hash_map = new HashMap(); notify_map = new HashMap(); } else { - hash_map = new HashMap((v) => { return this.key_hash_func(v); }, - (a, b) => { return this.key_equal_func(a, b); }, - (a, b) => { return this.value_equal_func(a, b); }); - notify_map = new HashMap((v) => { return this.key_hash_func(v); }, - (a, b) => { return this.key_equal_func(a, b); }, - (a, b) => { return this.value_equal_func(a, b); }); + hash_map = new HashMap((v) => { return this.key_hash_func != null ? this.key_hash_func(v) : 0; }, + (a, b) => { return this.key_equal_func != null ? this.key_equal_func(a, b) : a == b; }, + (a, b) => { return this.value_equal_func != null ? this.value_equal_func(a, b) : a == b; }); + notify_map = new HashMap((v) => { return this.key_hash_func != null ? this.key_hash_func(v) : 0; }, + (a, b) => { return this.key_equal_func != null ? this.key_equal_func(a, b) : a == b; }); } } @@ -49,7 +48,7 @@ public class WeakMap : Gee.AbstractMap { } public override bool has(K key, V value) { - assert_not_reached(); + return has_key(key) && (this.value_equal_func != null ? this.value_equal_func(hash_map[key], value) : hash_map[key] == value); } public override bool has_key(K key) { diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index 322fb016..42e90bf9 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -1,6 +1,6 @@ using Gee; -public abstract class Xmpp.XmppStream { +public abstract class Xmpp.XmppStream : Object { public signal void received_node(XmppStream stream, StanzaNode node); public signal void received_root_node(XmppStream stream, StanzaNode node);