diff --git a/source/glest_game/network/client_interface.cpp b/source/glest_game/network/client_interface.cpp index a066da94e..0968d7ea2 100644 --- a/source/glest_game/network/client_interface.cpp +++ b/source/glest_game/network/client_interface.cpp @@ -62,6 +62,8 @@ ClientInterface::ClientInterface() : GameNetworkInterface() { this->joinGameInProgress = false; this->joinGameInProgressLaunch = false; + this->quitThread = false; + playerIndex= -1; setGameSettingsReceived(false); gotIntro = false; @@ -78,32 +80,42 @@ ClientInterface::ClientInterface() : GameNetworkInterface() { } void ClientInterface::shutdownNetworkCommandListThread() { - MutexSafeWrapper safeMutex(networkCommandListThreadAccessor,CODE_AT_LINE); if(networkCommandListThread != NULL) { + //printf("START === shutdownNetworkCommandListThread\n"); + time_t elapsed = time(NULL); - this->quit = true; + + this->quitThread = true; networkCommandListThread->signalQuit(); + for(;networkCommandListThread->canShutdown(false) == false && difftime((long int)time(NULL),elapsed) <= 15;) { //sleep(150); } - sleep(0); + + //printf("A === shutdownNetworkCommandListThread\n"); + + //sleep(0); if(networkCommandListThread->canShutdown(true)) { delete networkCommandListThread; networkCommandListThread = NULL; } - } - Mutex *tempMutexPtr = networkCommandListThreadAccessor; - networkCommandListThreadAccessor = NULL; - safeMutex.ReleaseLock(false,true); + //printf("END === shutdownNetworkCommandListThread\n"); + } } ClientInterface::~ClientInterface() { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] destructor for %p\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,this); + //printf("START === Client destructor\n"); + + MutexSafeWrapper safeMutex(networkCommandListThreadAccessor,CODE_AT_LINE); + shutdownNetworkCommandListThread(); + //printf("A === Client destructor\n"); + if(clientSocket != NULL && clientSocket->isConnected() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); @@ -118,6 +130,8 @@ ClientInterface::~ClientInterface() { } } + //printf("B === Client destructor\n"); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); close(); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); @@ -125,6 +139,14 @@ ClientInterface::~ClientInterface() { delete clientSocket; clientSocket = NULL; + //printf("C === Client destructor\n"); + + Mutex *tempMutexPtr = networkCommandListThreadAccessor; + networkCommandListThreadAccessor = NULL; + safeMutex.ReleaseLock(false,true); + + //printf("END === Client destructor\n"); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__); } @@ -716,7 +738,7 @@ void ClientInterface::updateLobby() { void ClientInterface::updateFrame(int *checkFrame) { //printf("#1 ClientInterface::updateFrame\n"); - if(isConnected() == true && quit == false) { + if(isConnected() == true && this->quitThread == false) { //printf("#2 ClientInterface::updateFrame\n"); Chrono chrono; @@ -724,7 +746,7 @@ void ClientInterface::updateFrame(int *checkFrame) { int simulateLag = Config::getInstance().getInt("SimulateClientLag","0"); bool done= false; - while(done == false) { + while(done == false && this->quitThread == false) { //printf("BEFORE Client get networkMessageType\n"); @@ -1003,7 +1025,7 @@ void ClientInterface::simpleTask(BaseThread *callingThread) { //printf("START === Client thread ended\n"); - while(callingThread->getQuitStatus() == false && quit == false) { + while(callingThread->getQuitStatus() == false && this->quitThread == false) { updateFrame(NULL); } @@ -1016,11 +1038,11 @@ bool ClientInterface::getNetworkCommand(int frameCount, int currentCachedPending bool result = false; bool waitForData = false; - if(quit == false) { + if(quit == false && this->quitThread == false) { MutexSafeWrapper safeMutex(networkCommandListThreadAccessor,CODE_AT_LINE); safeMutex.ReleaseLock(true); - for(;quit == false;) { + for(;quit == false && this->quitThread == false;) { safeMutex.Lock(); uint64 copyCachedLastPendingFrameCount = cachedLastPendingFrameCount; if(cachedPendingCommands.find(frameCount) != cachedPendingCommands.end()) { @@ -1071,9 +1093,9 @@ void ClientInterface::updateKeyframe(int frameCount) { if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); //chrono.start(); - if(quit == false) { - bool testThreaded = Config::getInstance().getBool("ThreadedNetworkClient","true"); - //bool testThreaded = false; + if(quit == false && this->quitThread == false) { + //bool testThreaded = Config::getInstance().getBool("ThreadedNetworkClient","true"); + bool testThreaded = true; if(testThreaded == false) { updateFrame(&frameCount); Commands &frameCmdList = cachedPendingCommands[frameCount]; @@ -1521,7 +1543,7 @@ NetworkMessageType ClientInterface::waitForMessage() NetworkMessageType msg = nmtInvalid; //uint64 waitLoopCount = 0; - while(msg == nmtInvalid) { + while(msg == nmtInvalid && this->quitThread == false) { msg = getNextMessageType(); if(msg == nmtInvalid) { if(chrono.getMillis() % 250 == 0 && isConnected() == false) { diff --git a/source/glest_game/network/client_interface.h b/source/glest_game/network/client_interface.h index 8cdc11647..91590eda9 100644 --- a/source/glest_game/network/client_interface.h +++ b/source/glest_game/network/client_interface.h @@ -73,6 +73,8 @@ private: bool readyForInGameJoin; bool resumeInGameJoin; + bool quitThread; + public: ClientInterface(); virtual ~ClientInterface(); diff --git a/source/glest_game/network/server_interface.cpp b/source/glest_game/network/server_interface.cpp index 546786cdd..8777603a4 100644 --- a/source/glest_game/network/server_interface.cpp +++ b/source/glest_game/network/server_interface.cpp @@ -960,7 +960,7 @@ void ServerInterface::checkForCompletedClients(std::map & mapSlotSigna //printf("===> IN slot %d - About to checkForCompletedClients\n",i); ConnectionSlot* connectionSlot = slots[i]; - if(connectionSlot != NULL && connectionSlot->hasValidSocketId() == true && + if(connectionSlot != NULL && connectionSlot->isConnected() == true && mapSlotSignalledList[i] == true && connectionSlot->getJoinGameInProgress() == false && slotsCompleted.find(i) == slotsCompleted.end()) { diff --git a/source/shared_lib/include/platform/posix/socket.h b/source/shared_lib/include/platform/posix/socket.h index a4569710a..21a8e2db9 100644 --- a/source/shared_lib/include/platform/posix/socket.h +++ b/source/shared_lib/include/platform/posix/socket.h @@ -130,6 +130,8 @@ protected: Mutex *inSocketDestructorSynchAccessor; bool inSocketDestructor; + bool isSocketBlocking; + public: Socket(PLATFORM_SOCKET sock); Socket(); @@ -164,6 +166,7 @@ public: void setBlock(bool block); static void setBlock(bool block, PLATFORM_SOCKET socket); + bool getBlock(); bool isReadable(); bool isWritable(struct timeval *timeVal=NULL); @@ -186,6 +189,17 @@ protected: static void throwException(string str); }; +class SafeSocketBlockToggleWrapper { +protected: + Socket *socket; + bool originallyBlocked; + bool newBlocked; +public: + SafeSocketBlockToggleWrapper(Socket *socket, bool toggle); + ~SafeSocketBlockToggleWrapper(); + void Restore(); +}; + class BroadCastClientSocketThread : public BaseThread { private: diff --git a/source/shared_lib/sources/platform/posix/socket.cpp b/source/shared_lib/sources/platform/posix/socket.cpp index fe7cfcca1..952d6294a 100644 --- a/source/shared_lib/sources/platform/posix/socket.cpp +++ b/source/shared_lib/sources/platform/posix/socket.cpp @@ -795,9 +795,8 @@ Socket::Socket(PLATFORM_SOCKET sock) { dataSynchAccessorRead->setOwnerId(CODE_AT_LINE); dataSynchAccessorWrite->setOwnerId(CODE_AT_LINE); - - this->sock= sock; + this->isSocketBlocking = true; this->connectedIpAddress = ""; } @@ -822,6 +821,8 @@ Socket::Socket() { throwException("Error creating socket"); } + this->isSocketBlocking = true; + #ifdef __APPLE__ int set = 1; setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); @@ -1449,12 +1450,38 @@ int Socket::receive(void *data, int dataSize, bool tryReceiveUntilDataSizeMet) { return static_cast(bytesReceived); } +SafeSocketBlockToggleWrapper::SafeSocketBlockToggleWrapper(Socket *socket, bool toggle) { + this->socket = socket; + if(this->socket != NULL) { + this->originallyBlocked = socket->getBlock(); + this->newBlocked = toggle; + + if(this->originallyBlocked != this->newBlocked) { + socket->setBlock(this->newBlocked); + } + } +} + +void SafeSocketBlockToggleWrapper::Restore() { + if(this->socket != NULL) { + if(this->originallyBlocked != this->newBlocked) { + socket->setBlock(this->originallyBlocked); + this->newBlocked = this->originallyBlocked; + } + } +} + +SafeSocketBlockToggleWrapper::~SafeSocketBlockToggleWrapper() { + Restore(); +} + int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError) { Chrono chrono; if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) chrono.start(); const int MAX_PEEK_WAIT_SECONDS = 3; + int lastSocketError = 0; int err = 0; if(isSocketValid() == true) { //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); @@ -1472,7 +1499,19 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); if(isSocketValid() == true) { +// Chrono recvTimer(true); + SafeSocketBlockToggleWrapper safeUnblock(this, false); + errno = 0; err = recv(sock, reinterpret_cast(data), dataSize, MSG_PEEK); + lastSocketError = getLastSocketError(); + if(pLastSocketError != NULL) { + *pLastSocketError = lastSocketError; + } + safeUnblock.Restore(); + +// if(recvTimer.getMillis() > 1000 || (err <= 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN)) { +// printf("#1 PEEK err = %d lastSocketError = %d ms: %lld\n",err,lastSocketError,(long long int)recvTimer.getMillis()); +// } } safeMutex.ReleaseLock(); @@ -1485,10 +1524,6 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError } //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); - int lastSocketError = getLastSocketError(); - if(pLastSocketError != NULL) { - *pLastSocketError = lastSocketError; - } if(err < 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] ERROR PEEKING SOCKET DATA error while sending socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,err,getLastSocketErrorFormattedText().c_str()); disconnectSocket(); @@ -1500,6 +1535,7 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError time_t tStartTimer = time(NULL); while((err < 0 && lastSocketError == PLATFORM_SOCKET_TRY_AGAIN) && + isSocketValid() == true && (difftime((long int)time(NULL),tStartTimer) <= MAX_PEEK_WAIT_SECONDS)) { /* if(isConnected() == false) { @@ -1520,17 +1556,31 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError //MutexSafeWrapper safeMutex(&dataSynchAccessor,CODE_AT_LINE + "_" + intToStr(sock) + "_" + intToStr(dataSize)); MutexSafeWrapper safeMutex(dataSynchAccessorRead,CODE_AT_LINE); + +// Chrono recvTimer(true); + SafeSocketBlockToggleWrapper safeUnblock(this, false); + errno = 0; err = recv(sock, reinterpret_cast(data), dataSize, MSG_PEEK); lastSocketError = getLastSocketError(); if(pLastSocketError != NULL) { *pLastSocketError = lastSocketError; } + safeUnblock.Restore(); + +// if(recvTimer.getMillis() > 1000 || (err <= 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN)) { +// printf("#2 PEEK err = %d lastSocketError = %d ms: %lld\n",err,lastSocketError,(long long int)recvTimer.getMillis()); +// } + + //printf("Socket peek delayed checking for sock = %d err = %d\n",sock,err); safeMutex.ReleaseLock(); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during peek, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,err); } + else { + //printf("Socket peek delayed [NOT READABLE] checking for sock = %d err = %d\n",sock,err); + } } if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); @@ -1541,12 +1591,17 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); - if(err <= 0) { + if(err < 0 || (err == 0 && dataSize != 0) || + ((err == 0 || err == -1) && dataSize == 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN)) { + //printf("** #1 Socket peek error for sock = %d err = %d lastSocketError = %d\n",sock,err,lastSocketError); + int iErr = lastSocketError; if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] DISCONNECTING SOCKET for socket [%d], err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,socket,err,getLastSocketErrorFormattedText(&iErr).c_str()); //printf("Peek #3 err = %d\n",err); //lastSocketError = getLastSocketError(); if(mustGetData == true || lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) { + //printf("** #2 Socket peek error for sock = %d err = %d lastSocketError = %d mustGetData = %d\n",sock,err,lastSocketError,mustGetData); + int iErr = lastSocketError; disconnectSocket(); @@ -1557,8 +1612,26 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError return static_cast(err); } +bool Socket::getBlock() { + bool blocking = true; + + // don't waste time if the socket is invalid + if(isSocketValid(&sock) == false) { + return blocking; + } + +//#ifndef WIN32 +// int currentFlags = fcntl(sock, F_GETFL); +// blocking = !((currentFlags & O_NONBLOCK) == O_NONBLOCK); +//#else + blocking = this->isSocketBlocking; +//#endif + return blocking; +} + void Socket::setBlock(bool block){ setBlock(block,this->sock); + this->isSocketBlocking = block; } void Socket::setBlock(bool block, PLATFORM_SOCKET socket) { @@ -1691,10 +1764,12 @@ bool Socket::isConnected() { //if the socket is readable it is connected if we can read a byte from it if(isReadable()) { char tmp=0; + int peekDataBytes=1; int lastSocketError=0; - int err = peek(&tmp, 1, false, &lastSocketError); + //int err = peek(&tmp, 1, false, &lastSocketError); + int err = peek(&tmp, peekDataBytes, false, &lastSocketError); //if(err <= 0 && err != PLATFORM_SOCKET_TRY_AGAIN) { - if(err <= 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) { + if(err <= 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] ERROR Peek failed, err = %d for socket: %d, error = %s, lastSocketError = %d\n",__FILE__,__FUNCTION__,__LINE__,err,sock,getLastSocketErrorFormattedText().c_str(),lastSocketError); if(SystemFlags::VERBOSE_MODE_ENABLED) SystemFlags::OutputDebug(SystemFlags::debugError,"SOCKET DISCONNECTED In [%s::%s Line: %d] ERROR Peek failed, err = %d for socket: %d, error = %s, lastSocketError = %d\n",__FILE__,__FUNCTION__,__LINE__,err,sock,getLastSocketErrorFormattedText().c_str(),lastSocketError); return false;