diff --git a/src/libtomahawk/pipeline.cpp b/src/libtomahawk/pipeline.cpp index cbd0d47ed..d4a8675bb 100644 --- a/src/libtomahawk/pipeline.cpp +++ b/src/libtomahawk/pipeline.cpp @@ -25,7 +25,7 @@ #include "functimeout.h" #include "database/database.h" -#define CONCURRENT_QUERIES 8 +#define CONCURRENT_QUERIES 4 using namespace Tomahawk; @@ -160,30 +160,31 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results ) Q_ASSERT( false ); return; } - - if ( !m_qidsState.contains( qid ) ) - { - qDebug() << "reportResults called for unknown QID-state" << qid; - Q_ASSERT( false ); - return; - } } const query_ptr& q = m_qids.value( qid ); if ( !results.isEmpty() ) { //qDebug() << Q_FUNC_INFO << qid; - //qDebug() << "solved query:" << (qlonglong)q.data() << q->toString(); q->addResults( results ); - foreach( const result_ptr& r, q->results() ) { m_rids.insert( r->id(), r ); } if ( q->solved() ) + { + qDebug() << "FINISHED RESOLVING EARLY" << q->toString(); q->onResolvingFinished(); + + setQIDState( q, 0 ); + if ( m_qidsTimeout.contains( q->id() ) ) + m_qidsTimeout.remove( q->id() ); + + shuntNext(); + return; + } } if ( decQIDState( q ) == 0 ) @@ -193,6 +194,10 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results ) shuntNext(); } + else + { + new FuncTimeout( 500, boost::bind( &Pipeline::timeoutShunt, this, q ), this ); + } } @@ -226,11 +231,22 @@ Pipeline::shuntNext() q->setLastPipelineWeight( 101 ); } - if ( !q.isNull() ) + setQIDState( q, m_resolvers.count() ); + new FuncTimeout( 500, boost::bind( &Pipeline::shunt, this, q ), this ); +} + + +void +Pipeline::timeoutShunt( const query_ptr& q ) +{ + // are we still waiting for a timeout? + if ( m_qidsTimeout.contains( q->id() ) ) { - incQIDState( q ); - shunt( q ); // bump into next stage of pipeline (highest weights are 100) + m_qidsTimeout.remove( q->id() ); + shunt( q ); } + else + qDebug() << Q_FUNC_INFO << q->toString() << "Ignoring timeout"; } @@ -241,7 +257,6 @@ Pipeline::shunt( const query_ptr& q ) unsigned int lastweight = 0; unsigned int lasttimeout = 0; int thisResolver = 0; - bool dispatched = false; if ( !q->resolvingFinished() ) { @@ -264,13 +279,7 @@ Pipeline::shunt( const query_ptr& q ) if ( r->timeout() < lasttimeout ) lasttimeout = r->timeout(); - // resolvers aren't allowed to block in this call: - if ( dispatched ) - incQIDState( q ); - - dispatched = true; qDebug() << "Dispatching to resolver" << r->name() << q->toString(); - thisResolver = i; r->resolve( q ); } @@ -286,18 +295,17 @@ Pipeline::shunt( const query_ptr& q ) if ( thisResolver < m_resolvers.count() ) { qDebug() << "Shunting in" << lasttimeout << "ms, q:" << q->toString(); - incQIDState( q ); - new FuncTimeout( lasttimeout, boost::bind( &Pipeline::shunt, this, q ), this ); + + m_qidsTimeout.insert( q->id(), true ); + new FuncTimeout( lasttimeout, boost::bind( &Pipeline::timeoutShunt, this, q ), this ); } } else { // reached end of pipeline qDebug() << "Reached end of pipeline for:" << q->toString(); - - decQIDState( q ); - if ( !q->solved() ) - q->onResolvingFinished(); + setQIDState( q, 0 ); + return; } shuntNext(); @@ -314,6 +322,24 @@ Pipeline::resolverSorter( const Resolver* left, const Resolver* right ) } +void +Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state ) +{ + QMutexLocker lock( &m_mut ); + + if ( state ) + { + qDebug() << Q_FUNC_INFO << "inserting to qidsstate:" << query->id() << state; + m_qidsState.insert( query->id(), state ); + } + else + { + qDebug() << Q_FUNC_INFO << "removing" << query->id() << state; + m_qidsState.remove( query->id() ); + } +} + + int Pipeline::incQIDState( const Tomahawk::query_ptr& query ) { @@ -337,6 +363,9 @@ Pipeline::decQIDState( const Tomahawk::query_ptr& query ) { QMutexLocker lock( &m_mut ); + if ( !m_qidsState.contains( query->id() ) ) + return 0; + int state = m_qidsState.value( query->id() ) - 1; if ( state ) { diff --git a/src/libtomahawk/pipeline.h b/src/libtomahawk/pipeline.h index 2c31bc28b..7f845f2e9 100644 --- a/src/libtomahawk/pipeline.h +++ b/src/libtomahawk/pipeline.h @@ -1,5 +1,5 @@ /* === This file is part of Tomahawk Player - === - * + * * Copyright 2010-2011, Christian Muehlhaeuser * * Tomahawk is free software: you can redistribute it and/or modify @@ -73,17 +73,20 @@ signals: void idle(); private slots: + void timeoutShunt( const query_ptr& q ); void shunt( const query_ptr& q ); void shuntNext(); void indexReady(); private: + void setQIDState( const Tomahawk::query_ptr& query, int state ); int incQIDState( const Tomahawk::query_ptr& query ); int decQIDState( const Tomahawk::query_ptr& query ); QList< Resolver* > m_resolvers; + QMap< QID, bool > m_qidsTimeout; QMap< QID, unsigned int > m_qidsState; QMap< QID, query_ptr > m_qids; QMap< RID, result_ptr > m_rids;