1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-08-06 22:26:32 +02:00

Pimple Playlist

This commit is contained in:
Uwe L. Korn
2013-07-12 18:14:02 +02:00
parent ce4143e79f
commit 365fb989a4
3 changed files with 234 additions and 116 deletions

View File

@@ -16,7 +16,7 @@
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Pipeline.h"
#include "Pipeline_p.h"
#include <QMutexLocker>
@@ -41,27 +41,28 @@
using namespace Tomahawk;
Pipeline* Pipeline::s_instance = 0;
Pipeline* PipelinePrivate::s_instance = 0;
Pipeline*
Pipeline::instance()
{
return s_instance;
return PipelinePrivate::s_instance;
}
Pipeline::Pipeline( QObject* parent )
: QObject( parent )
, m_running( false )
, d_ptr( new PipelinePrivate( this ) )
{
s_instance = this;
Q_D( Pipeline );
PipelinePrivate::s_instance = this;
m_maxConcurrentQueries = qBound( DEFAULT_CONCURRENT_QUERIES, QThread::idealThreadCount(), MAX_CONCURRENT_QUERIES );
tDebug() << Q_FUNC_INFO << "Using" << m_maxConcurrentQueries << "threads";
d->maxConcurrentQueries = qBound( DEFAULT_CONCURRENT_QUERIES, QThread::idealThreadCount(), MAX_CONCURRENT_QUERIES );
tDebug() << Q_FUNC_INFO << "Using" << d->maxConcurrentQueries << "threads";
m_temporaryQueryTimer.setInterval( CLEANUP_TIMEOUT );
connect( &m_temporaryQueryTimer, SIGNAL( timeout() ), SLOT( onTemporaryQueryTimer() ) );
d->temporaryQueryTimer.setInterval( CLEANUP_TIMEOUT );
connect( &d->temporaryQueryTimer, SIGNAL( timeout() ), SLOT( onTemporaryQueryTimer() ) );
connect( this, SIGNAL( resolverAdded( Tomahawk::Resolver* ) ),
SourceList::instance(), SLOT( onResolverAdded( Tomahawk::Resolver* ) ) );
@@ -72,15 +73,40 @@ Pipeline::Pipeline( QObject* parent )
Pipeline::~Pipeline()
{
Q_D( Pipeline );
tDebug() << Q_FUNC_INFO;
m_running = false;
d->running = false;
// stop script resolvers
foreach ( QPointer< ExternalResolver > r, m_scriptResolvers )
foreach ( QPointer< ExternalResolver > r, d->scriptResolvers )
if ( !r.isNull() )
r.data()->deleteLater();
m_scriptResolvers.clear();
d->scriptResolvers.clear();
}
bool
Pipeline::isRunning() const
{
Q_D( const Pipeline );
return d->running;
}
unsigned int
Pipeline::pendingQueryCount() const
{
Q_D( const Pipeline );
return d->queries_pending.count();
}
unsigned int
Pipeline::activeQueryCount() const
{
Q_D( const Pipeline );
return d->qidsState.count();
}
@@ -95,8 +121,10 @@ Pipeline::databaseReady()
void
Pipeline::start()
{
tDebug() << Q_FUNC_INFO << "Shunting" << m_queries_pending.size() << "queries!";
m_running = true;
Q_D( Pipeline );
tDebug() << Q_FUNC_INFO << "Shunting" << d->queries_pending.size() << "queries!";
d->running = true;
emit running();
shuntNext();
@@ -106,17 +134,20 @@ Pipeline::start()
void
Pipeline::stop()
{
m_running = false;
Q_D( Pipeline );
d->running = false;
}
void
Pipeline::removeResolver( Resolver* r )
{
QMutexLocker lock( &m_mut );
Q_D( Pipeline );
QMutexLocker lock( &d->mut );
tDebug() << "Removed resolver:" << r->name();
m_resolvers.removeAll( r );
d->resolvers.removeAll( r );
emit resolverRemoved( r );
}
@@ -124,10 +155,11 @@ Pipeline::removeResolver( Resolver* r )
void
Pipeline::addResolver( Resolver* r )
{
QMutexLocker lock( &m_mut );
Q_D( Pipeline );
QMutexLocker lock( &d->mut );
tDebug() << "Adding resolver" << r->name();
m_resolvers.append( r );
d->resolvers.append( r );
emit resolverAdded( r );
}
@@ -135,22 +167,24 @@ Pipeline::addResolver( Resolver* r )
void
Pipeline::addExternalResolverFactory( ResolverFactoryFunc resolverFactory )
{
m_resolverFactories << resolverFactory;
Q_D( Pipeline );
d->resolverFactories << resolverFactory;
}
Tomahawk::ExternalResolver*
Pipeline::addScriptResolver( const QString& path, const QStringList& additionalScriptPaths )
{
Q_D( Pipeline );
ExternalResolver* res = 0;
foreach ( ResolverFactoryFunc factory, m_resolverFactories )
foreach ( ResolverFactoryFunc factory, d->resolverFactories )
{
res = factory( path, additionalScriptPaths );
if ( !res )
continue;
m_scriptResolvers << QPointer< ExternalResolver > ( res );
d->scriptResolvers << QPointer< ExternalResolver > ( res );
break;
}
@@ -162,7 +196,8 @@ Pipeline::addScriptResolver( const QString& path, const QStringList& additionalS
void
Pipeline::stopScriptResolver( const QString& path )
{
foreach ( QPointer< ExternalResolver > res, m_scriptResolvers )
Q_D( Pipeline );
foreach ( QPointer< ExternalResolver > res, d->scriptResolvers )
{
if ( res.data()->filePath() == path )
res.data()->stop();
@@ -173,8 +208,9 @@ Pipeline::stopScriptResolver( const QString& path )
void
Pipeline::removeScriptResolver( const QString& scriptPath )
{
Q_D( Pipeline );
QPointer< ExternalResolver > r;
foreach ( QPointer< ExternalResolver > res, m_scriptResolvers )
foreach ( QPointer< ExternalResolver > res, d->scriptResolvers )
{
if ( res.isNull() )
continue;
@@ -182,7 +218,7 @@ Pipeline::removeScriptResolver( const QString& scriptPath )
if ( res.data()->filePath() == scriptPath )
r = res;
}
m_scriptResolvers.removeAll( r );
d->scriptResolvers.removeAll( r );
if ( !r.isNull() )
{
@@ -192,10 +228,21 @@ Pipeline::removeScriptResolver( const QString& scriptPath )
}
QList<QPointer<ExternalResolver> >
Pipeline::scriptResolvers() const
{
Q_D( const Pipeline );
return d->scriptResolvers;
}
ExternalResolver*
Pipeline::resolverForPath( const QString& scriptPath )
{
foreach ( QPointer< ExternalResolver > res, m_scriptResolvers )
Q_D( Pipeline );
foreach ( QPointer< ExternalResolver > res, d->scriptResolvers )
{
if ( res.data()->filePath() == scriptPath )
return res.data();
@@ -207,40 +254,42 @@ Pipeline::resolverForPath( const QString& scriptPath )
void
Pipeline::resolve( const QList<query_ptr>& qlist, bool prioritized, bool temporaryQuery )
{
Q_D( Pipeline );
{
QMutexLocker lock( &m_mut );
QMutexLocker lock( &d->mut );
int i = 0;
foreach ( const query_ptr& q, qlist )
{
if ( q->resolvingFinished() )
continue;
if ( m_qidsState.contains( q->id() ) )
if ( d->qidsState.contains( q->id() ) )
continue;
if ( m_queries_pending.contains( q ) )
if ( d->queries_pending.contains( q ) )
{
if ( prioritized )
{
m_queries_pending.insert( i++, m_queries_pending.takeAt( m_queries_pending.indexOf( q ) ) );
d->queries_pending.insert( i++, d->queries_pending.takeAt( d->queries_pending.indexOf( q ) ) );
}
continue;
}
if ( !m_qids.contains( q->id() ) )
m_qids.insert( q->id(), q );
if ( !d->qids.contains( q->id() ) )
d->qids.insert( q->id(), q );
if ( prioritized )
m_queries_pending.insert( i++, q );
d->queries_pending.insert( i++, q );
else
m_queries_pending << q;
d->queries_pending << q;
if ( temporaryQuery )
{
m_queries_temporary << q;
d->queries_temporary << q;
if ( m_temporaryQueryTimer.isActive() )
m_temporaryQueryTimer.stop();
m_temporaryQueryTimer.start();
if ( d->temporaryQueryTimer.isActive() )
d->temporaryQueryTimer.stop();
d->temporaryQueryTimer.start();
}
}
}
@@ -252,7 +301,9 @@ Pipeline::resolve( const QList<query_ptr>& qlist, bool prioritized, bool tempora
bool
Pipeline::isResolving( const query_ptr& q ) const
{
return m_qids.contains( q->id() ) && m_qidsState.contains( q->id() );
Q_D( const Pipeline );
return d->qids.contains( q->id() ) && d->qidsState.contains( q->id() );
}
@@ -278,14 +329,15 @@ Pipeline::resolve( QID qid, bool prioritized, bool temporaryQuery )
void
Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
{
if ( !m_running )
Q_D( Pipeline );
if ( !d->running )
return;
if ( !m_qids.contains( qid ) )
if ( !d->qids.contains( qid ) )
{
tDebug() << "Result arrived too late for:" << qid;
return;
}
const query_ptr& q = m_qids.value( qid );
const query_ptr& q = d->qids.value( qid );
Q_ASSERT( !q.isNull() );
if ( q.isNull() )
@@ -302,7 +354,7 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
httpResults << r;
else
cleanResults << r;
}
}const
ResultUrlChecker* checker = new ResultUrlChecker( q, httpResults );
connect( checker, SIGNAL( done() ), SLOT( onResultUrlCheckerDone() ) );
@@ -322,6 +374,7 @@ Pipeline::reportResults( QID qid, const QList< result_ptr >& results )
void
Pipeline::addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results )
{
Q_D( Pipeline );
// tDebug( LOGVERBOSE ) << Q_FUNC_INFO << query->toString() << results.count();
QList< result_ptr > cleanResults;
@@ -338,11 +391,11 @@ Pipeline::addResultsToQuery( const query_ptr& query, const QList< result_ptr >&
{
query->addResults( cleanResults );
if ( m_queries_temporary.contains( query ) )
if ( d->queries_temporary.contains( query ) )
{
foreach ( const result_ptr& r, cleanResults )
{
m_rids.insert( r->id(), r );
d->rids.insert( r->id(), r );
}
}
}
@@ -373,15 +426,16 @@ Pipeline::onResultUrlCheckerDone()
void
Pipeline::reportAlbums( QID qid, const QList< album_ptr >& albums )
{
if ( !m_running )
Q_D( Pipeline );
if ( !d->running )
return;
if ( !m_qids.contains( qid ) )
if ( !d->qids.contains( qid ) )
{
tDebug() << "Albums arrived too late for:" << qid;
return;
}
const query_ptr& q = m_qids.value( qid );
const query_ptr& q = d->qids.value( qid );
Q_ASSERT( q->isFullTextQuery() );
QList< album_ptr > cleanAlbums;
@@ -402,15 +456,16 @@ Pipeline::reportAlbums( QID qid, const QList< album_ptr >& albums )
void
Pipeline::reportArtists( QID qid, const QList< artist_ptr >& artists )
{
if ( !m_running )
Q_D( Pipeline );
if ( !d->running )
return;
if ( !m_qids.contains( qid ) )
if ( !d->qids.contains( qid ) )
{
tDebug() << "Artists arrived too late for:" << qid;
return;
}
const query_ptr& q = m_qids.value( qid );
const query_ptr& q = d->qids.value( qid );
Q_ASSERT( q->isFullTextQuery() );
QList< artist_ptr > cleanArtists;
@@ -431,31 +486,32 @@ Pipeline::reportArtists( QID qid, const QList< artist_ptr >& artists )
void
Pipeline::shuntNext()
{
if ( !m_running )
Q_D( Pipeline );
if ( !d->running )
return;
unsigned int rc;
query_ptr q;
{
QMutexLocker lock( &m_mut );
QMutexLocker lock( &d->mut );
rc = m_resolvers.count();
if ( m_queries_pending.isEmpty() )
rc = d->resolvers.count();
if ( d->queries_pending.isEmpty() )
{
if ( m_qidsState.isEmpty() )
if ( d->qidsState.isEmpty() )
emit idle();
return;
}
// Check if we are ready to dispatch more queries
if ( m_qidsState.count() >= m_maxConcurrentQueries )
if ( d->qidsState.count() >= d->maxConcurrentQueries )
return;
/*
Since resolvers are async, we now dispatch to the highest weighted ones
and after timeout, dispatch to next highest etc, aborting when solved
*/
q = m_queries_pending.takeFirst();
q = d->queries_pending.takeFirst();
q->setCurrentResolver( 0 );
}
@@ -466,11 +522,12 @@ Pipeline::shuntNext()
void
Pipeline::timeoutShunt( const query_ptr& q )
{
if ( !m_running )
Q_D( Pipeline );
if ( !d->running )
return;
// are we still waiting for a timeout?
if ( m_qidsTimeout.contains( q->id() ) )
if ( d->qidsTimeout.contains( q->id() ) )
{
decQIDState( q );
}
@@ -480,7 +537,8 @@ Pipeline::timeoutShunt( const query_ptr& q )
void
Pipeline::shunt( const query_ptr& q )
{
if ( !m_running )
Q_D( Pipeline );
if ( !d->running )
return;
Resolver* r = 0;
@@ -497,7 +555,7 @@ Pipeline::shunt( const query_ptr& q )
if ( r->timeout() > 0 )
{
m_qidsTimeout.insert( q->id(), true );
d->qidsTimeout.insert( q->id(), true );
new FuncTimeout( r->timeout(), boost::bind( &Pipeline::timeoutShunt, this, q ), this );
}
}
@@ -515,9 +573,10 @@ Pipeline::shunt( const query_ptr& q )
Tomahawk::Resolver*
Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const
{
Q_D( const Pipeline );
Resolver* newResolver = 0;
foreach ( Resolver* r, m_resolvers )
foreach ( Resolver* r, d->resolvers )
{
if ( query->resolvedBy().contains( r ) )
continue;
@@ -539,24 +598,25 @@ Pipeline::nextResolver( const Tomahawk::query_ptr& query ) const
void
Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state )
{
QMutexLocker lock( &m_mut );
Q_D( Pipeline );
QMutexLocker lock( &d->mut );
if ( m_qidsTimeout.contains( query->id() ) )
m_qidsTimeout.remove( query->id() );
if ( d->qidsTimeout.contains( query->id() ) )
d->qidsTimeout.remove( query->id() );
if ( state > 0 )
{
m_qidsState.insert( query->id(), state );
d->qidsState.insert( query->id(), state );
new FuncTimeout( 0, boost::bind( &Pipeline::shunt, this, query ), this );
}
else
{
m_qidsState.remove( query->id() );
d->qidsState.remove( query->id() );
query->onResolvingFinished();
if ( !m_queries_temporary.contains( query ) )
m_qids.remove( query->id() );
if ( !d->queries_temporary.contains( query ) )
d->qids.remove( query->id() );
new FuncTimeout( 0, boost::bind( &Pipeline::shuntNext, this ), this );
}
@@ -566,14 +626,15 @@ Pipeline::setQIDState( const Tomahawk::query_ptr& query, int state )
int
Pipeline::incQIDState( const Tomahawk::query_ptr& query )
{
QMutexLocker lock( &m_mut );
Q_D( Pipeline );
QMutexLocker lock( &d->mut );
int state = 1;
if ( m_qidsState.contains( query->id() ) )
if ( d->qidsState.contains( query->id() ) )
{
state = m_qidsState.value( query->id() ) + 1;
state = d->qidsState.value( query->id() ) + 1;
}
m_qidsState.insert( query->id(), state );
d->qidsState.insert( query->id(), state );
return state;
}
@@ -582,14 +643,15 @@ Pipeline::incQIDState( const Tomahawk::query_ptr& query )
int
Pipeline::decQIDState( const Tomahawk::query_ptr& query )
{
Q_D( Pipeline );
int state = 0;
{
QMutexLocker lock( &m_mut );
QMutexLocker lock( &d->mut );
if ( !m_qidsState.contains( query->id() ) )
if ( !d->qidsState.contains( query->id() ) )
return 0;
state = m_qidsState.value( query->id() ) - 1;
state = d->qidsState.value( query->id() ) - 1;
}
setQIDState( query, state );
@@ -600,18 +662,19 @@ Pipeline::decQIDState( const Tomahawk::query_ptr& query )
void
Pipeline::onTemporaryQueryTimer()
{
Q_D( Pipeline );
tDebug() << Q_FUNC_INFO;
QMutexLocker lock( &m_mut );
m_temporaryQueryTimer.stop();
QMutexLocker lock( &d->mut );
d->temporaryQueryTimer.stop();
for ( int i = m_queries_temporary.count() - 1; i >= 0; i-- )
for ( int i = d->queries_temporary.count() - 1; i >= 0; i-- )
{
query_ptr q = m_queries_temporary.takeAt( i );
query_ptr q = d->queries_temporary.takeAt( i );
m_qids.remove( q->id() );
d->qids.remove( q->id() );
foreach ( const Tomahawk::result_ptr& r, q->results() )
m_rids.remove( r->id() );
d->rids.remove( r->id() );
}
}
@@ -619,12 +682,14 @@ Pipeline::onTemporaryQueryTimer()
query_ptr
Pipeline::query( const QID& qid ) const
{
return m_qids.value( qid );
Q_D( const Pipeline );
return d->qids.value( qid );
}
result_ptr
Pipeline::result( const RID& rid ) const
{
return m_rids.value( rid );
Q_D( const Pipeline );
return d->rids.value( rid );
}

View File

@@ -1,6 +1,7 @@
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
*
* Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
* Copyright 2013, Uwe L. Korn <uwelk@xhochy.com>
*
* Tomahawk is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -16,25 +17,24 @@
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef PIPELINE_H
#define PIPELINE_H
#include "DllMacro.h"
#include "Typedefs.h"
#include "Query.h"
#include <QObject>
#include <QList>
#include <QMap>
#include <QMutex>
#include <QStringList>
#include <QTimer>
#include <boost/function.hpp>
#include "DllMacro.h"
namespace Tomahawk
{
class PipelinePrivate;
class Resolver;
class ExternalResolver;
typedef boost::function<Tomahawk::ExternalResolver*( QString, QStringList )> ResolverFactoryFunc;
@@ -49,10 +49,10 @@ public:
explicit Pipeline( QObject* parent = 0 );
virtual ~Pipeline();
bool isRunning() const { return m_running; }
bool isRunning() const;
unsigned int pendingQueryCount() const { return m_queries_pending.count(); }
unsigned int activeQueryCount() const { return m_qidsState.count(); }
unsigned int pendingQueryCount() const;
unsigned int activeQueryCount() const;
void reportResults( QID qid, const QList< result_ptr >& results );
void reportAlbums( QID qid, const QList< album_ptr >& albums );
@@ -62,7 +62,7 @@ public:
Tomahawk::ExternalResolver* addScriptResolver( const QString& scriptPath, const QStringList& additionalScriptPaths = QStringList() );
void stopScriptResolver( const QString& scriptPath );
void removeScriptResolver( const QString& scriptPath );
QList< QPointer< ExternalResolver > > scriptResolvers() const { return m_scriptResolvers; }
QList< QPointer< ExternalResolver > > scriptResolvers() const;
Tomahawk::ExternalResolver* resolverForPath( const QString& scriptPath );
void addResolver( Resolver* r );
@@ -90,6 +90,9 @@ signals:
void resolverAdded( Tomahawk::Resolver* );
void resolverRemoved( Tomahawk::Resolver* );
protected:
QScopedPointer<PipelinePrivate> d_ptr;
private slots:
void timeoutShunt( const query_ptr& q );
void shunt( const query_ptr& q );
@@ -99,35 +102,16 @@ private slots:
void onResultUrlCheckerDone();
private:
Q_DECLARE_PRIVATE( Pipeline )
void addResultsToQuery( const query_ptr& query, const QList< result_ptr >& results );
Tomahawk::Resolver* nextResolver( const Tomahawk::query_ptr& query ) const;
void setQIDState( const Tomahawk::query_ptr& query, int state );
int incQIDState( const Tomahawk::query_ptr& query );
int decQIDState( const Tomahawk::query_ptr& query );
QList< Resolver* > m_resolvers;
QList< QPointer<Tomahawk::ExternalResolver> > m_scriptResolvers;
QList< ResolverFactoryFunc > m_resolverFactories;
QMap< QID, bool > m_qidsTimeout;
QMap< QID, unsigned int > m_qidsState;
QMap< QID, query_ptr > m_qids;
QMap< RID, result_ptr > m_rids;
QMutex m_mut; // for m_qids, m_rids
// store queries here until DB index is loaded, then shunt them all
QList< query_ptr > m_queries_pending;
// store temporary queries here and clean up after timeout threshold
QList< query_ptr > m_queries_temporary;
int m_maxConcurrentQueries;
bool m_running;
QTimer m_temporaryQueryTimer;
static Pipeline* s_instance;
};
}; //ns
} // Tomahawk
#endif // PIPELINE_H

View File

@@ -0,0 +1,69 @@
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
*
* Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
* Copyright 2013, Uwe L. Korn <uwelk@xhochy.com>
*
* Tomahawk is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Tomahawk is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifndef PIPELINE_P_H
#define PIPELINE_P_H
#include "Pipeline.h"
#include <QMutex>
#include <QTimer>
namespace Tomahawk
{
class PipelinePrivate
{
public:
PipelinePrivate( Pipeline* q )
: q_ptr( q )
, running( false )
{
}
Pipeline* q_ptr;
Q_DECLARE_PUBLIC( Pipeline )
private:
QList< Resolver* > resolvers;
QList< QPointer<Tomahawk::ExternalResolver> > scriptResolvers;
QList< ResolverFactoryFunc > resolverFactories;
QMap< QID, bool > qidsTimeout;
QMap< QID, unsigned int > qidsState;
QMap< QID, query_ptr > qids;
QMap< RID, result_ptr > rids;
QMutex mut; // for m_qids, m_rids
// store queries here until DB index is loaded, then shunt them all
QList< query_ptr > queries_pending;
// store temporary queries here and clean up after timeout threshold
QList< query_ptr > queries_temporary;
int maxConcurrentQueries;
bool running;
QTimer temporaryQueryTimer;
static Pipeline* s_instance;
};
} // Tomahawk
#endif // PIPELINE_P_H