Fork of Poseidon providing Bukkit #1060 to older Beta versions (b1.0-b1.7.3)
at develop 140 lines 4.7 kB view raw
1package org.bukkit.craftbukkit; 2 3import net.minecraft.server.EntityPlayer; 4import net.minecraft.server.Packet; 5import net.minecraft.server.Packet51MapChunk; 6 7import java.util.HashMap; 8import java.util.concurrent.BlockingQueue; 9import java.util.concurrent.LinkedBlockingQueue; 10import java.util.zip.Deflater; 11 12public final class ChunkCompressionThread implements Runnable { 13 14 private static final ChunkCompressionThread instance = new ChunkCompressionThread(); 15 private static boolean isRunning = false; 16 17 private final int QUEUE_CAPACITY = 1024 * 10; 18 private final HashMap<EntityPlayer, Integer> queueSizePerPlayer = new HashMap<EntityPlayer, Integer>(); 19 private final BlockingQueue<QueuedPacket> packetQueue = new LinkedBlockingQueue<QueuedPacket>(QUEUE_CAPACITY); 20 21 private final int CHUNK_SIZE = 16 * 128 * 16 * 5 / 2; 22 private final int REDUCED_DEFLATE_THRESHOLD = CHUNK_SIZE / 4; 23 private final int DEFLATE_LEVEL_CHUNKS = 6; 24 private final int DEFLATE_LEVEL_PARTS = 1; 25 26 private final Deflater deflater = new Deflater(); 27 private byte[] deflateBuffer = new byte[CHUNK_SIZE + 100]; 28 29 public static void startThread() { 30 if (!isRunning) { 31 isRunning = true; 32 new Thread(instance).start(); 33 } 34 } 35 36 public void run() { 37 while (true) { 38 try { 39 handleQueuedPacket(packetQueue.take()); 40 } catch (InterruptedException ie) { 41 } catch (Exception e) { 42 e.printStackTrace(); 43 } 44 } 45 } 46 47 private void handleQueuedPacket(QueuedPacket queuedPacket) { 48 addToPlayerQueueSize(queuedPacket.player, -1); 49 // Compress the packet if necessary. 50 if (queuedPacket.compress) { 51 handleMapChunk(queuedPacket); 52 } 53 sendToNetworkQueue(queuedPacket); 54 } 55 56 private void handleMapChunk(QueuedPacket queuedPacket) { 57 Packet51MapChunk packet = (Packet51MapChunk) queuedPacket.packet; 58 59 // If 'packet.g' is set then this packet has already been compressed. 60 if (packet.g != null) { 61 return; 62 } 63 64 int dataSize = packet.rawData.length; 65 if (deflateBuffer.length < dataSize + 100) { 66 deflateBuffer = new byte[dataSize + 100]; 67 } 68 69 deflater.reset(); 70 deflater.setLevel(dataSize < REDUCED_DEFLATE_THRESHOLD ? DEFLATE_LEVEL_PARTS : DEFLATE_LEVEL_CHUNKS); 71 deflater.setInput(packet.rawData); 72 deflater.finish(); 73 int size = deflater.deflate(deflateBuffer); 74 if (size == 0) { 75 size = deflater.deflate(deflateBuffer); 76 } 77 78 // copy compressed data to packet 79 packet.g = new byte[size]; 80 packet.h = size; 81 System.arraycopy(deflateBuffer, 0, packet.g, 0, size); 82 } 83 84 private void sendToNetworkQueue(QueuedPacket queuedPacket) { 85 queuedPacket.player.netServerHandler.networkManager.queue(queuedPacket.packet); 86 } 87 88 public static void sendPacket(EntityPlayer player, Packet packet) { 89 if (packet instanceof Packet51MapChunk) { 90 // MapChunk Packets need compressing. 91 instance.addQueuedPacket(new QueuedPacket(player, packet, true)); 92 } else { 93 // Other Packets don't. 94 instance.addQueuedPacket(new QueuedPacket(player, packet, false)); 95 } 96 } 97 98 private void addToPlayerQueueSize(EntityPlayer player, int amount) { 99 synchronized (queueSizePerPlayer) { 100 Integer count = queueSizePerPlayer.get(player); 101 amount += (count == null) ? 0 : count; 102 if (amount == 0) { 103 queueSizePerPlayer.remove(player); 104 } else { 105 queueSizePerPlayer.put(player, amount); 106 } 107 } 108 } 109 110 public static int getPlayerQueueSize(EntityPlayer player) { 111 synchronized (instance.queueSizePerPlayer) { 112 Integer count = instance.queueSizePerPlayer.get(player); 113 return count == null ? 0 : count; 114 } 115 } 116 117 private void addQueuedPacket(QueuedPacket task) { 118 addToPlayerQueueSize(task.player, +1); 119 120 while (true) { 121 try { 122 packetQueue.put(task); 123 return; 124 } catch (InterruptedException e) { 125 } 126 } 127 } 128 129 private static class QueuedPacket { 130 final EntityPlayer player; 131 final Packet packet; 132 final boolean compress; 133 134 QueuedPacket(EntityPlayer player, Packet packet, boolean compress) { 135 this.player = player; 136 this.packet = packet; 137 this.compress = compress; 138 } 139 } 140}