1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-08-01 03:40:16 +02:00

Properly threadify databaseworker, untested as of yet

This commit is contained in:
Jeff Mitchell
2012-07-09 18:48:25 -04:00
parent 8c8de62271
commit c7002a1364
4 changed files with 83 additions and 33 deletions

View File

@@ -42,7 +42,7 @@ Database::Database( const QString& dbname, QObject* parent )
: QObject( parent ) : QObject( parent )
, m_ready( false ) , m_ready( false )
, m_impl( new DatabaseImpl( dbname ) ) , m_impl( new DatabaseImpl( dbname ) )
, m_workerRW( new DatabaseWorker( this, true ) ) , m_workerRW( new DatabaseWorkerThread( this, true ) )
{ {
s_instance = this; s_instance = this;
@@ -65,8 +65,23 @@ Database::~Database()
{ {
qDebug() << Q_FUNC_INFO; qDebug() << Q_FUNC_INFO;
qDeleteAll( m_workers ); m_workerRW->quit();
foreach ( DatabaseWorkerThread *thread, m_workers )
{
if ( thread->worker() )
thread->quit();
}
m_workerRW->wait( 60000 );
delete m_workerRW; delete m_workerRW;
m_workerRW = 0;
foreach ( DatabaseWorkerThread *thread, m_workers )
{
thread->wait( 60000 );
delete thread;
}
m_workers.clear();
qDeleteAll( m_implHash.values() ); qDeleteAll( m_implHash.values() );
delete m_impl; delete m_impl;
} }
@@ -84,7 +99,8 @@ Database::enqueue( const QList< QSharedPointer<DatabaseCommand> >& lc )
{ {
Q_ASSERT( m_ready ); Q_ASSERT( m_ready );
qDebug() << "Enqueueing" << lc.count() << "commands to rw thread"; qDebug() << "Enqueueing" << lc.count() << "commands to rw thread";
m_workerRW->enqueue( lc ); if ( m_workerRW->worker() )
m_workerRW->worker().data()->enqueue( lc );
} }
@@ -95,7 +111,8 @@ Database::enqueue( const QSharedPointer<DatabaseCommand>& lc )
if ( lc->doesMutates() ) if ( lc->doesMutates() )
{ {
qDebug() << "Enqueueing command to rw thread:" << lc->commandname(); qDebug() << "Enqueueing command to rw thread:" << lc->commandname();
m_workerRW->enqueue( lc ); if ( m_workerRW->worker() )
m_workerRW->worker().data()->enqueue( lc );
} }
else else
{ {
@@ -103,7 +120,7 @@ Database::enqueue( const QSharedPointer<DatabaseCommand>& lc )
// create new thread if < WORKER_THREADS // create new thread if < WORKER_THREADS
if ( m_workers.count() < m_maxConcurrentThreads ) if ( m_workers.count() < m_maxConcurrentThreads )
{ {
DatabaseWorker* worker = new DatabaseWorker( this, false ); DatabaseWorkerThread* worker = new DatabaseWorkerThread( this, false );
worker->start(); worker->start();
m_workers << worker; m_workers << worker;
@@ -111,24 +128,24 @@ Database::enqueue( const QSharedPointer<DatabaseCommand>& lc )
// find thread for commandname with lowest amount of outstanding jobs and enqueue job // find thread for commandname with lowest amount of outstanding jobs and enqueue job
int busyThreads = 0; int busyThreads = 0;
DatabaseWorker* happyThread = 0; QWeakPointer< DatabaseWorker > happyWorker;
for ( int i = 0; i < m_workers.count(); i++ ) for ( int i = 0; i < m_workers.count(); i++ )
{ {
DatabaseWorker* worker = m_workers.at( i ); DatabaseWorkerThread* workerThread = m_workers.at( i );
if ( !worker->busy() ) if ( workerThread->worker() && !workerThread->worker().data()->busy() )
{ {
happyThread = worker; happyWorker = workerThread->worker();
break; break;
} }
busyThreads++; busyThreads++;
if ( !happyThread || worker->outstandingJobs() < happyThread->outstandingJobs() ) if ( !happyWorker || ( workerThread->worker() && workerThread->worker().data()->outstandingJobs() < happyWorker.data()->outstandingJobs() ) )
happyThread = worker; happyWorker = workerThread->worker();
} }
// qDebug() << "Enqueueing command to thread:" << happyThread << busyThreads << lc->commandname(); // qDebug() << "Enqueueing command to thread:" << happyThread << busyThreads << lc->commandname();
happyThread->enqueue( lc ); happyWorker.data()->enqueue( lc );
} }
} }

View File

@@ -30,6 +30,7 @@
#include "DllMacro.h" #include "DllMacro.h"
class DatabaseImpl; class DatabaseImpl;
class DatabaseWorkerThread;
class DatabaseWorker; class DatabaseWorker;
/* /*
@@ -75,8 +76,8 @@ private:
bool m_ready; bool m_ready;
DatabaseImpl* m_impl; DatabaseImpl* m_impl;
DatabaseWorker* m_workerRW; DatabaseWorkerThread* m_workerRW;
QList<DatabaseWorker*> m_workers; QList< DatabaseWorkerThread* > m_workers;
int m_maxConcurrentThreads; int m_maxConcurrentThreads;
QHash< QThread*, DatabaseImpl* > m_implHash; QHash< QThread*, DatabaseImpl* > m_implHash;

View File

@@ -34,17 +34,42 @@
//#define DEBUG_TIMING TRUE //#define DEBUG_TIMING TRUE
#endif #endif
DatabaseWorker::DatabaseWorker( Database* db, bool mutates )
DatabaseWorkerThread::DatabaseWorkerThread( Database* db, bool mutates )
: QThread() : QThread()
, m_db( db ) , m_db( db )
, m_mutates( mutates )
{
Q_UNUSED( db );
Q_UNUSED( mutates );
}
void
DatabaseWorkerThread::run()
{
m_worker = QWeakPointer< DatabaseWorker >( new DatabaseWorker( m_db, m_mutates ) );
exec();
tDebug() << Q_FUNC_INFO << "DatabaseWorker finishing...";
if ( m_worker )
delete m_worker.data();
}
DatabaseWorkerThread::~DatabaseWorkerThread()
{
}
DatabaseWorker::DatabaseWorker( Database* db, bool mutates )
: QObject()
, m_db( db )
, m_outstanding( 0 ) , m_outstanding( 0 )
{ {
Q_UNUSED( db ); Q_UNUSED( db );
Q_UNUSED( mutates ); Q_UNUSED( mutates );
tDebug() << Q_FUNC_INFO << "New db connection with name:" << Database::instance()->impl()->database().connectionName() << "on thread" << this->thread();
moveToThread( this );
qDebug() << "CTOR DatabaseWorker" << this->thread();
} }
@@ -65,15 +90,6 @@ DatabaseWorker::~DatabaseWorker()
} }
void
DatabaseWorker::run()
{
tDebug() << "New db connection with name:" << Database::instance()->impl()->database().connectionName();
exec();
qDebug() << Q_FUNC_INFO << "DatabaseWorker finishing...";
}
void void
DatabaseWorker::enqueue( const QList< QSharedPointer<DatabaseCommand> >& cmds ) DatabaseWorker::enqueue( const QList< QSharedPointer<DatabaseCommand> >& cmds )
{ {

View File

@@ -34,12 +34,12 @@
class Database; class Database;
class DatabaseCommandLoggable; class DatabaseCommandLoggable;
class DatabaseWorker : public QThread class DatabaseWorker : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
DatabaseWorker( Database*, bool mutates ); DatabaseWorker( Database* db, bool mutates );
~DatabaseWorker(); ~DatabaseWorker();
bool busy() const { return m_outstanding > 0; } bool busy() const { return m_outstanding > 0; }
@@ -49,9 +49,6 @@ public slots:
void enqueue( const QSharedPointer<DatabaseCommand>& ); void enqueue( const QSharedPointer<DatabaseCommand>& );
void enqueue( const QList< QSharedPointer<DatabaseCommand> >& ); void enqueue( const QList< QSharedPointer<DatabaseCommand> >& );
protected:
void run();
private slots: private slots:
void doWork(); void doWork();
@@ -66,4 +63,23 @@ private:
QJson::Serializer m_serializer; QJson::Serializer m_serializer;
}; };
class DatabaseWorkerThread : public QThread
{
Q_OBJECT
public:
DatabaseWorkerThread( Database* db, bool mutates );
~DatabaseWorkerThread();
QWeakPointer< DatabaseWorker > worker() { return m_worker; }
protected:
void run();
private:
QWeakPointer< DatabaseWorker > m_worker;
Database* m_db;
bool m_mutates;
};
#endif // DATABASEWORKER_H #endif // DATABASEWORKER_H