1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-08-12 00:54:20 +02:00

Finish Connection pimpeling

This commit is contained in:
Uwe L. Korn
2013-06-16 11:43:32 +02:00
parent 0041a9f850
commit a314d8b638
4 changed files with 76 additions and 65 deletions

View File

@@ -33,11 +33,9 @@
Connection::Connection( Servent* parent ) Connection::Connection( Servent* parent )
: QObject() : QObject()
, m_sock( 0 ) , d_ptr( new ConnectionPrivate( this, parent ) )
, m_servent( parent )
, d_ptr( new ConnectionPrivate( this ) )
{ {
moveToThread( m_servent->thread() ); moveToThread( parent->thread() );
tDebug( LOGVERBOSE ) << "CTOR Connection (super)" << thread(); tDebug( LOGVERBOSE ) << "CTOR Connection (super)" << thread();
connect( &d_func()->msgprocessor_out, SIGNAL( ready( msg_ptr ) ), connect( &d_func()->msgprocessor_out, SIGNAL( ready( msg_ptr ) ),
@@ -53,13 +51,14 @@ Connection::Connection( Servent* parent )
Connection::~Connection() Connection::~Connection()
{ {
tDebug( LOGVERBOSE ) << "DTOR connection (super)" << id() << thread() << m_sock.isNull(); Q_D( Connection );
if ( !m_sock.isNull() ) tDebug( LOGVERBOSE ) << "DTOR connection (super)" << id() << thread() << d->sock.isNull();
if ( !d->sock.isNull() )
{ {
m_sock->deleteLater(); d->sock->deleteLater();
} }
delete d_func()->statstimer; delete d->statstimer;
delete d_ptr; delete d_ptr;
} }
@@ -67,16 +66,17 @@ Connection::~Connection()
void void
Connection::handleIncomingQueueEmpty() Connection::handleIncomingQueueEmpty()
{ {
Q_D( Connection );
//qDebug() << Q_FUNC_INFO << "bavail" << m_sock->bytesAvailable() //qDebug() << Q_FUNC_INFO << "bavail" << m_sock->bytesAvailable()
// << "isopen" << m_sock->isOpen() // << "isopen" << m_sock->isOpen()
// << "m_peer_disconnected" << m_peer_disconnected // << "m_peer_disconnected" << m_peer_disconnected
// << "bytes rx" << bytesReceived(); // << "bytes rx" << bytesReceived();
if ( !m_sock.isNull() && m_sock->bytesAvailable() == 0 && d_func()->peer_disconnected ) if ( !d->sock.isNull() && d->sock->bytesAvailable() == 0 && d->peer_disconnected )
{ {
tDebug( LOGVERBOSE ) << "No more data to read, peer disconnected. shutting down connection." tDebug( LOGVERBOSE ) << "No more data to read, peer disconnected. shutting down connection."
<< "bytesavail" << m_sock->bytesAvailable() << "bytesavail" << d->sock->bytesAvailable()
<< "bytesrx" << d_func()->rx_bytes; << "bytesrx" << d->rx_bytes;
shutdown(); shutdown();
} }
} }
@@ -112,9 +112,11 @@ Connection::firstMessage() const
} }
const QPointer<QTcpSocket>& const QPointer<QTcpSocket>&
Connection::socket() Connection::socket() const
{ {
return m_sock; Q_D( const Connection );
return d->sock;
} }
void void
@@ -136,7 +138,9 @@ Connection::outbound() const
Servent* Servent*
Connection::servent() const Connection::servent() const
{ {
return m_servent; Q_D( const Connection );
return d->servent;
} }
int int
@@ -187,16 +191,17 @@ Connection::shutdown( bool waitUntilSentAll )
void void
Connection::actualShutdown() Connection::actualShutdown()
{ {
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << d_func()->actually_shutting_down << id(); Q_D( Connection );
if ( d_func()->actually_shutting_down ) tDebug( LOGVERBOSE ) << Q_FUNC_INFO << d->actually_shutting_down << id();
if ( d->actually_shutting_down )
{ {
return; return;
} }
d_func()->actually_shutting_down = true; d->actually_shutting_down = true;
if ( !m_sock.isNull() && m_sock->isOpen() ) if ( !d->sock.isNull() && d->sock->isOpen() )
{ {
m_sock->disconnectFromHost(); d->sock->disconnectFromHost();
} }
// qDebug() << "EMITTING finished()"; // qDebug() << "EMITTING finished()";
@@ -255,7 +260,9 @@ Connection::isReady() const
bool bool
Connection::isRunning() const Connection::isRunning() const
{ {
return m_sock != 0; Q_D( const Connection );
return d->sock != 0;
} }
qint64 qint64
@@ -293,15 +300,15 @@ void
Connection::start( QTcpSocket* sock ) Connection::start( QTcpSocket* sock )
{ {
Q_D( Connection ); Q_D( Connection );
Q_ASSERT( m_sock.isNull() ); Q_ASSERT( d->sock.isNull() );
Q_ASSERT( sock ); Q_ASSERT( sock );
Q_ASSERT( sock->isValid() ); Q_ASSERT( sock->isValid() );
m_sock = sock; d->sock = sock;
if ( d->name.isEmpty() ) if ( d->name.isEmpty() )
{ {
d->name = QString( "peer[%1]" ).arg( m_sock->peerAddress().toString() ); d->name = QString( "peer[%1]" ).arg( d->sock->peerAddress().toString() );
} }
QTimer::singleShot( 0, this, SLOT( checkACL() ) ); QTimer::singleShot( 0, this, SLOT( checkACL() ) );
@@ -396,32 +403,32 @@ Connection::doSetup()
HINT: export QT_FATAL_WARNINGS=1 helps to catch these kind of errors. HINT: export QT_FATAL_WARNINGS=1 helps to catch these kind of errors.
*/ */
if ( QThread::currentThread() != m_servent->thread() ) if ( QThread::currentThread() != d->servent->thread() )
{ {
// Connections should always be in the same thread as the servent. // Connections should always be in the same thread as the servent.
moveToThread( m_servent->thread() ); moveToThread( d->servent->thread() );
} }
//stats timer calculates BW used by this connection //stats timer calculates BW used by this connection
d_func()->statstimer = new QTimer; d->statstimer = new QTimer;
d_func()->statstimer->moveToThread( this->thread() ); d->statstimer->moveToThread( this->thread() );
d_func()->statstimer->setInterval( 1000 ); d->statstimer->setInterval( 1000 );
connect( d_func()->statstimer, SIGNAL( timeout() ), SLOT( calcStats() ) ); connect( d->statstimer, SIGNAL( timeout() ), SLOT( calcStats() ) );
d_func()->statstimer->start(); d->statstimer->start();
d_func()->statstimer_mark.start(); d->statstimer_mark.start();
m_sock->moveToThread( thread() ); d->sock->moveToThread( thread() );
connect( m_sock.data(), SIGNAL( bytesWritten( qint64 ) ), connect( d->sock.data(), SIGNAL( bytesWritten( qint64 ) ),
SLOT( bytesWritten( qint64 ) ), Qt::QueuedConnection ); SLOT( bytesWritten( qint64 ) ), Qt::QueuedConnection );
connect( m_sock.data(), SIGNAL( disconnected() ), connect( d->sock.data(), SIGNAL( disconnected() ),
SLOT( socketDisconnected() ), Qt::QueuedConnection ); SLOT( socketDisconnected() ), Qt::QueuedConnection );
connect( m_sock.data(), SIGNAL( error( QAbstractSocket::SocketError ) ), connect( d->sock.data(), SIGNAL( error( QAbstractSocket::SocketError ) ),
SLOT( socketDisconnectedError( QAbstractSocket::SocketError ) ), Qt::QueuedConnection ); SLOT( socketDisconnectedError( QAbstractSocket::SocketError ) ), Qt::QueuedConnection );
connect( m_sock.data(), SIGNAL( readyRead() ), connect( d->sock.data(), SIGNAL( readyRead() ),
SLOT( readyRead() ), Qt::QueuedConnection ); SLOT( readyRead() ), Qt::QueuedConnection );
// if connection not authed/setup fast enough, kill it: // if connection not authed/setup fast enough, kill it:
@@ -446,20 +453,22 @@ Connection::doSetup()
void void
Connection::socketDisconnected() Connection::socketDisconnected()
{ {
Q_D( Connection );
qint64 bytesAvailable = 0; qint64 bytesAvailable = 0;
if ( !m_sock.isNull() ) if ( !d->sock.isNull() )
{ {
bytesAvailable = m_sock->bytesAvailable(); bytesAvailable = d->sock->bytesAvailable();
} }
tDebug( LOGVERBOSE ) << "SOCKET DISCONNECTED" << this->name() << id() tDebug( LOGVERBOSE ) << "SOCKET DISCONNECTED" << this->name() << id()
<< "shutdown will happen after incoming queue empties." << "shutdown will happen after incoming queue empties."
<< "bytesavail:" << bytesAvailable << "bytesavail:" << bytesAvailable
<< "bytesRecvd" << bytesReceived(); << "bytesRecvd" << bytesReceived();
d_func()->peer_disconnected = true; d->peer_disconnected = true;
emit socketClosed(); emit socketClosed();
if ( d_func()->msgprocessor_in.length() == 0 && bytesAvailable == 0 ) if ( d->msgprocessor_in.length() == 0 && bytesAvailable == 0 )
{ {
handleIncomingQueueEmpty(); handleIncomingQueueEmpty();
actualShutdown(); actualShutdown();
@@ -522,11 +531,11 @@ Connection::readyRead()
if ( d->msg.isNull() ) if ( d->msg.isNull() )
{ {
if ( m_sock->bytesAvailable() < Msg::headerSize() ) if ( d->sock->bytesAvailable() < Msg::headerSize() )
return; return;
char msgheader[ Msg::headerSize() ]; char msgheader[ Msg::headerSize() ];
if ( m_sock->read( (char*) &msgheader, Msg::headerSize() ) != Msg::headerSize() ) if ( d->sock->read( (char*) &msgheader, Msg::headerSize() ) != Msg::headerSize() )
{ {
tDebug() << "Failed reading msg header"; tDebug() << "Failed reading msg header";
this->markAsFailed(); this->markAsFailed();
@@ -537,10 +546,10 @@ Connection::readyRead()
d->rx_bytes += Msg::headerSize(); d->rx_bytes += Msg::headerSize();
} }
if ( m_sock->bytesAvailable() < d->msg->length() ) if ( d->sock->bytesAvailable() < d->msg->length() )
return; return;
QByteArray ba = m_sock->read( d->msg->length() ); QByteArray ba = d->sock->read( d->msg->length() );
if ( ba.length() != (qint32)d->msg->length() ) if ( ba.length() != (qint32)d->msg->length() )
{ {
tDebug() << "Failed to read full msg payload"; tDebug() << "Failed to read full msg payload";
@@ -553,7 +562,7 @@ Connection::readyRead()
handleReadMsg(); // process m_msg and clear() it handleReadMsg(); // process m_msg and clear() it
// since there is no explicit threading, use the event loop to schedule this: // since there is no explicit threading, use the event loop to schedule this:
if ( m_sock->bytesAvailable() ) if ( d->sock->bytesAvailable() )
{ {
QTimer::singleShot( 0, this, SLOT( readyRead() ) ); QTimer::singleShot( 0, this, SLOT( readyRead() ) );
} }
@@ -634,17 +643,18 @@ Connection::sendMsg( msg_ptr msg )
void void
Connection::sendMsg_now( msg_ptr msg ) Connection::sendMsg_now( msg_ptr msg )
{ {
Q_D( Connection );
Q_ASSERT( QThread::currentThread() == thread() ); Q_ASSERT( QThread::currentThread() == thread() );
// Q_ASSERT( this->isRunning() ); // Q_ASSERT( this->isRunning() );
if ( m_sock.isNull() || !m_sock->isOpen() || !m_sock->isWritable() ) if ( d->sock.isNull() || !d->sock->isOpen() || !d->sock->isWritable() )
{ {
tDebug() << "***** Socket problem, whilst in sendMsg(). Cleaning up. *****"; tDebug() << "***** Socket problem, whilst in sendMsg(). Cleaning up. *****";
shutdown( false ); shutdown( false );
return; return;
} }
if ( !msg->write( m_sock.data() ) ) if ( !msg->write( d->sock.data() ) )
{ {
//qDebug() << "Error writing to socket in sendMsg() *************"; //qDebug() << "Error writing to socket in sendMsg() *************";
shutdown( false ); shutdown( false );
@@ -666,13 +676,14 @@ Connection::bytesWritten( qint64 i )
void void
Connection::calcStats() Connection::calcStats()
{ {
int elapsed = d_func()->statstimer_mark.restart(); // ms since last calc Q_D( Connection );
int elapsed = d->statstimer_mark.restart(); // ms since last calc
d_func()->stats_tx_bytes_per_sec = (float)1000 * ( (d_func()->tx_bytes - d_func()->tx_bytes_last) / (float)elapsed ); d->stats_tx_bytes_per_sec = (float)1000 * ( (d->tx_bytes - d->tx_bytes_last) / (float)elapsed );
d_func()->stats_rx_bytes_per_sec = (float)1000 * ( (d_func()->rx_bytes - d_func()->rx_bytes_last) / (float)elapsed ); d->stats_rx_bytes_per_sec = (float)1000 * ( (d->rx_bytes - d->rx_bytes_last) / (float)elapsed );
d_func()->rx_bytes_last = d_func()->rx_bytes; d->rx_bytes_last = d->rx_bytes;
d_func()->tx_bytes_last = d_func()->tx_bytes; d->tx_bytes_last = d->tx_bytes;
emit statsTick( d_func()->stats_tx_bytes_per_sec, d_func()->stats_rx_bytes_per_sec ); emit statsTick( d->stats_tx_bytes_per_sec, d->stats_rx_bytes_per_sec );
} }

View File

@@ -53,7 +53,7 @@ public:
void setFirstMessage( msg_ptr m ); void setFirstMessage( msg_ptr m );
msg_ptr firstMessage() const; msg_ptr firstMessage() const;
const QPointer<QTcpSocket>& socket(); const QPointer<QTcpSocket>& socket() const;
void setOutbound( bool o ); void setOutbound( bool o );
bool outbound() const; bool outbound() const;
@@ -120,10 +120,6 @@ private slots:
void bytesWritten( qint64 ); void bytesWritten( qint64 );
void calcStats(); void calcStats();
protected:
QPointer<QTcpSocket> m_sock;
Servent* m_servent;
private: private:
Q_DECLARE_PRIVATE( Connection ) Q_DECLARE_PRIVATE( Connection )
ConnectionPrivate* d_ptr; ConnectionPrivate* d_ptr;

View File

@@ -32,8 +32,10 @@
class ConnectionPrivate class ConnectionPrivate
{ {
public: public:
ConnectionPrivate( Connection* q ) ConnectionPrivate( Connection* q, Servent* _servent )
: q_ptr ( q ) : q_ptr ( q )
, servent( _servent )
, sock( 0 )
, do_shutdown( false ) , do_shutdown( false )
, actually_shutting_down( false ) , actually_shutting_down( false )
, peer_disconnected( false ) , peer_disconnected( false )
@@ -55,6 +57,8 @@ public:
Q_DECLARE_PUBLIC ( Connection ) Q_DECLARE_PUBLIC ( Connection )
private: private:
Servent* servent;
QPointer<QTcpSocket> sock;
QHostAddress peerIpAddress; QHostAddress peerIpAddress;
bool do_shutdown; bool do_shutdown;
bool actually_shutting_down; bool actually_shutting_down;

View File

@@ -65,7 +65,7 @@ ControlConnection::~ControlConnection()
} }
delete m_pingtimer; delete m_pingtimer;
m_servent->unregisterControlConnection( this ); servent()->unregisterControlConnection( this );
if ( m_dbsyncconn ) if ( m_dbsyncconn )
m_dbsyncconn->deleteLater(); m_dbsyncconn->deleteLater();
} }
@@ -112,7 +112,7 @@ ControlConnection::setup()
QString friendlyName = name(); QString friendlyName = name();
tDebug() << "Detected name:" << name() << friendlyName << m_sock->peerAddress(); tDebug() << "Detected name:" << name() << friendlyName;
// setup source and remote collection for this peer // setup source and remote collection for this peer
m_source = SourceList::instance()->get( id(), friendlyName, true ); m_source = SourceList::instance()->get( id(), friendlyName, true );
@@ -182,18 +182,18 @@ ControlConnection::setupDbSyncConnection( bool ondemand )
if ( !m_dbconnkey.isEmpty() ) if ( !m_dbconnkey.isEmpty() )
{ {
qDebug() << "Connecting to DBSync offer from peer..."; qDebug() << "Connecting to DBSync offer from peer...";
m_dbsyncconn = new DBSyncConnection( m_servent, m_source ); m_dbsyncconn = new DBSyncConnection( servent(), m_source );
m_servent->createParallelConnection( this, m_dbsyncconn, m_dbconnkey ); servent()->createParallelConnection( this, m_dbsyncconn, m_dbconnkey );
m_dbconnkey.clear(); m_dbconnkey.clear();
} }
else if ( !outbound() || ondemand ) // only one end makes the offer else if ( !outbound() || ondemand ) // only one end makes the offer
{ {
qDebug() << "Offering a DBSync key to peer..."; qDebug() << "Offering a DBSync key to peer...";
m_dbsyncconn = new DBSyncConnection( m_servent, m_source ); m_dbsyncconn = new DBSyncConnection( servent(), m_source );
QString key = uuid(); QString key = uuid();
m_servent->registerOffer( key, m_dbsyncconn ); servent()->registerOffer( key, m_dbsyncconn );
QVariantMap m; QVariantMap m;
m.insert( "method", "dbsync-offer" ); m.insert( "method", "dbsync-offer" );
m.insert( "key", key ); m.insert( "key", key );