the game where you go into mines and start crafting! but for consoles (forked directly from smartcmd's github)
at main 708 lines 18 kB view raw
1#include "stdafx.h" 2#include "InputOutputStream.h" 3#include "Socket.h" 4#include "Connection.h" 5#include "ThreadName.h" 6#include "compression.h" 7#include "..\Minecraft.Client\PS3\PS3Extras\ShutdownManager.h" 8 9// This should always be enabled, except for debugging use 10#ifndef _DEBUG 11#define CONNECTION_ENABLE_TIMEOUT_DISCONNECT 1 12#endif 13 14int Connection::readThreads = 0; 15int Connection::writeThreads = 0; 16 17int Connection::readSizes[256]; 18int Connection::writeSizes[256]; 19 20 21 22void Connection::_init() 23{ 24// printf("Con:0x%x init\n",this); 25 InitializeCriticalSection(&writeLock); 26 InitializeCriticalSection(&threadCounterLock); 27 InitializeCriticalSection(&incoming_cs); 28 29 running = true; 30 quitting = false; 31 disconnected = false; 32 disconnectReason = DisconnectPacket::eDisconnect_None; 33 noInputTicks = 0; 34 estimatedRemaining = 0; 35 fakeLag = 0; 36 slowWriteDelay = 50; 37 38 saqThreadID = 0; 39 closeThreadID = 0; 40 41 tickCount = 0; 42 43} 44 45// 4J Jev, need to delete the critical section. 46Connection::~Connection() 47{ 48 // 4J Stu - Just to be sure, make sure the read and write threads terminate themselves before the connection object is destroyed 49 running = false; 50 if( dis ) dis->close(); // The input stream needs closed before the readThread, or the readThread 51 // may get stuck whilst blocking waiting on a read 52 readThread->WaitForCompletion(INFINITE); 53 writeThread->WaitForCompletion(INFINITE); 54 55 DeleteCriticalSection(&writeLock); 56 DeleteCriticalSection(&threadCounterLock); 57 DeleteCriticalSection(&incoming_cs); 58 59 delete m_hWakeReadThread; 60 delete m_hWakeWriteThread; 61 62 // These should all have been destroyed in close() but no harm in checking again 63 delete byteArrayDos; 64 byteArrayDos = NULL; 65 delete baos; 66 baos = NULL; 67 if( bufferedDos ) 68 { 69 bufferedDos->deleteChildStream(); 70 delete bufferedDos; 71 bufferedDos = NULL; 72 } 73 delete dis; 74 dis = NULL; 75} 76 77Connection::Connection(Socket *socket, const wstring& id, PacketListener *packetListener) // throws IOException 78{ 79 _init(); 80 81 this->socket = socket; 82 83 address = socket->getRemoteSocketAddress(); 84 85 this->packetListener = packetListener; 86 87 //try { 88 socket->setSoTimeout(30000); 89 socket->setTrafficClass(IPTOS_THROUGHPUT | IPTOS_LOWDELAY); 90 91 /* 4J JEV no catch 92 } catch (SocketException e) { 93 // catching this exception because it (apparently?) causes problems 94 // on OSX Tiger 95 System.err.println(e.getMessage()); 96 }*/ 97 98 dis = new DataInputStream(socket->getInputStream(packetListener->isServerPacketListener())); 99 100 sos = socket->getOutputStream(packetListener->isServerPacketListener()); 101 bufferedDos = new DataOutputStream(new BufferedOutputStream(sos, SEND_BUFFER_SIZE)); 102 baos = new ByteArrayOutputStream( SEND_BUFFER_SIZE ); 103 byteArrayDos = new DataOutputStream(baos); 104 105 m_hWakeReadThread = new C4JThread::Event; 106 m_hWakeWriteThread = new C4JThread::Event; 107 108 const char *szId = wstringtofilename(id); 109 char readThreadName[256]; 110 char writeThreadName[256]; 111 sprintf(readThreadName,"%s read\n",szId); 112 sprintf(writeThreadName,"%s write\n",szId); 113 114 readThread = new C4JThread(runRead, (void*)this, readThreadName, READ_STACK_SIZE); 115 writeThread = new C4JThread(runWrite, this, writeThreadName, WRITE_STACK_SIZE); 116 readThread->SetProcessor(CPU_CORE_CONNECTIONS); 117 writeThread->SetProcessor(CPU_CORE_CONNECTIONS ); 118#ifdef __ORBIS__ 119 readThread->SetPriority(THREAD_PRIORITY_BELOW_NORMAL); // On Orbis, this core is also used for Matching 2, and that priority of that seems to be always at default no matter what we set it to. Prioritise this below Matching 2. 120 writeThread->SetPriority(THREAD_PRIORITY_BELOW_NORMAL); // On Orbis, this core is also used for Matching 2, and that priority of that seems to be always at default no matter what we set it to. Prioritise this below Matching 2. 121#endif 122 123 readThread->Run(); 124 writeThread->Run(); 125 126 127 /* 4J JEV, java: 128 new Thread(wstring(id).append(L" read thread")) { 129 130 }; 131 132 writeThread = new Thread(id + " write thread") { 133 public void run() { 134 135 }; 136 137 readThread->start(); 138 writeThread->start(); 139 */ 140} 141 142 143void Connection::setListener(PacketListener *packetListener) 144{ 145 this->packetListener = packetListener; 146} 147 148void Connection::send(shared_ptr<Packet> packet) 149{ 150 if (quitting) return; 151 152 MemSect(15); 153 // 4J Jev, synchronized (&writeLock) 154 EnterCriticalSection(&writeLock); 155 156 estimatedRemaining += packet->getEstimatedSize() + 1; 157 if (packet->shouldDelay) 158 { 159 // 4J We have delayed it enough by putting it in the slow queue, so don't delay when we actually send it 160 packet->shouldDelay = false; 161 outgoing_slow.push(packet); 162 } 163 else 164 { 165 outgoing.push(packet); 166 } 167 168 // 4J Jev, end synchronized. 169 LeaveCriticalSection(&writeLock); 170 MemSect(0); 171} 172 173 174void Connection::queueSend(shared_ptr<Packet> packet) 175{ 176 if (quitting) return; 177 EnterCriticalSection(&writeLock); 178 estimatedRemaining += packet->getEstimatedSize() + 1; 179 outgoing_slow.push(packet); 180 LeaveCriticalSection(&writeLock); 181} 182 183bool Connection::writeTick() 184{ 185 bool didSomething = false; 186 187 // 4J Stu - If the connection is closed and the output stream has been deleted 188 if(bufferedDos==NULL || byteArrayDos==NULL) 189 return didSomething; 190 191 // try { 192 if (!outgoing.empty() && (fakeLag == 0 || System::currentTimeMillis() - outgoing.front()->createTime >= fakeLag)) 193 { 194 shared_ptr<Packet> packet; 195 196 EnterCriticalSection(&writeLock); 197 198 packet = outgoing.front(); 199 outgoing.pop(); 200 estimatedRemaining -= packet->getEstimatedSize() + 1; 201 202 LeaveCriticalSection(&writeLock); 203 204 Packet::writePacket(packet, bufferedDos); 205 206 207#ifndef _CONTENT_PACKAGE 208 // 4J Added for debugging 209 int playerId = 0; 210 if( !socket->isLocal() ) 211 { 212 Socket *socket = getSocket(); 213 if( socket ) 214 { 215 INetworkPlayer *player = socket->getPlayer(); 216 if( player ) 217 { 218 playerId = player->GetSmallId(); 219 } 220 } 221 Packet::recordOutgoingPacket(packet,playerId); 222 } 223#endif 224 225 // 4J Stu - Changed this so that rather than writing to the network stream through a buffered stream we want to: 226 // a) Only push whole "game" packets to QNet, rather than amalgamated chunks of data that may include many packets, and partial packets 227 // b) To be able to change the priority and queue of a packet if required 228 //sos->writeWithFlags( baos->buf, 0, baos->size(), 0 ); 229 //baos->reset(); 230 231 writeSizes[packet->getId()] += packet->getEstimatedSize() + 1; 232 didSomething = true; 233 } 234 235 if ((slowWriteDelay-- <= 0) && !outgoing_slow.empty() && (fakeLag == 0 || System::currentTimeMillis() - outgoing_slow.front()->createTime >= fakeLag)) 236 { 237 shared_ptr<Packet> packet; 238 239 //synchronized (writeLock) { 240 241 EnterCriticalSection(&writeLock); 242 243 packet = outgoing_slow.front(); 244 outgoing_slow.pop(); 245 estimatedRemaining -= packet->getEstimatedSize() + 1; 246 247 LeaveCriticalSection(&writeLock); 248 249 // If the shouldDelay flag is still set at this point then we want to write it to QNet as a single packet with priority flags 250 // Otherwise just buffer the packet with other outgoing packets as the java game did 251 if(packet->shouldDelay) 252 { 253 Packet::writePacket(packet, byteArrayDos); 254 255 // 4J Stu - Changed this so that rather than writing to the network stream through a buffered stream we want to: 256 // a) Only push whole "game" packets to QNet, rather than amalgamated chunks of data that may include many packets, and partial packets 257 // b) To be able to change the priority and queue of a packet if required 258#ifdef _XBOX 259 int flags = QNET_SENDDATA_LOW_PRIORITY | QNET_SENDDATA_SECONDARY; 260#else 261 int flags = NON_QNET_SENDDATA_ACK_REQUIRED; 262#endif 263 sos->writeWithFlags( baos->buf, 0, baos->size(), flags ); 264 baos->reset(); 265 } 266 else 267 { 268 Packet::writePacket(packet, bufferedDos); 269 } 270 271#ifndef _CONTENT_PACKAGE 272 // 4J Added for debugging 273 if( !socket->isLocal() ) 274 { 275 int playerId = 0; 276 if( !socket->isLocal() ) 277 { 278 Socket *socket = getSocket(); 279 if( socket ) 280 { 281 INetworkPlayer *player = socket->getPlayer(); 282 if( player ) 283 { 284 playerId = player->GetSmallId(); 285 } 286 } 287 Packet::recordOutgoingPacket(packet,playerId); 288 } 289 } 290#endif 291 292 writeSizes[packet->getId()] += packet->getEstimatedSize() + 1; 293 slowWriteDelay = 0; 294 didSomething = true; 295 } 296 /* 4J JEV, removed try/catch 297 } catch (Exception e) { 298 if (!disconnected) handleException(e); 299 return false; 300 } */ 301 302 return didSomething; 303} 304 305 306void Connection::flush() 307{ 308 // TODO 4J Stu - How to interrupt threads? Or do we need to change the multithreaded functions a bit more 309 //readThread.interrupt(); 310 //writeThread.interrupt(); 311 m_hWakeReadThread->Set(); 312 m_hWakeWriteThread->Set(); 313} 314 315 316bool Connection::readTick() 317{ 318 bool didSomething = false; 319 320 // 4J Stu - If the connection has closed and the input stream has been deleted 321 if(dis==NULL) 322 return didSomething; 323 324 //try { 325 326 shared_ptr<Packet> packet = Packet::readPacket(dis, packetListener->isServerPacketListener()); 327 328 if (packet != NULL) 329 { 330 readSizes[packet->getId()] += packet->getEstimatedSize() + 1; 331 EnterCriticalSection(&incoming_cs); 332 if(!quitting) 333 { 334 incoming.push(packet); 335 } 336 LeaveCriticalSection(&incoming_cs); 337 didSomething = true; 338 } 339 else 340 { 341// printf("Con:0x%x readTick close EOS\n",this); 342 343 // 4J Stu - Remove this line 344 // Fix for #10410 - UI: If the player is removed from a splitscreened host�s game, the next game that player joins will produce a message stating that the host has left. 345 //close(DisconnectPacket::eDisconnect_EndOfStream); 346 } 347 348 349 /* 4J JEV, removed try/catch 350 } catch (Exception e) { 351 if (!disconnected) handleException(e); 352 return false; 353 } */ 354 355 return didSomething; 356} 357 358 359/* 4J JEV, removed try/catch 360void handleException(Exception e) 361{ 362e.printStackTrace(); 363close("disconnect.genericReason", "Internal exception: " + e.toString()); 364}*/ 365 366 367void Connection::close(DisconnectPacket::eDisconnectReason reason, ...) 368{ 369// printf("Con:0x%x close\n",this); 370 if (!running) return; 371// printf("Con:0x%x close doing something\n",this); 372 disconnected = true; 373 374 va_list input; 375 va_start( input, reason ); 376 377 disconnectReason = reason;//va_arg( input, const wstring ); 378 379 vector<void *> objs = vector<void *>(); 380 void *i = NULL; 381 while (i != NULL) 382 { 383 i = va_arg( input, void* ); 384 objs.push_back(i); 385 } 386 387 if( objs.size() ) 388 { 389 disconnectReasonObjects = &objs[0]; 390 } 391 else 392 { 393 disconnectReasonObjects = NULL; 394 } 395 396 // int count = 0, sum = 0, i = first; 397 // va_list marker; 398 // 399 // va_start( marker, first ); 400 // while( i != -1 ) 401 // { 402 // sum += i; 403 // count++; 404 // i = va_arg( marker, int); 405 // } 406 // va_end( marker ); 407 // return( sum ? (sum / count) : 0 ); 408 409 410// CreateThread(NULL, 0, runClose, this, 0, &closeThreadID); 411 412 running = false; 413 414 if( dis ) dis->close(); // The input stream needs closed before the readThread, or the readThread 415 // may get stuck whilst blocking waiting on a read 416 417 // Make sure that the read & write threads are dead before we go and kill the streams that they depend on 418 readThread->WaitForCompletion(INFINITE); 419 writeThread->WaitForCompletion(INFINITE); 420 421 delete dis; 422 dis = NULL; 423 if( bufferedDos ) 424 { 425 bufferedDos->close(); 426 bufferedDos->deleteChildStream(); 427 delete bufferedDos; 428 bufferedDos = NULL; 429 } 430 if( byteArrayDos ) 431 { 432 byteArrayDos->close(); 433 delete byteArrayDos; 434 byteArrayDos = NULL; 435 } 436 if( socket ) 437 { 438 socket->close(packetListener->isServerPacketListener()); 439 socket = NULL; 440 } 441} 442 443void Connection::tick() 444{ 445 if (estimatedRemaining > 1 * 1024 * 1024) 446 { 447 close(DisconnectPacket::eDisconnect_Overflow); 448 } 449 EnterCriticalSection(&incoming_cs); 450 bool empty = incoming.empty(); 451 LeaveCriticalSection(&incoming_cs); 452 if (empty) 453 { 454#if CONNECTION_ENABLE_TIMEOUT_DISCONNECT 455 if (noInputTicks++ == MAX_TICKS_WITHOUT_INPUT) 456 { 457 close(DisconnectPacket::eDisconnect_TimeOut); 458 } 459#endif 460 } 461 // 4J Stu - Moved this a bit later in the function to stop the race condition of Disconnect packets not being processed when local client leaves 462 //else if( socket && socket->isClosing() ) 463 //{ 464 // close(DisconnectPacket::eDisconnect_Closed); 465 //} 466 else 467 { 468 noInputTicks = 0; 469 470 } 471 472 // 4J Added - Send a KeepAlivePacket every now and then to ensure that our read and write threads don't timeout 473 tickCount++; 474 if (tickCount % 20 == 0) 475 { 476 send( shared_ptr<KeepAlivePacket>( new KeepAlivePacket() ) ); 477 } 478 479 // 4J Stu - 1.8.2 changed from 100 to 1000 480 int max = 1000; 481 482 // 4J-PB - NEEDS CHANGED!!! 483 // If we can call connection.close from within a packet->handle, then we can lockup because the loop below has locked incoming_cs, and the connection.close will flag the read and write threads for the connection to close. 484 // they are running on other threads, and will try to lock incoming_cs 485 // We got this with a pre-login packet of a player who wasn't allowed to play due to parental controls, so was kicked out 486 // This has been changed to use a eAppAction_ExitPlayerPreLogin which will run in the main loop, so the connection will not be ticked at that point 487 488 489 EnterCriticalSection(&incoming_cs); 490 // 4J Stu - If disconnected, then we shouldn't process incoming packets 491 std::vector< shared_ptr<Packet> > packetsToHandle; 492 while (!disconnected && !g_NetworkManager.IsLeavingGame() && g_NetworkManager.IsInSession() && !incoming.empty() && max-- >= 0) 493 { 494 shared_ptr<Packet> packet = incoming.front(); 495 packetsToHandle.push_back(packet); 496 incoming.pop(); 497 } 498 LeaveCriticalSection(&incoming_cs); 499 500 // MGH - moved the packet handling outside of the incoming_cs block, as it was locking up sometimes when disconnecting 501 for(int i=0; i<packetsToHandle.size();i++) 502 { 503 PIXBeginNamedEvent(0,"Handling packet %d\n",packetsToHandle[i]->getId()); 504 packetsToHandle[i]->handle(packetListener); 505 PIXEndNamedEvent(); 506 } 507 flush(); 508 509 // 4J Stu - Moved this a bit later in the function to stop the race condition of Disconnect packets not being processed when local client leaves 510 if( socket && socket->isClosing() ) 511 { 512 close(DisconnectPacket::eDisconnect_Closed); 513 } 514 515 // 4J - split the following condition (used to be disconnect && iscoming.empty()) so we can wrap the access in a critical section 516 if (disconnected) 517 { 518 EnterCriticalSection(&incoming_cs); 519 bool empty = incoming.empty(); 520 LeaveCriticalSection(&incoming_cs); 521 if( empty ) 522 { 523 packetListener->onDisconnect(disconnectReason, disconnectReasonObjects); 524 disconnected = false; // 4J added - don't keep sending this every tick 525 } 526 } 527} 528 529SocketAddress *Connection::getRemoteAddress() 530{ 531 return (SocketAddress *) address; 532} 533 534void Connection::sendAndQuit() 535{ 536 if (quitting) 537 { 538 return; 539 } 540// printf("Con:0x%x send & quit\n",this); 541 flush(); 542 quitting = true; 543 // TODO 4J Stu - How to interrupt threads? Or do we need to change the multithreaded functions a bit more 544 //readThread.interrupt(); 545 546#if 1 547 // 4J - this used to be in a thread but not sure why, and is causing trouble for us if we kill the connection 548 // whilst the thread is still expecting to be able to send a packet a couple of seconds after starting it 549 if (running) 550 { 551 // 4J TODO writeThread.interrupt(); 552 close(DisconnectPacket::eDisconnect_Closed); 553 } 554#else 555 CreateThread(NULL, 0, runSendAndQuit, this, 0, &saqThreadID); 556#endif 557} 558 559int Connection::countDelayedPackets() 560{ 561 return (int)outgoing_slow.size(); 562} 563 564 565int Connection::runRead(void* lpParam) 566{ 567 ShutdownManager::HasStarted(ShutdownManager::eConnectionReadThreads); 568 Connection *con = (Connection *)lpParam; 569 570 if (con == NULL) 571 { 572#ifdef __PS3__ 573 ShutdownManager::HasFinished(ShutdownManager::eConnectionReadThreads); 574#endif 575 return 0; 576 } 577 578 Compression::UseDefaultThreadStorage(); 579 580 CRITICAL_SECTION *cs = &con->threadCounterLock; 581 582 EnterCriticalSection(cs); 583 con->readThreads++; 584 LeaveCriticalSection(cs); 585 586 //try { 587 588 MemSect(19); 589 while (con->running && !con->quitting && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads)) 590 { 591 while (con->readTick()) 592 ; 593 594 // try { 595 //Sleep(100L); 596 // TODO - 4J Stu - 1.8.2 changes these sleeps to 2L, but not sure whether we should do that as well 597 con->m_hWakeReadThread->WaitForSignal(100L); 598 } 599 MemSect(0); 600 601 /* 4J JEV, removed try/catch 602 } catch (InterruptedException e) { 603 } 604 } 605 } finally { 606 synchronized (threadCounterLock) { 607 readThreads--; 608 } 609 } */ 610 611 ShutdownManager::HasFinished(ShutdownManager::eConnectionReadThreads); 612 return 0; 613} 614 615int Connection::runWrite(void* lpParam) 616{ 617 ShutdownManager::HasStarted(ShutdownManager::eConnectionWriteThreads); 618 Connection *con = dynamic_cast<Connection *>((Connection *) lpParam); 619 620 if (con == NULL) 621 { 622 ShutdownManager::HasFinished(ShutdownManager::eConnectionWriteThreads); 623 return 0; 624 } 625 626 Compression::UseDefaultThreadStorage(); 627 628 CRITICAL_SECTION *cs = &con->threadCounterLock; 629 630 EnterCriticalSection(cs); 631 con->writeThreads++; 632 LeaveCriticalSection(cs); 633 634 // 4J Stu - Adding this to force us to run through the writeTick at least once after the event is fired 635 // Otherwise there is a race between the calling thread setting the running flag and this loop checking the condition 636 DWORD waitResult = WAIT_TIMEOUT; 637 638 while ((con->running || waitResult == WAIT_OBJECT_0 ) && ShutdownManager::ShouldRun(ShutdownManager::eConnectionWriteThreads)) 639 { 640 while (con->writeTick()) 641 ; 642 643 //Sleep(100L); 644 // TODO - 4J Stu - 1.8.2 changes these sleeps to 2L, but not sure whether we should do that as well 645 waitResult = con->m_hWakeWriteThread->WaitForSignal(100L); 646 647 if (con->bufferedDos != NULL) con->bufferedDos->flush(); 648 //if (con->byteArrayDos != NULL) con->byteArrayDos->flush(); 649 } 650 651 652 // 4J was in a finally block. 653 EnterCriticalSection(cs); 654 con->writeThreads--; 655 LeaveCriticalSection(cs); 656 657 ShutdownManager::HasFinished(ShutdownManager::eConnectionWriteThreads); 658 return 0; 659} 660 661int Connection::runClose(void* lpParam) 662{ 663 Connection *con = dynamic_cast<Connection *>((Connection *) lpParam); 664 665 if (con == NULL) return 0; 666 667 //try { 668 669 Sleep(2000); 670 if (con->running) 671 { 672 // 4J TODO writeThread.interrupt(); 673 con->close(DisconnectPacket::eDisconnect_Closed); 674 } 675 676 /* 4J Jev, removed try/catch 677 } catch (Exception e) { 678 e.printStackTrace(); 679 } */ 680 681 return 1; 682} 683 684int Connection::runSendAndQuit(void* lpParam) 685{ 686 Connection *con = dynamic_cast<Connection *>((Connection *) lpParam); 687// printf("Con:0x%x runSendAndQuit\n",con); 688 689 if (con == NULL) return 0; 690 691 //try { 692 693 Sleep(2000); 694 if (con->running) 695 { 696 // 4J TODO writeThread.interrupt(); 697 con->close(DisconnectPacket::eDisconnect_Closed); 698// printf("Con:0x%x runSendAndQuit close\n",con); 699 } 700 701// printf("Con:0x%x runSendAndQuit end\n",con); 702 /* 4J Jev, removed try/catch 703 } catch (Exception e) { 704 e.printStackTrace(); 705 } */ 706 707 return 0; 708}