- attempt to improve network performance (use separate mutexes for read / write operations)

This commit is contained in:
Mark Vejvoda
2011-02-15 07:49:40 +00:00
parent 2357e2cfd1
commit 8ec58868dc
4 changed files with 80 additions and 95 deletions

View File

@@ -37,6 +37,7 @@ ConnectionSlotThread::ConnectionSlotThread(int slotIndex) : BaseThread() {
this->slotInterface = NULL; this->slotInterface = NULL;
//this->event = NULL; //this->event = NULL;
eventList.clear(); eventList.clear();
eventList.reserve(100);
} }
ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() { ConnectionSlotThread::ConnectionSlotThread(ConnectionSlotCallbackInterface *slotInterface,int slotIndex) : BaseThread() {
@@ -482,6 +483,7 @@ void ConnectionSlot::update(bool checkForNewClients,int lockedSlotIndex) {
for(int i = 0; i < networkMessageCommandList.getCommandCount(); ++i) { for(int i = 0; i < networkMessageCommandList.getCommandCount(); ++i) {
vctPendingNetworkCommandList.push_back(*networkMessageCommandList.getCommand(i)); vctPendingNetworkCommandList.push_back(*networkMessageCommandList.getCommand(i));
} }
safeMutexSlot.ReleaseLock();
} }
} }
} }
@@ -880,12 +882,15 @@ string ConnectionSlot::getHumanPlayerName(int index) {
} }
vector<NetworkCommand> ConnectionSlot::getPendingNetworkCommandList(bool clearList) { vector<NetworkCommand> ConnectionSlot::getPendingNetworkCommandList(bool clearList) {
vector<NetworkCommand> ret;
static string mutexOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); static string mutexOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MutexSafeWrapper safeMutexSlot(&mutexPendingNetworkCommandList,mutexOwnerId); MutexSafeWrapper safeMutexSlot(&mutexPendingNetworkCommandList,mutexOwnerId);
vector<NetworkCommand> ret = vctPendingNetworkCommandList; if(vctPendingNetworkCommandList.size() > 0) {
ret = vctPendingNetworkCommandList;
if(clearList == true) { if(clearList == true) {
vctPendingNetworkCommandList.clear(); vctPendingNetworkCommandList.clear();
} }
}
safeMutexSlot.ReleaseLock(); safeMutexSlot.ReleaseLock();
return ret; return ret;
@@ -894,7 +899,10 @@ vector<NetworkCommand> ConnectionSlot::getPendingNetworkCommandList(bool clearLi
void ConnectionSlot::clearPendingNetworkCommandList() { void ConnectionSlot::clearPendingNetworkCommandList() {
static string mutexOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__); static string mutexOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MutexSafeWrapper safeMutexSlot(&mutexPendingNetworkCommandList,mutexOwnerId); MutexSafeWrapper safeMutexSlot(&mutexPendingNetworkCommandList,mutexOwnerId);
if(vctPendingNetworkCommandList.size() > 0) {
vctPendingNetworkCommandList.clear(); vctPendingNetworkCommandList.clear();
} }
safeMutexSlot.ReleaseLock();
}
}}//end namespace }}//end namespace

View File

@@ -303,24 +303,24 @@ int64 ServerInterface::getNextEventId() {
} }
void ServerInterface::slotUpdateTask(ConnectionSlotEvent *event) { void ServerInterface::slotUpdateTask(ConnectionSlotEvent *event) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
if(event != NULL) { if(event != NULL) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] event->eventType = %d\n",__FILE__,__FUNCTION__,__LINE__,event->eventType); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] event->eventType = %d\n",__FILE__,__FUNCTION__,__LINE__,event->eventType);
if(event->eventType == eSendSocketData) { if(event->eventType == eSendSocketData) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] before sendMessage, event->networkMessage = %p\n",__FILE__,__FUNCTION__,event->networkMessage); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] before sendMessage, event->networkMessage = %p\n",__FILE__,__FUNCTION__,event->networkMessage);
event->connectionSlot->sendMessage(event->networkMessage); event->connectionSlot->sendMessage(event->networkMessage);
} }
else if(event->eventType == eReceiveSocketData) { else if(event->eventType == eReceiveSocketData) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
updateSlot(event); updateSlot(event);
} }
else { else {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
} }
} }
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
} }
void ServerInterface::updateSlot(ConnectionSlotEvent *event) { void ServerInterface::updateSlot(ConnectionSlotEvent *event) {

View File

@@ -100,7 +100,8 @@ public:
}; };
#endif #endif
class Socket : public SimpleTaskCallbackInterface { //class Socket : public SimpleTaskCallbackInterface {
class Socket {
protected: protected:
#ifdef WIN32 #ifdef WIN32
@@ -112,19 +113,20 @@ protected:
std::string ipAddress; std::string ipAddress;
std::string connectedIpAddress; std::string connectedIpAddress;
SimpleTaskThread *pingThread; //SimpleTaskThread *pingThread;
std::map<string,double> pingCache; std::map<string,double> pingCache;
time_t lastThreadedPing; time_t lastThreadedPing;
Mutex pingThreadAccessor; Mutex pingThreadAccessor;
Mutex dataSynchAccessor; Mutex dataSynchAccessorRead;
Mutex dataSynchAccessorWrite;
public: public:
Socket(PLATFORM_SOCKET sock); Socket(PLATFORM_SOCKET sock);
Socket(); Socket();
virtual ~Socket(); virtual ~Socket();
virtual void simpleTask(BaseThread *callingThread); //virtual void simpleTask(BaseThread *callingThread);
static int getBroadCastPort() { return broadcast_portno; } static int getBroadCastPort() { return broadcast_portno; }
static void setBroadCastPort(int value) { broadcast_portno = value; } static void setBroadCastPort(int value) { broadcast_portno = value; }
@@ -147,7 +149,7 @@ public:
static void setBlock(bool block, PLATFORM_SOCKET socket); static void setBlock(bool block, PLATFORM_SOCKET socket);
bool isReadable(); bool isReadable();
bool isWritable(bool waitOnDelayedResponse); bool isWritable();
bool isConnected(); bool isConnected();
static string getHostName(); static string getHostName();

View File

@@ -752,7 +752,7 @@ bool Socket::isSocketValid(const PLATFORM_SOCKET *validateSocket) {
} }
Socket::Socket(PLATFORM_SOCKET sock) { Socket::Socket(PLATFORM_SOCKET sock) {
this->pingThread = NULL; //this->pingThread = NULL;
this->sock= sock; this->sock= sock;
this->connectedIpAddress = ""; this->connectedIpAddress = "";
} }
@@ -760,7 +760,7 @@ Socket::Socket(PLATFORM_SOCKET sock) {
Socket::Socket() { Socket::Socket() {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
this->pingThread = NULL; //this->pingThread = NULL;
this->connectedIpAddress = ""; this->connectedIpAddress = "";
sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
@@ -799,6 +799,7 @@ float Socket::getThreadedPingMS(std::string host) {
return result; return result;
} }
/*
void Socket::simpleTask(BaseThread *callingThread) { void Socket::simpleTask(BaseThread *callingThread) {
// update ping times every x seconds // update ping times every x seconds
const int pingFrequencySeconds = 2; const int pingFrequencySeconds = 2;
@@ -816,6 +817,7 @@ void Socket::simpleTask(BaseThread *callingThread) {
safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
} }
} }
*/
Socket::~Socket() Socket::~Socket()
{ {
@@ -825,8 +827,8 @@ Socket::~Socket()
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock);
delete pingThread; //delete pingThread;
pingThread = NULL; //pingThread = NULL;
} }
void Socket::disconnectSocket() { void Socket::disconnectSocket() {
@@ -835,7 +837,8 @@ void Socket::disconnectSocket() {
if(isSocketValid() == true) { if(isSocketValid() == true) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] calling shutdown and close for socket = %d...\n",__FILE__,__FUNCTION__,sock); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] calling shutdown and close for socket = %d...\n",__FILE__,__FUNCTION__,sock);
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorRead,string(__FILE__) + "_" + intToStr(__LINE__));
MutexSafeWrapper safeMutex1(&dataSynchAccessorWrite,string(__FILE__) + "_" + intToStr(__LINE__));
::shutdown(sock,2); ::shutdown(sock,2);
#ifndef WIN32 #ifndef WIN32
::close(sock); ::close(sock);
@@ -844,7 +847,8 @@ void Socket::disconnectSocket() {
::closesocket(sock); ::closesocket(sock);
sock = -1; sock = -1;
#endif #endif
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
safeMutex1.ReleaseLock();
} }
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s] END closing socket = %d...\n",__FILE__,__FUNCTION__,sock);
@@ -1033,15 +1037,16 @@ int Socket::send(const void *data, int dataSize) {
//SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
errno = 0; errno = 0;
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,string(__FILE__) + "_" + intToStr(__LINE__));
//SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
#ifdef __APPLE__ #ifdef __APPLE__
bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE); bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE);
#else #else
bytesSent = ::send(sock, (const char *)data, dataSize, MSG_NOSIGNAL | MSG_DONTWAIT); //bytesSent = ::send(sock, (const char *)data, dataSize, MSG_NOSIGNAL | MSG_DONTWAIT);
bytesSent = ::send(sock, (const char *)data, dataSize, MSG_NOSIGNAL);
#endif #endif
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
//SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__);
} }
@@ -1068,17 +1073,18 @@ int Socket::send(const void *data, int dataSize) {
if(isConnected() == true) { if(isConnected() == true) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] attemptCount = %d, sock = %d, dataSize = %d, data = %p\n",__FILE__,__FUNCTION__,__LINE__,attemptCount,sock,dataSize,data); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] attemptCount = %d, sock = %d, dataSize = %d, data = %p\n",__FILE__,__FUNCTION__,__LINE__,attemptCount,sock,dataSize,data);
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,string(__FILE__) + "_" + intToStr(__LINE__));
#ifdef __APPLE__ #ifdef __APPLE__
bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE); bytesSent = ::send(sock, (const char *)data, dataSize, SO_NOSIGPIPE);
#else #else
bytesSent = ::send(sock, (const char *)data, dataSize, MSG_NOSIGNAL | MSG_DONTWAIT); //bytesSent = ::send(sock, (const char *)data, dataSize, MSG_NOSIGNAL | MSG_DONTWAIT);
bytesSent = ::send(sock, (const char *)data, dataSize, MSG_NOSIGNAL);
#endif #endif
if(bytesSent < 0 && getLastSocketError() != PLATFORM_SOCKET_TRY_AGAIN) { if(bytesSent < 0 && getLastSocketError() != PLATFORM_SOCKET_TRY_AGAIN) {
break; break;
} }
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during send, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,bytesSent); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during send, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,bytesSent);
} }
@@ -1109,12 +1115,13 @@ int Socket::send(const void *data, int dataSize) {
if(isConnected() == true) { if(isConnected() == true) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] attemptCount = %d, sock = %d, dataSize = %d, data = %p\n",__FILE__,__FUNCTION__,__LINE__,attemptCount,sock,dataSize,data); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] attemptCount = %d, sock = %d, dataSize = %d, data = %p\n",__FILE__,__FUNCTION__,__LINE__,attemptCount,sock,dataSize,data);
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorWrite,string(__FILE__) + "_" + intToStr(__LINE__));
const char *sendBuf = (const char *)data; const char *sendBuf = (const char *)data;
#ifdef __APPLE__ #ifdef __APPLE__
bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, SO_NOSIGPIPE); bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, SO_NOSIGPIPE);
#else #else
bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, MSG_NOSIGNAL | MSG_DONTWAIT); //bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, MSG_NOSIGNAL | MSG_DONTWAIT);
bytesSent = ::send(sock, &sendBuf[totalBytesSent], dataSize - totalBytesSent, MSG_NOSIGNAL);
#endif #endif
if(bytesSent > 0) { if(bytesSent > 0) {
totalBytesSent += bytesSent; totalBytesSent += bytesSent;
@@ -1124,7 +1131,7 @@ int Socket::send(const void *data, int dataSize) {
break; break;
} }
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] retry send returned: %d\n",__FILE__,__FUNCTION__,__LINE__,bytesSent); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] retry send returned: %d\n",__FILE__,__FUNCTION__,__LINE__,bytesSent);
} }
@@ -1169,9 +1176,9 @@ int Socket::receive(void *data, int dataSize) {
ssize_t bytesReceived = 0; ssize_t bytesReceived = 0;
if(isSocketValid() == true) { if(isSocketValid() == true) {
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorRead,string(__FILE__) + "_" + intToStr(__LINE__));
bytesReceived = recv(sock, reinterpret_cast<char*>(data), dataSize, 0); bytesReceived = recv(sock, reinterpret_cast<char*>(data), dataSize, 0);
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
} }
if(bytesReceived < 0 && getLastSocketError() != PLATFORM_SOCKET_TRY_AGAIN) { if(bytesReceived < 0 && getLastSocketError() != PLATFORM_SOCKET_TRY_AGAIN) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] ERROR READING SOCKET DATA error while sending socket data, bytesSent = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,bytesReceived,getLastSocketErrorFormattedText().c_str()); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] ERROR READING SOCKET DATA error while sending socket data, bytesSent = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,bytesReceived,getLastSocketErrorFormattedText().c_str());
@@ -1191,9 +1198,9 @@ int Socket::receive(void *data, int dataSize) {
break; break;
} }
else if(Socket::isReadable() == true) { else if(Socket::isReadable() == true) {
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorRead,string(__FILE__) + "_" + intToStr(__LINE__));
bytesReceived = recv(sock, reinterpret_cast<char*>(data), dataSize, 0); bytesReceived = recv(sock, reinterpret_cast<char*>(data), dataSize, 0);
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during receive, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,bytesReceived); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during receive, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,bytesReceived);
} }
@@ -1221,12 +1228,13 @@ int Socket::peek(void *data, int dataSize,bool mustGetData) {
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
//MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__) + "_" + intToStr(sock) + "_" + intToStr(dataSize)); //MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__) + "_" + intToStr(sock) + "_" + intToStr(dataSize));
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + string("_") + intToStr(__LINE__)); static string mutexOwnerId = string(__FILE__) + string("_") + intToStr(__LINE__);
MutexSafeWrapper safeMutex(&dataSynchAccessorRead,mutexOwnerId);
//if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); //if(chrono.getMillis() > 1) printf("In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK); err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK);
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
} }
@@ -1261,9 +1269,9 @@ int Socket::peek(void *data, int dataSize,bool mustGetData) {
*/ */
if(Socket::isReadable() == true) { if(Socket::isReadable() == true) {
//MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__) + "_" + intToStr(sock) + "_" + intToStr(dataSize)); //MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__) + "_" + intToStr(sock) + "_" + intToStr(dataSize));
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + string("_") + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorRead,string(__FILE__) + string("_") + intToStr(__LINE__));
err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK); err = recv(sock, reinterpret_cast<char*>(data), dataSize, MSG_PEEK);
//safeMutex.ReleaseLock(); safeMutex.ReleaseLock();
if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis()); if(chrono.getMillis() > 1) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] action running for msecs: %lld\n",__FILE__,__FUNCTION__,__LINE__,(long long int)chrono.getMillis());
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during peek, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,err); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] #2 EAGAIN during peek, trying again returned: %d\n",__FILE__,__FUNCTION__,__LINE__,err);
@@ -1329,85 +1337,52 @@ bool Socket::isReadable() {
int i = 0; int i = 0;
{ {
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); //MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__));
i= select((int)sock + 1, &set, NULL, NULL, &tv); i= select((int)sock + 1, &set, NULL, NULL, &tv);
//safeMutex.ReleaseLock();
} }
if(i < 0) { if(i < 0) {
//if(difftime(time(NULL),lastDebugEvent) >= 1) {
// lastDebugEvent = time(NULL);
//throwException("Error selecting socket");
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s] error while selecting socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,i,getLastSocketErrorFormattedText().c_str()); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s] error while selecting socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,i,getLastSocketErrorFormattedText().c_str());
//}
} }
bool result = (i == 1); return (i == 1);
return result;
} }
bool Socket::isWritable(bool waitOnDelayedResponse) { bool Socket::isWritable() {
if(isSocketValid() == false) return false; if(isSocketValid() == false) return false;
struct timeval tv; struct timeval tv;
tv.tv_sec= 0; tv.tv_sec= 0;
tv.tv_usec= 1; //tv.tv_usec= 1;
tv.tv_usec= 0;
fd_set set; fd_set set;
FD_ZERO(&set); FD_ZERO(&set);
FD_SET(sock, &set); FD_SET(sock, &set);
time_t maxElapsedCheck = time(NULL);
const int MAX_CHECK_WAIT_SECONDS = 2;
bool result = false;
do {
if(difftime(time(NULL),maxElapsedCheck) >= MAX_CHECK_WAIT_SECONDS) {
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] max timeout waited during writable check\n",__FILE__,__FUNCTION__,__LINE__);
break;
}
int i = 0; int i = 0;
{ {
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); //MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__));
i = select((int)sock + 1, NULL, &set, NULL, &tv); i = select((int)sock + 1, NULL, &set, NULL, &tv);
//safeMutex.ReleaseLock();
} }
bool result = false;
if(i < 0 ) { if(i < 0 ) {
//if(difftime(time(NULL),lastDebugEvent) >= 1) {
// lastDebugEvent = time(NULL);
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] error while selecting socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,i,getLastSocketErrorFormattedText().c_str()); SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] error while selecting socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,i,getLastSocketErrorFormattedText().c_str());
//}
waitOnDelayedResponse = false;
//throwException("Error selecting socket");
} }
else if(i == 0) { else if(i == 0) {
//if(difftime(time(NULL),lastDebugEvent) >= 1) { //SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] TIMEOUT while selecting socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,i,getLastSocketErrorFormattedText().c_str());
// lastDebugEvent = time(NULL);
SystemFlags::OutputDebug(SystemFlags::debugNetwork,"[%s::%s Line: %d] TIMEOUT while selecting socket data, err = %d, error = %s\n",__FILE__,__FUNCTION__,__LINE__,i,getLastSocketErrorFormattedText().c_str());
//}
if(waitOnDelayedResponse == false) {
result = true;
break;
}
} }
else { else {
result = true; result = true;
break;
} }
if(isSocketValid() == false) return false;
} while(waitOnDelayedResponse == true && result == false);
//return (i == 1 && FD_ISSET(sock, &set)); //return (i == 1 && FD_ISSET(sock, &set));
return result; return result;
} }
bool Socket::isConnected() { bool Socket::isConnected() {
if(isSocketValid() == false) return false;
//if the socket is not writable then it is not conencted //if the socket is not writable then it is not conencted
if(isWritable(false) == false) { if(isWritable() == false) {
return false; return false;
} }
//if the socket is readable it is connected if we can read a byte from it //if the socket is readable it is connected if we can read a byte from it
@@ -1551,7 +1526,7 @@ void ClientSocket::connect(const Ip &ip, int port)
FD_SET(sock, &myset); FD_SET(sock, &myset);
{ {
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); //MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__));
err = select((int)sock + 1, NULL, &myset, NULL, &tv); err = select((int)sock + 1, NULL, &myset, NULL, &tv);
//safeMutex.ReleaseLock(); //safeMutex.ReleaseLock();
} }
@@ -1933,7 +1908,7 @@ Socket *ServerSocket::accept() {
struct sockaddr_in cli_addr; struct sockaddr_in cli_addr;
socklen_t clilen = sizeof(cli_addr); socklen_t clilen = sizeof(cli_addr);
char client_host[100]=""; char client_host[100]="";
MutexSafeWrapper safeMutex(&dataSynchAccessor,string(__FILE__) + "_" + intToStr(__LINE__)); MutexSafeWrapper safeMutex(&dataSynchAccessorRead,string(__FILE__) + "_" + intToStr(__LINE__));
PLATFORM_SOCKET newSock= ::accept(sock, (struct sockaddr *) &cli_addr, &clilen); PLATFORM_SOCKET newSock= ::accept(sock, (struct sockaddr *) &cli_addr, &clilen);
safeMutex.ReleaseLock(); safeMutex.ReleaseLock();