From d2c3009e1ee527e5d6b990264190370ff50444b2 Mon Sep 17 00:00:00 2001 From: md_5 Date: Fri, 19 Apr 2013 17:44:39 +1000 Subject: [PATCH] Netty diff --git a/pom.xml b/pom.xml index da1a0eb..b8c24af 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,11 @@ trove4j 3.0.2 + + io.netty + netty-all + 4.0.0.CR1 + diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index bd7e41c..93cd0a9 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -34,7 +34,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer super(options); // CraftBukkit end this.l = new ConsoleLogManager("Minecraft-Server", (String) null, (String) null); // CraftBukkit - null last argument - new ThreadSleepForever(this); + // new ThreadSleepForever(this); // Spigot } protected boolean init() throws java.net.UnknownHostException { // CraftBukkit - throws UnknownHostException @@ -91,10 +91,10 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer this.getLogger().info("Generating keypair"); this.a(MinecraftEncryption.b()); - this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); + this.a((PlayerList) (new DedicatedPlayerList(this))); // CraftBukkit try { - this.r = new DedicatedServerConnection(this, inetaddress, this.G()); + this.r = new org.spigotmc.MultiplexingServerConnection(this); // Spigot } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable this.getLogger().warning("**** FAILED TO BIND TO PORT!"); this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()}); @@ -102,8 +102,6 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer return false; } - this.a((PlayerList) (new DedicatedPlayerList(this))); // CraftBukkit - if (!this.getOnlineMode()) { this.getLogger().warning("**** SERVER IS RUNNING IN OFFLINE/INSECURE MODE!"); this.getLogger().warning("The server will make no attempt to authenticate usernames. Beware."); diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java new file mode 100644 index 0000000..6fcc5d7 --- /dev/null +++ b/src/main/java/net/minecraft/server/INetworkManager.java @@ -0,0 +1,26 @@ +package net.minecraft.server; + +import java.net.SocketAddress; + +public interface INetworkManager { + + void a(Connection connection); + + void queue(Packet packet); + + void a(); + + void b(); + + SocketAddress getSocketAddress(); + + void d(); + + int e(); + + void a(String s, Object... aobject); + + java.net.Socket getSocket(); // Spigot + + void setSocketAddress(java.net.SocketAddress address); // Spigot +} diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index 1862863..5a24f2a 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -24,7 +24,7 @@ public class NetworkManager implements INetworkManager { private final Object h = new Object(); private final IConsoleLogManager i; public Socket socket; // CraftBukkit - private -> public - private final SocketAddress k; + private SocketAddress k; // Spigot - remove final private volatile DataInputStream input; private volatile DataOutputStream output; private volatile boolean n = true; @@ -369,4 +369,6 @@ public class NetworkManager implements INetworkManager { static Thread h(NetworkManager networkmanager) { return networkmanager.u; } + + public void setSocketAddress(SocketAddress address) { k = address; } // Spigot } diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index 84dbb88..617b474 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet { this.b = chunk.z; this.e = flag; ChunkMap chunkmap = a(chunk, flag, i); - Deflater deflater = new Deflater(-1); + Deflater deflater = new Deflater(4); this.d = chunkmap.c; this.c = chunkmap.b; diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java index 9f8afe3..b1d3a17 100644 --- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java +++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java @@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet { @Override protected Deflater initialValue() { // Don't use higher compression level, slows things down too much - return new Deflater(6); + return new Deflater(4); } }; // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index eb474f5..71e4739 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -17,7 +17,7 @@ public class PendingConnection extends Connection { private static Random random = new Random(); private byte[] d; private final MinecraftServer server; - public final NetworkManager networkManager; + public final INetworkManager networkManager; public boolean b = false; private int f = 0; private String g = null; @@ -27,10 +27,15 @@ public class PendingConnection extends Connection { private SecretKey k = null; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; this.networkManager = new NetworkManager(minecraftserver.getLogger(), socket, s, this, minecraftserver.F().getPrivate()); - this.networkManager.e = 0; + // this.networkManager.e = 0; } // CraftBukkit start @@ -146,7 +151,7 @@ public class PendingConnection extends Connection { // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.a == 1) { + if (true) { // CraftBukkit start - Fix decompile issues, don't create a list from an array Object[] list = new Object[] { 1, 60, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() }; @@ -173,9 +178,11 @@ public class PendingConnection extends Connection { this.networkManager.queue(new Packet255KickDisconnect(s)); this.networkManager.d(); - if (inetaddress != null && this.server.ae() instanceof DedicatedServerConnection) { - ((DedicatedServerConnection) this.server.ae()).a(inetaddress); + // Spigot start + if (inetaddress != null) { + ((org.spigotmc.MultiplexingServerConnection) this.server.ae()).throttle(inetaddress); } + // Spigot end this.b = true; } catch (Exception exception) { diff --git a/src/main/java/net/minecraft/server/ThreadCommandReader.java b/src/main/java/net/minecraft/server/ThreadCommandReader.java index 489e184..9533b6f 100644 --- a/src/main/java/net/minecraft/server/ThreadCommandReader.java +++ b/src/main/java/net/minecraft/server/ThreadCommandReader.java @@ -11,6 +11,7 @@ class ThreadCommandReader extends Thread { final DedicatedServer server; ThreadCommandReader(DedicatedServer dedicatedserver) { + super("Command Reader Thread"); // Spigot this.server = dedicatedserver; } diff --git a/src/main/java/net/minecraft/server/ThreadLoginVerifier.java b/src/main/java/net/minecraft/server/ThreadLoginVerifier.java index c185f64..abe0b81 100644 --- a/src/main/java/net/minecraft/server/ThreadLoginVerifier.java +++ b/src/main/java/net/minecraft/server/ThreadLoginVerifier.java @@ -21,6 +21,7 @@ class ThreadLoginVerifier extends Thread { CraftServer server; ThreadLoginVerifier(PendingConnection pendingconnection, CraftServer server) { + super("Login Verifier Thread"); this.server = server; // CraftBukkit end this.pendingConnection = pendingconnection; diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java index 9f2be37..c7a804b 100644 --- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java +++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java @@ -129,6 +129,7 @@ import com.avaje.ebean.config.dbplatform.SQLitePlatform; import com.avaje.ebeaninternal.server.lib.sql.TransactionIsolation; import com.google.common.collect.ImmutableList; import com.google.common.collect.MapMaker; +import java.net.InetSocketAddress; import jline.console.ConsoleReader; @@ -1373,4 +1374,20 @@ public final class CraftServer implements Server { public CraftScoreboardManager getScoreboardManager() { return scoreboardManager; } + + // Spigot start + @SuppressWarnings("unchecked") + public java.util.Collection getSecondaryHosts() { + java.util.Collection ret = new java.util.HashSet(); + List listeners = configuration.getList("listeners"); + if (listeners != null) { + for (Object o : listeners) { + + Map sect = (Map) o; + ret.add(new InetSocketAddress((String) sect.get("address"), (Integer) sect.get("port"))); + } + } + return ret; + } + // Spigot end } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java index 84dcfcc..a30f217 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java @@ -71,7 +71,7 @@ public class CraftScheduler implements BukkitScheduler { */ private final ConcurrentHashMap runners = new ConcurrentHashMap(); private volatile int currentTick = -1; - private final Executor executor = Executors.newCachedThreadPool(); + private final Executor executor = Executors.newCachedThreadPool(new com.google.common.util.concurrent.ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %1$d").build()); // Spigot private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {@Override StringBuilder debugTo(StringBuilder string) {return string;}}; private CraftAsyncDebugger debugTail = debugHead; private static final int RECENT_TICKS; diff --git a/src/main/java/org/spigotmc/MultiplexingServerConnection.java b/src/main/java/org/spigotmc/MultiplexingServerConnection.java new file mode 100644 index 0000000..7dc2533 --- /dev/null +++ b/src/main/java/org/spigotmc/MultiplexingServerConnection.java @@ -0,0 +1,126 @@ +package org.spigotmc; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.logging.Level; +import net.minecraft.server.DedicatedServerConnection; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; + +public class MultiplexingServerConnection extends ServerConnection { + + private static final boolean NETTY_DISABLED = Boolean.getBoolean("org.spigotmc.netty.disabled"); + private final Collection children = new HashSet(); + private final List pending = Collections.synchronizedList(new ArrayList()); + private final HashMap throttle = new HashMap(); + + public MultiplexingServerConnection(MinecraftServer ms) { + super(ms); + + // Add primary connection + start(ms.server.getIp(), ms.server.getPort()); + // Add all other connections + for (InetSocketAddress address : ms.server.getSecondaryHosts()) { + start(address.getHostString(), address.getPort()); + } + } + + private void start(String ipAddress, int port) { + try { + // Calculate address, can't use isEmpty due to Java 5 + InetAddress socketAddress = (ipAddress.length() == 0) ? null : InetAddress.getByName(ipAddress); + // Say hello to the log + d().getLogger().info("Starting listener #" + children.size() + " on " + (socketAddress == null ? "*" : ipAddress) + ":" + port); + // Start connection: Netty / non Netty + ServerConnection listener = (NETTY_DISABLED) ? new DedicatedServerConnection(d(), socketAddress, port) : new org.spigotmc.netty.NettyServerConnection(d(), socketAddress, port); + // Register with other connections + children.add(listener); + // Gotta catch em all + } catch (Throwable t) { + // Just print some info to the log + t.printStackTrace(); + d().getLogger().warning("**** FAILED TO BIND TO PORT!"); + d().getLogger().warning("The exception was: {0}", t); + d().getLogger().warning("Perhaps a server is already running on that port?"); + } + } + + /** + * close. + */ + @Override + public void a() { + for (ServerConnection child : children) { + child.a(); + } + } + + /** + * Pulse. This method pulses all connections causing them to update. It is + * called from the main server thread a few times a tick. + */ + @Override + public void b() { + super.b(); // pulse PlayerConnections + for (int i = 0; i < pending.size(); ++i) { + PendingConnection connection = pending.get(i); + + try { + connection.c(); + } catch (Exception ex) { + connection.disconnect("Internal server error"); + Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex); + } + + if (connection.b) { + pending.remove(i--); + } + } + } + + /** + * Remove the user from connection throttle. This should fix the server ping + * bugs. + * + * @param address the address to remove + */ + public void a(InetAddress address) { + if (address != null) { + synchronized (throttle) { + throttle.remove(address); + } + } + } + + /** + * Add a connection to the throttle list. + * + * @param address + * @return Whether they must be disconnected + */ + public boolean throttle(InetAddress address) { + long currentTime = System.currentTimeMillis(); + synchronized (throttle) { + Long value = throttle.get(address); + if (value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle()) { + throttle.put(address, currentTime); + return true; + } + + throttle.put(address, currentTime); + } + return false; + } + + public void register(PendingConnection conn) { + pending.add(conn); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java new file mode 100644 index 0000000..2dbbf6c --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherCodec.java @@ -0,0 +1,67 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteCodec; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; +import net.minecraft.server.Packet252KeyResponse; + +/** + * This class is a complete solution for encrypting and decoding bytes in a + * Netty stream. It takes two {@link Cipher} instances, used for encryption and + * decryption respectively. + */ +public class CipherCodec extends ByteToByteCodec { + + private Cipher encrypt; + private Cipher decrypt; + private Packet252KeyResponse responsePacket; + private ThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + + private static class EmptyByteThreadLocal extends ThreadLocal { + + @Override + protected byte[] initialValue() { + return new byte[0]; + } + } + + public CipherCodec(Cipher encrypt, Cipher decrypt, Packet252KeyResponse responsePacket) { + this.encrypt = encrypt; + this.decrypt = decrypt; + this.responsePacket = responsePacket; + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + ctx.channel().write(responsePacket); + } + + @Override + public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + cipher(in, out, encrypt); + } + + @Override + public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + cipher(in, out, decrypt); + } + + private void cipher(ByteBuf in, ByteBuf out, Cipher cipher) throws ShortBufferException { + byte[] heapIn = heapInLocal.get(); + int readableBytes = in.readableBytes(); + if (heapIn.length < readableBytes) { + heapIn = new byte[readableBytes]; + } + in.readBytes(heapIn, 0, readableBytes); + + byte[] heapOut = heapOutLocal.get(); + int outputSize = cipher.getOutputSize(readableBytes); + if (heapOut.length < outputSize) { + heapOut = new byte[outputSize]; + } + out.writeBytes(heapOut, 0, cipher.update(heapIn, 0, readableBytes, heapOut)); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..0e1b1fd --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,253 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; +import org.spigotmc.MultiplexingServerConnection; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter implements INetworkManager { + + private static final ExecutorService threadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Async Packet Handler - %1$d").build()); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.F().getPrivate(); + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); + private final List highPriorityQueue = new AbstractList() { + @Override + public void add(int index, Packet element) { + // NOP + } + + @Override + public Packet get(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + private Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + private long writtenBytes; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Check the throttle + if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) { + channel.close(); + } + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel); + // Followed by their first handler + connection = new PendingConnection(server, this); + // Finally register the connection + connected = true; + serverConnection.register((PendingConnection) connection); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + a("disconnect.endOfStream", new Object[0]); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a("disconnect.genericReason", new Object[]{"Internal exception: " + cause}); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception { + if (connected) { + if (msg instanceof Packet252KeyResponse) { + secret = ((Packet252KeyResponse) msg).a(key); + } + + if (msg.a_()) { + threadPool.submit(new Runnable() { + public void run() { + Packet packet = PacketListener.callReceived(NettyNetworkManager.this, connection, msg); + if (packet != null) { + packet.handle(connection); + } + } + }); + } else { + syncPackets.add(msg); + } + } + } + + public Socket getSocket() { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(Packet packet) { + // Only send if channel is still connected + if (connected) { + // Process packet via handler + packet = PacketListener.callQueued(this, connection, packet); + // If handler indicates packet send + if (packet != null) { + highPriorityQueue.add(packet); + + // If needed, check and prepare encryption phase + // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions + // Which are caused by the slow first initialization of the cipher SPI + if (packet instanceof Packet252KeyResponse) { + Cipher encrypt = NettyServerConnection.getCipher(Cipher.ENCRYPT_MODE, secret); + Cipher decrypt = NettyServerConnection.getCipher(Cipher.DECRYPT_MODE, secret); + CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet); + channel.pipeline().addBefore("decoder", "cipher", codec); + } else { + channel.write(packet); + } + } + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() { + for (int i = 1000; !syncPackets.isEmpty() && i >= 0; i--) { + if (connection instanceof PendingConnection ? ((PendingConnection) connection).b : ((PlayerConnection) connection).disconnected) { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived(this, connection, syncPackets.poll()); + if (packet != null) { + packet.handle(connection); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if (!connected && (dcReason != null || dcArgs != null)) { + connection.a(dcReason, dcArgs); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() { + return address; + } + + public void setSocketAddress(SocketAddress address) { + this.address = address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() { + if (connected) { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) { + if (connected) { + dcReason = reason; + dcArgs = arguments; + d(); + } + } + + public long getWrittenBytes() { + return writtenBytes; + } + + public void addWrittenBytes(int written) { + writtenBytes += written; + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..e5d24f7 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,90 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.security.GeneralSecurityException; +import java.security.Key; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.logging.Level; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection { + + private final ChannelFuture socket; + + + + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + super(ms); + int threads = Integer.getInteger("org.spigotmc.netty.threads", 3); + socket = new ServerBootstrap().channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel ch) throws Exception { + try { + ch.config().setOption(ChannelOption.IP_TOS, 0x18); + } catch (ChannelException ex) { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + NettyNetworkManager networkManager = new NettyNetworkManager(); + ch.pipeline() + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) + .addLast("encoder", new PacketEncoder(networkManager)) + .addLast("manager", networkManager); + } + }).group(new NioEventLoopGroup(threads, new ThreadFactoryBuilder().setNameFormat("Netty IO Thread - %1$d").build())).localAddress(host, port).bind(); + MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections."); + } + + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() { + socket.channel().close().syncUninterruptibly(); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param opMode the mode to initialize the cipher in + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static Cipher getCipher(int opMode, Key key) { + try { + Cipher cip = Cipher.getInstance("AES/CFB8/NoPadding"); + cip.init(opMode, key, new IvParameterSpec(key.getEncoded())); + return cip; + } catch (GeneralSecurityException ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..a3b86b8 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,248 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket { + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) { + return new NettySocketAdaptor(ch); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException { + ch.bind(bindpoint).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException { + ch.connect(endpoint).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + ch.config().setConnectTimeoutMillis(timeout); + ch.connect(endpoint).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof NettySocketAdaptor && ch.equals(((NettySocketAdaptor) obj).ch); + } + + @Override + public SocketChannel getChannel() { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public InetAddress getInetAddress() { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public boolean getKeepAlive() throws SocketException { + return ch.config().getOption(ChannelOption.SO_KEEPALIVE); + } + + @Override + public InetAddress getLocalAddress() { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public int getPort() { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException { + return ch.config().getOption(ChannelOption.SO_RCVBUF); + } + + @Override + public SocketAddress getRemoteSocketAddress() { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException { + return ch.config().getOption(ChannelOption.SO_REUSEADDR); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException { + return ch.config().getOption(ChannelOption.SO_SNDBUF); + } + + @Override + public int getSoLinger() throws SocketException { + return ch.config().getOption(ChannelOption.SO_LINGER); + } + + @Override + public synchronized int getSoTimeout() throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public boolean getTcpNoDelay() throws SocketException { + return ch.config().getOption(ChannelOption.TCP_NODELAY); + } + + @Override + public int getTrafficClass() throws SocketException { + return ch.config().getOption(ChannelOption.IP_TOS); + } + + @Override + public int hashCode() { + return ch.hashCode(); + } + + @Override + public boolean isBound() { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.SO_KEEPALIVE, on); + } + + @Override + public void setOOBInline(boolean on) throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException { + ch.config().setOption(ChannelOption.SO_RCVBUF, size); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.SO_REUSEADDR, on); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException { + ch.config().setOption(ChannelOption.SO_SNDBUF, size); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException { + ch.config().setOption(ChannelOption.SO_LINGER, linger); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.TCP_NODELAY, on); + } + + @Override + public void setTrafficClass(int tc) throws SocketException { + ch.config().setOption(ChannelOption.IP_TOS, tc); + } + + @Override + public void shutdownInput() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void shutdownOutput() throws IOException { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..65074d2 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,64 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder { + + private DataInputStream input; + private Packet packet; + + public PacketDecoder() { + super(ReadState.HEADER); + } + + @Override + public Packet decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + if (input == null) { + input = new DataInputStream(new ByteBufInputStream(in)); + } + + switch (state()) { + case HEADER: + short packetId = in.readUnsignedByte(); + packet = Packet.a(MinecraftServer.getServer().getLogger(), packetId); + if (packet == null) { + throw new IOException("Bad packet id " + packetId); + } + checkpoint(ReadState.DATA); + case DATA: + try { + packet.a(input); + } catch (EOFException ex) { + return null; + } + + checkpoint(ReadState.HEADER); + Packet ret = packet; + packet = null; + + return ret; + default: + throw new IllegalStateException(); + } + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + input = null; + packet = null; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java new file mode 100644 index 0000000..c8832d6 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java @@ -0,0 +1,50 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import java.io.DataOutputStream; +import net.minecraft.server.Packet; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketEncoder extends MessageToByteEncoder { + + private ByteBuf outBuf; + private DataOutputStream dataOut; + private final NettyNetworkManager networkManager; + + public PacketEncoder(NettyNetworkManager networkManager) { + this.networkManager = networkManager; + } + + @Override + public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { + if (outBuf == null) { + outBuf = ctx.alloc().directBuffer(); + } + if (dataOut == null) { + dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf)); + } + + out.writeByte(msg.n()); + msg.a(dataOut); + + networkManager.addWrittenBytes(outBuf.readableBytes()); + out.writeBytes(outBuf); + out.discardSomeReadBytes(); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeOutboundBuffer(ctx); + if (outBuf != null) { + outBuf.release(); + outBuf = null; + } + dataOut = null; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..8e3b932 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,100 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener { + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map listeners = new HashMap(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[0]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) { + Preconditions.checkNotNull(listener, "listener"); + Preconditions.checkNotNull(plugin, "plugin"); + Preconditions.checkState(!listeners.containsKey(listener), "listener already registered"); + + int size = listeners.size(); + Preconditions.checkState(baked.length == size); + listeners.put(listener, plugin); + baked = Arrays.copyOf(baked, size + 1); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) { + for (PacketListener listener : baked) { + try { + packet = listener.packetReceived(networkManager, connection, packet); + } catch (Throwable t) { + Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing receive hook for packet", t); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) { + for (PacketListener listener : baked) { + try { + packet = listener.packetQueued(networkManager, connection, packet); + } catch (Throwable t) { + Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing queued hook for packet", t); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..5dc3754 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,16 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState { + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} diff --git a/src/main/resources/configurations/bukkit.yml b/src/main/resources/configurations/bukkit.yml index a62ba24..aaf9454 100644 --- a/src/main/resources/configurations/bukkit.yml +++ b/src/main/resources/configurations/bukkit.yml @@ -12,7 +12,9 @@ # Twitter: http://twitter.com/Craftbukkit # Bug tracker: http://leaky.bukkit.org/ - +#listeners: +# - address: 127.0.0.1 +# port: 25577 settings: allow-end: true warn-on-overload: true -- 1.8.2.1