mirror of
https://github.com/tomahawk-player/tomahawk.git
synced 2025-07-31 03:10:12 +02:00
* Refactored pipeline a bit further.
This commit is contained in:
@@ -26,6 +26,7 @@
|
|||||||
#include "database/database.h"
|
#include "database/database.h"
|
||||||
|
|
||||||
#define DEFAULT_CONCURRENT_QUERIES 4
|
#define DEFAULT_CONCURRENT_QUERIES 4
|
||||||
|
#define MAX_CONCURRENT_QUERIES 16
|
||||||
|
|
||||||
using namespace Tomahawk;
|
using namespace Tomahawk;
|
||||||
|
|
||||||
@@ -45,7 +46,7 @@ Pipeline::Pipeline( QObject* parent )
|
|||||||
{
|
{
|
||||||
s_instance = this;
|
s_instance = this;
|
||||||
|
|
||||||
m_maxConcurrentQueries = qMax( DEFAULT_CONCURRENT_QUERIES, QThread::idealThreadCount() * 2 );
|
m_maxConcurrentQueries = qBound( DEFAULT_CONCURRENT_QUERIES, QThread::idealThreadCount(), MAX_CONCURRENT_QUERIES );
|
||||||
qDebug() << Q_FUNC_INFO << "Using" << m_maxConcurrentQueries << "threads";
|
qDebug() << Q_FUNC_INFO << "Using" << m_maxConcurrentQueries << "threads";
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,27 +93,13 @@ Pipeline::removeResolver( Resolver* r )
|
|||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
Pipeline::addResolver( Resolver* r, bool sort )
|
Pipeline::addResolver( Resolver* r )
|
||||||
{
|
{
|
||||||
QMutexLocker lock( &m_mut );
|
QMutexLocker lock( &m_mut );
|
||||||
|
|
||||||
m_resolvers.append( r );
|
|
||||||
if( sort )
|
|
||||||
{
|
|
||||||
qSort( m_resolvers.begin(),
|
|
||||||
m_resolvers.end(),
|
|
||||||
Pipeline::resolverSorter );
|
|
||||||
}
|
|
||||||
qDebug() << "Adding resolver" << r->name();
|
qDebug() << "Adding resolver" << r->name();
|
||||||
|
m_resolvers.append( r );
|
||||||
emit resolverAdded( r );
|
emit resolverAdded( r );
|
||||||
|
|
||||||
/* qDebug() << "Current pipeline:";
|
|
||||||
foreach( Resolver * r, m_resolvers )
|
|
||||||
{
|
|
||||||
qDebug() << "* score:" << r->weight()
|
|
||||||
<< "pref:" << r->preference()
|
|
||||||
<< "name:" << r->name();
|
|
||||||
}*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -255,7 +242,7 @@ Pipeline::shuntNext()
|
|||||||
and after timeout, dispatch to next highest etc, aborting when solved
|
and after timeout, dispatch to next highest etc, aborting when solved
|
||||||
*/
|
*/
|
||||||
q = m_queries_pending.takeFirst();
|
q = m_queries_pending.takeFirst();
|
||||||
q->setLastPipelineWeight( 101 );
|
q->setCurrentResolver( 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
setQIDState( q, rc );
|
setQIDState( q, rc );
|
||||||
@@ -289,46 +276,20 @@ Pipeline::shunt( const query_ptr& q )
|
|||||||
if ( !m_running )
|
if ( !m_running )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// qDebug() << Q_FUNC_INFO << q->solved() << q->toString() << q->id();
|
Resolver* r = 0;
|
||||||
unsigned int lastweight = 0;
|
|
||||||
unsigned int lasttimeout = 0;
|
|
||||||
|
|
||||||
if ( !q->resolvingFinished() )
|
if ( !q->resolvingFinished() )
|
||||||
|
r = nextResolver( q );
|
||||||
|
|
||||||
|
if ( r )
|
||||||
{
|
{
|
||||||
int i = 0;
|
qDebug() << "Dispatching to resolver" << r->name() << q->toString() << q->solved() << q->id();
|
||||||
foreach( Resolver* r, m_resolvers )
|
|
||||||
{
|
|
||||||
i++;
|
|
||||||
if ( r->weight() >= q->lastPipelineWeight() )
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if ( lastweight == 0 )
|
q->setCurrentResolver( r );
|
||||||
{
|
r->resolve( q );
|
||||||
lastweight = r->weight();
|
emit resolving( q );
|
||||||
lasttimeout = r->timeout();
|
|
||||||
//qDebug() << "Shunting into weight" << lastweight << "q:" << q->toString();
|
|
||||||
}
|
|
||||||
if ( lastweight == r->weight() )
|
|
||||||
{
|
|
||||||
// snag the lowest timeout at this weight
|
|
||||||
if ( r->timeout() < lasttimeout )
|
|
||||||
lasttimeout = r->timeout();
|
|
||||||
|
|
||||||
qDebug() << "Dispatching to resolver" << r->name() << q->toString() << q->solved() << q->id();
|
|
||||||
r->resolve( q );
|
|
||||||
emit resolving( q );
|
|
||||||
}
|
|
||||||
else
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( lastweight > 0 )
|
|
||||||
{
|
|
||||||
// qDebug() << "Shunting in" << lasttimeout << "ms, q:" << q->toString();
|
|
||||||
q->setLastPipelineWeight( lastweight );
|
|
||||||
m_qidsTimeout.insert( q->id(), true );
|
m_qidsTimeout.insert( q->id(), true );
|
||||||
new FuncTimeout( lasttimeout, boost::bind( &Pipeline::timeoutShunt, this, q ), this );
|
new FuncTimeout( r->timeout(), boost::bind( &Pipeline::timeoutShunt, this, q ), this );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -340,13 +301,27 @@ Pipeline::shunt( const query_ptr& q )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool
|
Tomahawk::Resolver*
|
||||||
Pipeline::resolverSorter( const Resolver* left, const Resolver* right )
|
Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const
|
||||||
{
|
{
|
||||||
if( left->weight() == right->weight() ) // TODO dispatch in parallel
|
Resolver* newResolver = 0;
|
||||||
return left;
|
|
||||||
else
|
foreach ( Resolver* r, m_resolvers )
|
||||||
return left->weight() > right->weight();
|
{
|
||||||
|
if ( query->resolvedBy().contains( r ) )
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if ( !newResolver )
|
||||||
|
{
|
||||||
|
newResolver = r;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( r->weight() > newResolver->weight() )
|
||||||
|
newResolver = r;
|
||||||
|
}
|
||||||
|
|
||||||
|
return newResolver;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -51,10 +51,7 @@ public:
|
|||||||
|
|
||||||
void reportResults( QID qid, const QList< result_ptr >& results );
|
void reportResults( QID qid, const QList< result_ptr >& results );
|
||||||
|
|
||||||
/// sorter to rank resolver priority
|
void addResolver( Resolver* r );
|
||||||
static bool resolverSorter( const Resolver* left, const Resolver* right );
|
|
||||||
|
|
||||||
void addResolver( Resolver* r, bool sort = true );
|
|
||||||
void removeResolver( Resolver* r );
|
void removeResolver( Resolver* r );
|
||||||
|
|
||||||
query_ptr query( const QID& qid ) const
|
query_ptr query( const QID& qid ) const
|
||||||
@@ -89,6 +86,8 @@ private slots:
|
|||||||
void shuntNext();
|
void shuntNext();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Tomahawk::Resolver* nextResolver( const Tomahawk::query_ptr& query ) const;
|
||||||
|
|
||||||
void setQIDState( const Tomahawk::query_ptr& query, int state );
|
void setQIDState( const Tomahawk::query_ptr& query, int state );
|
||||||
int incQIDState( const Tomahawk::query_ptr& query );
|
int incQIDState( const Tomahawk::query_ptr& query );
|
||||||
int decQIDState( const Tomahawk::query_ptr& query );
|
int decQIDState( const Tomahawk::query_ptr& query );
|
||||||
|
Reference in New Issue
Block a user