1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-08-05 21:57:41 +02:00

Tie timeouts to resolvers, makes pipeline more predictable, stable and deterministic

This commit is contained in:
Anton Romanov
2015-05-14 17:29:03 -07:00
parent 7628273d45
commit 6250a19a1f
9 changed files with 52 additions and 65 deletions

View File

@@ -36,6 +36,7 @@
#define MAX_CONCURRENT_QUERIES 16 #define MAX_CONCURRENT_QUERIES 16
#define CLEANUP_TIMEOUT 5 * 60 * 1000 #define CLEANUP_TIMEOUT 5 * 60 * 1000
#define MINSCORE 0.5 #define MINSCORE 0.5
#define DEFAULT_RESOLVER_TIMEOUT 5000 //5 seconds
using namespace Tomahawk; using namespace Tomahawk;
@@ -99,7 +100,7 @@ unsigned int
Pipeline::activeQueryCount() const Pipeline::activeQueryCount() const
{ {
Q_D( const Pipeline ); Q_D( const Pipeline );
return d->qidsState.count(); return d->qidsState.uniqueKeys().count();
} }
@@ -323,7 +324,7 @@ Pipeline::resolve( QID qid, bool prioritized, bool temporaryQuery )
void void
Pipeline::reportResults( QID qid, const QList< result_ptr >& results ) Pipeline::reportResults( QID qid, Tomahawk::Resolver* r, const QList< result_ptr >& results )
{ {
Q_D( Pipeline ); Q_D( Pipeline );
if ( !d->running ) if ( !d->running )
@@ -366,17 +367,17 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
addResultsToQuery( q, cleanResults ); addResultsToQuery( q, cleanResults );
if ( !httpResults.isEmpty() ) if ( !httpResults.isEmpty() )
{ {
const ResultUrlChecker* checker = new ResultUrlChecker( q, httpResults ); const ResultUrlChecker* checker = new ResultUrlChecker( q, r, httpResults );
connect( checker, SIGNAL( done() ), SLOT( onResultUrlCheckerDone() ) ); connect( checker, SIGNAL( done() ), SLOT( onResultUrlCheckerDone() ) );
} }
else else
{ {
decQIDState( q ); decQIDState( q, r );
} }
/* if ( q->solved() && !q->isFullTextQuery() ) /* if ( q->solved() && !q->isFullTextQuery() )
{ {
setQIDState( q, 0 ); checkQIDState( q, 0 );
return; return;
}*/ }*/
} }
@@ -413,7 +414,7 @@ Pipeline::addResultsToQuery( const query_ptr& query, const QList< result_ptr >&
void void
Pipeline::onResultUrlCheckerDone() Pipeline::onResultUrlCheckerDone( )
{ {
ResultUrlChecker* checker = qobject_cast< ResultUrlChecker* >( sender() ); ResultUrlChecker* checker = qobject_cast< ResultUrlChecker* >( sender() );
if ( !checker ) if ( !checker )
@@ -425,11 +426,11 @@ Pipeline::onResultUrlCheckerDone()
addResultsToQuery( q, checker->validResults() ); addResultsToQuery( q, checker->validResults() );
/* if ( q && !q->isFullTextQuery() ) /* if ( q && !q->isFullTextQuery() )
{ {
setQIDState( q, 0 ); checkQIDState( q, 0 );
return; return;
}*/ }*/
decQIDState( q ); decQIDState( q, reinterpret_cast<Tomahawk::Resolver*>( checker->resolver() ) );
} }
@@ -525,26 +526,22 @@ Pipeline::shuntNext()
q->setCurrentResolver( 0 ); q->setCurrentResolver( 0 );
} }
setQIDState( q, rc ); foreach ( Resolver* r, d->resolvers )
{
incQIDState( q, r );
}
checkQIDState( q );
} }
void void
Pipeline::timeoutShunt( const query_ptr& q ) Pipeline::timeoutShunt( const query_ptr& q, Tomahawk::Resolver* r )
{ {
Q_D( Pipeline ); Q_D( Pipeline );
if ( !d->running ) if ( !d->running )
return; return;
// are we still waiting for a timeout? decQIDState( q, r );
if ( d->qidsTimeout.contains( q->id() ) )
{
if ( --d->qidsTimeout[q->id()] == 0 )
{
d->qidsTimeout.remove( q->id() );
setQIDState( q, 0 );
}
}
} }
@@ -567,16 +564,15 @@ Pipeline::shunt( const query_ptr& q )
r->resolve( q ); r->resolve( q );
emit resolving( q ); emit resolving( q );
if ( r->timeout() > 0 ) auto timeout = r->timeout();
{ if ( timeout == 0 )
d->qidsTimeout[q->id()]++; timeout = DEFAULT_RESOLVER_TIMEOUT;
new FuncTimeout( r->timeout(), std::bind( &Pipeline::timeoutShunt, this, q ), this ); new FuncTimeout( r->timeout(), std::bind( &Pipeline::timeoutShunt, this, q, r ), this );
}
} }
else else
{ {
// we get here if we disable a resolver while a query is resolving // we get here if we disable a resolver while a query is resolving
setQIDState( q, 0 ); decQIDState(q, r);
return; return;
} }
@@ -610,15 +606,15 @@ Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const
void void
Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state ) Pipeline::checkQIDState( const Tomahawk::query_ptr& query )
{ {
Q_D( Pipeline ); Q_D( Pipeline );
QMutexLocker lock( &d->mut ); QMutexLocker lock( &d->mut );
if ( state > 0 ) tDebug() << Q_FUNC_INFO << " " << query->id() << " " << d->qidsState.count( query->id() );
{
d->qidsState.insert( query->id(), state );
if ( d->qidsState.contains( query->id() ) )
{
new FuncTimeout( 0, std::bind( &Pipeline::shunt, this, query ), this ); new FuncTimeout( 0, std::bind( &Pipeline::shunt, this, query ), this );
} }
else else
@@ -634,39 +630,27 @@ Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state )
} }
int void
Pipeline::incQIDState( const Tomahawk::query_ptr& query ) Pipeline::incQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* r )
{ {
Q_D( Pipeline ); Q_D( Pipeline );
QMutexLocker lock( &d->mut ); QMutexLocker lock( &d->mut );
int state = 1; d->qidsState.insert( query->id(), r );
if ( d->qidsState.contains( query->id() ) )
{
state = d->qidsState.value( query->id() ) + 1;
}
d->qidsState.insert( query->id(), state );
return state;
} }
int void
Pipeline::decQIDState( const Tomahawk::query_ptr& query ) Pipeline::decQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* r )
{ {
Q_D( Pipeline ); Q_D( Pipeline );
int state = 0;
{
QMutexLocker lock( &d->mut );
if ( !d->qidsState.contains( query->id() ) ) if ( r )
return 0; d->qidsState.remove( query->id(), r );//Removes all matching pairs
else
d->qidsState.remove( query->id() );//Will clear
state = d->qidsState.value( query->id() ) - 1; checkQIDState( query );
}
setQIDState( query, state );
return state;
} }

View File

@@ -54,7 +54,7 @@ public:
unsigned int pendingQueryCount() const; unsigned int pendingQueryCount() const;
unsigned int activeQueryCount() const; unsigned int activeQueryCount() const;
void reportResults( QID qid, const QList< result_ptr >& results ); void reportResults( QID qid, Tomahawk::Resolver* r, const QList< result_ptr >& results );
void reportAlbums( QID qid, const QList< album_ptr >& albums ); void reportAlbums( QID qid, const QList< album_ptr >& albums );
void reportArtists( QID qid, const QList< artist_ptr >& artists ); void reportArtists( QID qid, const QList< artist_ptr >& artists );
@@ -94,12 +94,12 @@ protected:
QScopedPointer<PipelinePrivate> d_ptr; QScopedPointer<PipelinePrivate> d_ptr;
private slots: private slots:
void timeoutShunt( const query_ptr& q ); void timeoutShunt( const query_ptr& q, Tomahawk::Resolver* r );
void shunt( const query_ptr& q ); void shunt( const query_ptr& q );
void shuntNext(); void shuntNext();
void onTemporaryQueryTimer(); void onTemporaryQueryTimer();
void onResultUrlCheckerDone(); void onResultUrlCheckerDone( );
private: private:
Q_DECLARE_PRIVATE( Pipeline ) Q_DECLARE_PRIVATE( Pipeline )
@@ -107,9 +107,9 @@ private:
void addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results ); void addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results );
Tomahawk::Resolver* nextResolver( const Tomahawk::query_ptr& query ) const; Tomahawk::Resolver* nextResolver( const Tomahawk::query_ptr& query ) const;
void setQIDState( const Tomahawk::query_ptr& query, int state ); void checkQIDState( const Tomahawk::query_ptr& query );
int incQIDState( const Tomahawk::query_ptr& query ); void incQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* );
int decQIDState( const Tomahawk::query_ptr& query ); void decQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* );
}; };
} // Tomahawk } // Tomahawk

View File

@@ -45,8 +45,7 @@ private:
QList< Resolver* > resolvers; QList< Resolver* > resolvers;
QList< QPointer<Tomahawk::ExternalResolver> > scriptResolvers; QList< QPointer<Tomahawk::ExternalResolver> > scriptResolvers;
QList< ResolverFactoryFunc > resolverFactories; QList< ResolverFactoryFunc > resolverFactories;
QMap< QID, unsigned int > qidsTimeout; QMultiMap< QID, Tomahawk::Resolver* > qidsState;
QMap< QID, unsigned int > qidsState;
QMap< QID, query_ptr > qids; QMap< QID, query_ptr > qids;
QMap< RID, result_ptr > rids; QMap< RID, result_ptr > rids;

View File

@@ -644,7 +644,7 @@ Tomahawk::DatabaseImpl::resultFromHint( const Tomahawk::query_ptr& origquery )
const QUrl u = QUrl::fromUserInput( url ); const QUrl u = QUrl::fromUserInput( url );
res->setFriendlySource( u.host() ); res->setFriendlySource( u.host() );
ResultUrlChecker* checker = new ResultUrlChecker( origquery, QList< result_ptr >() << res ); ResultUrlChecker* checker = new ResultUrlChecker( origquery, nullptr, QList< result_ptr >() << res );
QEventLoop loop; QEventLoop loop;
connect( checker, SIGNAL( done() ), &loop, SLOT( quit() ) ); connect( checker, SIGNAL( done() ), &loop, SLOT( quit() ) );
loop.exec(); loop.exec();

View File

@@ -59,7 +59,7 @@ DatabaseResolver::gotResults( const Tomahawk::QID qid, QList< Tomahawk::result_p
foreach ( const Tomahawk::result_ptr& r, results ) foreach ( const Tomahawk::result_ptr& r, results )
r->setResolvedByResolver( this ); r->setResolvedByResolver( this );
Tomahawk::Pipeline::instance()->reportResults( qid, results ); Tomahawk::Pipeline::instance()->reportResults( qid, this, results );
} }

View File

@@ -155,7 +155,7 @@ JSResolverHelper::addTrackResults( const QVariantMap& results )
QString qid = results.value("qid").toString(); QString qid = results.value("qid").toString();
Tomahawk::Pipeline::instance()->reportResults( qid, tracks ); Tomahawk::Pipeline::instance()->reportResults( qid, m_resolver, tracks );
} }

View File

@@ -331,7 +331,7 @@ ScriptResolver::handleMsg( const QByteArray& msg )
results << rp; results << rp;
} }
Tomahawk::Pipeline::instance()->reportResults( qid, results ); Tomahawk::Pipeline::instance()->reportResults( qid, this, results );
} }
else else
{ {

View File

@@ -32,9 +32,11 @@
using namespace Tomahawk; using namespace Tomahawk;
ResultUrlChecker::ResultUrlChecker( const query_ptr& query, const QList< result_ptr >& results ) ResultUrlChecker::ResultUrlChecker( const query_ptr& query, void* r,
const QList< result_ptr >& results )
: QObject( 0 ) : QObject( 0 )
, m_query( query ) , m_query( query )
, m_resolver( r )
, m_results( results ) , m_results( results )
{ {
check(); check();

View File

@@ -33,10 +33,11 @@ class ResultUrlChecker : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
ResultUrlChecker( const query_ptr& query, const QList< result_ptr >& results ); ResultUrlChecker( const query_ptr& query, void* r, const QList< result_ptr >& results );
virtual ~ResultUrlChecker(); virtual ~ResultUrlChecker();
query_ptr query() const { return m_query; } query_ptr query() const { return m_query; }
void* resolver() const { return m_resolver; }
QList< result_ptr > results() const { return m_results; } QList< result_ptr > results() const { return m_results; }
QList< result_ptr > validResults() const { return m_validResults; } QList< result_ptr > validResults() const { return m_validResults; }
@@ -49,6 +50,7 @@ private slots:
private: private:
query_ptr m_query; query_ptr m_query;
void* m_resolver;
QList< result_ptr > m_results; QList< result_ptr > m_results;
QList< result_ptr > m_validResults; QList< result_ptr > m_validResults;
QHash< NetworkReply*, Tomahawk::result_ptr > m_replies; QHash< NetworkReply*, Tomahawk::result_ptr > m_replies;