diff --git a/source/shared_lib/include/platform/posix/ircclient.h b/source/shared_lib/include/platform/posix/ircclient.h index 99a5eb3c8..fcd2a2cfa 100644 --- a/source/shared_lib/include/platform/posix/ircclient.h +++ b/source/shared_lib/include/platform/posix/ircclient.h @@ -58,6 +58,8 @@ protected: static const char *globalCacheContainerName; std::vector argv; + + Mutex mutexIRCSession; irc_session_t *ircSession; string execute_cmd_onconnect; @@ -79,6 +81,8 @@ protected: bool wantToLeaveChannel; + int irc_run_session(irc_session_t * session); + public: IRCThread(const std::vector &argv,IRCCallbackInterface *callbackObj); diff --git a/source/shared_lib/include/platform/posix/socket.h b/source/shared_lib/include/platform/posix/socket.h index bc2f6b669..359f1e362 100644 --- a/source/shared_lib/include/platform/posix/socket.h +++ b/source/shared_lib/include/platform/posix/socket.h @@ -50,6 +50,20 @@ using namespace Shared::PlatformCommon; namespace Shared { namespace Platform { +#ifdef WIN32 + + #define PLATFORM_SOCKET_TRY_AGAIN WSAEWOULDBLOCK + #define PLATFORM_SOCKET_INPROGRESS WSAEINPROGRESS + #define PLATFORM_SOCKET_INTERRUPTED WSAEWOULDBLOCK + +#else + + #define PLATFORM_SOCKET_TRY_AGAIN EAGAIN + #define PLATFORM_SOCKET_INPROGRESS EINPROGRESS + #define PLATFORM_SOCKET_INTERRUPTED EINTR + +#endif + // The callback Interface used by the UPNP discovery process class FTPClientValidationInterface { public: @@ -137,6 +151,10 @@ public: Socket(); virtual ~Socket(); + static int getLastSocketError(); + static const char * getLastSocketErrorText(int *errNumber=NULL); + static string getLastSocketErrorFormattedText(int *errNumber=NULL); + static bool disableNagle; static int DEFAULT_SOCKET_SENDBUF_SIZE; static int DEFAULT_SOCKET_RECVBUF_SIZE; diff --git a/source/shared_lib/sources/platform/posix/ircclient.cpp b/source/shared_lib/sources/platform/posix/ircclient.cpp index 8c4bc7d98..897d6823a 100644 --- a/source/shared_lib/sources/platform/posix/ircclient.cpp +++ b/source/shared_lib/sources/platform/posix/ircclient.cpp @@ -13,6 +13,7 @@ #include "ircclient.h" #include "util.h" #include "platform_common.h" +#include "socket.h" #include "cache_manager.h" #if !defined(DISABLE_IRCCLIENT) @@ -475,11 +476,18 @@ IRCThread::IRCThread(const std::vector &argv, IRCCallbackInterface *call void IRCThread::disconnect() { #if !defined(DISABLE_IRCCLIENT) - if(ircSession != NULL) { + + MutexSafeWrapper safeMutex(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + bool validSession = (ircSession != NULL); + safeMutex.ReleaseLock(); + + if(validSession == true) { setCallbackObj(NULL); if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC: Quitting Channel\n"); + MutexSafeWrapper safeMutex1(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); irc_disconnect(ircSession); + safeMutex1.ReleaseLock(); BaseThread::signalQuit(); hasJoinedChannel = false; @@ -493,11 +501,18 @@ void IRCThread::signalQuit() { if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC: signalQuit [%p]\n",ircSession); #if !defined(DISABLE_IRCCLIENT) - if(ircSession != NULL) { + + MutexSafeWrapper safeMutex(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + bool validSession = (ircSession != NULL); + safeMutex.ReleaseLock(); + + if(validSession == true) { setCallbackObj(NULL); if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC: Quitting Channel\n"); + MutexSafeWrapper safeMutex1(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); irc_cmd_quit(ircSession, "MG Bot is closing!"); + safeMutex1.ReleaseLock(); BaseThread::signalQuit(); hasJoinedChannel = false; @@ -515,11 +530,17 @@ bool IRCThread::shutdownAndWait() { } void IRCThread::SendIRCCmdMessage(string target, string msg) { - if(ircSession != NULL && hasJoinedChannel == true) { + MutexSafeWrapper safeMutex(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + bool validSession = (ircSession != NULL); + safeMutex.ReleaseLock(); + + if(validSession == true && hasJoinedChannel == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] sending IRC command to [%s] cmd [%s]\n",__FILE__,__FUNCTION__,__LINE__,target.c_str(),msg.c_str()); #if !defined(DISABLE_IRCCLIENT) + MutexSafeWrapper safeMutex1(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); int ret = irc_cmd_msg (ircSession, target.c_str(), msg.c_str()); + safeMutex1.ReleaseLock(); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] sending IRC command to [%s] cmd [%s] ret = %d\n",__FILE__,__FUNCTION__,__LINE__,target.c_str(),msg.c_str(),ret); #endif @@ -528,11 +549,19 @@ void IRCThread::SendIRCCmdMessage(string target, string msg) { std::vector IRCThread::GetIRCConnectedNickList(string target, bool waitForCompletion) { eventDataDone = false; - if(ircSession != NULL && hasJoinedChannel == true) { + + MutexSafeWrapper safeMutexSession(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + bool validSession = (ircSession != NULL); + safeMutexSession.ReleaseLock(); + + if(validSession == true && hasJoinedChannel == true) { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] sending IRC nick list command to [%s]\n",__FILE__,__FUNCTION__,__LINE__,target.c_str()); #if !defined(DISABLE_IRCCLIENT) + + MutexSafeWrapper safeMutex1(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); int ret = irc_cmd_names (ircSession, target.c_str()); + safeMutex1.ReleaseLock(); if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d] sending IRC nick list command to [%s] ret = %d\n",__FILE__,__FUNCTION__,__LINE__,target.c_str(),ret); @@ -556,10 +585,16 @@ std::vector IRCThread::GetIRCConnectedNickList(string target, bool waitF bool IRCThread::isConnected() { bool ret = false; - if(ircSession != NULL) { + MutexSafeWrapper safeMutex(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + bool validSession = (ircSession != NULL); + safeMutex.ReleaseLock(); + + if(validSession == true) { #if !defined(DISABLE_IRCCLIENT) + MutexSafeWrapper safeMutex1(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); ret = (irc_is_connected(ircSession) != 0); + safeMutex1.ReleaseLock(); #endif } @@ -600,7 +635,10 @@ void IRCThread::execute() { try { #if !defined(DISABLE_IRCCLIENT) irc_callbacks_t callbacks; - ircSession=NULL; + + MutexSafeWrapper safeMutex(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + ircSession=NULL; + safeMutex.ReleaseLock(true); if(argv.size() != 5) { if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC Usage: : got params [%ld]\n",(long int)argv.size()); @@ -633,12 +671,16 @@ void IRCThread::execute() { if(this->getQuitStatus() == true) { return; } + safeMutex.Lock(); ircSession = irc_create_session (&callbacks); + bool validSession = (ircSession != NULL); - if(!ircSession) { + if(validSession == false) { + safeMutex.ReleaseLock(); if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC Could not create session\n"); return; } + safeMutex.ReleaseLock(true); // this->execute_cmd_onconnect = ""; // if(argv.size() >= 5) { @@ -655,16 +697,22 @@ void IRCThread::execute() { } this->channel = argv[2]; this->nick = argv[1]; + + safeMutex.Lock(); irc_set_ctx(ircSession, this); + safeMutex.ReleaseLock(true); if(this->getQuitStatus() == true) { return; } + safeMutex.Lock(); if(irc_connect(ircSession, argv[0].c_str(), IRC_SERVER_PORT, 0, this->nick.c_str(), this->username.c_str(), "megaglest")) { + safeMutex.ReleaseLock(); if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC Could not connect: %s\n", irc_strerror (irc_errno(ircSession))); return; } + safeMutex.ReleaseLock(); if(this->getQuitStatus() == true) { return; @@ -677,7 +725,8 @@ void IRCThread::execute() { for(int iAttempts=1; this->getQuitStatus() == false && iAttempts <= 5; ++iAttempts) { - if(irc_run(ircSession)) { + //if(irc_run(ircSession)) { + if(irc_run_session(ircSession)) { if(SystemFlags::VERBOSE_MODE_ENABLED || IRCThread::debugEnabled) printf ("===> IRC Could not run the session: %s\n", irc_strerror (irc_errno(ircSession))); } } @@ -713,6 +762,53 @@ void IRCThread::execute() { delete this; } +int IRCThread::irc_run_session(irc_session_t * session) { +// if ( session->state != LIBIRC_STATE_CONNECTING ) +// { +// session->lasterror = LIBIRC_ERR_STATE; +// return 1; +// } + + if ( isConnected() == false ) { + //session->lasterror = LIBIRC_ERR_STATE; + return 1; + } + + while ( isConnected() == true ) { + struct timeval tv; + fd_set in_set, out_set; + int maxfd = 0; + + tv.tv_usec = 250000; + tv.tv_sec = 0; + + // Init sets + FD_ZERO (&in_set); + FD_ZERO (&out_set); + + MutexSafeWrapper safeMutex(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + irc_add_select_descriptors (session, &in_set, &out_set, &maxfd); + safeMutex.ReleaseLock(); + + if ( select (maxfd + 1, &in_set, &out_set, 0, &tv) < 0 ) { + int lastSocketError = Socket::getLastSocketError(); + if ( lastSocketError == PLATFORM_SOCKET_INTERRUPTED ) { + continue; + } + + //session->lasterror = LIBIRC_ERR_TERMINATED; + return 1; + } + + MutexSafeWrapper safeMutex1(&mutexIRCSession,string(__FILE__) + "_" + intToStr(__LINE__)); + if ( irc_process_select_descriptors (session, &in_set, &out_set) ) { + return 1; + } + } + + return 0; +} + IRCThread::~IRCThread() { if(SystemFlags::VERBOSE_MODE_ENABLED) printf("In ~IRCThread() ...\n"); diff --git a/source/shared_lib/sources/platform/posix/socket.cpp b/source/shared_lib/sources/platform/posix/socket.cpp index 22ebe94c8..7acb848ed 100644 --- a/source/shared_lib/sources/platform/posix/socket.cpp +++ b/source/shared_lib/sources/platform/posix/socket.cpp @@ -91,9 +91,10 @@ Mutex UPNP_Tools::mutexUPNP; #define socklen_t int #define MAXHOSTNAME 254 - #define PLATFORM_SOCKET_TRY_AGAIN WSAEWOULDBLOCK - #define PLATFORM_SOCKET_INPROGRESS WSAEINPROGRESS - #define PLATFORM_SOCKET_INTERRUPTED WSAEWOULDBLOCK + +//#define PLATFORM_SOCKET_TRY_AGAIN WSAEWOULDBLOCK +//#define PLATFORM_SOCKET_INPROGRESS WSAEINPROGRESS +//#define PLATFORM_SOCKET_INTERRUPTED WSAEWOULDBLOCK typedef SSIZE_T ssize_t; //// Constants ///////////////////////////////////////////////////////// @@ -246,13 +247,13 @@ Mutex UPNP_Tools::mutexUPNP; typedef UINT_PTR SOCKET; #define INVALID_SOCKET (SOCKET)(~0) - #define PLATFORM_SOCKET_TRY_AGAIN EAGAIN - #define PLATFORM_SOCKET_INPROGRESS EINPROGRESS - #define PLATFORM_SOCKET_INTERRUPTED EINTR + //#define PLATFORM_SOCKET_TRY_AGAIN EAGAIN + //#define PLATFORM_SOCKET_INPROGRESS EINPROGRESS + //#define PLATFORM_SOCKET_INTERRUPTED EINTR #endif -int getLastSocketError() { +int Socket::getLastSocketError() { #ifndef WIN32 return errno; #else @@ -260,7 +261,7 @@ int getLastSocketError() { #endif } -const char * getLastSocketErrorText(int *errNumber=NULL) { +const char * Socket::getLastSocketErrorText(int *errNumber) { int errId = (errNumber != NULL ? *errNumber : getLastSocketError()); #ifndef WIN32 return strerror(errId); @@ -269,7 +270,7 @@ const char * getLastSocketErrorText(int *errNumber=NULL) { #endif } -string getLastSocketErrorFormattedText(int *errNumber=NULL) { +string Socket::getLastSocketErrorFormattedText(int *errNumber) { int errId = (errNumber != NULL ? *errNumber : getLastSocketError()); string msg = "(Error: " + intToStr(errId) + " - [" + string(getLastSocketErrorText(&errId)) +"])"; return msg; @@ -2039,7 +2040,7 @@ void BroadCastClientSocketThread::execute() { // Prepare to receive the broadcast. bcfd = socket(AF_INET, SOCK_DGRAM, 0); if( bcfd <= 0 ) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"socket failed: %s\n", getLastSocketErrorFormattedText().c_str()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"socket failed: %s\n", Socket::getLastSocketErrorFormattedText().c_str()); } else { // Create the address we are receiving on. @@ -2055,7 +2056,7 @@ void BroadCastClientSocketThread::execute() { setsockopt(bcfd, SOL_SOCKET, SO_REUSEADDR, (char *)&val, sizeof(val)); #endif if(::bind( bcfd, (struct sockaddr *)&bcaddr, sizeof(bcaddr) ) < 0 ) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"bind failed: %s\n", getLastSocketErrorFormattedText().c_str()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"bind failed: %s\n", Socket::getLastSocketErrorFormattedText().c_str()); } else { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"In [%s::%s Line: %d]\n",__FILE__,__FUNCTION__,__LINE__); @@ -2074,7 +2075,7 @@ void BroadCastClientSocketThread::execute() { //printf("Broadcasting client nb = %d buff [%s] gotData = %d\n",nb,buff,gotData); if(gotData == false) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"recvfrom failed: %s\n", getLastSocketErrorFormattedText().c_str()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"recvfrom failed: %s\n", Socket::getLastSocketErrorFormattedText().c_str()); } else { //string fromIP = inet_ntoa(bcSender.sin_addr); @@ -2871,12 +2872,12 @@ void BroadCastSocketThread::execute() { bcfd[idx] = socket( AF_INET, SOCK_DGRAM, 0 ); if( bcfd[idx] <= 0 ) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Unable to allocate broadcast socket [%s]: %s\n", ipSubnetMaskList[idx].c_str(), getLastSocketErrorFormattedText().c_str()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Unable to allocate broadcast socket [%s]: %s\n", ipSubnetMaskList[idx].c_str(), Socket::getLastSocketErrorFormattedText().c_str()); //exit(-1); } // Mark the socket for broadcast. else if( setsockopt( bcfd[idx], SOL_SOCKET, SO_BROADCAST, (const char *) &one, sizeof( int ) ) < 0 ) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Could not set socket to broadcast [%s]: %s\n", ipSubnetMaskList[idx].c_str(), getLastSocketErrorFormattedText().c_str()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Could not set socket to broadcast [%s]: %s\n", ipSubnetMaskList[idx].c_str(), Socket::getLastSocketErrorFormattedText().c_str()); //exit(-1); } //} @@ -2912,7 +2913,7 @@ void BroadCastSocketThread::execute() { ssize_t send_res = sendto( bcfd[idx], buff, buffMaxSize, 0 , (struct sockaddr *)&bcLocal[idx], sizeof(struct sockaddr_in) ); if( send_res != buffMaxSize ) { - if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Sendto error: %s\n", getLastSocketErrorFormattedText().c_str()); + if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Sendto error: %s\n", Socket::getLastSocketErrorFormattedText().c_str()); } else { if(SystemFlags::getSystemSettingType(SystemFlags::debugNetwork).enabled) SystemFlags::OutputDebug(SystemFlags::debugNetwork,"Broadcasting on port [%d] the message: [%s]\n",Socket::getBroadCastPort(),buff);