Terminate the Jingle session after the file transfer is complete

This commit is contained in:
hrxi 2019-07-22 21:40:30 +02:00
parent 308d71b703
commit 77ff73a1ca
1 changed files with 97 additions and 17 deletions

View File

@ -122,7 +122,7 @@ public class Module : XmppStreamModule, Iq.Handler {
throw new Error.GENERAL("Couldn't determine own JID");
}
TransportParameters transport_params = transport.create_transport_parameters();
Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name);
Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name, stream);
StanzaNode content = new StanzaNode.build("content", NS_URI)
.put_attribute("creator", "initiator")
.put_attribute("name", content_name)
@ -177,7 +177,7 @@ public class Module : XmppStreamModule, Iq.Handler {
ContentParameters content_params = content_type.parse_content_parameters(description);
TransportType type = content_type.content_type_transport_type();
Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name);
Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name, stream);
stream.get_flag(Flag.IDENTITY).add_session(session);
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
@ -317,7 +317,9 @@ public class Session {
// Signals that the session has been accepted by the peer.
public signal void accepted(XmppStream stream);
public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name) {
XmppStream hack;
public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name, XmppStream hack) {
this.state = State.INITIATE_SENT;
this.sid = sid;
this.type_ = type;
@ -325,9 +327,10 @@ public class Session {
this.content_name = content_name;
this.transport = transport;
this.connection = new Connection(this);
this.hack = hack;
}
public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) {
public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name, XmppStream hack) {
this.state = State.INITIATE_RECEIVED;
this.sid = sid;
this.type_ = type;
@ -335,6 +338,7 @@ public class Session {
this.content_name = content_name;
this.transport = transport;
this.connection = new Connection(this);
this.hack = hack;
}
public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError {
@ -447,12 +451,22 @@ public class Session {
}
public void on_connection_error(IOError error) {
// TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session
// TODO(hrxi): where can we get an XmppStream from?
StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("failed-transport", NS_URI))
.put_node(new StanzaNode.build("text", NS_URI)
.put_node(new StanzaNode.text(error.message))
);
terminate(hack, reason, "transport error: $(error.message)");
}
public void on_connection_close() {
StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("success", NS_URI));
terminate(hack, reason, "success");
}
public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) {
if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) {
// TODO(hrxi): what to do?
if (state == State.ENDED) {
return;
}
if (state == State.ACTIVE) {
@ -525,6 +539,9 @@ public class Connection : IOStream {
private IOStream? inner = null;
private string? error = null;
private bool read_closed = false;
private bool write_closed = false;
private class OnSetInnerCallback {
public SourceFunc callback;
public int io_priority;
@ -578,12 +595,19 @@ public class Connection : IOStream {
strong.on_connection_error(error);
}
}
private void handle_connection_close() {
Session? strong = session;
if (strong != null) {
strong.on_connection_close();
}
}
public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.input_stream.read_async(buffer, io_priority, cancellable);
} catch (IOError e) {
print("read_async error\n");
handle_connection_error(e);
throw e;
}
@ -593,37 +617,93 @@ public class Connection : IOStream {
try {
return yield inner.output_stream.write_async(buffer, io_priority, cancellable);
} catch (IOError e) {
print("write_async error\n");
handle_connection_error(e);
throw e;
}
}
public bool close_read(Cancellable? cancellable = null) throws IOError {
check_for_errors();
if (read_closed) {
return true;
}
close_read_async.begin(GLib.Priority.DEFAULT, cancellable);
return true;
}
public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.input_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
handle_connection_error(e);
throw e;
if (read_closed) {
return true;
}
read_closed = true;
IOError error = null;
bool result = true;
try {
result = yield inner.input_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
print("input_stream.close_async error\n");
if (error == null) {
error = e;
}
}
try {
result = (yield close_if_both_closed(io_priority, cancellable)) && result;
} catch (IOError e) {
print("close_if_both_closed error\n");
if (error == null) {
error = e;
}
}
if (error != null) {
handle_connection_error(error);
throw error;
}
return result;
}
public bool close_write(Cancellable? cancellable = null) throws IOError {
check_for_errors();
if (write_closed) {
return true;
}
close_write_async.begin(GLib.Priority.DEFAULT, cancellable);
return true;
}
public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.output_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
handle_connection_error(e);
throw e;
if (write_closed) {
return true;
}
write_closed = true;
IOError error = null;
bool result = true;
try {
result = yield inner.output_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
print("output_stream.close_async error\n");
if (error == null) {
error = e;
}
}
try {
result = (yield close_if_both_closed(io_priority, cancellable)) && result;
} catch (IOError e) {
print("close_if_both_closed error\n");
if (error == null) {
error = e;
}
}
if (error != null) {
handle_connection_error(error);
throw error;
}
return result;
}
private async bool close_if_both_closed(int io_priority, Cancellable? cancellable = null) throws IOError {
if (read_closed && write_closed) {
handle_connection_close();
//return yield inner.close_async(io_priority, cancellable);
}
return true;
}
}