- attempt to make connection status more reliable and have less hangs when disconnects occur

This commit is contained in:
Mark Vejvoda
2013-03-08 08:42:08 +00:00
parent 704935c534
commit e74f747fbd
5 changed files with 139 additions and 26 deletions

View File

@@ -62,6 +62,8 @@ ClientInterface::ClientInterface() : GameNetworkInterface() {
this->joinGameInProgress = false;
this->joinGameInProgressLaunch = false;
this->quitThread = false;
playerIndex= -1;
setGameSettingsReceived(false);
gotIntro = false;
@@ -78,32 +80,42 @@ ClientInterface::ClientInterface() : GameNetworkInterface() {
}
void ClientInterface::shutdownNetworkCommandListThread() {
MutexSafeWrapper safeMutex(networkCommandListThreadAccessor,CODE_AT_LINE);
if(networkCommandListThread != NULL) {
//printf("START === shutdownNetworkCommandListThread\n");
time_t elapsed = time(NULL);
this->quit = true;
this->quitThread = true;
networkCommandListThread->signalQuit();
for(;networkCommandListThread->canShutdown(false) == false &&
difftime((long int)time(NULL),elapsed) <= 15;) {
//sleep(150);
}
sleep(0);
//printf("A === shutdownNetworkCommandListThread\n");
//sleep(0);
if(networkCommandListThread->canShutdown(true)) {
delete networkCommandListThread;
networkCommandListThread = NULL;
}
}
Mutex *tempMutexPtr = networkCommandListThreadAccessor;
networkCommandListThreadAccessor = NULL;
safeMutex.ReleaseLock(false,true);
//printf("END === shutdownNetworkCommandListThread\n");
}
}
ClientInterface::~ClientInterface() {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] destructor for %p\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__,this);
//printf("START === Client destructor\n");
MutexSafeWrapper safeMutex(networkCommandListThreadAccessor,CODE_AT_LINE);
shutdownNetworkCommandListThread();
//printf("A === Client destructor\n");
if(clientSocket != NULL && clientSocket->isConnected() == true) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
@@ -118,6 +130,8 @@ ClientInterface::~ClientInterface() {
}
}
//printf("B === Client destructor\n");
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
close();
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
@@ -125,6 +139,14 @@ ClientInterface::~ClientInterface() {
delete clientSocket;
clientSocket = NULL;
//printf("C === Client destructor\n");
Mutex *tempMutexPtr = networkCommandListThreadAccessor;
networkCommandListThreadAccessor = NULL;
safeMutex.ReleaseLock(false,true);
//printf("END === Client destructor\n");
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",extractFileFromDirectoryPath(__FILE__).c_str(),__FUNCTION__,__LINE__);
}
@@ -716,7 +738,7 @@ void ClientInterface::updateLobby() {
void ClientInterface::updateFrame(int *checkFrame) {
//printf("#1 ClientInterface::updateFrame\n");
if(isConnected() == true && quit == false) {
if(isConnected() == true && this->quitThread == false) {
//printf("#2 ClientInterface::updateFrame\n");
Chrono chrono;
@@ -724,7 +746,7 @@ void ClientInterface::updateFrame(int *checkFrame) {
int simulateLag = Config::getInstance().getInt("SimulateClientLag","0");
bool done= false;
while(done == false) {
while(done == false && this->quitThread == false) {
//printf("BEFORE Client get networkMessageType\n");
@@ -1003,7 +1025,7 @@ void ClientInterface::simpleTask(BaseThread *callingThread) {
//printf("START === Client thread ended\n");
while(callingThread->getQuitStatus() == false && quit == false) {
while(callingThread->getQuitStatus() == false && this->quitThread == false) {
updateFrame(NULL);
}
@@ -1016,11 +1038,11 @@ bool ClientInterface::getNetworkCommand(int frameCount, int currentCachedPending
bool result = false;
bool waitForData = false;
if(quit == false) {
if(quit == false && this->quitThread == false) {
MutexSafeWrapper safeMutex(networkCommandListThreadAccessor,CODE_AT_LINE);
safeMutex.ReleaseLock(true);
for(;quit == false;) {
for(;quit == false && this->quitThread == false;) {
safeMutex.Lock();
uint64 copyCachedLastPendingFrameCount = cachedLastPendingFrameCount;
if(cachedPendingCommands.find(frameCount) != cachedPendingCommands.end()) {
@@ -1071,9 +1093,9 @@ void ClientInterface::updateKeyframe(int frameCount) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugPerformance).enabled) chrono.start();
//chrono.start();
if(quit == false) {
bool testThreaded = Config::getInstance().getBool("ThreadedNetworkClient","true");
//bool testThreaded = false;
if(quit == false && this->quitThread == false) {
//bool testThreaded = Config::getInstance().getBool("ThreadedNetworkClient","true");
bool testThreaded = true;
if(testThreaded == false) {
updateFrame(&frameCount);
Commands &frameCmdList = cachedPendingCommands[frameCount];
@@ -1521,7 +1543,7 @@ NetworkMessageType ClientInterface::waitForMessage()
NetworkMessageType msg = nmtInvalid;
//uint64 waitLoopCount = 0;
while(msg == nmtInvalid) {
while(msg == nmtInvalid && this->quitThread == false) {
msg = getNextMessageType();
if(msg == nmtInvalid) {
if(chrono.getMillis() % 250 == 0 && isConnected() == false) {

View File

@@ -73,6 +73,8 @@ private:
bool readyForInGameJoin;
bool resumeInGameJoin;
bool quitThread;
public:
ClientInterface();
virtual ~ClientInterface();

View File

@@ -960,7 +960,7 @@ void ServerInterface::checkForCompletedClients(std::map<int,bool> & mapSlotSigna
//printf("===> IN slot %d - About to checkForCompletedClients\n",i);
ConnectionSlot* connectionSlot = slots[i];
if(connectionSlot != NULL && connectionSlot->hasValidSocketId() == true &&
if(connectionSlot != NULL && connectionSlot->isConnected() == true &&
mapSlotSignalledList[i] == true &&
connectionSlot->getJoinGameInProgress() == false &&
slotsCompleted.find(i) == slotsCompleted.end()) {

View File

@@ -130,6 +130,8 @@ protected:
Mutex *inSocketDestructorSynchAccessor;
bool inSocketDestructor;
bool isSocketBlocking;
public:
Socket(PLATFORM_SOCKET sock);
Socket();
@@ -164,6 +166,7 @@ public:
void setBlock(bool block);
static void setBlock(bool block, PLATFORM_SOCKET socket);
bool getBlock();
bool isReadable();
bool isWritable(struct timeval *timeVal=NULL);
@@ -186,6 +189,17 @@ protected:
static void throwException(string str);
};
class SafeSocketBlockToggleWrapper {
protected:
Socket *socket;
bool originallyBlocked;
bool newBlocked;
public:
SafeSocketBlockToggleWrapper(Socket *socket, bool toggle);
~SafeSocketBlockToggleWrapper();
void Restore();
};
class BroadCastClientSocketThread : public BaseThread
{
private:

View File

@@ -795,9 +795,8 @@ Socket::Socket(PLATFORM_SOCKET sock) {
dataSynchAccessorRead->setOwnerId(CODE_AT_LINE);
dataSynchAccessorWrite->setOwnerId(CODE_AT_LINE);
this->sock= sock;
this->isSocketBlocking = true;
this->connectedIpAddress = "";
}
@@ -822,6 +821,8 @@ Socket::Socket() {
throwException("Error creating socket");
}
this->isSocketBlocking = true;
#ifdef __APPLE__
int set = 1;
setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
@@ -1449,12 +1450,38 @@ int Socket::receive(void *data, int dataSize, bool tryReceiveUntilDataSizeMet) {
return static_cast<int>(bytesReceived);
}
SafeSocketBlockToggleWrapper::SafeSocketBlockToggleWrapper(Socket *socket, bool toggle) {
this->socket = socket;
if(this->socket != NULL) {
this->originallyBlocked = socket->getBlock();
this->newBlocked = toggle;
if(this->originallyBlocked != this->newBlocked) {
socket->setBlock(this->newBlocked);
}
}
}
void SafeSocketBlockToggleWrapper::Restore() {
if(this->socket != NULL) {
if(this->originallyBlocked != this->newBlocked) {
socket->setBlock(this->originallyBlocked);
this->newBlocked = this->originallyBlocked;
}
}
}
SafeSocketBlockToggleWrapper::~SafeSocketBlockToggleWrapper() {
Restore();
}
int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError) {
Chrono chrono;
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) chrono.start();
const int MAX_PEEK_WAIT_SECONDS = 3;
int lastSocketError = 0;
int err = 0;
if(isSocketValid() == true) {
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
@@ -1472,7 +1499,19 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
if(isSocketValid() == true) {
// Chrono recvTimer(true);
SafeSocketBlockToggleWrapper safeUnblock(this, false);
errno = 0;
err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK);
lastSocketError = getLastSocketError();
if(pLastSocketError != NULL) {
*pLastSocketError = lastSocketError;
}
safeUnblock.Restore();
// if(recvTimer.getMillis() > 1000 || (err <= 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN)) {
// printf("#1 PEEK err = %d lastSocketError = %d ms: %lld\n",err,lastSocketError,(long long int)recvTimer.getMillis());
// }
}
safeMutex.ReleaseLock();
@@ -1485,10 +1524,6 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError
}
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
int lastSocketError = getLastSocketError();
if(pLastSocketError != NULL) {
*pLastSocketError = lastSocketError;
}
if(err < 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] ERROR PEEKING SOCKET DATA error while sending socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,err,getLastSocketErrorFormattedText().c_str());
disconnectSocket();
@@ -1500,6 +1535,7 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError
time_t tStartTimer = time(NULL);
while((err < 0 && lastSocketError == PLATFORM_SOCKET_TRY_AGAIN) &&
isSocketValid() == true &&
(difftime((long int)time(NULL),tStartTimer) <= MAX_PEEK_WAIT_SECONDS)) {
/*
if(isConnected() == false) {
@@ -1520,17 +1556,31 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError
//MutexSafeWrapper safeMutex(&dataSynchAccessor,CODE_AT_LINE + "_" + intToStr(sock) + "_" + intToStr(dataSize));
MutexSafeWrapper safeMutex(dataSynchAccessorRead,CODE_AT_LINE);
// Chrono recvTimer(true);
SafeSocketBlockToggleWrapper safeUnblock(this, false);
errno = 0;
err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK);
lastSocketError = getLastSocketError();
if(pLastSocketError != NULL) {
*pLastSocketError = lastSocketError;
}
safeUnblock.Restore();
// if(recvTimer.getMillis() > 1000 || (err <= 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN)) {
// printf("#2 PEEK err = %d lastSocketError = %d ms: %lld\n",err,lastSocketError,(long long int)recvTimer.getMillis());
// }
//printf("Socket peek delayed checking for sock = %d err = %d\n",sock,err);
safeMutex.ReleaseLock();
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during peek, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,err);
}
else {
//printf("Socket peek delayed [NOT READABLE] checking for sock = %d err = %d\n",sock,err);
}
}
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
@@ -1541,12 +1591,17 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
if(err <= 0) {
if(err < 0 || (err == 0 && dataSize != 0) ||
((err == 0 || err == -1) && dataSize == 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN)) {
//printf("** #1 Socket peek error for sock = %d err = %d lastSocketError = %d\n",sock,err,lastSocketError);
int iErr = lastSocketError;
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] DISCONNECTING SOCKET for socket [%d], err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,socket,err,getLastSocketErrorFormattedText(&iErr).c_str());
//printf("Peek #3 err = %d\n",err);
//lastSocketError = getLastSocketError();
if(mustGetData == true || lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) {
//printf("** #2 Socket peek error for sock = %d err = %d lastSocketError = %d mustGetData = %d\n",sock,err,lastSocketError,mustGetData);
int iErr = lastSocketError;
disconnectSocket();
@@ -1557,8 +1612,26 @@ int Socket::peek(void *data, int dataSize,bool mustGetData,int *pLastSocketError
return static_cast<int>(err);
}
bool Socket::getBlock() {
bool blocking = true;
// don't waste time if the socket is invalid
if(isSocketValid(&sock) == false) {
return blocking;
}
//#ifndef WIN32
// int currentFlags = fcntl(sock, F_GETFL);
// blocking = !((currentFlags & O_NONBLOCK) == O_NONBLOCK);
//#else
blocking = this->isSocketBlocking;
//#endif
return blocking;
}
void Socket::setBlock(bool block){
setBlock(block,this->sock);
this->isSocketBlocking = block;
}
void Socket::setBlock(bool block, PLATFORM_SOCKET socket) {
@@ -1691,10 +1764,12 @@ bool Socket::isConnected() {
//if the socket is readable it is connected if we can read a byte from it
if(isReadable()) {
char tmp=0;
int peekDataBytes=1;
int lastSocketError=0;
int err = peek(&tmp, 1, false, &lastSocketError);
//int err = peek(&tmp, 1, false, &lastSocketError);
int err = peek(&tmp, peekDataBytes, false, &lastSocketError);
//if(err <= 0 && err != PLATFORM_SOCKET_TRY_AGAIN) {
if(err <= 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) {
if(err <= 0 && lastSocketError != 0 && lastSocketError != PLATFORM_SOCKET_TRY_AGAIN) {
if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] ERROR Peek failed, err = %d for socket: %d, error = %s, lastSocketError = %d\n",__FILE__,__FUNCTION__,__LINE__,err,sock,getLastSocketErrorFormattedText().c_str(),lastSocketError);
if(SystemFlags::VERBOSE_MODE_ENABLED) SystemFlags::OutputDebug(SystemFlags::debugError,"SOCKET DISCONNECTED In [%s::%s Line: %d] ERROR Peek failed, err = %d for socket: %d, error = %s, lastSocketError = %d\n",__FILE__,__FUNCTION__,__LINE__,err,sock,getLastSocketErrorFormattedText().c_str(),lastSocketError);
return false;