mirror of
https://github.com/tomahawk-player/tomahawk.git
synced 2025-08-01 03:40:16 +02:00
* Multithreaded read-only DatabaseWorkers.
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
#include "database.h"
|
#include "database.h"
|
||||||
|
|
||||||
|
#define WORKER_THREADS 5
|
||||||
|
|
||||||
Database* Database::s_instance = 0;
|
Database* Database::s_instance = 0;
|
||||||
|
|
||||||
|
|
||||||
@@ -13,12 +15,18 @@ Database::instance()
|
|||||||
Database::Database( const QString& dbname, QObject* parent )
|
Database::Database( const QString& dbname, QObject* parent )
|
||||||
: QObject( parent )
|
: QObject( parent )
|
||||||
, m_impl( new DatabaseImpl( dbname, this ) )
|
, m_impl( new DatabaseImpl( dbname, this ) )
|
||||||
, m_workerRO( new DatabaseWorker( m_impl, this, false ) )
|
|
||||||
, m_workerRW( new DatabaseWorker( m_impl, this, true ) )
|
, m_workerRW( new DatabaseWorker( m_impl, this, true ) )
|
||||||
{
|
{
|
||||||
s_instance = this;
|
s_instance = this;
|
||||||
|
|
||||||
m_workerRO->start();
|
for ( int i = 0; i < WORKER_THREADS; i++ )
|
||||||
|
{
|
||||||
|
DatabaseWorker* worker = new DatabaseWorker( m_impl, this, false );
|
||||||
|
worker->start();
|
||||||
|
|
||||||
|
m_workersRO << worker;
|
||||||
|
}
|
||||||
|
|
||||||
m_workerRW->start();
|
m_workerRW->start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -27,8 +35,8 @@ Database::~Database()
|
|||||||
{
|
{
|
||||||
qDebug() << Q_FUNC_INFO;
|
qDebug() << Q_FUNC_INFO;
|
||||||
|
|
||||||
|
qDeleteAll( m_workersRO );
|
||||||
delete m_workerRW;
|
delete m_workerRW;
|
||||||
delete m_workerRO;
|
|
||||||
delete m_impl;
|
delete m_impl;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,12 +54,31 @@ Database::enqueue( QSharedPointer<DatabaseCommand> lc )
|
|||||||
if( lc->doesMutates() )
|
if( lc->doesMutates() )
|
||||||
{
|
{
|
||||||
//qDebug() << Q_FUNC_INFO << "RW" << lc->commandname();
|
//qDebug() << Q_FUNC_INFO << "RW" << lc->commandname();
|
||||||
emit newJobRO( lc );
|
m_workerRW->enqueue( lc );
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
//qDebug() << Q_FUNC_INFO << "RO" << lc->commandname();
|
// qDebug() << Q_FUNC_INFO << "RO" << lc->commandname();
|
||||||
emit newJobRW( lc );
|
|
||||||
|
int busyThreads = 0;
|
||||||
|
DatabaseWorker* happyThread = 0;
|
||||||
|
for ( int i = 0; i < m_workersRO.count(); i++ )
|
||||||
|
{
|
||||||
|
DatabaseWorker* worker = m_workersRO.at( i );
|
||||||
|
|
||||||
|
if ( !worker->busy() )
|
||||||
|
{
|
||||||
|
happyThread = worker;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
busyThreads++;
|
||||||
|
|
||||||
|
if ( !happyThread || worker->outstandingJobs() < happyThread->outstandingJobs() )
|
||||||
|
happyThread = worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug() << "Enqueueing command to thread:" << happyThread << busyThreads << lc->commandname();
|
||||||
|
happyThread->enqueue( lc );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -45,7 +45,8 @@ public slots:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
DatabaseImpl* m_impl;
|
DatabaseImpl* m_impl;
|
||||||
DatabaseWorker *m_workerRO, *m_workerRW;
|
DatabaseWorker* m_workerRW;
|
||||||
|
QList<DatabaseWorker*> m_workersRO;
|
||||||
bool m_indexReady;
|
bool m_indexReady;
|
||||||
|
|
||||||
static Database* s_instance;
|
static Database* s_instance;
|
||||||
|
@@ -15,18 +15,6 @@ DatabaseWorker::DatabaseWorker( DatabaseImpl* lib, Database* db, bool mutates )
|
|||||||
, m_outstanding( 0 )
|
, m_outstanding( 0 )
|
||||||
{
|
{
|
||||||
moveToThread( this );
|
moveToThread( this );
|
||||||
if( mutates )
|
|
||||||
{
|
|
||||||
connect( db, SIGNAL( newJobRW(QSharedPointer<DatabaseCommand>) ),
|
|
||||||
SLOT( doWork(QSharedPointer<DatabaseCommand>) ),
|
|
||||||
Qt::QueuedConnection );
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
connect( db, SIGNAL( newJobRO(QSharedPointer<DatabaseCommand>) ),
|
|
||||||
SLOT( doWork(QSharedPointer<DatabaseCommand>) ),
|
|
||||||
Qt::QueuedConnection );
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug() << "CTOR DatabaseWorker" << this->thread();
|
qDebug() << "CTOR DatabaseWorker" << this->thread();
|
||||||
}
|
}
|
||||||
@@ -50,7 +38,20 @@ DatabaseWorker::run()
|
|||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
DatabaseWorker::doWork( QSharedPointer<DatabaseCommand> cmd )
|
DatabaseWorker::enqueue( const QSharedPointer<DatabaseCommand>& cmd )
|
||||||
|
{
|
||||||
|
m_outstanding++;
|
||||||
|
|
||||||
|
QMutexLocker lock( &m_mut );
|
||||||
|
m_commands << cmd;
|
||||||
|
|
||||||
|
if ( m_outstanding == 1 )
|
||||||
|
QTimer::singleShot( 0, this, SLOT( doWork() ) );
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
DatabaseWorker::doWork()
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
Run the dbcmd. Only inside a transaction if the cmd does mutates.
|
Run the dbcmd. Only inside a transaction if the cmd does mutates.
|
||||||
@@ -59,8 +60,16 @@ DatabaseWorker::doWork( QSharedPointer<DatabaseCommand> cmd )
|
|||||||
log to the database oplog for replication to peers.
|
log to the database oplog for replication to peers.
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
QTime timer;
|
QTime timer;
|
||||||
timer.start();
|
timer.start();
|
||||||
|
|
||||||
|
QSharedPointer<DatabaseCommand> cmd;
|
||||||
|
{
|
||||||
|
QMutexLocker lock( &m_mut );
|
||||||
|
cmd = m_commands.takeFirst();
|
||||||
|
}
|
||||||
|
|
||||||
if( cmd->doesMutates() )
|
if( cmd->doesMutates() )
|
||||||
{
|
{
|
||||||
bool transok = m_dbimpl->database().transaction();
|
bool transok = m_dbimpl->database().transaction();
|
||||||
@@ -154,6 +163,14 @@ DatabaseWorker::doWork( QSharedPointer<DatabaseCommand> cmd )
|
|||||||
}
|
}
|
||||||
|
|
||||||
cmd->emitFinished();
|
cmd->emitFinished();
|
||||||
|
|
||||||
|
{
|
||||||
|
QMutexLocker lock( &m_mut );
|
||||||
|
m_outstanding--;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( m_outstanding > 0 )
|
||||||
|
doWork();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@@ -24,13 +24,17 @@ Q_OBJECT
|
|||||||
public:
|
public:
|
||||||
DatabaseWorker( DatabaseImpl*, Database*, bool mutates );
|
DatabaseWorker( DatabaseImpl*, Database*, bool mutates );
|
||||||
~DatabaseWorker();
|
~DatabaseWorker();
|
||||||
//void enqueue( QSharedPointer<DatabaseCommand> );
|
|
||||||
|
void enqueue( const QSharedPointer<DatabaseCommand>& );
|
||||||
|
|
||||||
|
bool busy() const { return m_outstanding > 0; }
|
||||||
|
unsigned int outstandingJobs() const { return m_outstanding; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void run();
|
void run();
|
||||||
|
|
||||||
public slots:
|
private slots:
|
||||||
void doWork( QSharedPointer<DatabaseCommand> );
|
void doWork();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void logOp( DatabaseCommandLoggable* command );
|
void logOp( DatabaseCommandLoggable* command );
|
||||||
@@ -38,8 +42,10 @@ private:
|
|||||||
QMutex m_mut;
|
QMutex m_mut;
|
||||||
DatabaseImpl* m_dbimpl;
|
DatabaseImpl* m_dbimpl;
|
||||||
QList< QSharedPointer<DatabaseCommand> > m_commands;
|
QList< QSharedPointer<DatabaseCommand> > m_commands;
|
||||||
|
|
||||||
bool m_abort;
|
bool m_abort;
|
||||||
int m_outstanding;
|
int m_outstanding;
|
||||||
|
|
||||||
QJson::Serializer m_serializer;
|
QJson::Serializer m_serializer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user