diff --git a/libdino/src/service/history_sync.vala b/libdino/src/service/history_sync.vala index c7bfee88..d9ecd41b 100644 --- a/libdino/src/service/history_sync.vala +++ b/libdino/src/service/history_sync.vala @@ -118,7 +118,7 @@ public class Dino.HistorySync { } } - public async void fetch_everything(Account account, Jid mam_server, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) { + public async void fetch_everything(Account account, Jid mam_server, Cancellable? cancellable = null, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) { debug("Fetch everything for %s %s", mam_server.to_string(), until_earliest_time != null ? @"(until $until_earliest_time)" : ""); RowOption latest_row_opt = db.mam_catchup.select() .with(db.mam_catchup.account_id, "=", account.id) @@ -128,7 +128,7 @@ public class Dino.HistorySync { .single().row(); Row? latest_row = latest_row_opt.is_present() ? latest_row_opt.inner : null; - Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time); + Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time, cancellable); if (new_row != null) { current_catchup_id[account][mam_server] = new_row[db.mam_catchup.id]; @@ -182,7 +182,7 @@ public class Dino.HistorySync { } // Fetches the latest page (up to previous db row). Extends the previous db row if it was reached, creates a new row otherwise. - public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time) { + public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time, Cancellable? cancellable = null) { debug("[%s | %s] Fetching latest page", account.bare_jid.to_string(), mam_server.to_string()); int latest_row_id = -1; @@ -203,7 +203,7 @@ public class Dino.HistorySync { var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_latest(mam_server, latest_message_time, latest_message_id); - PageRequestResult page_result = yield get_mam_page(account, query_params, null); + PageRequestResult page_result = yield get_mam_page(account, query_params, null, cancellable); debug("[%s | %s] Latest page result: %s", account.bare_jid.to_string(), mam_server.to_string(), page_result.page_result.to_string()); if (page_result.page_result == PageResult.Error) { @@ -299,7 +299,7 @@ public class Dino.HistorySync { return null; } - private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time) { + private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time, Cancellable? cancellable = null) { DateTime latest_time = new DateTime.from_unix_utc(range[db.mam_catchup.from_time]); string latest_id = range[db.mam_catchup.from_id]; debug("[%s | %s] Fetching before range < %s, %s", account.bare_jid.to_string(), mam_server.to_string(), latest_time.to_string(), latest_id); @@ -314,18 +314,18 @@ public class Dino.HistorySync { latest_time, latest_id ); } - yield fetch_query(account, query_params, range[db.mam_catchup.id]); + yield fetch_query(account, query_params, range[db.mam_catchup.id], cancellable); } /** * Iteratively fetches all pages returned for a query (until a PageResult other than MorePagesAvailable is returned) * @return The last PageRequestResult result **/ - private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id) { + private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id, Cancellable? cancellable = null) { debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : ""); PageRequestResult? page_result = null; do { - page_result = yield get_mam_page(account, query_params, page_result); + page_result = yield get_mam_page(account, query_params, page_result, cancellable); debug("Page result %s %b", page_result.page_result.to_string(), page_result.stanzas == null); if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result; @@ -360,7 +360,7 @@ public class Dino.HistorySync { /** * prev_page_result: null if this is the first page request **/ - private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result) { + private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result, Cancellable? cancellable = null) { XmppStream stream = stream_interactor.get_stream(account); Xmpp.MessageArchiveManagement.QueryResult query_result = null; if (prev_page_result == null) { @@ -368,10 +368,10 @@ public class Dino.HistorySync { } else { query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result); } - return yield process_query_result(account, query_params, query_result); + return yield process_query_result(account, query_params, query_result, cancellable); } - private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, Xmpp.MessageArchiveManagement.QueryResult query_result) { + private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, Xmpp.MessageArchiveManagement.QueryResult query_result, Cancellable? cancellable = null) { PageResult page_result = PageResult.MorePagesAvailable; if (query_result.malformed || query_result.error) { diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala index ff6ac941..f95e10f2 100644 --- a/libdino/src/service/muc_manager.vala +++ b/libdino/src/service/muc_manager.vala @@ -109,7 +109,7 @@ public class MucManager : StreamInteractionModule, Object { } else { // Fetch everything up to the last time the user actively joined stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync - .fetch_everything.begin(account, jid.bare_jid, conversation.active_last_changed); + .fetch_everything.begin(account, jid.bare_jid, null, conversation.active_last_changed); } } } else if (res.muc_error != null) { diff --git a/xmpp-vala/src/core/io_xmpp_stream.vala b/xmpp-vala/src/core/io_xmpp_stream.vala index 208e8053..9c58a46b 100644 --- a/xmpp-vala/src/core/io_xmpp_stream.vala +++ b/xmpp-vala/src/core/io_xmpp_stream.vala @@ -1,7 +1,7 @@ using Gee; public interface Xmpp.WriteNodeFunc : Object { - public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOError; + public abstract async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError; } public abstract class Xmpp.IoXmppStream : XmppStream { @@ -44,22 +44,22 @@ public abstract class Xmpp.IoXmppStream : XmppStream { } [Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")] - public override void write(StanzaNode node) { - write_async.begin(node, (obj, res) => { + public override void write(StanzaNode node, int io_priority = Priority.DEFAULT) { + write_async.begin(node, io_priority, null, (obj, res) => { try { write_async.end(res); } catch (Error e) { } }); } - public override async void write_async(StanzaNode node) throws IOError { + public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { if (write_obj != null) { - yield write_obj.write_stanza(this, node); + yield write_obj.write_stanza(this, node, io_priority, cancellable); } else { StanzaWriter? writer = this.writer; if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open"); log.node("OUT", node, this); - yield ((!)writer).write_node(node); + yield ((!)writer).write_node(node, io_priority, cancellable); } } diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index 5b926a93..aecf8983 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -12,11 +12,11 @@ public class StanzaWriter { this.output = output; } - public async void write_node(StanzaNode node) throws IOError { - yield write_data(node.to_xml().data); + public async void write_node(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield write_data(node.to_xml().data, io_priority, cancellable); } - public async void write_nodes(StanzaNode node1, StanzaNode node2) throws IOError { + public async void write_nodes(StanzaNode node1, StanzaNode node2, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { var data1 = node1.to_xml().data; var data2 = node2.to_xml().data; @@ -29,21 +29,21 @@ public class StanzaWriter { concat[i++] = datum; } - yield write_data(concat); + yield write_data(concat, io_priority, cancellable); } - public async void write(string s) throws IOError { - yield write_data(s.data); + public async void write(string s, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield write_data(s.data, io_priority, cancellable); } - private async void write_data(owned uint8[] data) throws IOError { + private async void write_data(owned uint8[] data, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { if (running) { queue.push_tail(new SourceFuncWrapper(write_data.callback)); yield; } running = true; try { - yield output.write_all_async(data, 0, null, null); + yield output.write_all_async(data, io_priority, cancellable, null); } catch (IOError e) { cancel(); throw e; diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index 6370554f..322fb016 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -37,9 +37,9 @@ public abstract class Xmpp.XmppStream { public abstract async StanzaNode read() throws IOError; [Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")] - public abstract void write(StanzaNode node); + public abstract void write(StanzaNode node, int io_priority = Priority.DEFAULT); - public abstract async void write_async(StanzaNode node) throws IOError; + public abstract async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError; public abstract async void setup() throws IOError; diff --git a/xmpp-vala/src/module/iq/module.vala b/xmpp-vala/src/module/iq/module.vala index 17cd3f0d..172aa9b1 100644 --- a/xmpp-vala/src/module/iq/module.vala +++ b/xmpp-vala/src/module/iq/module.vala @@ -12,22 +12,25 @@ namespace Xmpp.Iq { private HashMap responseListeners = new HashMap(); private HashMap> namespaceRegistrants = new HashMap>(); - public async Iq.Stanza send_iq_async(XmppStream stream, Iq.Stanza iq) { + public async Iq.Stanza send_iq_async(XmppStream stream, Iq.Stanza iq, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { assert(iq.type_ == Iq.Stanza.TYPE_GET || iq.type_ == Iq.Stanza.TYPE_SET); + preprocess_outgoing_iq_set_get(stream, iq); Iq.Stanza? return_stanza = null; - send_iq(stream, iq, (_, result_iq) => { + responseListeners[iq.id] = new ResponseListener((_, result_iq) => { return_stanza = result_iq; Idle.add(send_iq_async.callback); }); + stream.write_async(iq.stanza, io_priority, cancellable); yield; + cancellable.set_error_if_cancelled(); return return_stanza; } public delegate void OnResult(XmppStream stream, Iq.Stanza iq); - public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) { + public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null, int io_priority = Priority.DEFAULT) { preprocess_outgoing_iq_set_get(stream, iq); - stream.write(iq.stanza); + stream.write(iq.stanza, io_priority); if (listener != null) { responseListeners[iq.id] = new ResponseListener((owned) listener); } diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index 340c4e6f..68eee8ae 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -13,32 +13,51 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { public string? session_id { get; set; default=null; } public Gee.List flags = null; private HashMap in_flight_stanzas = new HashMap(); - private Gee.List node_queue = new ArrayList(); + private Gee.Queue node_queue = new PriorityQueue((a, b) => { + return a.io_priority - b.io_priority; + }); private class QueueItem { public StanzaNode node; - public Promise promise; + public int io_priority; + public Cancellable? cancellable; + public Promise promise; - public QueueItem(StanzaNode node, Promise promise) { + public QueueItem(StanzaNode node, int io_priority, Cancellable? cancellable) { this.node = node; - this.promise = promise; + this.io_priority = io_priority; + this.cancellable = cancellable; + this.promise = new Promise(); } } - public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError { - var promise = new Promise(); - - node_queue.add(new QueueItem(node, promise)); - check_queue(stream); - + public async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + var future = enqueue_stanza(stream, node, io_priority, cancellable); try { - yield promise.future.wait_async(); + yield future.wait_async(); } catch (FutureError e) { - throw new IOError.FAILED("Future returned error %i".printf(e.code)); + if (e is FutureError.ABANDON_PROMISE) { + throw new IOError.FAILED("Future abandoned: %s".printf(e.message)); + } else if (e is FutureError.EXCEPTION) { + if (future.exception is IOError) { + throw (IOError) future.exception; + } else { + throw new IOError.FAILED("Unknown error %s".printf(future.exception.message)); + } + } else { + throw new IOError.FAILED("Unknown future error: %s".printf(e.message)); + } } } - internal async void write_node(XmppStream stream, StanzaNode node) { + private Future enqueue_stanza(XmppStream stream, StanzaNode node, int io_priority, Cancellable? cancellable) { + var queue_item = new QueueItem(node, io_priority, cancellable); + node_queue.offer(queue_item); + check_queue(stream); + return queue_item.promise.future; + } + + internal async void write_node(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) { StanzaWriter? writer = ((IoXmppStream)stream).writer; if (writer == null) return; try { @@ -46,22 +65,28 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { if (node.name == "message" || node.name == "iq" || node.name == "presence") { var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns(); stream.log.node("OUT", r_node, stream); - yield ((!)writer).write_nodes(node, r_node); + yield ((!)writer).write_nodes(node, r_node, io_priority, cancellable); } else { - yield ((!)writer).write_node(node); + yield ((!)writer).write_node(node, io_priority, cancellable); } } catch (IOError e) { } } private void check_queue(XmppStream stream) { while (!node_queue.is_empty && in_flight_stanzas.size < 10) { - QueueItem queue_item = node_queue.remove_at(0); + QueueItem queue_item = node_queue.poll(); + try { + queue_item.cancellable.set_error_if_cancelled(); + } catch (IOError e) { + queue_item.promise.set_exception(e); + continue; + } StanzaNode node = queue_item.node; if (node.name == "message" || node.name == "iq" || node.name == "presence") { in_flight_stanzas[++h_outbound] = queue_item; } - write_node.begin(stream, node); + write_node.begin(stream, node, queue_item.io_priority, queue_item.cancellable); } } diff --git a/xmpp-vala/src/module/xep/0199_ping.vala b/xmpp-vala/src/module/xep/0199_ping.vala index 0b31011f..18fa538c 100644 --- a/xmpp-vala/src/module/xep/0199_ping.vala +++ b/xmpp-vala/src/module/xep/0199_ping.vala @@ -9,7 +9,7 @@ namespace Xmpp.Xep.Ping { public async Iq.Stanza send_ping(XmppStream stream, Jid jid) { StanzaNode ping_node = new StanzaNode.build("ping", NS_URI).add_self_xmlns(); Iq.Stanza iq = new Iq.Stanza.get(ping_node) { to=jid }; - return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq); + return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq, Priority.HIGH); } public override void attach(XmppStream stream) { @@ -23,7 +23,7 @@ namespace Xmpp.Xep.Ping { } public async void on_iq_get(XmppStream stream, Iq.Stanza iq) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq), null, Priority.HIGH); } public override string get_ns() { return NS_URI; } diff --git a/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala index a710a459..3a6d9259 100644 --- a/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala @@ -61,20 +61,20 @@ namespace Xmpp.MessageArchiveManagement.V2 { return MessageArchiveManagement.create_base_query(stream, MessageArchiveManagement.NS_URI_2, mam_params.query_id, fields); } - public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params) { + public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params, Cancellable? cancellable = null) { var query_node = create_base_query(stream, mam_params); if (!mam_params.use_ns2_extended) { query_node.put_node(ResultSetManagement.create_set_rsm_node_before(mam_params.end_id)); } - return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node); + return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node, cancellable); } - public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result) { + public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result, Cancellable? cancellable = null) { var query_node = create_base_query(stream, mam_params); query_node.put_node(ResultSetManagement.create_set_rsm_node_before(prev_result.first)); - return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node); + return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node, cancellable); } } diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 8c1b3fbc..1caa1bc3 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -71,7 +71,7 @@ public class Module : XmppStreamModule { return query_node; } - internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node) { + internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node, Cancellable? cancellable = null) { var res = new QueryResult(); if (stream.get_flag(Flag.IDENTITY) == null) { res.error = true; return res; } @@ -79,7 +79,7 @@ public class Module : XmppStreamModule { // Build and send query Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server }; - Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq); + Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq, Priority.LOW, cancellable); // Parse the response IQ into a QueryResult. StanzaNode? fin_node = result_iq.stanza.get_subnode("fin", ns);