diff --git a/source/glest_game/network/connection_slot.cpp b/source/glest_game/network/connection_slot.cpp index 23e9661ac..d13dbee7d 100644 --- a/source/glest_game/network/connection_slot.cpp +++ b/source/glest_game/network/connection_slot.cpp @@ -32,16 +32,18 @@ namespace Glest{ namespace Game{ // class ConnectionSlotThread // ===================================================== -ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() { - this->slotIndex = slotIndex; +ConnectionSlotThread::ConnectionSlotThread(ConnectionSlot *slot) : BaseThread() { + this->slot = slot; + this->slotIndex = slot->getPlayerIndex(); this->slotInterface = NULL; //this->event = NULL; eventList.clear(); eventList.reserve(100); } -ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() { - this->slotIndex = slotIndex; +ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,ConnectionSlot *slot) : BaseThread() { + this->slot = slot; + this->slotIndex = slot->getPlayerIndex(); this->slotInterface = slotInterface; //this->event = NULL; eventList.clear(); @@ -203,10 +205,12 @@ void ConnectionSlotThread::execute() { //this->slotInterface->slotUpdateTask(&eventCopy); this->slotUpdateTask(&eventCopy); setTaskCompleted(eventCopy.eventId); + this->slot->signalSlotWorkerTaskCompleted(); } } else { safeMutex.ReleaseLock(); + this->slot->signalSlotWorkerTaskCompleted(); } if(getQuitStatus() == true) { @@ -219,12 +223,14 @@ void ConnectionSlotThread::execute() { } catch(const exception &ex) { //setRunningStatus(false); + this->slot->signalSlotWorkerTaskCompleted(); SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",__FILE__,__FUNCTION__,__LINE__,ex.what()); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); throw runtime_error(ex.what()); } + this->slot->signalSlotWorkerTaskCompleted(); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] Line: %d\n",__FILE__,__FUNCTION__,__LINE__); } @@ -250,7 +256,7 @@ ConnectionSlot::ConnectionSlot(ServerInterface* serverInterface, int playerIndex this->setSocket(NULL); this->slotThreadWorker = NULL; - this->slotThreadWorker = new ConnectionSlotThread(this->serverInterface,playerIndex); + this->slotThreadWorker = new ConnectionSlotThread(this->serverInterface,this); this->slotThreadWorker->setUniqueID(__FILE__); this->slotThreadWorker->start(); @@ -286,6 +292,15 @@ ConnectionSlot::~ConnectionSlot() { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END\n",__FILE__,__FUNCTION__); } +void ConnectionSlot::signalSlotWorkerTaskCompleted() { + semSlotWorkerTaskCompleted.signal(); +} + +bool ConnectionSlot::waitSlotWorkerTaskCompleted(int waitMilliseconds) { + int result = semSlotWorkerTaskCompleted.waitTillSignalled(waitMilliseconds); + return (result == 0); +} + void ConnectionSlot::updateSlot(ConnectionSlotEvent *event) { Chrono chrono; if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); @@ -546,11 +561,13 @@ void ConnectionSlot::update(bool checkForNewClients,int lockedSlotIndex) { lastReceiveCommandListTime = time(NULL); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] currentFrameCount = %d\n",__FILE__,__FUNCTION__,__LINE__,currentFrameCount); - MutexSafeWrapper safeMutexSlot(&mutexPendingNetworkCommandList,CODE_AT_LINE); - for(int i = 0; i < networkMessageCommandList.getCommandCount(); ++i) { - vctPendingNetworkCommandList.push_back(*networkMessageCommandList.getCommand(i)); + if(networkMessageCommandList.getCommandCount() > 0) { + MutexSafeWrapper safeMutexSlot(&mutexPendingNetworkCommandList,CODE_AT_LINE); + for(int i = 0; i < networkMessageCommandList.getCommandCount(); ++i) { + vctPendingNetworkCommandList.push_back(*networkMessageCommandList.getCommand(i)); + } + safeMutexSlot.ReleaseLock(); } - safeMutexSlot.ReleaseLock(); } else { if(SystemFlags::getSystemSettingType(SystemFlags::debugError).enabled) SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d]\nInvalid message type before intro handshake [%d]\nDisconnecting socket for slot: %d [%s].\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,networkMessageType,this->playerIndex,this->getIpAddress().c_str()); @@ -1067,7 +1084,7 @@ bool ConnectionSlot::updateCompleted(ConnectionSlotEvent *event) { } void ConnectionSlot::sendMessage(const NetworkMessage* networkMessage) { - MutexSafeWrapper safeMutex(&socketSynchAccessor,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&socketSynchAccessor,CODE_AT_LINE); // Skip text messages not intended for the players preferred language const NetworkMessageText *textMsg = dynamic_cast(networkMessage); @@ -1109,72 +1126,90 @@ void ConnectionSlot::clearPendingNetworkCommandList() { safeMutexSlot.ReleaseLock(); } -bool ConnectionSlot::hasValidSocketId() { - //bool result = (this->getSocket() != NULL && this->getSocket()->getSocketId() > 0); - //return result; - bool result = false; - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); - if(socket != NULL && socket->getSocketId() > 0) { - result = true; - } - return result; - -} +//bool ConnectionSlot::hasValidSocketId() { +// //bool result = (this->getSocket() != NULL && this->getSocket()->getSocketId() > 0); +// //return result; +// bool result = false; +// MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); +// if(socket != NULL && socket->getSocketId() > 0) { +// result = true; +// } +// return result; +// +//} bool ConnectionSlot::isConnected() { bool result = false; - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + //MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,true,CODE_AT_LINE); if(socket != NULL && socket->isConnected() == true) { result = true; } + //socketRWLMutex.unlockRead(); return result; } PLATFORM_SOCKET ConnectionSlot::getSocketId() { PLATFORM_SOCKET result = 0; - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + //MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,true,CODE_AT_LINE); if(socket != NULL) { result = socket->getSocketId(); } + //socketRWLMutex.unlockRead(); return result; } pair ConnectionSlot::getSocketInfo() { pair result; - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + //MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,true,CODE_AT_LINE); result.first = (socket != NULL && socket->isConnected()); result.second = socket; + //socketRWLMutex.unlockRead(); return result; } Socket* ConnectionSlot::getSocket(bool mutexLock) { - MutexSafeWrapper safeMutexSlot(NULL,CODE_AT_LINE); + Socket *result = NULL; + //MutexSafeWrapper safeMutexSlot(NULL,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,true,CODE_AT_LINE); if(mutexLock == true) { - safeMutexSlot.setMutex(&mutexSocket,CODE_AT_LINE); + //safeMutexSlot.setMutex(&mutexSocket,CODE_AT_LINE); } - return socket; + result = socket; + //socketRWLMutex.unlockRead(); + + return result; } void ConnectionSlot::setSocket(Socket *newSocket) { - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + //MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,false,CODE_AT_LINE); socket = newSocket; + //socketRWLMutex.unlockWrite(); } void ConnectionSlot::deleteSocket() { - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + //MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,false,CODE_AT_LINE); delete socket; socket = NULL; + //socketRWLMutex.unlockWrite(); } bool ConnectionSlot::hasDataToRead() { bool result = false; - MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + //MutexSafeWrapper safeMutexSlot(&mutexSocket,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeRWL(&socketRWLMutex,true,CODE_AT_LINE); if(socket != NULL && socket->hasDataToRead() == true) { result = true; } + //socketRWLMutex.unlockRead(); + return result; } diff --git a/source/glest_game/network/connection_slot.h b/source/glest_game/network/connection_slot.h index dfebcf820..0d641a5aa 100644 --- a/source/glest_game/network/connection_slot.h +++ b/source/glest_game/network/connection_slot.h @@ -78,6 +78,7 @@ protected: Mutex triggerIdMutex; vector eventList; int slotIndex; + ConnectionSlot *slot; virtual void setQuitStatus(bool value); virtual void setTaskCompleted(int eventId); @@ -86,8 +87,8 @@ protected: void slotUpdateTask(ConnectionSlotEvent *event); public: - ConnectionSlotThread(int slotIndex); - ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex); + ConnectionSlotThread(ConnectionSlot *slot); + ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,ConnectionSlot *slot); virtual void execute(); void signalUpdate(ConnectionSlotEvent *event); bool isSignalCompleted(ConnectionSlotEvent *event); @@ -106,7 +107,9 @@ class ConnectionSlot: public NetworkInterface { private: ServerInterface* serverInterface; - Mutex mutexSocket; + Semaphore semSlotWorkerTaskCompleted; + + //Mutex mutexSocket; Socket* socket; int playerIndex; string name; @@ -131,10 +134,15 @@ private: int playerStatus; string playerLanguage; + ReadWriteMutex socketRWLMutex; + public: ConnectionSlot(ServerInterface* serverInterface, int playerIndex); ~ConnectionSlot(); + void signalSlotWorkerTaskCompleted(); + bool waitSlotWorkerTaskCompleted(int waitMilliseconds=-1); + void update(bool checkForNewClients,int lockedSlotIndex); void setPlayerIndex(int value) { playerIndex = value; } int getPlayerIndex() const {return playerIndex;} @@ -155,7 +163,7 @@ public: bool getReceivedNetworkGameStatus() const { return receivedNetworkGameStatus; } void setReceivedNetworkGameStatus(bool value) { receivedNetworkGameStatus = value; } - bool hasValidSocketId(); + //bool hasValidSocketId(); virtual bool getConnectHasHandshaked() const { return gotIntro; } std::vector getThreadErrorList() const { return threadErrorList; } void clearThreadErrorList() { threadErrorList.clear(); } diff --git a/source/glest_game/network/server_interface.cpp b/source/glest_game/network/server_interface.cpp index 6cea747cb..55e929869 100644 --- a/source/glest_game/network/server_interface.cpp +++ b/source/glest_game/network/server_interface.cpp @@ -882,6 +882,53 @@ void ServerInterface::checkForLaggingClients(std::map &mapSlotSignalle } } +void ServerInterface::checkForLaggingClients(std::vector &errorMsgList) { + bool lastGlobalLagCheckTimeUpdate = false; + time_t waitForClientsElapsed = time(NULL); + time_t waitForThreadElapsed = time(NULL); + std::map slotsWarnedList; + // Examine all threads for completion of delegation + for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { + MutexSafeWrapper safeMutexSlot(&slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + ConnectionSlot* connectionSlot = slots[i]; + + if(connectionSlot != NULL && connectionSlot->isConnected() == true) { + try { + if(gameHasBeenInitiated == true && + difftime(time(NULL),lastGlobalLagCheckTime) >= LAG_CHECK_GRACE_PERIOD) { + + //printf("\n\n\n^^^^^^^^^^^^^^ PART A\n\n\n"); + + // New lag check + std::pair clientLagExceededOrWarned = std::make_pair(false,false); + if( gameHasBeenInitiated == true && connectionSlot != NULL && + connectionSlot->isConnected() == true) { + //printf("\n\n\n^^^^^^^^^^^^^^ PART B\n\n\n"); + + lastGlobalLagCheckTimeUpdate = true; + clientLagExceededOrWarned = clientLagCheck(connectionSlot,slotsWarnedList[i]); + + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] clientLagExceededOrWarned.first = %d, clientLagExceededOrWarned.second = %d, gameSettings.getNetworkPauseGameForLaggedClients() = %d\n",__FILE__,__FUNCTION__,__LINE__,clientLagExceededOrWarned.first,clientLagExceededOrWarned.second,gameSettings.getNetworkPauseGameForLaggedClients()); + + if(clientLagExceededOrWarned.first == true) { + slotsWarnedList[i] = true; + } + } + } + } + catch(const exception &ex) { + SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] Error [%s]\n",__FILE__,__FUNCTION__,__LINE__,ex.what()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] error detected [%s]\n",__FILE__,__FUNCTION__,__LINE__,ex.what()); + errorMsgList.push_back(ex.what()); + } + } + } + + if(lastGlobalLagCheckTimeUpdate == true) { + lastGlobalLagCheckTime = time(NULL); + } +} + void ServerInterface::executeNetworkCommandsFromClients() { if(gameHasBeenInitiated == true) { for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { @@ -948,6 +995,36 @@ void ServerInterface::dispatchPendingChatMessages(std::vector &errorMsg } } +void ServerInterface::waitForSignalledClients(std::map &mapSlotSignalledList,std::vector &errorMsgList) { + for(int i= 0; exitServer == false && i < GameConstants::maxPlayers; ++i) { + if(mapSlotSignalledList[i] == true) { + MutexSafeWrapper safeMutexSlot(&slotAccessorMutexes[i],CODE_AT_LINE_X(i)); + ConnectionSlot* connectionSlot= slots[i]; + safeMutexSlot.ReleaseLock(true); + if(connectionSlot != NULL) { + connectionSlot->waitSlotWorkerTaskCompleted(4000); + + safeMutexSlot.Lock(); + connectionSlot= slots[i]; + if(connectionSlot != NULL) { + std::vector errorList = connectionSlot->getThreadErrorList(); + // Collect any collected errors from threads + if(errorList.empty() == false) { + for(int iErrIdx = 0; iErrIdx < errorList.size(); ++iErrIdx) { + string &sErr = errorList[iErrIdx]; + if(sErr != "") { + errorMsgList.push_back(sErr); + } + } + connectionSlot->clearThreadErrorList(); + } + } + safeMutexSlot.ReleaseLock(); + } + } + } +} + void ServerInterface::update() { Chrono chrono; if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start(); @@ -998,14 +1075,18 @@ void ServerInterface::update() { if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",__FILE__,__FUNCTION__,__LINE__,chrono.getMillis()); + waitForSignalledClients(mapSlotSignalledList,errorMsgList); + +/* // Step #2 check all connection slot worker threads for completed status checkForCompletedClients(mapSlotSignalledList,errorMsgList, eventList); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #3\n",__FILE__,__FUNCTION__,__LINE__); if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",__FILE__,__FUNCTION__,__LINE__,chrono.getMillis()); - +*/ // Step #3 check clients for any lagging scenarios and try to deal with them - checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList); + //checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList); + checkForLaggingClients(errorMsgList); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ============ Step #4\n",__FILE__,__FUNCTION__,__LINE__); if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",__FILE__,__FUNCTION__,__LINE__,chrono.getMillis()); @@ -1027,9 +1108,9 @@ void ServerInterface::update() { //printf("\nServerInterface::update -- E1\n"); //std::map eventList; - std::map mapSlotSignalledList; - - checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList); + //std::map mapSlotSignalledList; + //checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList); + checkForLaggingClients(errorMsgList); } if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled && chrono.getMillis() > 0) SystemFlags::OutputDebug(SystemFlags::debugPerformance,"In [%s::%s Line: %d] took %lld msecs\n",__FILE__,__FUNCTION__,__LINE__,chrono.getMillis()); @@ -1038,10 +1119,10 @@ void ServerInterface::update() { difftime(time(NULL),lastGlobalLagCheckTime) >= LAG_CHECK_GRACE_PERIOD) { //printf("\nServerInterface::update -- F\n"); - std::map eventList; - std::map mapSlotSignalledList; - - checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList); + //std::map eventList; + //std::map mapSlotSignalledList; + //checkForLaggingClients(mapSlotSignalledList, eventList, socketTriggeredList,errorMsgList); + checkForLaggingClients(errorMsgList); } //printf("\nServerInterface::update -- G\n"); diff --git a/source/glest_game/network/server_interface.h b/source/glest_game/network/server_interface.h index b79a2924a..e2db82910 100644 --- a/source/glest_game/network/server_interface.h +++ b/source/glest_game/network/server_interface.h @@ -209,6 +209,9 @@ protected: void checkForLaggingClients(std::map &mapSlotSignalledList, std::map &eventList, std::map &socketTriggeredList,std::vector &errorMsgList); void executeNetworkCommandsFromClients(); void dispatchPendingChatMessages(std::vector &errorMsgList); + + void waitForSignalledClients(std::map &mapSlotSignalledList,std::vector &errorMsgList); + void checkForLaggingClients(std::vector &errorMsgList); }; }}//end namespace diff --git a/source/shared_lib/include/platform/posix/socket.h b/source/shared_lib/include/platform/posix/socket.h index 3a6f58c92..658d5a69b 100644 --- a/source/shared_lib/include/platform/posix/socket.h +++ b/source/shared_lib/include/platform/posix/socket.h @@ -116,13 +116,14 @@ protected: //SimpleTaskThread *pingThread; std::map pingCache; time_t lastThreadedPing; - Mutex pingThreadAccessor; + //Mutex pingThreadAccessor; - Mutex dataSynchAccessorRead; - Mutex dataSynchAccessorWrite; + //Mutex dataSynchAccessorRead; + //Mutex dataSynchAccessorWrite; + ReadWriteMutex dataSynchAccessorRWLMutex; - Mutex inSocketDestructorSynchAccessor; - bool inSocketDestructor; + ReadWriteMutex inSocketDestructorSynchAccessor; + //bool inSocketDestructor; public: Socket(PLATFORM_SOCKET sock); diff --git a/source/shared_lib/include/platform/sdl/thread.h b/source/shared_lib/include/platform/sdl/thread.h index e611cac57..284863d64 100644 --- a/source/shared_lib/include/platform/sdl/thread.h +++ b/source/shared_lib/include/platform/sdl/thread.h @@ -24,6 +24,7 @@ //#include "util.h" #include +#include "types.h" #include "leak_dumper.h" // ===================================================== @@ -187,9 +188,131 @@ public: Semaphore(Uint32 initialValue = 0); ~Semaphore(); void signal(); - int waitTillSignalled(); + int waitTillSignalled(int waitMilliseconds=-1); + + uint32 getSemValue(); }; + +class ReadWriteMutex +{ +public: + ReadWriteMutex(int maxReaders = 32); + + void LockRead(); + void UnLockRead(); + + void LockWrite(); + void UnLockWrite(); + + int maxReaders(); + void setOwnerId(string ownerId) { this->ownerId = ownerId; } + +private: + Semaphore semaphore; + Mutex mutex; + int maxReadersCount; + + string ownerId; +}; + + +class ReadWriteMutexSafeWrapper { +protected: + ReadWriteMutex *mutex; + string ownerId; + bool isReadLock; + +#ifdef DEBUG_PERFORMANCE_MUTEXES + Chrono chrono; +#endif + +public: + + ReadWriteMutexSafeWrapper(ReadWriteMutex *mutex,bool isReadLock=true, string ownerId="") { + this->mutex = mutex; + this->isReadLock = isReadLock; + this->ownerId = ownerId; + Lock(); + } + ~ReadWriteMutexSafeWrapper() { + ReleaseLock(); + } + + void setReadWriteMutex(ReadWriteMutex *mutex,bool isReadLock=true,string ownerId="") { + this->mutex = mutex; + this->isReadLock = isReadLock; + this->ownerId = ownerId; + Lock(); + } + bool isValidReadWriteMutex() const { + return(this->mutex != NULL); + } + + void Lock() { + if(this->mutex != NULL) { + #ifdef DEBUG_MUTEXES + if(ownerId != "") { + printf("Locking Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount()); + } + #endif + +#ifdef DEBUG_PERFORMANCE_MUTEXES + chrono.start(); +#endif + + if(this->isReadLock == true) { + this->mutex->LockRead(); + } + else { + this->mutex->LockWrite(); + } + +#ifdef DEBUG_PERFORMANCE_MUTEXES + if(chrono.getMillis() > 5) printf("In [%s::%s Line: %d] MUTEX LOCK took msecs: %lld, this->mutex->getRefCount() = %d ownerId [%s]\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),this->mutex->getRefCount(),ownerId.c_str()); + chrono.start(); +#endif + + #ifdef DEBUG_MUTEXES + if(ownerId != "") { + printf("Locked Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount()); + } + #endif + } + } + void ReleaseLock(bool keepMutex=false) { + if(this->mutex != NULL) { + #ifdef DEBUG_MUTEXES + if(ownerId != "") { + printf("UnLocking Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount()); + } + #endif + + if(this->isReadLock == true) { + this->mutex->UnLockRead(); + } + else { + this->mutex->UnLockWrite(); + } + +#ifdef DEBUG_PERFORMANCE_MUTEXES + if(chrono.getMillis() > 100) printf("In [%s::%s Line: %d] MUTEX UNLOCKED and held locked for msecs: %lld, this->mutex->getRefCount() = %d ownerId [%s]\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),this->mutex->getRefCount(),ownerId.c_str()); +#endif + + #ifdef DEBUG_MUTEXES + if(ownerId != "") { + printf("UnLocked Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount()); + } + #endif + + if(keepMutex == false) { + this->mutex = NULL; + } + } + } +}; + + }}//end namespace #endif diff --git a/source/shared_lib/sources/platform/posix/socket.cpp b/source/shared_lib/sources/platform/posix/socket.cpp index d365ff3f5..1133ee02e 100644 --- a/source/shared_lib/sources/platform/posix/socket.cpp +++ b/source/shared_lib/sources/platform/posix/socket.cpp @@ -780,26 +780,25 @@ bool Socket::isSocketValid(const PLATFORM_SOCKET *validateSocket) { } Socket::Socket(PLATFORM_SOCKET sock) { - MutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,true,CODE_AT_LINE); inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); - this->inSocketDestructor = false; + //this->inSocketDestructor = false; //safeMutexSocketDestructorFlag.ReleaseLock(); //this->pingThread = NULL; - pingThreadAccessor.setOwnerId(CODE_AT_LINE); - dataSynchAccessorRead.setOwnerId(CODE_AT_LINE); - dataSynchAccessorWrite.setOwnerId(CODE_AT_LINE); - - + //pingThreadAccessor.setOwnerId(CODE_AT_LINE); + //dataSynchAccessorRead.setOwnerId(CODE_AT_LINE); + //dataSynchAccessorWrite.setOwnerId(CODE_AT_LINE); + dataSynchAccessorRWLMutex.setOwnerId(CODE_AT_LINE); this->sock= sock; this->connectedIpAddress = ""; } Socket::Socket() { - MutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,true,CODE_AT_LINE); inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); - this->inSocketDestructor = false; + //this->inSocketDestructor = false; //safeMutexSocketDestructorFlag.ReleaseLock(); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); @@ -865,13 +864,13 @@ void Socket::simpleTask(BaseThread *callingThread) { */ Socket::~Socket() { - MutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,CODE_AT_LINE); - if(this->inSocketDestructor == true) { - SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] this->inSocketDestructor == true\n",__FILE__,__FUNCTION__,__LINE__); - return; - } + ReadWriteMutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,false,CODE_AT_LINE); + //if(this->inSocketDestructor == true) { + // SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] this->inSocketDestructor == true\n",__FILE__,__FUNCTION__,__LINE__); + // return; + //} inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); - this->inSocketDestructor = true; + //this->inSocketDestructor = true; if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] START closing socket = %d...\n",__FILE__,__FUNCTION__,sock); @@ -880,12 +879,12 @@ Socket::~Socket() { disconnectSocket(); // Allow other callers with a lock on the mutexes to let them go - for(time_t elapsed = time(NULL); - (dataSynchAccessorRead.getRefCount() > 0 || - dataSynchAccessorWrite.getRefCount() > 0) && - difftime(time(NULL),elapsed) <= 5;) { + //for(time_t elapsed = time(NULL); + // (dataSynchAccessorRead.getRefCount() > 0 || + // dataSynchAccessorWrite.getRefCount() > 0) && + // difftime(time(NULL),elapsed) <= 5;) { //sleep(0); - } + //} if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock); @@ -900,8 +899,9 @@ void Socket::disconnectSocket() { if(isSocketValid() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] calling shutdown and close for socket = %d...\n",__FILE__,__FUNCTION__,sock); - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); - MutexSafeWrapper safeMutex1(&dataSynchAccessorWrite,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex1(&dataSynchAccessorWrite,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE); ::shutdown(sock,2); #ifndef WIN32 ::close(sock); @@ -910,8 +910,8 @@ void Socket::disconnectSocket() { ::closesocket(sock); sock = -1; #endif - safeMutex.ReleaseLock(); - safeMutex1.ReleaseLock(); + //safeMutex.ReleaseLock(); + //safeMutex1.ReleaseLock(); } if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock); @@ -1142,7 +1142,8 @@ int Socket::send(const void *data, int dataSize) { // inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); // safeMutexSocketDestructorFlag.ReleaseLock(); - MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE); #ifdef __APPLE__ bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE); @@ -1159,7 +1160,7 @@ int Socket::send(const void *data, int dataSize) { if(bytesSent < 0 && getLastSocketError() != PLATFORM_SOCKET_TRY_AGAIN) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ERROR WRITING SOCKET DATA, err = %d error = %s\n",__FILE__,__FUNCTION__,__LINE__,bytesSent,getLastSocketErrorFormattedText().c_str()); } - else if(isConnected() == true && bytesSent < 0 && getLastSocketError() == PLATFORM_SOCKET_TRY_AGAIN) { + else if(bytesSent < 0 && getLastSocketError() == PLATFORM_SOCKET_TRY_AGAIN && isConnected() == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #1 EAGAIN during send, trying again...\n",__FILE__,__FUNCTION__,__LINE__); int attemptCount = 0; @@ -1180,7 +1181,8 @@ int Socket::send(const void *data, int dataSize) { // inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); // safeMutexSocketDestructorFlag.ReleaseLock(); - MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE); #ifdef __APPLE__ bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE); #else @@ -1229,7 +1231,8 @@ int Socket::send(const void *data, int dataSize) { // inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); // safeMutexSocketDestructorFlag.ReleaseLock(); - MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE); const char *sendBuf = (const char *)data; #ifdef __APPLE__ bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, SO_NOSIGPIPE); @@ -1294,7 +1297,8 @@ int Socket::receive(void *data, int dataSize, bool tryReceiveUntilDataSizeMet) { // inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); // safeMutexSocketDestructorFlag.ReleaseLock(); - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE); bytesReceived = recv(sock, reinterpret_cast(data), dataSize, 0); safeMutex.ReleaseLock(); } @@ -1323,7 +1327,8 @@ int Socket::receive(void *data, int dataSize, bool tryReceiveUntilDataSizeMet) { // inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE); // safeMutexSocketDestructorFlag.ReleaseLock(); - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE); bytesReceived = recv(sock, reinterpret_cast(data), dataSize, 0); safeMutex.ReleaseLock(); @@ -1382,7 +1387,8 @@ int Socket::peek(void *data, int dataSize,bool mustGetData) { // safeMutexSocketDestructorFlag.ReleaseLock(); //MutexSafeWrapper safeMutex(&dataSynchAccessor,CODE_AT_LINE + "_" + intToStr(sock) + "_" + intToStr(dataSize)); - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE); //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); @@ -1421,7 +1427,8 @@ int Socket::peek(void *data, int dataSize,bool mustGetData) { // safeMutexSocketDestructorFlag.ReleaseLock(); //MutexSafeWrapper safeMutex(&dataSynchAccessor,CODE_AT_LINE + "_" + intToStr(sock) + "_" + intToStr(dataSize)); - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE); err = recv(sock, reinterpret_cast(data), dataSize, MSG_PEEK); safeMutex.ReleaseLock(); @@ -1716,7 +1723,8 @@ void ClientSocket::connect(const Ip &ip, int port) FD_SET(sock, &myset); { - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE); err = select((int)sock + 1, NULL, &myset, NULL, &tv); //safeMutex.ReleaseLock(); } @@ -2132,7 +2140,8 @@ Socket *ServerSocket::accept() { struct sockaddr_in cli_addr; socklen_t clilen = sizeof(cli_addr); char client_host[100]=""; - MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + //MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE); + ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE); PLATFORM_SOCKET newSock= ::accept(sock, (struct sockaddr *) &cli_addr, &clilen); safeMutex.ReleaseLock(); diff --git a/source/shared_lib/sources/platform/sdl/thread.cpp b/source/shared_lib/sources/platform/sdl/thread.cpp index 89a3c3078..64b97cdf4 100644 --- a/source/shared_lib/sources/platform/sdl/thread.cpp +++ b/source/shared_lib/sources/platform/sdl/thread.cpp @@ -210,14 +210,59 @@ void Semaphore::signal() { SDL_SemPost(semaphore); } -int Semaphore::waitTillSignalled() { +int Semaphore::waitTillSignalled(int waitMilliseconds) { if(semaphore == NULL) { char szBuf[1024]=""; snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); throw runtime_error(szBuf); } - int semValue = SDL_SemWait(semaphore); + int semValue = 0; + if(waitMilliseconds >= 0) { + semValue = SDL_SemWaitTimeout(semaphore,waitMilliseconds); + } + else { + semValue = SDL_SemWait(semaphore); + } return semValue; } +uint32 Semaphore::getSemValue() { + if(semaphore == NULL) { + char szBuf[1024]=""; + snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__); + throw runtime_error(szBuf); + } + + return SDL_SemValue(semaphore); +} + +ReadWriteMutex::ReadWriteMutex(int maxReaders) : semaphore(maxReaders) { + this->maxReadersCount = maxReaders; +} + +void ReadWriteMutex::LockRead() { + semaphore.waitTillSignalled(); +} +void ReadWriteMutex::UnLockRead() { + semaphore.signal(); +} +void ReadWriteMutex::LockWrite() { + MutexSafeWrapper safeMutex(&mutex); + uint32 totalLocks = maxReaders(); + for (int i = 0; i < totalLocks; ++i) { + semaphore.waitTillSignalled(); + } +} +void ReadWriteMutex::UnLockWrite() { + uint32 totalLocks = maxReaders(); + for (int i = 0; i < totalLocks; ++i) { + semaphore.signal(); + } +} +int ReadWriteMutex::maxReaders() { + //return semaphore.getSemValue(); + return this->maxReadersCount; +} + + }}//end namespace