diff --git a/src/libtomahawk/Pipeline.cpp b/src/libtomahawk/Pipeline.cpp index 7a8b5f66d..b965700ea 100644 --- a/src/libtomahawk/Pipeline.cpp +++ b/src/libtomahawk/Pipeline.cpp @@ -16,7 +16,7 @@ * along with Tomahawk. If not, see . */ -#include "Pipeline.h" +#include "Pipeline_p.h" #include @@ -41,27 +41,28 @@ using namespace Tomahawk; -Pipeline* Pipeline::s_instance = 0; +Pipeline* PipelinePrivate::s_instance = 0; Pipeline* Pipeline::instance() { - return s_instance; + return PipelinePrivate::s_instance; } Pipeline::Pipeline( QObject* parent ) : QObject( parent ) - , m_running( false ) + , d_ptr( new PipelinePrivate( this ) ) { - s_instance = this; + Q_D( Pipeline ); + PipelinePrivate::s_instance = this; - m_maxConcurrentQueries = qBound( DEFAULT_CONCURRENT_QUERIES, QThread::idealThreadCount(), MAX_CONCURRENT_QUERIES ); - tDebug() << Q_FUNC_INFO << "Using" << m_maxConcurrentQueries << "threads"; + d->maxConcurrentQueries = qBound( DEFAULT_CONCURRENT_QUERIES, QThread::idealThreadCount(), MAX_CONCURRENT_QUERIES ); + tDebug() << Q_FUNC_INFO << "Using" << d->maxConcurrentQueries << "threads"; - m_temporaryQueryTimer.setInterval( CLEANUP_TIMEOUT ); - connect( &m_temporaryQueryTimer, SIGNAL( timeout() ), SLOT( onTemporaryQueryTimer() ) ); + d->temporaryQueryTimer.setInterval( CLEANUP_TIMEOUT ); + connect( &d->temporaryQueryTimer, SIGNAL( timeout() ), SLOT( onTemporaryQueryTimer() ) ); connect( this, SIGNAL( resolverAdded( Tomahawk::Resolver* ) ), SourceList::instance(), SLOT( onResolverAdded( Tomahawk::Resolver* ) ) ); @@ -72,15 +73,40 @@ Pipeline::Pipeline( QObject* parent ) Pipeline::~Pipeline() { + Q_D( Pipeline ); tDebug() << Q_FUNC_INFO; - m_running = false; + d->running = false; // stop script resolvers - foreach ( QPointer< ExternalResolver > r, m_scriptResolvers ) + foreach ( QPointer< ExternalResolver > r, d->scriptResolvers ) if ( !r.isNull() ) r.data()->deleteLater(); - m_scriptResolvers.clear(); + d->scriptResolvers.clear(); +} + + +bool +Pipeline::isRunning() const +{ + Q_D( const Pipeline ); + return d->running; +} + + +unsigned int +Pipeline::pendingQueryCount() const +{ + Q_D( const Pipeline ); + return d->queries_pending.count(); +} + + +unsigned int +Pipeline::activeQueryCount() const +{ + Q_D( const Pipeline ); + return d->qidsState.count(); } @@ -95,8 +121,10 @@ Pipeline::databaseReady() void Pipeline::start() { - tDebug() << Q_FUNC_INFO << "Shunting" << m_queries_pending.size() << "queries!"; - m_running = true; + Q_D( Pipeline ); + + tDebug() << Q_FUNC_INFO << "Shunting" << d->queries_pending.size() << "queries!"; + d->running = true; emit running(); shuntNext(); @@ -106,17 +134,20 @@ Pipeline::start() void Pipeline::stop() { - m_running = false; + Q_D( Pipeline ); + + d->running = false; } void Pipeline::removeResolver( Resolver* r ) { - QMutexLocker lock( &m_mut ); + Q_D( Pipeline ); + QMutexLocker lock( &d->mut ); tDebug() << "Removed resolver:" << r->name(); - m_resolvers.removeAll( r ); + d->resolvers.removeAll( r ); emit resolverRemoved( r ); } @@ -124,10 +155,11 @@ Pipeline::removeResolver( Resolver* r ) void Pipeline::addResolver( Resolver* r ) { - QMutexLocker lock( &m_mut ); + Q_D( Pipeline ); + QMutexLocker lock( &d->mut ); tDebug() << "Adding resolver" << r->name(); - m_resolvers.append( r ); + d->resolvers.append( r ); emit resolverAdded( r ); } @@ -135,22 +167,24 @@ Pipeline::addResolver( Resolver* r ) void Pipeline::addExternalResolverFactory( ResolverFactoryFunc resolverFactory ) { - m_resolverFactories << resolverFactory; + Q_D( Pipeline ); + d->resolverFactories << resolverFactory; } Tomahawk::ExternalResolver* Pipeline::addScriptResolver( const QString& path, const QStringList& additionalScriptPaths ) { + Q_D( Pipeline ); ExternalResolver* res = 0; - foreach ( ResolverFactoryFunc factory, m_resolverFactories ) + foreach ( ResolverFactoryFunc factory, d->resolverFactories ) { res = factory( path, additionalScriptPaths ); if ( !res ) continue; - m_scriptResolvers << QPointer< ExternalResolver > ( res ); + d->scriptResolvers << QPointer< ExternalResolver > ( res ); break; } @@ -162,7 +196,8 @@ Pipeline::addScriptResolver( const QString& path, const QStringList& additionalS void Pipeline::stopScriptResolver( const QString& path ) { - foreach ( QPointer< ExternalResolver > res, m_scriptResolvers ) + Q_D( Pipeline ); + foreach ( QPointer< ExternalResolver > res, d->scriptResolvers ) { if ( res.data()->filePath() == path ) res.data()->stop(); @@ -173,8 +208,9 @@ Pipeline::stopScriptResolver( const QString& path ) void Pipeline::removeScriptResolver( const QString& scriptPath ) { + Q_D( Pipeline ); QPointer< ExternalResolver > r; - foreach ( QPointer< ExternalResolver > res, m_scriptResolvers ) + foreach ( QPointer< ExternalResolver > res, d->scriptResolvers ) { if ( res.isNull() ) continue; @@ -182,7 +218,7 @@ Pipeline::removeScriptResolver( const QString& scriptPath ) if ( res.data()->filePath() == scriptPath ) r = res; } - m_scriptResolvers.removeAll( r ); + d->scriptResolvers.removeAll( r ); if ( !r.isNull() ) { @@ -192,10 +228,21 @@ Pipeline::removeScriptResolver( const QString& scriptPath ) } +QList > +Pipeline::scriptResolvers() const +{ + Q_D( const Pipeline ); + + return d->scriptResolvers; +} + + ExternalResolver* Pipeline::resolverForPath( const QString& scriptPath ) { - foreach ( QPointer< ExternalResolver > res, m_scriptResolvers ) + Q_D( Pipeline ); + + foreach ( QPointer< ExternalResolver > res, d->scriptResolvers ) { if ( res.data()->filePath() == scriptPath ) return res.data(); @@ -207,40 +254,42 @@ Pipeline::resolverForPath( const QString& scriptPath ) void Pipeline::resolve( const QList& qlist, bool prioritized, bool temporaryQuery ) { + Q_D( Pipeline ); + { - QMutexLocker lock( &m_mut ); + QMutexLocker lock( &d->mut ); int i = 0; foreach ( const query_ptr& q, qlist ) { if ( q->resolvingFinished() ) continue; - if ( m_qidsState.contains( q->id() ) ) + if ( d->qidsState.contains( q->id() ) ) continue; - if ( m_queries_pending.contains( q ) ) + if ( d->queries_pending.contains( q ) ) { if ( prioritized ) { - m_queries_pending.insert( i++, m_queries_pending.takeAt( m_queries_pending.indexOf( q ) ) ); + d->queries_pending.insert( i++, d->queries_pending.takeAt( d->queries_pending.indexOf( q ) ) ); } continue; } - if ( !m_qids.contains( q->id() ) ) - m_qids.insert( q->id(), q ); + if ( !d->qids.contains( q->id() ) ) + d->qids.insert( q->id(), q ); if ( prioritized ) - m_queries_pending.insert( i++, q ); + d->queries_pending.insert( i++, q ); else - m_queries_pending << q; + d->queries_pending << q; if ( temporaryQuery ) { - m_queries_temporary << q; + d->queries_temporary << q; - if ( m_temporaryQueryTimer.isActive() ) - m_temporaryQueryTimer.stop(); - m_temporaryQueryTimer.start(); + if ( d->temporaryQueryTimer.isActive() ) + d->temporaryQueryTimer.stop(); + d->temporaryQueryTimer.start(); } } } @@ -252,7 +301,9 @@ Pipeline::resolve( const QList& qlist, bool prioritized, bool tempora bool Pipeline::isResolving( const query_ptr& q ) const { - return m_qids.contains( q->id() ) && m_qidsState.contains( q->id() ); + Q_D( const Pipeline ); + + return d->qids.contains( q->id() ) && d->qidsState.contains( q->id() ); } @@ -278,14 +329,15 @@ Pipeline::resolve( QID qid, bool prioritized, bool temporaryQuery ) void Pipeline::reportResults( QID qid, const QList< result_ptr >& results ) { - if ( !m_running ) + Q_D( Pipeline ); + if ( !d->running ) return; - if ( !m_qids.contains( qid ) ) + if ( !d->qids.contains( qid ) ) { tDebug() << "Result arrived too late for:" << qid; return; } - const query_ptr& q = m_qids.value( qid ); + const query_ptr& q = d->qids.value( qid ); Q_ASSERT( !q.isNull() ); if ( q.isNull() ) @@ -302,7 +354,7 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results ) httpResults << r; else cleanResults << r; - } + }const ResultUrlChecker* checker = new ResultUrlChecker( q, httpResults ); connect( checker, SIGNAL( done() ), SLOT( onResultUrlCheckerDone() ) ); @@ -322,6 +374,7 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results ) void Pipeline::addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results ) { + Q_D( Pipeline ); // tDebug( LOGVERBOSE ) << Q_FUNC_INFO << query->toString() << results.count(); QList< result_ptr > cleanResults; @@ -338,11 +391,11 @@ Pipeline::addResultsToQuery( const query_ptr& query, const QList< result_ptr >& { query->addResults( cleanResults ); - if ( m_queries_temporary.contains( query ) ) + if ( d->queries_temporary.contains( query ) ) { foreach ( const result_ptr& r, cleanResults ) { - m_rids.insert( r->id(), r ); + d->rids.insert( r->id(), r ); } } } @@ -373,15 +426,16 @@ Pipeline::onResultUrlCheckerDone() void Pipeline::reportAlbums( QID qid, const QList< album_ptr >& albums ) { - if ( !m_running ) + Q_D( Pipeline ); + if ( !d->running ) return; - if ( !m_qids.contains( qid ) ) + if ( !d->qids.contains( qid ) ) { tDebug() << "Albums arrived too late for:" << qid; return; } - const query_ptr& q = m_qids.value( qid ); + const query_ptr& q = d->qids.value( qid ); Q_ASSERT( q->isFullTextQuery() ); QList< album_ptr > cleanAlbums; @@ -402,15 +456,16 @@ Pipeline::reportAlbums( QID qid, const QList< album_ptr >& albums ) void Pipeline::reportArtists( QID qid, const QList< artist_ptr >& artists ) { - if ( !m_running ) + Q_D( Pipeline ); + if ( !d->running ) return; - if ( !m_qids.contains( qid ) ) + if ( !d->qids.contains( qid ) ) { tDebug() << "Artists arrived too late for:" << qid; return; } - const query_ptr& q = m_qids.value( qid ); + const query_ptr& q = d->qids.value( qid ); Q_ASSERT( q->isFullTextQuery() ); QList< artist_ptr > cleanArtists; @@ -431,31 +486,32 @@ Pipeline::reportArtists( QID qid, const QList< artist_ptr >& artists ) void Pipeline::shuntNext() { - if ( !m_running ) + Q_D( Pipeline ); + if ( !d->running ) return; unsigned int rc; query_ptr q; { - QMutexLocker lock( &m_mut ); + QMutexLocker lock( &d->mut ); - rc = m_resolvers.count(); - if ( m_queries_pending.isEmpty() ) + rc = d->resolvers.count(); + if ( d->queries_pending.isEmpty() ) { - if ( m_qidsState.isEmpty() ) + if ( d->qidsState.isEmpty() ) emit idle(); return; } // Check if we are ready to dispatch more queries - if ( m_qidsState.count() >= m_maxConcurrentQueries ) + if ( d->qidsState.count() >= d->maxConcurrentQueries ) return; /* Since resolvers are async, we now dispatch to the highest weighted ones and after timeout, dispatch to next highest etc, aborting when solved */ - q = m_queries_pending.takeFirst(); + q = d->queries_pending.takeFirst(); q->setCurrentResolver( 0 ); } @@ -466,11 +522,12 @@ Pipeline::shuntNext() void Pipeline::timeoutShunt( const query_ptr& q ) { - if ( !m_running ) + Q_D( Pipeline ); + if ( !d->running ) return; // are we still waiting for a timeout? - if ( m_qidsTimeout.contains( q->id() ) ) + if ( d->qidsTimeout.contains( q->id() ) ) { decQIDState( q ); } @@ -480,7 +537,8 @@ Pipeline::timeoutShunt( const query_ptr& q ) void Pipeline::shunt( const query_ptr& q ) { - if ( !m_running ) + Q_D( Pipeline ); + if ( !d->running ) return; Resolver* r = 0; @@ -497,7 +555,7 @@ Pipeline::shunt( const query_ptr& q ) if ( r->timeout() > 0 ) { - m_qidsTimeout.insert( q->id(), true ); + d->qidsTimeout.insert( q->id(), true ); new FuncTimeout( r->timeout(), boost::bind( &Pipeline::timeoutShunt, this, q ), this ); } } @@ -515,9 +573,10 @@ Pipeline::shunt( const query_ptr& q ) Tomahawk::Resolver* Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const { + Q_D( const Pipeline ); Resolver* newResolver = 0; - foreach ( Resolver* r, m_resolvers ) + foreach ( Resolver* r, d->resolvers ) { if ( query->resolvedBy().contains( r ) ) continue; @@ -539,24 +598,25 @@ Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const void Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state ) { - QMutexLocker lock( &m_mut ); + Q_D( Pipeline ); + QMutexLocker lock( &d->mut ); - if ( m_qidsTimeout.contains( query->id() ) ) - m_qidsTimeout.remove( query->id() ); + if ( d->qidsTimeout.contains( query->id() ) ) + d->qidsTimeout.remove( query->id() ); if ( state > 0 ) { - m_qidsState.insert( query->id(), state ); + d->qidsState.insert( query->id(), state ); new FuncTimeout( 0, boost::bind( &Pipeline::shunt, this, query ), this ); } else { - m_qidsState.remove( query->id() ); + d->qidsState.remove( query->id() ); query->onResolvingFinished(); - if ( !m_queries_temporary.contains( query ) ) - m_qids.remove( query->id() ); + if ( !d->queries_temporary.contains( query ) ) + d->qids.remove( query->id() ); new FuncTimeout( 0, boost::bind( &Pipeline::shuntNext, this ), this ); } @@ -566,14 +626,15 @@ Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state ) int Pipeline::incQIDState( const Tomahawk::query_ptr& query ) { - QMutexLocker lock( &m_mut ); + Q_D( Pipeline ); + QMutexLocker lock( &d->mut ); int state = 1; - if ( m_qidsState.contains( query->id() ) ) + if ( d->qidsState.contains( query->id() ) ) { - state = m_qidsState.value( query->id() ) + 1; + state = d->qidsState.value( query->id() ) + 1; } - m_qidsState.insert( query->id(), state ); + d->qidsState.insert( query->id(), state ); return state; } @@ -582,14 +643,15 @@ Pipeline::incQIDState( const Tomahawk::query_ptr& query ) int Pipeline::decQIDState( const Tomahawk::query_ptr& query ) { + Q_D( Pipeline ); int state = 0; { - QMutexLocker lock( &m_mut ); + QMutexLocker lock( &d->mut ); - if ( !m_qidsState.contains( query->id() ) ) + if ( !d->qidsState.contains( query->id() ) ) return 0; - state = m_qidsState.value( query->id() ) - 1; + state = d->qidsState.value( query->id() ) - 1; } setQIDState( query, state ); @@ -600,18 +662,19 @@ Pipeline::decQIDState( const Tomahawk::query_ptr& query ) void Pipeline::onTemporaryQueryTimer() { + Q_D( Pipeline ); tDebug() << Q_FUNC_INFO; - QMutexLocker lock( &m_mut ); - m_temporaryQueryTimer.stop(); + QMutexLocker lock( &d->mut ); + d->temporaryQueryTimer.stop(); - for ( int i = m_queries_temporary.count() - 1; i >= 0; i-- ) + for ( int i = d->queries_temporary.count() - 1; i >= 0; i-- ) { - query_ptr q = m_queries_temporary.takeAt( i ); + query_ptr q = d->queries_temporary.takeAt( i ); - m_qids.remove( q->id() ); + d->qids.remove( q->id() ); foreach ( const Tomahawk::result_ptr& r, q->results() ) - m_rids.remove( r->id() ); + d->rids.remove( r->id() ); } } @@ -619,12 +682,14 @@ Pipeline::onTemporaryQueryTimer() query_ptr Pipeline::query( const QID& qid ) const { - return m_qids.value( qid ); + Q_D( const Pipeline ); + return d->qids.value( qid ); } result_ptr Pipeline::result( const RID& rid ) const { - return m_rids.value( rid ); + Q_D( const Pipeline ); + return d->rids.value( rid ); } diff --git a/src/libtomahawk/Pipeline.h b/src/libtomahawk/Pipeline.h index a5bc15ba1..541d774b7 100644 --- a/src/libtomahawk/Pipeline.h +++ b/src/libtomahawk/Pipeline.h @@ -1,6 +1,7 @@ /* === This file is part of Tomahawk Player - === * * Copyright 2010-2011, Christian Muehlhaeuser + * Copyright 2013, Uwe L. Korn * * Tomahawk is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -16,25 +17,24 @@ * along with Tomahawk. If not, see . */ +#pragma once #ifndef PIPELINE_H #define PIPELINE_H +#include "DllMacro.h" #include "Typedefs.h" #include "Query.h" #include #include -#include -#include #include -#include #include -#include "DllMacro.h" - namespace Tomahawk { + +class PipelinePrivate; class Resolver; class ExternalResolver; typedef boost::function ResolverFactoryFunc; @@ -49,10 +49,10 @@ public: explicit Pipeline( QObject* parent = 0 ); virtual ~Pipeline(); - bool isRunning() const { return m_running; } + bool isRunning() const; - unsigned int pendingQueryCount() const { return m_queries_pending.count(); } - unsigned int activeQueryCount() const { return m_qidsState.count(); } + unsigned int pendingQueryCount() const; + unsigned int activeQueryCount() const; void reportResults( QID qid, const QList< result_ptr >& results ); void reportAlbums( QID qid, const QList< album_ptr >& albums ); @@ -62,7 +62,7 @@ public: Tomahawk::ExternalResolver* addScriptResolver( const QString& scriptPath, const QStringList& additionalScriptPaths = QStringList() ); void stopScriptResolver( const QString& scriptPath ); void removeScriptResolver( const QString& scriptPath ); - QList< QPointer< ExternalResolver > > scriptResolvers() const { return m_scriptResolvers; } + QList< QPointer< ExternalResolver > > scriptResolvers() const; Tomahawk::ExternalResolver* resolverForPath( const QString& scriptPath ); void addResolver( Resolver* r ); @@ -90,6 +90,9 @@ signals: void resolverAdded( Tomahawk::Resolver* ); void resolverRemoved( Tomahawk::Resolver* ); +protected: + QScopedPointer d_ptr; + private slots: void timeoutShunt( const query_ptr& q ); void shunt( const query_ptr& q ); @@ -99,35 +102,16 @@ private slots: void onResultUrlCheckerDone(); private: + Q_DECLARE_PRIVATE( Pipeline ) + void addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results ); Tomahawk::Resolver* nextResolver( const Tomahawk::query_ptr& query ) const; void setQIDState( const Tomahawk::query_ptr& query, int state ); int incQIDState( const Tomahawk::query_ptr& query ); int decQIDState( const Tomahawk::query_ptr& query ); - - QList< Resolver* > m_resolvers; - QList< QPointer > m_scriptResolvers; - QList< ResolverFactoryFunc > m_resolverFactories; - QMap< QID, bool > m_qidsTimeout; - QMap< QID, unsigned int > m_qidsState; - QMap< QID, query_ptr > m_qids; - QMap< RID, result_ptr > m_rids; - - QMutex m_mut; // for m_qids, m_rids - - // store queries here until DB index is loaded, then shunt them all - QList< query_ptr > m_queries_pending; - // store temporary queries here and clean up after timeout threshold - QList< query_ptr > m_queries_temporary; - - int m_maxConcurrentQueries; - bool m_running; - QTimer m_temporaryQueryTimer; - - static Pipeline* s_instance; }; -}; //ns +} // Tomahawk #endif // PIPELINE_H diff --git a/src/libtomahawk/Pipeline_p.h b/src/libtomahawk/Pipeline_p.h new file mode 100644 index 000000000..cbc98afb4 --- /dev/null +++ b/src/libtomahawk/Pipeline_p.h @@ -0,0 +1,69 @@ +/* === This file is part of Tomahawk Player - === + * + * Copyright 2010-2011, Christian Muehlhaeuser + * Copyright 2013, Uwe L. Korn + * + * Tomahawk is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Tomahawk is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Tomahawk. If not, see . + */ + +#pragma once +#ifndef PIPELINE_P_H +#define PIPELINE_P_H + +#include "Pipeline.h" + +#include +#include + +namespace Tomahawk +{ + +class PipelinePrivate +{ +public: + PipelinePrivate( Pipeline* q ) + : q_ptr( q ) + , running( false ) + { + } + + Pipeline* q_ptr; + Q_DECLARE_PUBLIC( Pipeline ) + +private: + QList< Resolver* > resolvers; + QList< QPointer > scriptResolvers; + QList< ResolverFactoryFunc > resolverFactories; + QMap< QID, bool > qidsTimeout; + QMap< QID, unsigned int > qidsState; + QMap< QID, query_ptr > qids; + QMap< RID, result_ptr > rids; + + QMutex mut; // for m_qids, m_rids + + // store queries here until DB index is loaded, then shunt them all + QList< query_ptr > queries_pending; + // store temporary queries here and clean up after timeout threshold + QList< query_ptr > queries_temporary; + + int maxConcurrentQueries; + bool running; + QTimer temporaryQueryTimer; + + static Pipeline* s_instance; +}; + +} // Tomahawk + +#endif // PIPELINE_P_H