Fork of Poseidon providing Bukkit #1060 to older Beta versions (b1.0-b1.7.3)
1package org.bukkit.craftbukkit;
2
3import net.minecraft.server.EntityPlayer;
4import net.minecraft.server.Packet;
5import net.minecraft.server.Packet51MapChunk;
6
7import java.util.HashMap;
8import java.util.concurrent.BlockingQueue;
9import java.util.concurrent.LinkedBlockingQueue;
10import java.util.zip.Deflater;
11
12public final class ChunkCompressionThread implements Runnable {
13
14 private static final ChunkCompressionThread instance = new ChunkCompressionThread();
15 private static boolean isRunning = false;
16
17 private final int QUEUE_CAPACITY = 1024 * 10;
18 private final HashMap<EntityPlayer, Integer> queueSizePerPlayer = new HashMap<EntityPlayer, Integer>();
19 private final BlockingQueue<QueuedPacket> packetQueue = new LinkedBlockingQueue<QueuedPacket>(QUEUE_CAPACITY);
20
21 private final int CHUNK_SIZE = 16 * 128 * 16 * 5 / 2;
22 private final int REDUCED_DEFLATE_THRESHOLD = CHUNK_SIZE / 4;
23 private final int DEFLATE_LEVEL_CHUNKS = 6;
24 private final int DEFLATE_LEVEL_PARTS = 1;
25
26 private final Deflater deflater = new Deflater();
27 private byte[] deflateBuffer = new byte[CHUNK_SIZE + 100];
28
29 public static void startThread() {
30 if (!isRunning) {
31 isRunning = true;
32 new Thread(instance).start();
33 }
34 }
35
36 public void run() {
37 while (true) {
38 try {
39 handleQueuedPacket(packetQueue.take());
40 } catch (InterruptedException ie) {
41 } catch (Exception e) {
42 e.printStackTrace();
43 }
44 }
45 }
46
47 private void handleQueuedPacket(QueuedPacket queuedPacket) {
48 addToPlayerQueueSize(queuedPacket.player, -1);
49 // Compress the packet if necessary.
50 if (queuedPacket.compress) {
51 handleMapChunk(queuedPacket);
52 }
53 sendToNetworkQueue(queuedPacket);
54 }
55
56 private void handleMapChunk(QueuedPacket queuedPacket) {
57 Packet51MapChunk packet = (Packet51MapChunk) queuedPacket.packet;
58
59 // If 'packet.g' is set then this packet has already been compressed.
60 if (packet.g != null) {
61 return;
62 }
63
64 int dataSize = packet.rawData.length;
65 if (deflateBuffer.length < dataSize + 100) {
66 deflateBuffer = new byte[dataSize + 100];
67 }
68
69 deflater.reset();
70 deflater.setLevel(dataSize < REDUCED_DEFLATE_THRESHOLD ? DEFLATE_LEVEL_PARTS : DEFLATE_LEVEL_CHUNKS);
71 deflater.setInput(packet.rawData);
72 deflater.finish();
73 int size = deflater.deflate(deflateBuffer);
74 if (size == 0) {
75 size = deflater.deflate(deflateBuffer);
76 }
77
78 // copy compressed data to packet
79 packet.g = new byte[size];
80 packet.h = size;
81 System.arraycopy(deflateBuffer, 0, packet.g, 0, size);
82 }
83
84 private void sendToNetworkQueue(QueuedPacket queuedPacket) {
85 queuedPacket.player.netServerHandler.networkManager.queue(queuedPacket.packet);
86 }
87
88 public static void sendPacket(EntityPlayer player, Packet packet) {
89 if (packet instanceof Packet51MapChunk) {
90 // MapChunk Packets need compressing.
91 instance.addQueuedPacket(new QueuedPacket(player, packet, true));
92 } else {
93 // Other Packets don't.
94 instance.addQueuedPacket(new QueuedPacket(player, packet, false));
95 }
96 }
97
98 private void addToPlayerQueueSize(EntityPlayer player, int amount) {
99 synchronized (queueSizePerPlayer) {
100 Integer count = queueSizePerPlayer.get(player);
101 amount += (count == null) ? 0 : count;
102 if (amount == 0) {
103 queueSizePerPlayer.remove(player);
104 } else {
105 queueSizePerPlayer.put(player, amount);
106 }
107 }
108 }
109
110 public static int getPlayerQueueSize(EntityPlayer player) {
111 synchronized (instance.queueSizePerPlayer) {
112 Integer count = instance.queueSizePerPlayer.get(player);
113 return count == null ? 0 : count;
114 }
115 }
116
117 private void addQueuedPacket(QueuedPacket task) {
118 addToPlayerQueueSize(task.player, +1);
119
120 while (true) {
121 try {
122 packetQueue.put(task);
123 return;
124 } catch (InterruptedException e) {
125 }
126 }
127 }
128
129 private static class QueuedPacket {
130 final EntityPlayer player;
131 final Packet packet;
132 final boolean compress;
133
134 QueuedPacket(EntityPlayer player, Packet packet, boolean compress) {
135 this.player = player;
136 this.packet = packet;
137 this.compress = compress;
138 }
139 }
140}