From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Aikar Date: Wed, 6 May 2020 04:53:35 -0400 Subject: [PATCH] Optimize Network Manager and add advanced packet support Adds ability for 1 packet to bundle other packets to follow it adds ability for a packet to delay sending more packets until a state is ready. Removes synchronization from sending packets Removes processing packet queue off of main thread - for the few cases where it is allowed, order is not necessary nor should it even be happening concurrently in first place (handshaking/login/status) Ensures packets sent asynchronously are dispatched on main thread This helps ensure safety for ProtocolLib as packet listeners are commonly accessing world state. This will allow you to schedule a packet to be sent async, but itll be dispatched sync for packet listeners to process. This should solve some deadlock risks Part of this commit was authored by: Spottedleaf diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index b1dededc15cce686ead74a99bee64c89ac1de22c..b529c8882491bad278570f322e8a13cfc3588e6c 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -64,6 +64,10 @@ public class NetworkManager extends SimpleChannelInboundHandler> { public int protocolVersion; public java.net.InetSocketAddress virtualHost; private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); + // Optimize network + boolean isPending = true; + boolean queueImmunity = false; + EnumProtocol protocol; // Paper end public NetworkManager(EnumProtocolDirection enumprotocoldirection) { @@ -87,6 +91,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } public void setProtocol(EnumProtocol enumprotocol) { + protocol = enumprotocol; // Paper this.channel.attr(NetworkManager.c).set(enumprotocol); this.channel.config().setAutoRead(true); NetworkManager.LOGGER.debug("Enabled auto read"); @@ -158,19 +163,72 @@ public class NetworkManager extends SimpleChannelInboundHandler> { NetworkManager.LOGGER.debug("Set listener of {} to {}", this, packetlistener); this.packetListener = packetlistener; } + // Paper start + private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up. + private static java.util.List buildExtraPackets(Packet packet) { + java.util.List extra = packet.getExtraPackets(); + if (extra == null || extra.isEmpty()) { + return null; + } + java.util.List ret = new java.util.ArrayList<>(1 + extra.size()); + buildExtraPackets0(extra, ret); + return ret; + } + + private static void buildExtraPackets0(java.util.List extraPackets, java.util.List into) { + for (Packet extra : extraPackets) { + into.add(extra); + java.util.List extraExtra = extra.getExtraPackets(); + if (extraExtra != null && !extraExtra.isEmpty()) { + buildExtraPackets0(extraExtra, into); + } + } + } + } + // Paper end public void sendPacket(Packet packet) { this.sendPacket(packet, (GenericFutureListener) null); } + // Paper start + private boolean canSendImmediate(Packet packet) { + return isPending || protocol == EnumProtocol.HANDSHAKING || protocol == EnumProtocol.STATUS || queueImmunity || + packet instanceof PacketPlayOutKeepAlive || + packet instanceof PacketPlayOutChat || + packet instanceof PacketPlayOutTabComplete; + } + // Paper end public void sendPacket(Packet packet, @Nullable GenericFutureListener> genericfuturelistener) { - if (this.isConnected()) { - this.o(); - this.b(packet, genericfuturelistener); - } else { - this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + // Paper start - handle oversized packets better + boolean connected = this.isConnected(); + if (!connected && !preparing) return; // Do nothing + if (connected && (canSendImmediate(packet) || ( + MCUtil.isMainThread() && packet.isReady() && this.packetQueue.isEmpty() && + (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty()) + ))) { + this.dispatchPacket(packet, genericfuturelistener); + return; } + // write the packets to the queue, then flush - antixray hooks there already + java.util.List extraPackets = InnerUtil.buildExtraPackets(packet); + boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); + if (!hasExtraPackets) { + this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + } else { + java.util.List packets = new java.util.ArrayList<>(1 + extraPackets.size()); + packets.add(new NetworkManager.QueuedPacket(packet, null)); // delay the future listener until the end of the extra packets + for (int i = 0, len = extraPackets.size(); i < len;) { + Packet extra = extraPackets.get(i); + boolean end = ++i == len; + packets.add(new NetworkManager.QueuedPacket(extra, end ? genericfuturelistener : null)); // append listener to the end + } + + this.packetQueue.addAll(packets); // atomic + } + this.sendPacketQueue(); + // Paper end } private void dispatchPacket(Packet packet, @Nullable GenericFutureListener> genericFutureListener) { this.b(packet, genericFutureListener); } // Paper - OBFHELPER @@ -214,21 +272,46 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } - private void sendPacketQueue() { this.o(); } // Paper - OBFHELPER - private void o() { - if (this.channel != null && this.channel.isOpen()) { - Queue queue = this.packetQueue; - + // Paper start - rewrite this to be safer on + private boolean sendPacketQueue() { return this.o(); } // OBFHELPER // void -> boolean + private boolean o() { // void -> boolean + if (!isConnected()) { + return true; + } + if (MCUtil.isMainThread()) { + return processQueue(); + } else if (isPending) { + // Should only happen during login/status stages synchronized (this.packetQueue) { - NetworkManager.QueuedPacket networkmanager_queuedpacket; - - while ((networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.packetQueue.poll()) != null) { - this.b(networkmanager_queuedpacket.a, networkmanager_queuedpacket.b); - } + return this.processQueue(); + } + } + return false; + } + private boolean processQueue() { + if (this.packetQueue.isEmpty()) return true; + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + java.util.Iterator iterator = this.packetQueue.iterator(); + while (iterator.hasNext()) { + NetworkManager.QueuedPacket queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } + Packet packet = queued.getPacket(); + if (!packet.isReady()) { + return false; + } else { + iterator.remove(); + this.dispatchPacket(packet, queued.getGenericFutureListener()); } } + return true; } + // Paper end public void a() { this.o(); @@ -260,6 +343,7 @@ public class NetworkManager extends SimpleChannelInboundHandler> { public void close(IChatBaseComponent ichatbasecomponent) { // Spigot Start this.preparing = false; + this.packetQueue.clear(); // Paper - just incase its closed before we ever get to the main thread to do this // Spigot End if (this.channel.isOpen()) { this.channel.close(); // We can't wait as this may be called from an event loop. diff --git a/src/main/java/net/minecraft/server/Packet.java b/src/main/java/net/minecraft/server/Packet.java index 2d8e6a2f4a0c3c5d74a647d7164b0028781d3bf5..45142ed9d2440e21dd1ff1a32a12759fea4dbcb4 100644 --- a/src/main/java/net/minecraft/server/Packet.java +++ b/src/main/java/net/minecraft/server/Packet.java @@ -11,6 +11,8 @@ public interface Packet { void a(T t0); // Paper start + default boolean isReady() { return true; } + default java.util.List getExtraPackets() { return null; } default boolean packetTooLarge(NetworkManager manager) { return false; } diff --git a/src/main/java/net/minecraft/server/PlayerList.java b/src/main/java/net/minecraft/server/PlayerList.java index e148940ab3721cff27cf791c159c11b9b94191e4..e917d37382dab70ed9e6b62decf1557c33b26065 100644 --- a/src/main/java/net/minecraft/server/PlayerList.java +++ b/src/main/java/net/minecraft/server/PlayerList.java @@ -143,6 +143,7 @@ public abstract class PlayerList { // CraftBukkit - getType() // Spigot - view distance + networkmanager.queueImmunity = true; // Paper playerconnection.sendPacket(new PacketPlayOutLogin(entityplayer.getId(), entityplayer.playerInteractManager.getGameMode(), WorldData.c(worlddata.getSeed()), worlddata.isHardcore(), worldserver.worldProvider.getDimensionManager().getType(), this.getMaxPlayers(), worlddata.getType(), worldserver.spigotConfig.viewDistance, flag1, !flag)); entityplayer.getBukkitEntity().sendSupportedChannels(); // CraftBukkit playerconnection.sendPacket(new PacketPlayOutCustomPayload(PacketPlayOutCustomPayload.a, (new PacketDataSerializer(Unpooled.buffer())).a(this.getServer().getServerModName()))); @@ -152,6 +153,7 @@ public abstract class PlayerList { playerconnection.sendPacket(new PacketPlayOutRecipeUpdate(this.server.getCraftingManager().b())); playerconnection.sendPacket(new PacketPlayOutTags(this.server.getTagRegistry())); playerconnection.sendPacket(new PacketPlayOutEntityStatus(entityplayer, (byte) (worldserver.getGameRules().getBoolean(GameRules.REDUCED_DEBUG_INFO) ? 22 : 23))); // Paper - fix this rule not being initialized on the client + networkmanager.queueImmunity = false; // Paper this.d(entityplayer); entityplayer.getStatisticManager().c(); entityplayer.B().a(entityplayer); diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java index 37a22ba6f7a2ac54759428d23d5ea9787bb557f7..06cd29bb9a5d6b67f896c129662c3f493238c758 100644 --- a/src/main/java/net/minecraft/server/ServerConnection.java +++ b/src/main/java/net/minecraft/server/ServerConnection.java @@ -45,6 +45,7 @@ public class ServerConnection { NetworkManager manager = null; while ((manager = pending.poll()) != null) { connectedChannels.add(manager); + manager.isPending = false; } } // Paper end