From 116660b99d312b4d84956541a7fbc351778d4365 Mon Sep 17 00:00:00 2001 From: md_5 Date: Tue, 23 Apr 2013 11:47:32 +1000 Subject: [PATCH] Netty Implement an uber efficient network engine based on the Java NIO framework Netty. This is basically a complete rewrite of the Minecraft network engine with many distinct advantages. First and foremost, there will no longer be the horrid, and redundant case of 2, or even at times, 3 threads per a connection. Instead low level select/epoll based NIO is used. The number of threads used for network reading and writing will scale automatically to the number of cores for use on your server. In most cases this will be around 8 threads for a 4 core server, much better than the up to 1000 threads that could be in use at one time with the old engine. To facilitate asynchronous packet sending or receiving (currently only chat), a thread pool of 16 threads is kept handy. == Plugin incompatibilities As a side effect of this change, plugins which rely on very specific implementation level details within Minecraft are broken. At this point in time, TagAPI and ProtocolLib are affected. If you are a user of ProtocolLib you are advised to update to the latest build, where full support is enabled. If you are a user of TagAPI, support has not yet been added, so you will need to install the updated ProtocolLib so that TagAPI may use its functions. == Stability The code within this commit has been very lightly tested in production (300 players for approximately 24 hours), however it is not guaranteed to be free from all bugs. If you experence weird connection behaviour, reporting the bug and steps to reproduce are advised. You are also free to downgrade to the latest recommend build, which is guaranteed to be stable. == Summary This commit provides a reduction in threads, which gives the CPU / operating system more time to allocate to the main server threads, as well as various other side benefits such as chat thread pooling and a slight reduction in latency. This commit is licensed under the Creative Commons Attribution-ShareAlike 3.0 Unported license. diff --git a/pom.xml b/pom.xml index 274fd43..4022004 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,16 @@ trove4j 3.0.2 + + io.netty + netty-all + 4.0.0.CR3 + + + org.javassist + javassist + 3.17.1-GA + diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index 10ce69d..e4f1cbe 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -96,7 +96,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); try { - this.r = new DedicatedServerConnection(this, inetaddress, this.G()); + this.r = new org.spigotmc.MultiplexingServerConnection(this); // Spigot } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable this.getLogger().warning("**** FAILED TO BIND TO PORT!"); this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()}); diff --git a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java index ef7e10d..5f2e42e 100644 --- a/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java +++ b/src/main/java/net/minecraft/server/DedicatedServerConnectionThread.java @@ -82,7 +82,7 @@ public class DedicatedServerConnectionThread extends Thread { PendingConnection pendingconnection = new PendingConnection(this.e.d(), socket, "Connection #" + this.c++); - this.a(pendingconnection); + ((org.spigotmc.MultiplexingServerConnection) this.e.d().ae()).register(pendingconnection); // Spigot } catch (IOException ioexception) { this.e.d().getLogger().warning("DSCT: " + ioexception.getMessage()); // CraftBukkit } diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java new file mode 100644 index 0000000..6fcc5d7 --- /dev/null +++ b/src/main/java/net/minecraft/server/INetworkManager.java @@ -0,0 +1,26 @@ +package net.minecraft.server; + +import java.net.SocketAddress; + +public interface INetworkManager { + + void a(Connection connection); + + void queue(Packet packet); + + void a(); + + void b(); + + SocketAddress getSocketAddress(); + + void d(); + + int e(); + + void a(String s, Object... aobject); + + java.net.Socket getSocket(); // Spigot + + void setSocketAddress(java.net.SocketAddress address); // Spigot +} diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index 1862863..5a24f2a 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -24,7 +24,7 @@ public class NetworkManager implements INetworkManager { private final Object h = new Object(); private final IConsoleLogManager i; public Socket socket; // CraftBukkit - private -> public - private final SocketAddress k; + private SocketAddress k; // Spigot - remove final private volatile DataInputStream input; private volatile DataOutputStream output; private volatile boolean n = true; @@ -369,4 +369,6 @@ public class NetworkManager implements INetworkManager { static Thread h(NetworkManager networkmanager) { return networkmanager.u; } + + public void setSocketAddress(SocketAddress address) { k = address; } // Spigot } diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index 7711629..9cf625d 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet { this.b = chunk.z; this.e = flag; ChunkMap chunkmap = a(chunk, flag, i); - Deflater deflater = new Deflater(-1); + Deflater deflater = new Deflater(4); this.d = chunkmap.c; this.c = chunkmap.b; diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java index ce28495..fb34d4a 100644 --- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java +++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java @@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet { @Override protected Deflater initialValue() { // Don't use higher compression level, slows things down too much - return new Deflater(6); + return new Deflater(4); // Spigot - use lower compression level still } }; // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index 17cfacc..a945892 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -17,7 +17,7 @@ public class PendingConnection extends Connection { private static Random random = new Random(); private byte[] d; private final MinecraftServer server; - public final NetworkManager networkManager; + public final INetworkManager networkManager; public boolean b = false; private int f = 0; private String g = null; @@ -27,10 +27,15 @@ public class PendingConnection extends Connection { private SecretKey k = null; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; this.networkManager = new NetworkManager(minecraftserver.getLogger(), socket, s, this, minecraftserver.F().getPrivate()); - this.networkManager.e = 0; + // this.networkManager.e = 0; } // CraftBukkit start @@ -146,7 +151,7 @@ public class PendingConnection extends Connection { // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.a == 1) { + if (true) { // CraftBukkit start - Fix decompile issues, don't create a list from an array Object[] list = new Object[] { 1, 61, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() }; @@ -173,9 +178,11 @@ public class PendingConnection extends Connection { this.networkManager.queue(new Packet255KickDisconnect(s)); this.networkManager.d(); - if (inetaddress != null && this.server.ae() instanceof DedicatedServerConnection) { - ((DedicatedServerConnection) this.server.ae()).a(inetaddress); + // Spigot start + if (inetaddress != null) { + ((org.spigotmc.MultiplexingServerConnection) this.server.ae()).unThrottle(inetaddress); } + // Spigot end this.b = true; } catch (Exception exception) { diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java index 010e9c3..fc9170f 100644 --- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java +++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java @@ -1374,4 +1374,20 @@ public final class CraftServer implements Server { public CraftScoreboardManager getScoreboardManager() { return scoreboardManager; } + + // Spigot start + @SuppressWarnings("unchecked") + public java.util.Collection getSecondaryHosts() { + java.util.Collection ret = new java.util.HashSet(); + List listeners = configuration.getList("listeners"); + if (listeners != null) { + for (Object o : listeners) { + + Map sect = (Map) o; + ret.add(new java.net.InetSocketAddress((String) sect.get("address"), (Integer) sect.get("port"))); + } + } + return ret; + } + // Spigot end } diff --git a/src/main/java/org/spigotmc/MultiplexingServerConnection.java b/src/main/java/org/spigotmc/MultiplexingServerConnection.java new file mode 100644 index 0000000..c8ea80a --- /dev/null +++ b/src/main/java/org/spigotmc/MultiplexingServerConnection.java @@ -0,0 +1,126 @@ +package org.spigotmc; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.logging.Level; +import net.minecraft.server.DedicatedServerConnection; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; + +public class MultiplexingServerConnection extends ServerConnection { + + private static final boolean NETTY_DISABLED = Boolean.getBoolean("org.spigotmc.netty.disabled"); + private final Collection children = new HashSet(); + private final List pending = Collections.synchronizedList(new ArrayList()); + private final HashMap throttle = new HashMap(); + + public MultiplexingServerConnection(MinecraftServer ms) { + super(ms); + + // Add primary connection + start(ms.server.getIp(), ms.server.getPort()); + // Add all other connections + for (InetSocketAddress address : ms.server.getSecondaryHosts()) { + start(address.getAddress().getHostAddress(), address.getPort()); + } + } + + private void start(String ipAddress, int port) { + try { + // Calculate address, can't use isEmpty due to Java 5 + InetAddress socketAddress = (ipAddress.length() == 0) ? null : InetAddress.getByName(ipAddress); + // Say hello to the log + d().getLogger().info("Starting listener #" + children.size() + " on " + (socketAddress == null ? "*" : ipAddress) + ":" + port); + // Start connection: Netty / non Netty + ServerConnection listener = (NETTY_DISABLED) ? new DedicatedServerConnection(d(), socketAddress, port) : new org.spigotmc.netty.NettyServerConnection(d(), socketAddress, port); + // Register with other connections + children.add(listener); + // Gotta catch em all + } catch (Throwable t) { + // Just print some info to the log + t.printStackTrace(); + d().getLogger().warning("**** FAILED TO BIND TO PORT!"); + d().getLogger().warning("The exception was: {0}", t); + d().getLogger().warning("Perhaps a server is already running on that port?"); + } + } + + /** + * close. + */ + @Override + public void a() { + for (ServerConnection child : children) { + child.a(); + } + } + + /** + * Pulse. This method pulses all connections causing them to update. It is + * called from the main server thread a few times a tick. + */ + @Override + public void b() { + super.b(); // pulse PlayerConnections + for (int i = 0; i < pending.size(); ++i) { + PendingConnection connection = pending.get(i); + + try { + connection.c(); + } catch (Exception ex) { + connection.disconnect("Internal server error"); + Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex); + } + + if (connection.b) { + pending.remove(i--); + } + } + } + + /** + * Remove the user from connection throttle. This should fix the server ping + * bugs. + * + * @param address the address to remove + */ + public void unThrottle(InetAddress address) { + if (address != null) { + synchronized (throttle) { + throttle.remove(address); + } + } + } + + /** + * Add a connection to the throttle list. + * + * @param address + * @return Whether they must be disconnected + */ + public boolean throttle(InetAddress address) { + long currentTime = System.currentTimeMillis(); + synchronized (throttle) { + Long value = throttle.get(address); + if (value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle()) { + throttle.put(address, currentTime); + return true; + } + + throttle.put(address, currentTime); + } + return false; + } + + public void register(PendingConnection conn) { + pending.add(conn); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherBase.java b/src/main/java/org/spigotmc/netty/CipherBase.java new file mode 100644 index 0000000..c75a60f --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherBase.java @@ -0,0 +1,54 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; + +/** + * Class to expose an + * {@link #cipher(io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)} method to + * aid in the efficient passing of ByteBuffers through a cipher. + */ +class CipherBase +{ + + private final Cipher cipher; + private ThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + + private static class EmptyByteThreadLocal extends ThreadLocal + { + + @Override + protected byte[] initialValue() + { + return new byte[ 0 ]; + } + } + + protected CipherBase(Cipher cipher) + { + this.cipher = cipher; + } + + protected void cipher(ByteBuf in, ByteBuf out) throws ShortBufferException + { + byte[] heapIn = heapInLocal.get(); + int readableBytes = in.readableBytes(); + if ( heapIn.length < readableBytes ) + { + heapIn = new byte[ readableBytes ]; + heapInLocal.set( heapIn ); + } + in.readBytes( heapIn, 0, readableBytes ); + + byte[] heapOut = heapOutLocal.get(); + int outputSize = cipher.getOutputSize( readableBytes ); + if ( heapOut.length < outputSize ) + { + heapOut = new byte[ outputSize ]; + heapOutLocal.set( heapOut ); + } + out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherDecoder.java b/src/main/java/org/spigotmc/netty/CipherDecoder.java new file mode 100644 index 0000000..98dc3a0 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherDecoder.java @@ -0,0 +1,23 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteDecoder; +import javax.crypto.Cipher; + +class CipherDecoder extends ByteToByteDecoder +{ + + private final CipherBase cipher; + + public CipherDecoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + { + cipher.cipher( in, out ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherEncoder.java b/src/main/java/org/spigotmc/netty/CipherEncoder.java new file mode 100644 index 0000000..4ff943b --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherEncoder.java @@ -0,0 +1,23 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteEncoder; +import javax.crypto.Cipher; + +class CipherEncoder extends ByteToByteEncoder +{ + + private final CipherBase cipher; + + public CipherEncoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + { + cipher.cipher( in, out ); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..9a70a3a --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,295 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.socket.SocketChannel; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.Packet255KickDisconnect; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; +import org.spigotmc.MultiplexingServerConnection; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter implements INetworkManager +{ + + private static final ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( "Async Packet Handler - %1$d" ).build() ); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.F().getPrivate(); + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); + private final List highPriorityQueue = new AbstractList() + { + @Override + public void add(int index, Packet element) + { + // NOP + } + + @Override + public Packet get(int index) + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + private long writtenBytes; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception + { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt( (SocketChannel) channel ); + // Followed by their first handler + connection = new PendingConnection( server, this ); + // Finally register the connection + connected = true; + serverConnection.register( (PendingConnection) connection ); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + a( "disconnect.endOfStream", new Object[ 0 ] ); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a( "disconnect.genericReason", new Object[] + { + "Internal exception: " + cause + } ); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception + { + if ( connected ) + { + if ( msg instanceof Packet252KeyResponse ) + { + secret = ( (Packet252KeyResponse) msg ).a( key ); + Cipher decrypt = NettyServerConnection.getCipher( Cipher.DECRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + } + + if ( msg.a_() ) + { + threadPool.submit( new Runnable() + { + public void run() + { + Packet packet = PacketListener.callReceived( NettyNetworkManager.this, connection, msg ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + } ); + } else + { + syncPackets.add( msg ); + } + } + } + + public Socket getSocket() + { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) + { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(Packet packet) + { + // Only send if channel is still connected + if ( connected ) + { + // Process packet via handler + packet = PacketListener.callQueued( this, connection, packet ); + // If handler indicates packet send + if ( packet != null ) + { + highPriorityQueue.add( packet ); + + ChannelPromise promise; + if ( packet instanceof Packet255KickDisconnect ) + { + promise = channel.voidPromise(); + } else + { + promise = channel.newPromise(); + } + + channel.write( packet, promise ); + if ( packet instanceof Packet252KeyResponse ) + { + Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + } + } + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() + { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() + { + for ( int i = 1000; !syncPackets.isEmpty() && i >= 0; i-- ) + { + if ( connection instanceof PendingConnection ? ( (PendingConnection) connection ).b : ( (PlayerConnection) connection ).disconnected ) + { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived( this, connection, syncPackets.poll() ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if ( !connected && ( dcReason != null || dcArgs != null ) ) + { + connection.a( dcReason, dcArgs ); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() + { + return address; + } + + public void setSocketAddress(SocketAddress address) + { + this.address = address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() + { + if ( connected ) + { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() + { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) + { + if ( connected ) + { + dcReason = reason; + dcArgs = arguments; + d(); + } + } + + public long getWrittenBytes() + { + return writtenBytes; + } + + public void addWrittenBytes(int written) + { + writtenBytes += written; + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..575db8b --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,100 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Key; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.ServerConnection; +import org.spigotmc.MultiplexingServerConnection; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection +{ + + private final ChannelFuture socket; + + public NettyServerConnection(final MinecraftServer ms, InetAddress host, int port) + { + super( ms ); + int threads = Integer.getInteger( "org.spigotmc.netty.threads", 3 ); + socket = new ServerBootstrap().channel( NioServerSocketChannel.class ).childHandler( new ChannelInitializer() + { + @Override + public void initChannel(Channel ch) throws Exception + { + // Check the throttle + if ( ( (MultiplexingServerConnection) ms.ae() ).throttle( ( (InetSocketAddress) ch.remoteAddress() ).getAddress() ) ) + { + ch.close(); + return; + } + // Set IP_TOS + try + { + ch.config().setOption( ChannelOption.IP_TOS, 0x18 ); + } catch ( ChannelException ex ) + { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + NettyNetworkManager networkManager = new NettyNetworkManager(); + ch.pipeline() + .addLast( "flusher", new OutboundManager( networkManager ) ) + .addLast( "timer", new ReadTimeoutHandler( 30 ) ) + .addLast( "decoder", new PacketDecoder() ) + .addLast( "encoder", new PacketEncoder( networkManager ) ) + .addLast( "manager", networkManager ); + } + } ).childOption( ChannelOption.TCP_NODELAY, false ).group( new NioEventLoopGroup( threads, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread - %1$d" ).build() ) ).localAddress( host, port ).bind(); + MinecraftServer.getServer().getLogger().info( "Using Netty NIO with " + threads + " threads for network connections." ); + } + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() + { + socket.channel().close().syncUninterruptibly(); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param opMode the mode to initialize the cipher in + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static Cipher getCipher(int opMode, Key key) + { + try + { + Cipher cip = Cipher.getInstance( "AES/CFB8/NoPadding" ); + cip.init( opMode, key, new IvParameterSpec( key.getEncoded() ) ); + return cip; + } catch ( GeneralSecurityException ex ) + { + throw new RuntimeException( ex ); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..5da8a59 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,294 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket +{ + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) + { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) + { + return new NettySocketAdaptor( ch ); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException + { + ch.bind( bindpoint ).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException + { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException + { + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException + { + ch.config().setConnectTimeoutMillis( timeout ); + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof NettySocketAdaptor && ch.equals( ( (NettySocketAdaptor) obj ).ch ); + } + + @Override + public SocketChannel getChannel() + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public InetAddress getInetAddress() + { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getKeepAlive() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_KEEPALIVE ); + } + + @Override + public InetAddress getLocalAddress() + { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() + { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() + { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public OutputStream getOutputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public int getPort() + { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_RCVBUF ); + } + + @Override + public SocketAddress getRemoteSocketAddress() + { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_REUSEADDR ); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_SNDBUF ); + } + + @Override + public int getSoLinger() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_LINGER ); + } + + @Override + public synchronized int getSoTimeout() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getTcpNoDelay() throws SocketException + { + return ch.config().getOption( ChannelOption.TCP_NODELAY ); + } + + @Override + public int getTrafficClass() throws SocketException + { + return ch.config().getOption( ChannelOption.IP_TOS ); + } + + @Override + public int hashCode() + { + return ch.hashCode(); + } + + @Override + public boolean isBound() + { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() + { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() + { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() + { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() + { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_KEEPALIVE, on ); + } + + @Override + public void setOOBInline(boolean on) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_RCVBUF, size ); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_REUSEADDR, on ); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_SNDBUF, size ); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException + { + ch.config().setOption( ChannelOption.SO_LINGER, linger ); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.TCP_NODELAY, on ); + } + + @Override + public void setTrafficClass(int tc) throws SocketException + { + ch.config().setOption( ChannelOption.IP_TOS, tc ); + } + + @Override + public void shutdownInput() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void shutdownOutput() throws IOException + { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() + { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java new file mode 100644 index 0000000..3f4c02f --- /dev/null +++ b/src/main/java/org/spigotmc/netty/OutboundManager.java @@ -0,0 +1,29 @@ +package org.spigotmc.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOperationHandlerAdapter; +import io.netty.channel.ChannelPromise; +import net.minecraft.server.PendingConnection; + +class OutboundManager extends ChannelOperationHandlerAdapter +{ + + private static final int FLUSH_TIME = 2; + /*========================================================================*/ + private long lastFlush; + private final NettyNetworkManager manager; + + OutboundManager(NettyNetworkManager manager) + { + this.manager = manager; + } + + public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception + { + if ( promise == ctx.channel().voidPromise() || manager.connection instanceof PendingConnection || System.currentTimeMillis() - lastFlush > FLUSH_TIME ) + { + lastFlush = System.currentTimeMillis(); + ctx.flush( promise ); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..60b75d0 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,68 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.MessageBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder +{ + + private DataInputStream input; + private Packet packet; + + public PacketDecoder() + { + super( ReadState.HEADER ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf out) throws Exception + { + if ( input == null ) + { + input = new DataInputStream( new ByteBufInputStream( in ) ); + } + + while ( true ) + { + switch ( state() ) + { + case HEADER: + short packetId = in.readUnsignedByte(); + packet = Packet.a( MinecraftServer.getServer().getLogger(), packetId ); + if ( packet == null ) + { + throw new IOException( "Bad packet id " + packetId ); + } + checkpoint( ReadState.DATA ); + case DATA: + try + { + packet.a( input ); + } catch ( EOFException ex ) + { + return; + } + + checkpoint( ReadState.HEADER ); + out.add( packet ); + packet = null; + break; + default: + throw new IllegalStateException(); + } + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java new file mode 100644 index 0000000..e6a45d3 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java @@ -0,0 +1,55 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import java.io.DataOutputStream; +import net.minecraft.server.Packet; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketEncoder extends MessageToByteEncoder +{ + + private ByteBuf outBuf; + private DataOutputStream dataOut; + private final NettyNetworkManager networkManager; + + public PacketEncoder(NettyNetworkManager networkManager) + { + this.networkManager = networkManager; + } + + @Override + public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception + { + if ( outBuf == null ) + { + outBuf = ctx.alloc().buffer(); + } + if ( dataOut == null ) + { + dataOut = new DataOutputStream( new ByteBufOutputStream( outBuf ) ); + } + + out.writeByte( msg.n() ); + msg.a( dataOut ); + + networkManager.addWrittenBytes( outBuf.readableBytes() ); + out.writeBytes( outBuf ); + outBuf.discardSomeReadBytes(); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception + { + if ( outBuf != null ) + { + outBuf.release(); + outBuf = null; + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..965ba12 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,112 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener +{ + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map listeners = new HashMap(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[ 0 ]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) + { + Preconditions.checkNotNull( listener, "listener" ); + Preconditions.checkNotNull( plugin, "plugin" ); + Preconditions.checkState( !listeners.containsKey( listener ), "listener already registered" ); + + int size = listeners.size(); + Preconditions.checkState( baked.length == size ); + listeners.put( listener, plugin ); + baked = Arrays.copyOf( baked, size + 1 ); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetReceived( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing receive hook for packet", t ); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetQueued( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing queued hook for packet", t ); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..d3a9cab --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,17 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState +{ + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} diff --git a/src/main/resources/configurations/bukkit.yml b/src/main/resources/configurations/bukkit.yml index 67b4fa9..d26e644 100644 --- a/src/main/resources/configurations/bukkit.yml +++ b/src/main/resources/configurations/bukkit.yml @@ -13,6 +13,9 @@ # Bug tracker: http://leaky.bukkit.org/ +#listeners: +# - address: 127.0.0.1 +# port: 25577 settings: allow-end: true warn-on-overload: true -- 1.8.1.2