1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-03-25 02:09:48 +01:00

* Improved collection syncing.

* Improved dupe detection.
* Prevent sources from being stuck syncing.
This commit is contained in:
Christian Muehlhaeuser 2011-11-16 02:35:11 +01:00
parent 7e35961874
commit 27c7b3fc17
14 changed files with 145 additions and 134 deletions

View File

@ -33,10 +33,12 @@ Collection::Collection( const source_ptr& source, const QString& name, QObject*
: QObject( parent )
, m_name( name )
, m_lastmodified( 0 )
, m_isLoaded( false )
, m_changed( false )
, m_source( source )
{
qDebug() << Q_FUNC_INFO << name << source->friendlyName();
connect( source.data(), SIGNAL( synced() ), SLOT( onSynced() ) );
}
@ -211,8 +213,8 @@ Collection::setTracks( const QList<unsigned int>& ids )
{
qDebug() << Q_FUNC_INFO << ids.count() << name();
m_changed = true;
emit tracksAdded( ids );
emit changed();
}
@ -221,8 +223,19 @@ Collection::delTracks( const QList<unsigned int>& ids )
{
qDebug() << Q_FUNC_INFO << ids.count() << name();
m_changed = true;
emit tracksRemoved( ids );
emit changed();
}
void
Collection::onSynced()
{
if ( m_changed )
{
m_changed = false;
emit changed();
}
}

View File

@ -49,8 +49,6 @@ public:
Collection( const source_ptr& source, const QString& name, QObject* parent = 0 );
virtual ~Collection();
virtual void setLoaded() { m_isLoaded = true; }
virtual bool isLoaded() const { return m_isLoaded; }
virtual QString name() const;
virtual void loadPlaylists() { qDebug() << Q_FUNC_INFO; }
@ -107,8 +105,11 @@ protected:
QString m_name;
unsigned int m_lastmodified; // unix time of last change to collection
private slots:
void onSynced();
private:
bool m_isLoaded;
bool m_changed;
source_ptr m_source;
QHash< QString, Tomahawk::playlist_ptr > m_playlists;

View File

@ -69,22 +69,8 @@ DatabaseCollection::loadStations()
connect( cmd, SIGNAL( stationLoaded( Tomahawk::source_ptr, QVariantList ) ),
SLOT( stationCreated( const Tomahawk::source_ptr&, const QVariantList& ) ) );
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );}
/*void
DatabaseCollection::loadTracks()
{
qDebug() << Q_FUNC_INFO << source()->userName();
setLoaded();
DatabaseCommand_AllTracks* cmd = new DatabaseCommand_AllTracks( source()->collection() );
connect( cmd, SIGNAL( tracks( QList<Tomahawk::query_ptr>, QVariant ) ),
SLOT( setTracks( QList<Tomahawk::query_ptr> ) ) );
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}*/
}
void
@ -143,19 +129,8 @@ DatabaseCollection::stations()
}
/*QList< Tomahawk::query_ptr >
DatabaseCollection::tracks()
{
if ( !isLoaded() )
{
loadTracks();
}
return Collection::tracks();
}*/
void DatabaseCollection::autoPlaylistCreated( const source_ptr& source, const QVariantList& data )
void
DatabaseCollection::autoPlaylistCreated( const source_ptr& source, const QVariantList& data )
{
dynplaylist_ptr p( new DynamicPlaylist( source, //src
data[0].toString(), //current rev
@ -172,7 +147,8 @@ void DatabaseCollection::autoPlaylistCreated( const source_ptr& source, const QV
}
void DatabaseCollection::stationCreated( const source_ptr& source, const QVariantList& data )
void
DatabaseCollection::stationCreated( const source_ptr& source, const QVariantList& data )
{
dynplaylist_ptr p( new DynamicPlaylist( source, //src
data[0].toString(), //current rev

View File

@ -80,12 +80,14 @@ DatabaseCommand::setSource( const Tomahawk::source_ptr& s )
m_source = s;
}
const Tomahawk::source_ptr&
DatabaseCommand::source() const
{
return m_source;
}
DatabaseCommand*
DatabaseCommand::factory( const QVariant& op, const source_ptr& source )
{
@ -139,7 +141,8 @@ DatabaseCommand::factory( const QVariant& op, const source_ptr& source )
cmd->setSource( source );
QJson::QObjectHelper::qvariant2qobject( op.toMap(), cmd );
return cmd;
} else if( name == "createdynamicplaylist" )
}
else if( name == "createdynamicplaylist" )
{
DatabaseCommand_CreateDynamicPlaylist * cmd = new DatabaseCommand_CreateDynamicPlaylist;
cmd->setSource( source );
@ -183,7 +186,7 @@ DatabaseCommand::factory( const QVariant& op, const source_ptr& source )
return cmd;
}
qDebug() << "ERROR in" << Q_FUNC_INFO << name;
qDebug() << "Unknown database command" << name;
// Q_ASSERT( false );
return NULL;
}

View File

@ -157,7 +157,6 @@ DatabaseCommand_Resolve::resolve( DatabaseImpl* lib )
result->setRID( uuid() );
result->setAlbumPos( files_query.value( 14 ).toUInt() );
result->setTrackId( files_query.value( 9 ).toUInt() );
result->setYear( files_query.value( 17 ).toUInt() );
TomahawkSqlQuery attrQuery = lib->newquery();
QVariantMap attr;

View File

@ -212,6 +212,6 @@ DatabaseCommand_SetPlaylistRevision::exec( DatabaseImpl* lib )
else if ( !m_oldrev.isEmpty() )
{
tDebug() << "Not updating current revision, optimistic locking fail";
Q_ASSERT( false );
// Q_ASSERT( false );
}
}

View File

@ -122,7 +122,6 @@ DatabaseWorker::doWork()
try
{
{
// tDebug() << "Executing cmd:" << cmd->guid();
cmd->_exec( m_dbimpl ); // runs actual SQL stuff
if ( cmd->loggable() )
@ -155,7 +154,6 @@ DatabaseWorker::doWork()
if ( !query.exec() )
{
qDebug() << "Failed to set lastop";
throw "Failed to set lastop";
}
}
@ -167,8 +165,7 @@ DatabaseWorker::doWork()
qDebug() << "Committing" << cmd->commandname() << cmd->guid();
if ( !m_dbimpl->database().commit() )
{
qDebug() << "*FAILED TO COMMIT TRANSACTION*";
tDebug() << "FAILED TO COMMIT TRANSACTION*";
throw "commit failed";
}
}

View File

@ -63,7 +63,7 @@ Connection::Connection( Servent* parent )
Connection::~Connection()
{
qDebug() << "DTOR connection (super)" << id() << thread() << m_sock.isNull();
tDebug() << "DTOR connection (super)" << id() << thread() << m_sock.isNull();
if( !m_sock.isNull() )
{
// qDebug() << "deleteLatering sock" << m_sock;
@ -144,13 +144,13 @@ void
Connection::actualShutdown()
{
qDebug() << Q_FUNC_INFO << m_actually_shutting_down << id();
if( m_actually_shutting_down )
if ( m_actually_shutting_down )
{
return;
}
m_actually_shutting_down = true;
if( !m_sock.isNull() && m_sock->isOpen() )
if ( !m_sock.isNull() && m_sock->isOpen() )
{
m_sock->disconnectFromHost();
}
@ -261,7 +261,7 @@ Connection::doSetup()
void
Connection::socketDisconnected()
{
qDebug() << "SOCKET DISCONNECTED" << this->name() << id()
tDebug() << "SOCKET DISCONNECTED" << this->name() << id()
<< "shutdown will happen after incoming queue empties."
<< "bytesavail:" << m_sock->bytesAvailable()
<< "bytesRecvd" << bytesReceived();
@ -423,16 +423,16 @@ Connection::sendMsg_now( msg_ptr msg )
{
//qDebug() << Q_FUNC_INFO << thread() << QThread::currentThread();
Q_ASSERT( QThread::currentThread() == thread() );
Q_ASSERT( this->isRunning() );
// Q_ASSERT( this->isRunning() );
if( m_sock.isNull() || !m_sock->isOpen() || !m_sock->isWritable() )
if ( m_sock.isNull() || !m_sock->isOpen() || !m_sock->isWritable() )
{
qDebug() << "***** Socket problem, whilst in sendMsg(). Cleaning up. *****";
shutdown( false );
return;
}
if( ! msg->write( m_sock.data() ) )
if ( !msg->write( m_sock.data() ) )
{
//qDebug() << "Error writing to socket in sendMsg() *************";
shutdown( false );

View File

@ -39,10 +39,6 @@
#include "sourcelist.h"
#include "utils/logger.h"
// close the dbsync connection after this much inactivity.
// it's automatically reestablished as needed.
#define IDLE_TIMEOUT 300000
using namespace Tomahawk;
@ -52,11 +48,11 @@ DBSyncConnection::DBSyncConnection( Servent* s, const source_ptr& src )
, m_state( UNKNOWN )
{
qDebug() << Q_FUNC_INFO << src->id() << thread();
connect( this, SIGNAL( stateChanged( DBSyncConnection::State, DBSyncConnection::State, QString ) ),
m_source.data(), SLOT( onStateChanged( DBSyncConnection::State, DBSyncConnection::State, QString ) ) );
m_timer.setInterval( IDLE_TIMEOUT );
connect( &m_timer, SIGNAL( timeout() ), SLOT( idleTimeout() ) );
connect( m_source.data(), SIGNAL( commandsFinished() ),
this, SLOT( lastOpApplied() ) );
this->setMsgProcessorModeIn( MsgProcessor::PARSE_JSON | MsgProcessor::UNCOMPRESS_ALL );
@ -67,19 +63,11 @@ DBSyncConnection::DBSyncConnection( Servent* s, const source_ptr& src )
DBSyncConnection::~DBSyncConnection()
{
qDebug() << "DTOR" << Q_FUNC_INFO;
tDebug() << "DTOR" << Q_FUNC_INFO << m_source->friendlyName();
m_state = SHUTDOWN;
}
void
DBSyncConnection::idleTimeout()
{
qDebug() << Q_FUNC_INFO;
shutdown( true );
}
void
DBSyncConnection::changeState( State newstate )
{
@ -96,7 +84,6 @@ DBSyncConnection::changeState( State newstate )
void
DBSyncConnection::setup()
{
// qDebug() << Q_FUNC_INFO;
setId( QString( "DBSyncConnection/%1" ).arg( socket()->peerAddress().toString() ) );
check();
}
@ -105,16 +92,12 @@ DBSyncConnection::setup()
void
DBSyncConnection::trigger()
{
// qDebug() << Q_FUNC_INFO;
// if we're still setting up the connection, do nothing - we sync on first connect anyway:
if ( !isRunning() )
return;
QMetaObject::invokeMethod( this, "sendMsg", Qt::QueuedConnection,
Q_ARG( msg_ptr,
Msg::factory( "{\"method\":\"trigger\"}", Msg::JSON ) )
);
Q_ARG( msg_ptr, Msg::factory( "{\"method\":\"trigger\"}", Msg::JSON ) ) );
}
@ -133,7 +116,6 @@ DBSyncConnection::check()
return;
}
Q_ASSERT( m_cmds.isEmpty() );
m_uscache.clear();
m_us.clear();
@ -154,9 +136,6 @@ DBSyncConnection::check()
{
fetchOpsData( m_source->lastCmdGuid() );
}
// restarts idle countdown
m_timer.start();
}
@ -231,19 +210,16 @@ DBSyncConnection::handleMsg( msg_ptr msg )
if ( msg->is( Msg::DBOP ) )
{
DatabaseCommand* cmd = DatabaseCommand::factory( m, m_source );
if ( !cmd )
if ( cmd )
{
qDebug() << "UNKNOWN DBOP CMD";
return;
QSharedPointer<DatabaseCommand> cmdsp = QSharedPointer<DatabaseCommand>(cmd);
m_source->addCommand( cmdsp );
}
QSharedPointer<DatabaseCommand> cmdsp = QSharedPointer<DatabaseCommand>(cmd);
m_cmds << cmdsp;
if ( !msg->is( Msg::FRAGMENT ) ) // last msg in this batch
{
changeState( SAVING ); // just DB work left to complete
executeCommands();
m_source->executeCommands();
}
return;
}
@ -269,23 +245,11 @@ DBSyncConnection::handleMsg( msg_ptr msg )
void
DBSyncConnection::executeCommands()
DBSyncConnection::lastOpApplied()
{
if ( !m_cmds.isEmpty() )
{
QSharedPointer<DatabaseCommand> cmd = m_cmds.takeFirst();
if ( !cmd->singletonCmd() )
m_source->setLastCmdGuid( cmd->guid() );
connect( cmd.data(), SIGNAL( finished() ), SLOT( executeCommands() ) );
Database::instance()->enqueue( cmd );
}
else
{
changeState( SYNCED );
// check again, until peer responds we have no new ops to process
check();
}
changeState( SYNCED );
// check again, until peer responds we have no new ops to process
check();
}
@ -298,8 +262,8 @@ DBSyncConnection::sendOps()
source_ptr src = SourceList::instance()->getLocal();
DatabaseCommand_loadOps* cmd = new DatabaseCommand_loadOps( src, m_uscache.value( "lastop" ).toString() );
connect( cmd, SIGNAL( done( QString, QString, QList< dbop_ptr > ) ),
SLOT( sendOpsData( QString, QString, QList< dbop_ptr > ) ) );
connect( cmd, SIGNAL( done( QString, QString, QList< dbop_ptr > ) ),
SLOT( sendOpsData( QString, QString, QList< dbop_ptr > ) ) );
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}

View File

@ -70,10 +70,9 @@ private slots:
void fetchOpsData( const QString& sinceguid );
void sendOpsData( QString sinceguid, QString lastguid, QList< dbop_ptr > ops );
void executeCommands();
void lastOpApplied();
void check();
void idleTimeout();
private:
void synced();
@ -82,11 +81,9 @@ private:
Tomahawk::source_ptr m_source;
QVariantMap m_us, m_uscache;
QList< QSharedPointer<DatabaseCommand> > m_cmds;
QString m_lastSentOp;
State m_state;
QTimer m_timer;
};
#endif // DBSYNCCONNECTION_H

View File

@ -263,6 +263,7 @@ Servent::unregisterControlConnection( ControlConnection* conn )
if( c!=conn )
n.append( c );
m_connectedNodes.removeAll( conn->id() );
m_controlconnections = n;
}
@ -330,7 +331,6 @@ Servent::readyRead()
ControlConnection* cc = 0;
bool ok;
// int pport; //FIXME?
QString key, conntype, nodeid, controlid;
QVariantMap m = parser.parse( sock->_msg->payload(), &ok ).toMap();
if( !ok )
@ -338,9 +338,9 @@ Servent::readyRead()
tDebug() << "Invalid JSON on new connection, aborting";
goto closeconnection;
}
conntype = m.value( "conntype" ).toString();
key = m.value( "key" ).toString();
// pport = m.value( "port" ).toInt();
nodeid = m.value( "nodeid" ).toString();
controlid = m.value( "controlid" ).toString();
@ -348,14 +348,25 @@ Servent::readyRead()
if( !nodeid.isEmpty() ) // only control connections send nodeid
{
bool dupe = false;
if ( m_connectedNodes.contains( nodeid ) )
dupe = true;
foreach( ControlConnection* con, m_controlconnections )
{
tDebug() << "known connection:" << con->id() << con->source()->friendlyName();
if( con->id() == nodeid )
{
tLog() << "Duplicate control connection detected, dropping:" << nodeid << conntype;
goto closeconnection;
dupe = true;
break;
}
}
if ( dupe )
{
tLog() << "Duplicate control connection detected, dropping:" << nodeid << conntype;
goto closeconnection;
}
}
foreach( ControlConnection* con, m_controlconnections )
@ -374,11 +385,12 @@ Servent::readyRead()
Connection* conn = claimOffer( cc, nodeid, key, sock->peerAddress() );
if( !conn )
{
tLog() << "claimOffer FAILED, key:" << key;
tLog() << "claimOffer FAILED, key:" << key << nodeid;
goto closeconnection;
}
tDebug( LOGVERBOSE ) << "claimOffer OK:" << key;
tDebug( LOGVERBOSE ) << "claimOffer OK:" << key << nodeid;
m_connectedNodes << nodeid;
if( !nodeid.isEmpty() )
conn->setId( nodeid );

View File

@ -159,6 +159,8 @@ private:
QJson::Parser parser;
QList< ControlConnection* > m_controlconnections; // canonical list of authed peers
QMap< QString, QWeakPointer<Connection> > m_offers;
QStringList m_connectedNodes;
int m_port, m_externalPort;
QHostAddress m_externalAddress;
QString m_externalHostname;

View File

@ -46,6 +46,7 @@ Source::Source( int id, const QString& username )
, m_updateIndexWhenSynced( false )
, m_state( DBSyncConnection::UNKNOWN )
, m_cc( 0 )
, m_commandCount( 0 )
, m_avatar( 0 )
, m_fancyAvatar( 0 )
{
@ -224,8 +225,19 @@ void
Source::scanningFinished( unsigned int files )
{
Q_UNUSED( files );
m_textStatus = QString();
if ( m_updateIndexWhenSynced )
{
m_updateIndexWhenSynced = false;
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}
emit stateChanged();
emit synced();
}
@ -251,29 +263,16 @@ Source::onStateChanged( DBSyncConnection::State newstate, DBSyncConnection::Stat
msg = tr( "Parsing" );
break;
}
case DBSyncConnection::SAVING:
case DBSyncConnection::SCANNING:
{
msg = tr( "Saving" );
msg = tr( "Scanning (%L1 tracks)" ).arg( info );
break;
}
case DBSyncConnection::SYNCED:
{
if ( m_updateIndexWhenSynced )
{
m_updateIndexWhenSynced = false;
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}
msg = QString();
break;
}
case DBSyncConnection::SCANNING:
{
msg = tr( "Scanning (%L1 tracks)" ).arg( info );
break;
}
default:
msg = QString();
@ -336,6 +335,51 @@ Source::trackTimerFired()
}
void
Source::addCommand( const QSharedPointer<DatabaseCommand>& command )
{
m_cmds << command;
if ( !command->singletonCmd() )
m_lastCmdGuid = command->guid();
m_commandCount = m_cmds.count();
}
void
Source::executeCommands()
{
if ( !m_cmds.isEmpty() )
{
QSharedPointer<DatabaseCommand> cmd = m_cmds.takeFirst();
connect( cmd.data(), SIGNAL( finished() ), SLOT( executeCommands() ) );
Database::instance()->enqueue( cmd );
int percentage = ( float( m_commandCount - m_cmds.count() ) / (float)m_commandCount ) * 100.0;
m_textStatus = tr( "Saving (%1%)" ).arg( percentage );
emit stateChanged();
}
else
{
if ( m_updateIndexWhenSynced )
{
m_updateIndexWhenSynced = false;
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}
m_textStatus = QString();
m_state = DBSyncConnection::SYNCED;
emit commandsFinished();
emit stateChanged();
emit synced();
}
}
void
Source::reportSocialAttributesChanged( DatabaseCommand_SocialAction* action )
{
@ -359,11 +403,5 @@ Source::reportSocialAttributesChanged( DatabaseCommand_SocialAction* action )
void
Source::updateIndexWhenSynced()
{
if ( isLocal() )
{
DatabaseCommand* cmd = new DatabaseCommand_UpdateSearchIndex();
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
}
else
m_updateIndexWhenSynced = true;
m_updateIndexWhenSynced = true;
}

View File

@ -87,6 +87,8 @@ public:
signals:
void syncedWithDatabase();
void synced();
void online();
void offline();
@ -100,6 +102,7 @@ signals:
void playbackFinished( const Tomahawk::query_ptr& query );
void stateChanged();
void commandsFinished();
void socialAttributesChanged();
@ -112,18 +115,22 @@ public slots:
private slots:
void dbLoaded( unsigned int id, const QString& fname );
QString lastCmdGuid() const { return m_lastCmdGuid; }
void setLastCmdGuid( const QString& guid ) { m_lastCmdGuid = guid; }
void updateIndexWhenSynced();
void setOffline();
void setOnline();
void onStateChanged( DBSyncConnection::State newstate, DBSyncConnection::State oldstate, const QString& info );
void onPlaybackStarted( const Tomahawk::query_ptr& query );
void onPlaybackFinished( const Tomahawk::query_ptr& query );
void trackTimerFired();
void executeCommands();
private:
void addCommand( const QSharedPointer<DatabaseCommand>& command );
void reportSocialAttributesChanged( DatabaseCommand_SocialAction* action );
QList< QSharedPointer<Collection> > m_collections;
@ -144,6 +151,8 @@ private:
QTimer m_currentTrackTimer;
ControlConnection* m_cc;
QList< QSharedPointer<DatabaseCommand> > m_cmds;
int m_commandCount;
QPixmap* m_avatar;
mutable QPixmap* m_fancyAvatar;