1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-03-23 01:09:42 +01:00

* Introducing groupable DatabaseCommands.

This commit is contained in:
Christian Muehlhaeuser 2011-11-16 04:37:29 +01:00
parent 27c7b3fc17
commit e5636a446e
12 changed files with 132 additions and 61 deletions

View File

@ -73,7 +73,15 @@ Database::loadIndex()
void
Database::enqueue( QSharedPointer<DatabaseCommand> lc )
Database::enqueue( const QList< QSharedPointer<DatabaseCommand> >& lc )
{
qDebug() << "Enqueueing" << lc.count() << "commands to rw thread";
m_workerRW->enqueue( lc );
}
void
Database::enqueue( const QSharedPointer<DatabaseCommand>& lc )
{
if ( lc->doesMutates() )
{

View File

@ -66,7 +66,8 @@ signals:
void newJobRW( QSharedPointer<DatabaseCommand> );
public slots:
void enqueue( QSharedPointer<DatabaseCommand> lc );
void enqueue( const QSharedPointer<DatabaseCommand>& lc );
void enqueue( const QList< QSharedPointer<DatabaseCommand> >& lc );
private slots:
void setIsReadyTrue() { m_ready = true; }

View File

@ -70,6 +70,7 @@ public:
const Tomahawk::source_ptr& source() const;
virtual bool loggable() const { return false; }
virtual bool groupable() const { return false; }
virtual bool singletonCmd() const { return false; }
virtual bool localOnly() const { return false; }

View File

@ -24,7 +24,6 @@
#include "album.h"
#include "collection.h"
#include "database/database.h"
#include "databasecommand_collectionstats.h"
#include "databaseimpl.h"
#include "network/dbsyncconnection.h"
#include "network/servent.h"
@ -73,15 +72,7 @@ DatabaseCommand_AddFiles::postCommitHook()
emit notify( m_ids );
if ( source()->isLocal() )
{
Servent::instance()->triggerDBSync();
// Re-calculate local db stats
DatabaseCommand_CollectionStats* cmd = new DatabaseCommand_CollectionStats( SourceList::instance()->getLocal() );
connect( cmd, SIGNAL( done( QVariantMap ) ),
SourceList::instance()->getLocal().data(), SLOT( setStats( QVariantMap ) ), Qt::QueuedConnection );
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}
}

View File

@ -25,7 +25,6 @@
#include "collection.h"
#include "source.h"
#include "database/database.h"
#include "databasecommand_collectionstats.h"
#include "databaseimpl.h"
#include "network/servent.h"
#include "utils/logger.h"
@ -54,7 +53,7 @@ DatabaseCommand_DeleteFiles::postCommitHook()
ids << id.toUInt();
emit notify( ids );
if( source()->isLocal() )
if ( source()->isLocal() )
Servent::instance()->triggerDBSync();
}

View File

@ -64,6 +64,7 @@ public:
virtual void exec( DatabaseImpl* );
virtual bool doesMutates() const { return true; }
virtual bool localOnly() const { return false; }
virtual bool groupable() const { return true; }
virtual void postCommitHook();
QVariantList ids() const { return m_ids; }

View File

@ -68,6 +68,7 @@ public:
virtual bool doesMutates() const { return true; }
virtual bool singletonCmd() const { return ( m_action == Started ); }
virtual bool localOnly() const;
virtual bool groupable() const { return true; }
QString artist() const { return m_artist; }
void setArtist( const QString& s ) { m_artist = s; }

View File

@ -170,6 +170,7 @@ public:
void setTimestamp( const int ts ) { m_timestamp = ts; }
virtual bool doesMutates() const { return true; }
virtual bool groupable() const { return true; }
private:
Tomahawk::result_ptr m_result;

View File

@ -72,16 +72,21 @@ DatabaseWorker::run()
}
void
DatabaseWorker::enqueue( const QList< QSharedPointer<DatabaseCommand> >& cmds )
{
QMutexLocker lock( &m_mut );
m_outstanding += cmds.count();
m_commands << cmds;
if ( m_outstanding == cmds.count() )
QTimer::singleShot( 0, this, SLOT( doWork() ) );
}
void
DatabaseWorker::enqueue( const QSharedPointer<DatabaseCommand>& cmd )
{
if ( QThread::currentThread() != thread() )
{
// qDebug() << Q_FUNC_INFO << "Reinvoking in correct thread.";
QMetaObject::invokeMethod( this, "enqueue", Qt::QueuedConnection, Q_ARG( QSharedPointer<DatabaseCommand>, cmd ) );
return;
}
QMutexLocker lock( &m_mut );
m_outstanding++;
m_commands << cmd;
@ -107,6 +112,7 @@ DatabaseWorker::doWork()
timer.start();
#endif
QList< QSharedPointer<DatabaseCommand> > cmdGroup;
QSharedPointer<DatabaseCommand> cmd;
{
QMutexLocker lock( &m_mut );
@ -119,45 +125,66 @@ DatabaseWorker::doWork()
Q_ASSERT( transok );
Q_UNUSED( transok );
}
unsigned int completed = 0;
try
{
bool finished = false;
{
cmd->_exec( m_dbimpl ); // runs actual SQL stuff
if ( cmd->loggable() )
while ( !finished )
{
// We only save our own ops to the oplog, since incoming ops from peers
// are applied immediately.
//
// Crazy idea: if peers had keypairs and could sign ops/msgs, in theory it
// would be safe to sync ops for friend A from friend B's cache, if he saved them,
// which would mean you could get updates even if a peer was offline.
if ( cmd->source()->isLocal() && !cmd->localOnly() )
completed++;
cmd->_exec( m_dbimpl ); // runs actual SQL stuff
if ( cmd->loggable() )
{
// save to op-log
DatabaseCommandLoggable* command = (DatabaseCommandLoggable*)cmd.data();
logOp( command );
}
else
{
// Make a note of the last guid we applied for this source
// so we can always request just the newer ops in future.
// We only save our own ops to the oplog, since incoming ops from peers
// are applied immediately.
//
if ( !cmd->singletonCmd() )
// Crazy idea: if peers had keypairs and could sign ops/msgs, in theory it
// would be safe to sync ops for friend A from friend B's cache, if he saved them,
// which would mean you could get updates even if a peer was offline.
if ( cmd->source()->isLocal() && !cmd->localOnly() )
{
// qDebug() << "Setting lastop for source" << cmd->source()->id() << "to" << cmd->guid();
TomahawkSqlQuery query = m_dbimpl->newquery();
query.prepare( "UPDATE source SET lastop = ? WHERE id = ?" );
query.addBindValue( cmd->guid() );
query.addBindValue( cmd->source()->id() );
if ( !query.exec() )
// save to op-log
DatabaseCommandLoggable* command = (DatabaseCommandLoggable*)cmd.data();
logOp( command );
}
else
{
// Make a note of the last guid we applied for this source
// so we can always request just the newer ops in future.
//
if ( !cmd->singletonCmd() )
{
throw "Failed to set lastop";
TomahawkSqlQuery query = m_dbimpl->newquery();
query.prepare( "UPDATE source SET lastop = ? WHERE id = ?" );
query.addBindValue( cmd->guid() );
query.addBindValue( cmd->source()->id() );
if ( !query.exec() )
{
throw "Failed to set lastop";
}
}
}
}
cmdGroup << cmd;
if ( cmd->groupable() && !m_commands.isEmpty() )
{
QMutexLocker lock( &m_mut );
if ( m_commands.first()->groupable() )
{
cmd = m_commands.takeFirst();
}
else
{
finished = true;
}
}
else
finished = true;
}
if ( cmd->doesMutates() )
@ -175,7 +202,8 @@ DatabaseWorker::doWork()
tDebug() << "DBCmd Duration:" << duration << "ms, now running postcommit for" << cmd->commandname();
#endif
cmd->postCommit();
foreach ( QSharedPointer<DatabaseCommand> c, cmdGroup )
c->postCommit();
#ifdef DEBUG_TIMING
tDebug() << "Post commit finished in" << timer.elapsed() - duration << "ms for" << cmd->commandname();
@ -192,7 +220,7 @@ DatabaseWorker::doWork()
<< m_dbimpl->database().lastError().driverText()
<< endl;
if( cmd->doesMutates() )
if ( cmd->doesMutates() )
m_dbimpl->database().rollback();
Q_ASSERT( false );
@ -200,17 +228,19 @@ DatabaseWorker::doWork()
catch(...)
{
qDebug() << "Uncaught exception processing dbcmd";
if( cmd->doesMutates() )
if ( cmd->doesMutates() )
m_dbimpl->database().rollback();
Q_ASSERT( false );
throw;
}
cmd->emitFinished();
foreach ( QSharedPointer<DatabaseCommand> c, cmdGroup )
c->emitFinished();
QMutexLocker lock( &m_mut );
if ( --m_outstanding > 0 )
m_outstanding -= completed;
if ( m_outstanding > 0 )
QTimer::singleShot( 0, this, SLOT( doWork() ) );
}

View File

@ -47,6 +47,7 @@ public:
public slots:
void enqueue( const QSharedPointer<DatabaseCommand>& );
void enqueue( const QList< QSharedPointer<DatabaseCommand> >& );
protected:
void run();

View File

@ -24,6 +24,7 @@
#include "network/controlconnection.h"
#include "database/databasecommand_addsource.h"
#include "database/databasecommand_collectionstats.h"
#include "database/databasecommand_sourceoffline.h"
#include "database/databasecommand_updatesearchindex.h"
#include "database/database.h"
@ -116,6 +117,7 @@ Source::friendlyName() const
return m_friendlyname;
}
#ifndef ENABLE_HEADLESS
void
Source::setAvatar( const QPixmap& avatar )
@ -140,6 +142,7 @@ Source::avatar( AvatarStyle style ) const
}
#endif
void
Source::setFriendlyName( const QString& fname )
{
@ -231,9 +234,7 @@ Source::scanningFinished( unsigned int files )
if ( m_updateIndexWhenSynced )
{
m_updateIndexWhenSynced = false;
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
updateTracks();
}
emit stateChanged();
@ -351,10 +352,28 @@ Source::executeCommands()
{
if ( !m_cmds.isEmpty() )
{
QList< QSharedPointer<DatabaseCommand> > cmdGroup;
QSharedPointer<DatabaseCommand> cmd = m_cmds.takeFirst();
while ( cmd->groupable() )
{
cmdGroup << cmd;
if ( !m_cmds.isEmpty() && m_cmds.first()->groupable() && m_cmds.first()->commandname() == cmd->commandname() )
cmd = m_cmds.takeFirst();
else
break;
}
// return here when the last command finished
connect( cmd.data(), SIGNAL( finished() ), SLOT( executeCommands() ) );
Database::instance()->enqueue( cmd );
if ( cmdGroup.count() )
{
Database::instance()->enqueue( cmdGroup );
}
else
{
Database::instance()->enqueue( cmd );
}
int percentage = ( float( m_commandCount - m_cmds.count() ) / (float)m_commandCount ) * 100.0;
m_textStatus = tr( "Saving (%1%)" ).arg( percentage );
@ -365,9 +384,7 @@ Source::executeCommands()
if ( m_updateIndexWhenSynced )
{
m_updateIndexWhenSynced = false;
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
updateTracks();
}
m_textStatus = QString();
@ -400,6 +417,26 @@ Source::reportSocialAttributesChanged( DatabaseCommand_SocialAction* action )
}
void
Source::updateTracks()
{
{
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}
{
// Re-calculate local db stats
DatabaseCommand_CollectionStats* cmd = new DatabaseCommand_CollectionStats( SourceList::instance()->get( id() ) );
connect( cmd, SIGNAL( done( QVariantMap ) ),
SourceList::instance()->getLocal().data(), SLOT( setStats( QVariantMap ) ), Qt::QueuedConnection );
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
connect( cmd, SIGNAL( finished() ), SIGNAL( stateChanged() ) );
}
}
void
Source::updateIndexWhenSynced()
{

View File

@ -130,7 +130,7 @@ private slots:
private:
void addCommand( const QSharedPointer<DatabaseCommand>& command );
void updateTracks();
void reportSocialAttributesChanged( DatabaseCommand_SocialAction* action );
QList< QSharedPointer<Collection> > m_collections;