From a314d8b638203482c933d045ae725af1f953df29 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 16 Jun 2013 11:43:32 +0200 Subject: [PATCH] Finish Connection pimpeling --- src/libtomahawk/network/Connection.cpp | 117 ++++++++++-------- src/libtomahawk/network/Connection.h | 6 +- src/libtomahawk/network/Connection_p.h | 6 +- src/libtomahawk/network/ControlConnection.cpp | 12 +- 4 files changed, 76 insertions(+), 65 deletions(-) diff --git a/src/libtomahawk/network/Connection.cpp b/src/libtomahawk/network/Connection.cpp index 791922aa0..4e9cc4140 100644 --- a/src/libtomahawk/network/Connection.cpp +++ b/src/libtomahawk/network/Connection.cpp @@ -33,11 +33,9 @@ Connection::Connection( Servent* parent ) : QObject() - , m_sock( 0 ) - , m_servent( parent ) - , d_ptr( new ConnectionPrivate( this ) ) + , d_ptr( new ConnectionPrivate( this, parent ) ) { - moveToThread( m_servent->thread() ); + moveToThread( parent->thread() ); tDebug( LOGVERBOSE ) << "CTOR Connection (super)" << thread(); connect( &d_func()->msgprocessor_out, SIGNAL( ready( msg_ptr ) ), @@ -53,13 +51,14 @@ Connection::Connection( Servent* parent ) Connection::~Connection() { - tDebug( LOGVERBOSE ) << "DTOR connection (super)" << id() << thread() << m_sock.isNull(); - if ( !m_sock.isNull() ) + Q_D( Connection ); + tDebug( LOGVERBOSE ) << "DTOR connection (super)" << id() << thread() << d->sock.isNull(); + if ( !d->sock.isNull() ) { - m_sock->deleteLater(); + d->sock->deleteLater(); } - delete d_func()->statstimer; + delete d->statstimer; delete d_ptr; } @@ -67,16 +66,17 @@ Connection::~Connection() void Connection::handleIncomingQueueEmpty() { + Q_D( Connection ); //qDebug() << Q_FUNC_INFO << "bavail" << m_sock->bytesAvailable() // << "isopen" << m_sock->isOpen() // << "m_peer_disconnected" << m_peer_disconnected // << "bytes rx" << bytesReceived(); - if ( !m_sock.isNull() && m_sock->bytesAvailable() == 0 && d_func()->peer_disconnected ) + if ( !d->sock.isNull() && d->sock->bytesAvailable() == 0 && d->peer_disconnected ) { tDebug( LOGVERBOSE ) << "No more data to read, peer disconnected. shutting down connection." - << "bytesavail" << m_sock->bytesAvailable() - << "bytesrx" << d_func()->rx_bytes; + << "bytesavail" << d->sock->bytesAvailable() + << "bytesrx" << d->rx_bytes; shutdown(); } } @@ -112,9 +112,11 @@ Connection::firstMessage() const } const QPointer& -Connection::socket() +Connection::socket() const { - return m_sock; + Q_D( const Connection ); + + return d->sock; } void @@ -136,7 +138,9 @@ Connection::outbound() const Servent* Connection::servent() const { - return m_servent; + Q_D( const Connection ); + + return d->servent; } int @@ -187,16 +191,17 @@ Connection::shutdown( bool waitUntilSentAll ) void Connection::actualShutdown() { - tDebug( LOGVERBOSE ) << Q_FUNC_INFO << d_func()->actually_shutting_down << id(); - if ( d_func()->actually_shutting_down ) + Q_D( Connection ); + tDebug( LOGVERBOSE ) << Q_FUNC_INFO << d->actually_shutting_down << id(); + if ( d->actually_shutting_down ) { return; } - d_func()->actually_shutting_down = true; + d->actually_shutting_down = true; - if ( !m_sock.isNull() && m_sock->isOpen() ) + if ( !d->sock.isNull() && d->sock->isOpen() ) { - m_sock->disconnectFromHost(); + d->sock->disconnectFromHost(); } // qDebug() << "EMITTING finished()"; @@ -255,7 +260,9 @@ Connection::isReady() const bool Connection::isRunning() const { - return m_sock != 0; + Q_D( const Connection ); + + return d->sock != 0; } qint64 @@ -293,15 +300,15 @@ void Connection::start( QTcpSocket* sock ) { Q_D( Connection ); - Q_ASSERT( m_sock.isNull() ); + Q_ASSERT( d->sock.isNull() ); Q_ASSERT( sock ); Q_ASSERT( sock->isValid() ); - m_sock = sock; + d->sock = sock; if ( d->name.isEmpty() ) { - d->name = QString( "peer[%1]" ).arg( m_sock->peerAddress().toString() ); + d->name = QString( "peer[%1]" ).arg( d->sock->peerAddress().toString() ); } QTimer::singleShot( 0, this, SLOT( checkACL() ) ); @@ -396,32 +403,32 @@ Connection::doSetup() HINT: export QT_FATAL_WARNINGS=1 helps to catch these kind of errors. */ - if ( QThread::currentThread() != m_servent->thread() ) + if ( QThread::currentThread() != d->servent->thread() ) { // Connections should always be in the same thread as the servent. - moveToThread( m_servent->thread() ); + moveToThread( d->servent->thread() ); } //stats timer calculates BW used by this connection - d_func()->statstimer = new QTimer; - d_func()->statstimer->moveToThread( this->thread() ); - d_func()->statstimer->setInterval( 1000 ); - connect( d_func()->statstimer, SIGNAL( timeout() ), SLOT( calcStats() ) ); - d_func()->statstimer->start(); - d_func()->statstimer_mark.start(); + d->statstimer = new QTimer; + d->statstimer->moveToThread( this->thread() ); + d->statstimer->setInterval( 1000 ); + connect( d->statstimer, SIGNAL( timeout() ), SLOT( calcStats() ) ); + d->statstimer->start(); + d->statstimer_mark.start(); - m_sock->moveToThread( thread() ); + d->sock->moveToThread( thread() ); - connect( m_sock.data(), SIGNAL( bytesWritten( qint64 ) ), + connect( d->sock.data(), SIGNAL( bytesWritten( qint64 ) ), SLOT( bytesWritten( qint64 ) ), Qt::QueuedConnection ); - connect( m_sock.data(), SIGNAL( disconnected() ), + connect( d->sock.data(), SIGNAL( disconnected() ), SLOT( socketDisconnected() ), Qt::QueuedConnection ); - connect( m_sock.data(), SIGNAL( error( QAbstractSocket::SocketError ) ), + connect( d->sock.data(), SIGNAL( error( QAbstractSocket::SocketError ) ), SLOT( socketDisconnectedError( QAbstractSocket::SocketError ) ), Qt::QueuedConnection ); - connect( m_sock.data(), SIGNAL( readyRead() ), + connect( d->sock.data(), SIGNAL( readyRead() ), SLOT( readyRead() ), Qt::QueuedConnection ); // if connection not authed/setup fast enough, kill it: @@ -446,20 +453,22 @@ Connection::doSetup() void Connection::socketDisconnected() { + Q_D( Connection ); + qint64 bytesAvailable = 0; - if ( !m_sock.isNull() ) + if ( !d->sock.isNull() ) { - bytesAvailable = m_sock->bytesAvailable(); + bytesAvailable = d->sock->bytesAvailable(); } tDebug( LOGVERBOSE ) << "SOCKET DISCONNECTED" << this->name() << id() << "shutdown will happen after incoming queue empties." << "bytesavail:" << bytesAvailable << "bytesRecvd" << bytesReceived(); - d_func()->peer_disconnected = true; + d->peer_disconnected = true; emit socketClosed(); - if ( d_func()->msgprocessor_in.length() == 0 && bytesAvailable == 0 ) + if ( d->msgprocessor_in.length() == 0 && bytesAvailable == 0 ) { handleIncomingQueueEmpty(); actualShutdown(); @@ -522,11 +531,11 @@ Connection::readyRead() if ( d->msg.isNull() ) { - if ( m_sock->bytesAvailable() < Msg::headerSize() ) + if ( d->sock->bytesAvailable() < Msg::headerSize() ) return; char msgheader[ Msg::headerSize() ]; - if ( m_sock->read( (char*) &msgheader, Msg::headerSize() ) != Msg::headerSize() ) + if ( d->sock->read( (char*) &msgheader, Msg::headerSize() ) != Msg::headerSize() ) { tDebug() << "Failed reading msg header"; this->markAsFailed(); @@ -537,10 +546,10 @@ Connection::readyRead() d->rx_bytes += Msg::headerSize(); } - if ( m_sock->bytesAvailable() < d->msg->length() ) + if ( d->sock->bytesAvailable() < d->msg->length() ) return; - QByteArray ba = m_sock->read( d->msg->length() ); + QByteArray ba = d->sock->read( d->msg->length() ); if ( ba.length() != (qint32)d->msg->length() ) { tDebug() << "Failed to read full msg payload"; @@ -553,7 +562,7 @@ Connection::readyRead() handleReadMsg(); // process m_msg and clear() it // since there is no explicit threading, use the event loop to schedule this: - if ( m_sock->bytesAvailable() ) + if ( d->sock->bytesAvailable() ) { QTimer::singleShot( 0, this, SLOT( readyRead() ) ); } @@ -634,17 +643,18 @@ Connection::sendMsg( msg_ptr msg ) void Connection::sendMsg_now( msg_ptr msg ) { + Q_D( Connection ); Q_ASSERT( QThread::currentThread() == thread() ); // Q_ASSERT( this->isRunning() ); - if ( m_sock.isNull() || !m_sock->isOpen() || !m_sock->isWritable() ) + if ( d->sock.isNull() || !d->sock->isOpen() || !d->sock->isWritable() ) { tDebug() << "***** Socket problem, whilst in sendMsg(). Cleaning up. *****"; shutdown( false ); return; } - if ( !msg->write( m_sock.data() ) ) + if ( !msg->write( d->sock.data() ) ) { //qDebug() << "Error writing to socket in sendMsg() *************"; shutdown( false ); @@ -666,13 +676,14 @@ Connection::bytesWritten( qint64 i ) void Connection::calcStats() { - int elapsed = d_func()->statstimer_mark.restart(); // ms since last calc + Q_D( Connection ); + int elapsed = d->statstimer_mark.restart(); // ms since last calc - d_func()->stats_tx_bytes_per_sec = (float)1000 * ( (d_func()->tx_bytes - d_func()->tx_bytes_last) / (float)elapsed ); - d_func()->stats_rx_bytes_per_sec = (float)1000 * ( (d_func()->rx_bytes - d_func()->rx_bytes_last) / (float)elapsed ); + d->stats_tx_bytes_per_sec = (float)1000 * ( (d->tx_bytes - d->tx_bytes_last) / (float)elapsed ); + d->stats_rx_bytes_per_sec = (float)1000 * ( (d->rx_bytes - d->rx_bytes_last) / (float)elapsed ); - d_func()->rx_bytes_last = d_func()->rx_bytes; - d_func()->tx_bytes_last = d_func()->tx_bytes; + d->rx_bytes_last = d->rx_bytes; + d->tx_bytes_last = d->tx_bytes; - emit statsTick( d_func()->stats_tx_bytes_per_sec, d_func()->stats_rx_bytes_per_sec ); + emit statsTick( d->stats_tx_bytes_per_sec, d->stats_rx_bytes_per_sec ); } diff --git a/src/libtomahawk/network/Connection.h b/src/libtomahawk/network/Connection.h index e3c7b1436..4d3e695b3 100644 --- a/src/libtomahawk/network/Connection.h +++ b/src/libtomahawk/network/Connection.h @@ -53,7 +53,7 @@ public: void setFirstMessage( msg_ptr m ); msg_ptr firstMessage() const; - const QPointer& socket(); + const QPointer& socket() const; void setOutbound( bool o ); bool outbound() const; @@ -120,10 +120,6 @@ private slots: void bytesWritten( qint64 ); void calcStats(); -protected: - QPointer m_sock; - Servent* m_servent; - private: Q_DECLARE_PRIVATE( Connection ) ConnectionPrivate* d_ptr; diff --git a/src/libtomahawk/network/Connection_p.h b/src/libtomahawk/network/Connection_p.h index 83435ac87..6e0d519fa 100644 --- a/src/libtomahawk/network/Connection_p.h +++ b/src/libtomahawk/network/Connection_p.h @@ -32,8 +32,10 @@ class ConnectionPrivate { public: - ConnectionPrivate( Connection* q ) + ConnectionPrivate( Connection* q, Servent* _servent ) : q_ptr ( q ) + , servent( _servent ) + , sock( 0 ) , do_shutdown( false ) , actually_shutting_down( false ) , peer_disconnected( false ) @@ -55,6 +57,8 @@ public: Q_DECLARE_PUBLIC ( Connection ) private: + Servent* servent; + QPointer sock; QHostAddress peerIpAddress; bool do_shutdown; bool actually_shutting_down; diff --git a/src/libtomahawk/network/ControlConnection.cpp b/src/libtomahawk/network/ControlConnection.cpp index b6e93bba5..19bd38a60 100644 --- a/src/libtomahawk/network/ControlConnection.cpp +++ b/src/libtomahawk/network/ControlConnection.cpp @@ -65,7 +65,7 @@ ControlConnection::~ControlConnection() } delete m_pingtimer; - m_servent->unregisterControlConnection( this ); + servent()->unregisterControlConnection( this ); if ( m_dbsyncconn ) m_dbsyncconn->deleteLater(); } @@ -112,7 +112,7 @@ ControlConnection::setup() QString friendlyName = name(); - tDebug() << "Detected name:" << name() << friendlyName << m_sock->peerAddress(); + tDebug() << "Detected name:" << name() << friendlyName; // setup source and remote collection for this peer m_source = SourceList::instance()->get( id(), friendlyName, true ); @@ -182,18 +182,18 @@ ControlConnection::setupDbSyncConnection( bool ondemand ) if ( !m_dbconnkey.isEmpty() ) { qDebug() << "Connecting to DBSync offer from peer..."; - m_dbsyncconn = new DBSyncConnection( m_servent, m_source ); + m_dbsyncconn = new DBSyncConnection( servent(), m_source ); - m_servent->createParallelConnection( this, m_dbsyncconn, m_dbconnkey ); + servent()->createParallelConnection( this, m_dbsyncconn, m_dbconnkey ); m_dbconnkey.clear(); } else if ( !outbound() || ondemand ) // only one end makes the offer { qDebug() << "Offering a DBSync key to peer..."; - m_dbsyncconn = new DBSyncConnection( m_servent, m_source ); + m_dbsyncconn = new DBSyncConnection( servent(), m_source ); QString key = uuid(); - m_servent->registerOffer( key, m_dbsyncconn ); + servent()->registerOffer( key, m_dbsyncconn ); QVariantMap m; m.insert( "method", "dbsync-offer" ); m.insert( "key", key );