1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-08-06 22:26:32 +02:00

* Improved Pipeline.

This commit is contained in:
Christian Muehlhaeuser
2011-05-09 00:31:25 +02:00
parent 48d029468d
commit f2eb8a7a75
2 changed files with 59 additions and 27 deletions

View File

@@ -25,7 +25,7 @@
#include "functimeout.h" #include "functimeout.h"
#include "database/database.h" #include "database/database.h"
#define CONCURRENT_QUERIES 8 #define CONCURRENT_QUERIES 4
using namespace Tomahawk; using namespace Tomahawk;
@@ -160,30 +160,31 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
Q_ASSERT( false ); Q_ASSERT( false );
return; return;
} }
if ( !m_qidsState.contains( qid ) )
{
qDebug() << "reportResults called for unknown QID-state" << qid;
Q_ASSERT( false );
return;
}
} }
const query_ptr& q = m_qids.value( qid ); const query_ptr& q = m_qids.value( qid );
if ( !results.isEmpty() ) if ( !results.isEmpty() )
{ {
//qDebug() << Q_FUNC_INFO << qid; //qDebug() << Q_FUNC_INFO << qid;
//qDebug() << "solved query:" << (qlonglong)q.data() << q->toString();
q->addResults( results ); q->addResults( results );
foreach( const result_ptr& r, q->results() ) foreach( const result_ptr& r, q->results() )
{ {
m_rids.insert( r->id(), r ); m_rids.insert( r->id(), r );
} }
if ( q->solved() ) if ( q->solved() )
{
qDebug() << "FINISHED RESOLVING EARLY" << q->toString();
q->onResolvingFinished(); q->onResolvingFinished();
setQIDState( q, 0 );
if ( m_qidsTimeout.contains( q->id() ) )
m_qidsTimeout.remove( q->id() );
shuntNext();
return;
}
} }
if ( decQIDState( q ) == 0 ) if ( decQIDState( q ) == 0 )
@@ -193,6 +194,10 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
shuntNext(); shuntNext();
} }
else
{
new FuncTimeout( 500, boost::bind( &Pipeline::timeoutShunt, this, q ), this );
}
} }
@@ -226,11 +231,22 @@ Pipeline::shuntNext()
q->setLastPipelineWeight( 101 ); q->setLastPipelineWeight( 101 );
} }
if ( !q.isNull() ) setQIDState( q, m_resolvers.count() );
new FuncTimeout( 500, boost::bind( &Pipeline::shunt, this, q ), this );
}
void
Pipeline::timeoutShunt( const query_ptr& q )
{
// are we still waiting for a timeout?
if ( m_qidsTimeout.contains( q->id() ) )
{ {
incQIDState( q ); m_qidsTimeout.remove( q->id() );
shunt( q ); // bump into next stage of pipeline (highest weights are 100) shunt( q );
} }
else
qDebug() << Q_FUNC_INFO << q->toString() << "Ignoring timeout";
} }
@@ -241,7 +257,6 @@ Pipeline::shunt( const query_ptr& q )
unsigned int lastweight = 0; unsigned int lastweight = 0;
unsigned int lasttimeout = 0; unsigned int lasttimeout = 0;
int thisResolver = 0; int thisResolver = 0;
bool dispatched = false;
if ( !q->resolvingFinished() ) if ( !q->resolvingFinished() )
{ {
@@ -264,13 +279,7 @@ Pipeline::shunt( const query_ptr& q )
if ( r->timeout() < lasttimeout ) if ( r->timeout() < lasttimeout )
lasttimeout = r->timeout(); lasttimeout = r->timeout();
// resolvers aren't allowed to block in this call:
if ( dispatched )
incQIDState( q );
dispatched = true;
qDebug() << "Dispatching to resolver" << r->name() << q->toString(); qDebug() << "Dispatching to resolver" << r->name() << q->toString();
thisResolver = i; thisResolver = i;
r->resolve( q ); r->resolve( q );
} }
@@ -286,18 +295,17 @@ Pipeline::shunt( const query_ptr& q )
if ( thisResolver < m_resolvers.count() ) if ( thisResolver < m_resolvers.count() )
{ {
qDebug() << "Shunting in" << lasttimeout << "ms, q:" << q->toString(); qDebug() << "Shunting in" << lasttimeout << "ms, q:" << q->toString();
incQIDState( q );
new FuncTimeout( lasttimeout, boost::bind( &Pipeline::shunt, this, q ), this ); m_qidsTimeout.insert( q->id(), true );
new FuncTimeout( lasttimeout, boost::bind( &Pipeline::timeoutShunt, this, q ), this );
} }
} }
else else
{ {
// reached end of pipeline // reached end of pipeline
qDebug() << "Reached end of pipeline for:" << q->toString(); qDebug() << "Reached end of pipeline for:" << q->toString();
setQIDState( q, 0 );
decQIDState( q ); return;
if ( !q->solved() )
q->onResolvingFinished();
} }
shuntNext(); shuntNext();
@@ -314,6 +322,24 @@ Pipeline::resolverSorter( const Resolver* left, const Resolver* right )
} }
void
Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state )
{
QMutexLocker lock( &m_mut );
if ( state )
{
qDebug() << Q_FUNC_INFO << "inserting to qidsstate:" << query->id() << state;
m_qidsState.insert( query->id(), state );
}
else
{
qDebug() << Q_FUNC_INFO << "removing" << query->id() << state;
m_qidsState.remove( query->id() );
}
}
int int
Pipeline::incQIDState( const Tomahawk::query_ptr& query ) Pipeline::incQIDState( const Tomahawk::query_ptr& query )
{ {
@@ -337,6 +363,9 @@ Pipeline::decQIDState( const Tomahawk::query_ptr& query )
{ {
QMutexLocker lock( &m_mut ); QMutexLocker lock( &m_mut );
if ( !m_qidsState.contains( query->id() ) )
return 0;
int state = m_qidsState.value( query->id() ) - 1; int state = m_qidsState.value( query->id() ) - 1;
if ( state ) if ( state )
{ {

View File

@@ -73,17 +73,20 @@ signals:
void idle(); void idle();
private slots: private slots:
void timeoutShunt( const query_ptr& q );
void shunt( const query_ptr& q ); void shunt( const query_ptr& q );
void shuntNext(); void shuntNext();
void indexReady(); void indexReady();
private: private:
void setQIDState( const Tomahawk::query_ptr& query, int state );
int incQIDState( const Tomahawk::query_ptr& query ); int incQIDState( const Tomahawk::query_ptr& query );
int decQIDState( const Tomahawk::query_ptr& query ); int decQIDState( const Tomahawk::query_ptr& query );
QList< Resolver* > m_resolvers; QList< Resolver* > m_resolvers;
QMap< QID, bool > m_qidsTimeout;
QMap< QID, unsigned int > m_qidsState; QMap< QID, unsigned int > m_qidsState;
QMap< QID, query_ptr > m_qids; QMap< QID, query_ptr > m_qids;
QMap< RID, result_ptr > m_rids; QMap< RID, result_ptr > m_rids;