- attempt to fix network stuttering using a new multi-read single write mutex + semaphore class

This commit is contained in:
Mark Vejvoda
2011-11-26 08:14:23 +00:00
parent b8d71ffb0d
commit 6c1c248d62
8 changed files with 390 additions and 85 deletions

View File

@@ -116,13 +116,14 @@ protected:
//SimpleTaskThread *pingThread;
std::map<string,double> pingCache;
time_t lastThreadedPing;
Mutex pingThreadAccessor;
//Mutex pingThreadAccessor;
Mutex dataSynchAccessorRead;
Mutex dataSynchAccessorWrite;
//Mutex dataSynchAccessorRead;
//Mutex dataSynchAccessorWrite;
ReadWriteMutex dataSynchAccessorRWLMutex;
Mutex inSocketDestructorSynchAccessor;
bool inSocketDestructor;
ReadWriteMutex inSocketDestructorSynchAccessor;
//bool inSocketDestructor;
public:
Socket(PLATFORM_SOCKET sock);

View File

@@ -24,6 +24,7 @@
//#include "util.h"
#include <vector>
#include "types.h"
#include "leak_dumper.h"
// =====================================================
@@ -187,9 +188,131 @@ public:
Semaphore(Uint32 initialValue = 0);
~Semaphore();
void signal();
int waitTillSignalled();
int waitTillSignalled(int waitMilliseconds=-1);
uint32 getSemValue();
};
class ReadWriteMutex
{
public:
ReadWriteMutex(int maxReaders = 32);
void LockRead();
void UnLockRead();
void LockWrite();
void UnLockWrite();
int maxReaders();
void setOwnerId(string ownerId) { this->ownerId = ownerId; }
private:
Semaphore semaphore;
Mutex mutex;
int maxReadersCount;
string ownerId;
};
class ReadWriteMutexSafeWrapper {
protected:
ReadWriteMutex *mutex;
string ownerId;
bool isReadLock;
#ifdef DEBUG_PERFORMANCE_MUTEXES
Chrono chrono;
#endif
public:
ReadWriteMutexSafeWrapper(ReadWriteMutex *mutex,bool isReadLock=true, string ownerId="") {
this->mutex = mutex;
this->isReadLock = isReadLock;
this->ownerId = ownerId;
Lock();
}
~ReadWriteMutexSafeWrapper() {
ReleaseLock();
}
void setReadWriteMutex(ReadWriteMutex *mutex,bool isReadLock=true,string ownerId="") {
this->mutex = mutex;
this->isReadLock = isReadLock;
this->ownerId = ownerId;
Lock();
}
bool isValidReadWriteMutex() const {
return(this->mutex != NULL);
}
void Lock() {
if(this->mutex != NULL) {
#ifdef DEBUG_MUTEXES
if(ownerId != "") {
printf("Locking Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount());
}
#endif
#ifdef DEBUG_PERFORMANCE_MUTEXES
chrono.start();
#endif
if(this->isReadLock == true) {
this->mutex->LockRead();
}
else {
this->mutex->LockWrite();
}
#ifdef DEBUG_PERFORMANCE_MUTEXES
if(chrono.getMillis() > 5) printf("In [%s::%s Line: %d] MUTEX LOCK took msecs: %lld, this->mutex->getRefCount() = %d ownerId [%s]\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),this->mutex->getRefCount(),ownerId.c_str());
chrono.start();
#endif
#ifdef DEBUG_MUTEXES
if(ownerId != "") {
printf("Locked Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount());
}
#endif
}
}
void ReleaseLock(bool keepMutex=false) {
if(this->mutex != NULL) {
#ifdef DEBUG_MUTEXES
if(ownerId != "") {
printf("UnLocking Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount());
}
#endif
if(this->isReadLock == true) {
this->mutex->UnLockRead();
}
else {
this->mutex->UnLockWrite();
}
#ifdef DEBUG_PERFORMANCE_MUTEXES
if(chrono.getMillis() > 100) printf("In [%s::%s Line: %d] MUTEX UNLOCKED and held locked for msecs: %lld, this->mutex->getRefCount() = %d ownerId [%s]\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis(),this->mutex->getRefCount(),ownerId.c_str());
#endif
#ifdef DEBUG_MUTEXES
if(ownerId != "") {
printf("UnLocked Mutex [%s] refCount: %d\n",ownerId.c_str(),this->mutex->getRefCount());
}
#endif
if(keepMutex == false) {
this->mutex = NULL;
}
}
}
};
}}//end namespace
#endif

View File

@@ -780,26 +780,25 @@ bool Socket::isSocketValid(const PLATFORM_SOCKET *validateSocket) {
}
Socket::Socket(PLATFORM_SOCKET sock) {
MutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,true,CODE_AT_LINE);
inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
this->inSocketDestructor = false;
//this->inSocketDestructor = false;
//safeMutexSocketDestructorFlag.ReleaseLock();
//this->pingThread = NULL;
pingThreadAccessor.setOwnerId(CODE_AT_LINE);
dataSynchAccessorRead.setOwnerId(CODE_AT_LINE);
dataSynchAccessorWrite.setOwnerId(CODE_AT_LINE);
//pingThreadAccessor.setOwnerId(CODE_AT_LINE);
//dataSynchAccessorRead.setOwnerId(CODE_AT_LINE);
//dataSynchAccessorWrite.setOwnerId(CODE_AT_LINE);
dataSynchAccessorRWLMutex.setOwnerId(CODE_AT_LINE);
this->sock= sock;
this->connectedIpAddress = "";
}
Socket::Socket() {
MutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,true,CODE_AT_LINE);
inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
this->inSocketDestructor = false;
//this->inSocketDestructor = false;
//safeMutexSocketDestructorFlag.ReleaseLock();
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
@@ -865,13 +864,13 @@ void Socket::simpleTask(BaseThread *callingThread) {
*/
Socket::~Socket() {
MutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,CODE_AT_LINE);
if(this->inSocketDestructor == true) {
SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] this->inSocketDestructor == true\n",__FILE__,__FUNCTION__,__LINE__);
return;
}
ReadWriteMutexSafeWrapper safeMutexSocketDestructorFlag(&inSocketDestructorSynchAccessor,false,CODE_AT_LINE);
//if(this->inSocketDestructor == true) {
// SystemFlags::OutputDebug(SystemFlags::debugError,"In [%s::%s Line: %d] this->inSocketDestructor == true\n",__FILE__,__FUNCTION__,__LINE__);
// return;
//}
inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
this->inSocketDestructor = true;
//this->inSocketDestructor = true;
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] START closing socket = %d...\n",__FILE__,__FUNCTION__,sock);
@@ -880,12 +879,12 @@ Socket::~Socket() {
disconnectSocket();
// Allow other callers with a lock on the mutexes to let them go
for(time_t elapsed = time(NULL);
(dataSynchAccessorRead.getRefCount() > 0 ||
dataSynchAccessorWrite.getRefCount() > 0) &&
difftime(time(NULL),elapsed) <= 5;) {
//for(time_t elapsed = time(NULL);
// (dataSynchAccessorRead.getRefCount() > 0 ||
// dataSynchAccessorWrite.getRefCount() > 0) &&
// difftime(time(NULL),elapsed) <= 5;) {
//sleep(0);
}
//}
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock);
@@ -900,8 +899,9 @@ void Socket::disconnectSocket() {
if(isSocketValid() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] calling shutdown and close for socket = %d...\n",__FILE__,__FUNCTION__,sock);
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
MutexSafeWrapper safeMutex1(&dataSynchAccessorWrite,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex1(&dataSynchAccessorWrite,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE);
::shutdown(sock,2);
#ifndef WIN32
::close(sock);
@@ -910,8 +910,8 @@ void Socket::disconnectSocket() {
::closesocket(sock);
sock = -1;
#endif
safeMutex.ReleaseLock();
safeMutex1.ReleaseLock();
//safeMutex.ReleaseLock();
//safeMutex1.ReleaseLock();
}
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock);
@@ -1142,7 +1142,8 @@ int Socket::send(const void *data, int dataSize) {
// inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
// safeMutexSocketDestructorFlag.ReleaseLock();
MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE);
#ifdef __APPLE__
bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE);
@@ -1159,7 +1160,7 @@ int Socket::send(const void *data, int dataSize) {
if(bytesSent < 0 && getLastSocketError() != PLATFORM_SOCKET_TRY_AGAIN) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] ERROR WRITING SOCKET DATA, err = %d error = %s\n",__FILE__,__FUNCTION__,__LINE__,bytesSent,getLastSocketErrorFormattedText().c_str());
}
else if(isConnected() == true && bytesSent < 0 && getLastSocketError() == PLATFORM_SOCKET_TRY_AGAIN) {
else if(bytesSent < 0 && getLastSocketError() == PLATFORM_SOCKET_TRY_AGAIN && isConnected() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #1 EAGAIN during send, trying again...\n",__FILE__,__FUNCTION__,__LINE__);
int attemptCount = 0;
@@ -1180,7 +1181,8 @@ int Socket::send(const void *data, int dataSize) {
// inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
// safeMutexSocketDestructorFlag.ReleaseLock();
MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE);
#ifdef __APPLE__
bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE);
#else
@@ -1229,7 +1231,8 @@ int Socket::send(const void *data, int dataSize) {
// inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
// safeMutexSocketDestructorFlag.ReleaseLock();
MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,false,CODE_AT_LINE);
const char *sendBuf = (const char *)data;
#ifdef __APPLE__
bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, SO_NOSIGPIPE);
@@ -1294,7 +1297,8 @@ int Socket::receive(void *data, int dataSize, bool tryReceiveUntilDataSizeMet) {
// inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
// safeMutexSocketDestructorFlag.ReleaseLock();
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE);
bytesReceived = recv(sock, reinterpret_cast<char*>(data), dataSize, 0);
safeMutex.ReleaseLock();
}
@@ -1323,7 +1327,8 @@ int Socket::receive(void *data, int dataSize, bool tryReceiveUntilDataSizeMet) {
// inSocketDestructorSynchAccessor.setOwnerId(CODE_AT_LINE);
// safeMutexSocketDestructorFlag.ReleaseLock();
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE);
bytesReceived = recv(sock, reinterpret_cast<char*>(data), dataSize, 0);
safeMutex.ReleaseLock();
@@ -1382,7 +1387,8 @@ int Socket::peek(void *data, int dataSize,bool mustGetData) {
// safeMutexSocketDestructorFlag.ReleaseLock();
//MutexSafeWrapper safeMutex(&dataSynchAccessor,CODE_AT_LINE + "_" + intToStr(sock) + "_" + intToStr(dataSize));
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE);
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
@@ -1421,7 +1427,8 @@ int Socket::peek(void *data, int dataSize,bool mustGetData) {
// safeMutexSocketDestructorFlag.ReleaseLock();
//MutexSafeWrapper safeMutex(&dataSynchAccessor,CODE_AT_LINE + "_" + intToStr(sock) + "_" + intToStr(dataSize));
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE);
err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK);
safeMutex.ReleaseLock();
@@ -1716,7 +1723,8 @@ void ClientSocket::connect(const Ip &ip, int port)
FD_SET(sock, &myset);
{
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE);
err = select((int)sock + 1, NULL, &myset, NULL, &tv);
//safeMutex.ReleaseLock();
}
@@ -2132,7 +2140,8 @@ Socket *ServerSocket::accept() {
struct sockaddr_in cli_addr;
socklen_t clilen = sizeof(cli_addr);
char client_host[100]="";
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
//MutexSafeWrapper safeMutex(&dataSynchAccessorRead,CODE_AT_LINE);
ReadWriteMutexSafeWrapper safeMutex(&dataSynchAccessorRWLMutex,true,CODE_AT_LINE);
PLATFORM_SOCKET newSock= ::accept(sock, (struct sockaddr *) &cli_addr, &clilen);
safeMutex.ReleaseLock();

View File

@@ -210,14 +210,59 @@ void Semaphore::signal() {
SDL_SemPost(semaphore);
}
int Semaphore::waitTillSignalled() {
int Semaphore::waitTillSignalled(int waitMilliseconds) {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
throw runtime_error(szBuf);
}
int semValue = SDL_SemWait(semaphore);
int semValue = 0;
if(waitMilliseconds >= 0) {
semValue = SDL_SemWaitTimeout(semaphore,waitMilliseconds);
}
else {
semValue = SDL_SemWait(semaphore);
}
return semValue;
}
uint32 Semaphore::getSemValue() {
if(semaphore == NULL) {
char szBuf[1024]="";
snprintf(szBuf,1023,"In [%s::%s Line: %d] semaphore == NULL",__FILE__,__FUNCTION__,__LINE__);
throw runtime_error(szBuf);
}
return SDL_SemValue(semaphore);
}
ReadWriteMutex::ReadWriteMutex(int maxReaders) : semaphore(maxReaders) {
this->maxReadersCount = maxReaders;
}
void ReadWriteMutex::LockRead() {
semaphore.waitTillSignalled();
}
void ReadWriteMutex::UnLockRead() {
semaphore.signal();
}
void ReadWriteMutex::LockWrite() {
MutexSafeWrapper safeMutex(&mutex);
uint32 totalLocks = maxReaders();
for (int i = 0; i < totalLocks; ++i) {
semaphore.waitTillSignalled();
}
}
void ReadWriteMutex::UnLockWrite() {
uint32 totalLocks = maxReaders();
for (int i = 0; i < totalLocks; ++i) {
semaphore.signal();
}
}
int ReadWriteMutex::maxReaders() {
//return semaphore.getSemValue();
return this->maxReadersCount;
}
}}//end namespace