mirror of
https://github.com/tomahawk-player/tomahawk.git
synced 2025-04-05 08:32:42 +02:00
Merge pull request #301 from theli-ua/pipeline
Tie timeouts to resolvers, makes pipeline more predictable, stable and deterministic
This commit is contained in:
commit
37436089a9
@ -36,6 +36,7 @@
|
||||
#define MAX_CONCURRENT_QUERIES 16
|
||||
#define CLEANUP_TIMEOUT 5 * 60 * 1000
|
||||
#define MINSCORE 0.5
|
||||
#define DEFAULT_RESOLVER_TIMEOUT 5000 //5 seconds
|
||||
|
||||
using namespace Tomahawk;
|
||||
|
||||
@ -99,7 +100,7 @@ unsigned int
|
||||
Pipeline::activeQueryCount() const
|
||||
{
|
||||
Q_D( const Pipeline );
|
||||
return d->qidsState.count();
|
||||
return d->qidsState.uniqueKeys().count();
|
||||
}
|
||||
|
||||
|
||||
@ -323,7 +324,7 @@ Pipeline::resolve( QID qid, bool prioritized, bool temporaryQuery )
|
||||
|
||||
|
||||
void
|
||||
Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
|
||||
Pipeline::reportResults( QID qid, Tomahawk::Resolver* r, const QList< result_ptr >& results )
|
||||
{
|
||||
Q_D( Pipeline );
|
||||
if ( !d->running )
|
||||
@ -366,17 +367,17 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
|
||||
addResultsToQuery( q, cleanResults );
|
||||
if ( !httpResults.isEmpty() )
|
||||
{
|
||||
const ResultUrlChecker* checker = new ResultUrlChecker( q, httpResults );
|
||||
const ResultUrlChecker* checker = new ResultUrlChecker( q, r, httpResults );
|
||||
connect( checker, SIGNAL( done() ), SLOT( onResultUrlCheckerDone() ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
decQIDState( q );
|
||||
decQIDState( q, r );
|
||||
}
|
||||
|
||||
/* if ( q->solved() && !q->isFullTextQuery() )
|
||||
{
|
||||
setQIDState( q, 0 );
|
||||
checkQIDState( q, 0 );
|
||||
return;
|
||||
}*/
|
||||
}
|
||||
@ -413,7 +414,7 @@ Pipeline::addResultsToQuery( const query_ptr& query, const QList< result_ptr >&
|
||||
|
||||
|
||||
void
|
||||
Pipeline::onResultUrlCheckerDone()
|
||||
Pipeline::onResultUrlCheckerDone( )
|
||||
{
|
||||
ResultUrlChecker* checker = qobject_cast< ResultUrlChecker* >( sender() );
|
||||
if ( !checker )
|
||||
@ -425,11 +426,11 @@ Pipeline::onResultUrlCheckerDone()
|
||||
addResultsToQuery( q, checker->validResults() );
|
||||
/* if ( q && !q->isFullTextQuery() )
|
||||
{
|
||||
setQIDState( q, 0 );
|
||||
checkQIDState( q, 0 );
|
||||
return;
|
||||
}*/
|
||||
|
||||
decQIDState( q );
|
||||
decQIDState( q, reinterpret_cast<Tomahawk::Resolver*>( checker->userData() ) );
|
||||
}
|
||||
|
||||
|
||||
@ -525,26 +526,22 @@ Pipeline::shuntNext()
|
||||
q->setCurrentResolver( 0 );
|
||||
}
|
||||
|
||||
setQIDState( q, rc );
|
||||
foreach ( Resolver* r, d->resolvers )
|
||||
{
|
||||
incQIDState( q, r );
|
||||
}
|
||||
checkQIDState( q );
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Pipeline::timeoutShunt( const query_ptr& q )
|
||||
Pipeline::timeoutShunt( const query_ptr& q, Tomahawk::Resolver* r )
|
||||
{
|
||||
Q_D( Pipeline );
|
||||
if ( !d->running )
|
||||
return;
|
||||
|
||||
// are we still waiting for a timeout?
|
||||
if ( d->qidsTimeout.contains( q->id() ) )
|
||||
{
|
||||
if ( --d->qidsTimeout[q->id()] == 0 )
|
||||
{
|
||||
d->qidsTimeout.remove( q->id() );
|
||||
setQIDState( q, 0 );
|
||||
}
|
||||
}
|
||||
decQIDState( q, r );
|
||||
}
|
||||
|
||||
|
||||
@ -567,16 +564,16 @@ Pipeline::shunt( const query_ptr& q )
|
||||
r->resolve( q );
|
||||
emit resolving( q );
|
||||
|
||||
if ( r->timeout() > 0 )
|
||||
{
|
||||
d->qidsTimeout[q->id()]++;
|
||||
new FuncTimeout( r->timeout(), std::bind( &Pipeline::timeoutShunt, this, q ), this );
|
||||
}
|
||||
auto timeout = r->timeout();
|
||||
if ( timeout == 0 )
|
||||
timeout = DEFAULT_RESOLVER_TIMEOUT;
|
||||
|
||||
new FuncTimeout( timeout, std::bind( &Pipeline::timeoutShunt, this, q, r ), this );
|
||||
}
|
||||
else
|
||||
{
|
||||
// we get here if we disable a resolver while a query is resolving
|
||||
setQIDState( q, 0 );
|
||||
// OR we are just out of resolvers while query is still resolving
|
||||
return;
|
||||
}
|
||||
|
||||
@ -610,15 +607,15 @@ Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const
|
||||
|
||||
|
||||
void
|
||||
Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state )
|
||||
Pipeline::checkQIDState( const Tomahawk::query_ptr& query )
|
||||
{
|
||||
Q_D( Pipeline );
|
||||
QMutexLocker lock( &d->mut );
|
||||
|
||||
if ( state > 0 )
|
||||
{
|
||||
d->qidsState.insert( query->id(), state );
|
||||
tDebug() << Q_FUNC_INFO << " " << query->id() << " " << d->qidsState.count( query->id() );
|
||||
|
||||
if ( d->qidsState.contains( query->id() ) )
|
||||
{
|
||||
new FuncTimeout( 0, std::bind( &Pipeline::shunt, this, query ), this );
|
||||
}
|
||||
else
|
||||
@ -634,39 +631,27 @@ Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state )
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
Pipeline::incQIDState( const Tomahawk::query_ptr& query )
|
||||
void
|
||||
Pipeline::incQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* r )
|
||||
{
|
||||
Q_D( Pipeline );
|
||||
QMutexLocker lock( &d->mut );
|
||||
|
||||
int state = 1;
|
||||
if ( d->qidsState.contains( query->id() ) )
|
||||
{
|
||||
state = d->qidsState.value( query->id() ) + 1;
|
||||
}
|
||||
d->qidsState.insert( query->id(), state );
|
||||
|
||||
return state;
|
||||
d->qidsState.insert( query->id(), r );
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
Pipeline::decQIDState( const Tomahawk::query_ptr& query )
|
||||
void
|
||||
Pipeline::decQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* r )
|
||||
{
|
||||
Q_D( Pipeline );
|
||||
int state = 0;
|
||||
{
|
||||
QMutexLocker lock( &d->mut );
|
||||
|
||||
if ( !d->qidsState.contains( query->id() ) )
|
||||
return 0;
|
||||
if ( r )
|
||||
d->qidsState.remove( query->id(), r );//Removes all matching pairs
|
||||
else
|
||||
d->qidsState.remove( query->id() );//Will clear
|
||||
|
||||
state = d->qidsState.value( query->id() ) - 1;
|
||||
}
|
||||
|
||||
setQIDState( query, state );
|
||||
return state;
|
||||
checkQIDState( query );
|
||||
}
|
||||
|
||||
|
||||
|
@ -54,7 +54,7 @@ public:
|
||||
unsigned int pendingQueryCount() const;
|
||||
unsigned int activeQueryCount() const;
|
||||
|
||||
void reportResults( QID qid, const QList< result_ptr >& results );
|
||||
void reportResults( QID qid, Tomahawk::Resolver* r, const QList< result_ptr >& results );
|
||||
void reportAlbums( QID qid, const QList< album_ptr >& albums );
|
||||
void reportArtists( QID qid, const QList< artist_ptr >& artists );
|
||||
|
||||
@ -94,12 +94,12 @@ protected:
|
||||
QScopedPointer<PipelinePrivate> d_ptr;
|
||||
|
||||
private slots:
|
||||
void timeoutShunt( const query_ptr& q );
|
||||
void timeoutShunt( const query_ptr& q, Tomahawk::Resolver* r );
|
||||
void shunt( const query_ptr& q );
|
||||
void shuntNext();
|
||||
|
||||
void onTemporaryQueryTimer();
|
||||
void onResultUrlCheckerDone();
|
||||
void onResultUrlCheckerDone( );
|
||||
|
||||
private:
|
||||
Q_DECLARE_PRIVATE( Pipeline )
|
||||
@ -107,9 +107,9 @@ private:
|
||||
void addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results );
|
||||
Tomahawk::Resolver* nextResolver( const Tomahawk::query_ptr& query ) const;
|
||||
|
||||
void setQIDState( const Tomahawk::query_ptr& query, int state );
|
||||
int incQIDState( const Tomahawk::query_ptr& query );
|
||||
int decQIDState( const Tomahawk::query_ptr& query );
|
||||
void checkQIDState( const Tomahawk::query_ptr& query );
|
||||
void incQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* );
|
||||
void decQIDState( const Tomahawk::query_ptr& query, Tomahawk::Resolver* );
|
||||
};
|
||||
|
||||
} // Tomahawk
|
||||
|
@ -45,8 +45,7 @@ private:
|
||||
QList< Resolver* > resolvers;
|
||||
QList< QPointer<Tomahawk::ExternalResolver> > scriptResolvers;
|
||||
QList< ResolverFactoryFunc > resolverFactories;
|
||||
QMap< QID, unsigned int > qidsTimeout;
|
||||
QMap< QID, unsigned int > qidsState;
|
||||
QMultiMap< QID, Tomahawk::Resolver* > qidsState;
|
||||
QMap< QID, query_ptr > qids;
|
||||
QMap< RID, result_ptr > rids;
|
||||
|
||||
|
@ -644,7 +644,7 @@ Tomahawk::DatabaseImpl::resultFromHint( const Tomahawk::query_ptr& origquery )
|
||||
const QUrl u = QUrl::fromUserInput( url );
|
||||
res->setFriendlySource( u.host() );
|
||||
|
||||
ResultUrlChecker* checker = new ResultUrlChecker( origquery, QList< result_ptr >() << res );
|
||||
ResultUrlChecker* checker = new ResultUrlChecker( origquery, nullptr, QList< result_ptr >() << res );
|
||||
QEventLoop loop;
|
||||
connect( checker, SIGNAL( done() ), &loop, SLOT( quit() ) );
|
||||
loop.exec();
|
||||
|
@ -59,7 +59,7 @@ DatabaseResolver::gotResults( const Tomahawk::QID qid, QList< Tomahawk::result_p
|
||||
foreach ( const Tomahawk::result_ptr& r, results )
|
||||
r->setResolvedByResolver( this );
|
||||
|
||||
Tomahawk::Pipeline::instance()->reportResults( qid, results );
|
||||
Tomahawk::Pipeline::instance()->reportResults( qid, this, results );
|
||||
}
|
||||
|
||||
|
||||
|
@ -155,7 +155,7 @@ JSResolverHelper::addTrackResults( const QVariantMap& results )
|
||||
|
||||
QString qid = results.value("qid").toString();
|
||||
|
||||
Tomahawk::Pipeline::instance()->reportResults( qid, tracks );
|
||||
Tomahawk::Pipeline::instance()->reportResults( qid, m_resolver, tracks );
|
||||
}
|
||||
|
||||
|
||||
|
@ -331,7 +331,7 @@ ScriptResolver::handleMsg( const QByteArray& msg )
|
||||
results << rp;
|
||||
}
|
||||
|
||||
Tomahawk::Pipeline::instance()->reportResults( qid, results );
|
||||
Tomahawk::Pipeline::instance()->reportResults( qid, this, results );
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -32,9 +32,11 @@
|
||||
using namespace Tomahawk;
|
||||
|
||||
|
||||
ResultUrlChecker::ResultUrlChecker( const query_ptr& query, const QList< result_ptr >& results )
|
||||
ResultUrlChecker::ResultUrlChecker( const query_ptr& query, QObject* userData,
|
||||
const QList< result_ptr >& results )
|
||||
: QObject( 0 )
|
||||
, m_query( query )
|
||||
, m_userData( userData )
|
||||
, m_results( results )
|
||||
{
|
||||
check();
|
||||
|
@ -33,10 +33,11 @@ class ResultUrlChecker : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
ResultUrlChecker( const query_ptr& query, const QList< result_ptr >& results );
|
||||
ResultUrlChecker( const query_ptr& query, QObject* userData, const QList< result_ptr >& results );
|
||||
virtual ~ResultUrlChecker();
|
||||
|
||||
query_ptr query() const { return m_query; }
|
||||
QObject* userData() const { return m_userData; }
|
||||
QList< result_ptr > results() const { return m_results; }
|
||||
QList< result_ptr > validResults() const { return m_validResults; }
|
||||
|
||||
@ -49,6 +50,7 @@ private slots:
|
||||
|
||||
private:
|
||||
query_ptr m_query;
|
||||
QObject* m_userData;
|
||||
QList< result_ptr > m_results;
|
||||
QList< result_ptr > m_validResults;
|
||||
QHash< NetworkReply*, Tomahawk::result_ptr > m_replies;
|
||||
|
Loading…
x
Reference in New Issue
Block a user