From 2edd93ab3c2a07086010094c039c89fc1f197377 Mon Sep 17 00:00:00 2001 From: Christian Muehlhaeuser Date: Mon, 10 Jan 2011 15:00:33 +0100 Subject: [PATCH] * BufferIODevice & FileTransferConnection now support seeking (needs testing). --- src/libtomahawk/network/bufferiodevice.cpp | 209 +++++++++++++++--- src/libtomahawk/network/bufferiodevice.h | 33 ++- src/libtomahawk/network/connection.cpp | 6 +- .../network/filetransferconnection.cpp | 70 ++++-- .../network/filetransferconnection.h | 4 + 5 files changed, 264 insertions(+), 58 deletions(-) diff --git a/src/libtomahawk/network/bufferiodevice.cpp b/src/libtomahawk/network/bufferiodevice.cpp index b4e533643..2bea719ef 100644 --- a/src/libtomahawk/network/bufferiodevice.cpp +++ b/src/libtomahawk/network/bufferiodevice.cpp @@ -1,11 +1,18 @@ -#include #include "bufferiodevice.h" +#include +#include +#include -BufferIODevice::BufferIODevice( unsigned int size, QObject *parent ) : - QIODevice( parent ), - m_size( size ), - m_received( 0 ) +// Msgs are framed, this is the size each msg we send containing audio data: +#define BLOCKSIZE 4096 + + +BufferIODevice::BufferIODevice( unsigned int size, QObject *parent ) + : QIODevice( parent ) + , m_size( size ) + , m_received( 0 ) + , m_pos( 0 ) { } @@ -16,7 +23,7 @@ BufferIODevice::open( OpenMode mode ) QMutexLocker lock( &m_mut ); qDebug() << Q_FUNC_INFO; - QIODevice::open( QIODevice::ReadWrite ); // FIXME? + QIODevice::open( QIODevice::ReadOnly | QIODevice::Unbuffered ); // FIXME? return true; } @@ -31,6 +38,32 @@ BufferIODevice::close() } +bool +BufferIODevice::seek( qint64 pos ) +{ + qDebug() << Q_FUNC_INFO << pos << m_size; + + if ( pos >= m_size ) + return false; + + int block = blockForPos( pos ); + if ( isBlockEmpty( block ) ) + emit blockRequest( block ); + + m_pos = pos; + qDebug() << "Finished seeking"; + + return true; +} + + +void +BufferIODevice::seeked( int block ) +{ + qDebug() << Q_FUNC_INFO << block << m_size; +} + + void BufferIODevice::inputComplete( const QString& errmsg ) { @@ -41,62 +74,76 @@ BufferIODevice::inputComplete( const QString& errmsg ) void -BufferIODevice::addData( QByteArray ba ) +BufferIODevice::addData( int block, const QByteArray& ba ) { - writeData( ba.data(), ba.length() ); + { + QMutexLocker lock( &m_mut ); + + while ( m_buffer.count() <= block ) + m_buffer << QByteArray(); + + m_buffer.replace( block, ba ); + } + + // If this was the last block of the transfer, check if we need to fill up gaps + if ( block + 1 == maxBlocks() ) + { + if ( nextEmptyBlock() >= 0 ) + { + emit blockRequest( nextEmptyBlock() ); + } + } + + emit bytesWritten( ba.count() ); + emit readyRead(); } qint64 BufferIODevice::bytesAvailable() const { - QMutexLocker lock( &m_mut ); - return m_buffer.length(); + return m_size - m_pos; } qint64 -BufferIODevice::readData( char * data, qint64 maxSize ) +BufferIODevice::readData( char* data, qint64 maxSize ) { - //qDebug() << Q_FUNC_INFO << maxSize; - QMutexLocker lock( &m_mut ); +// qDebug() << Q_FUNC_INFO << m_pos << maxSize << 1; - qint64 size = maxSize; - if ( m_buffer.length() < maxSize ) - size = m_buffer.length(); + if ( atEnd() ) + return 0; - memcpy( data, m_buffer.data(), size ); - m_buffer.remove( 0, size ); + QByteArray ba; + ba.append( getData( m_pos, maxSize ) ); + m_pos += ba.count(); - //qDebug() << "readData ends, bufersize:" << m_buffer.length(); - return size; +// qDebug() << Q_FUNC_INFO << maxSize << ba.count() << 2; + memcpy( data, ba.data(), ba.count() ); + + return ba.count(); } -qint64 BufferIODevice::writeData( const char * data, qint64 maxSize ) +qint64 BufferIODevice::writeData( const char* data, qint64 maxSize ) { - { - QMutexLocker lock( &m_mut ); - m_buffer.append( data, maxSize ); - m_received += maxSize; - } - - emit bytesWritten( maxSize ); - emit readyRead(); - return maxSize; + // call addData instead + Q_ASSERT( false ); + return 0; } qint64 BufferIODevice::size() const { + qDebug() << Q_FUNC_INFO << m_size; return m_size; } bool BufferIODevice::atEnd() const { - QMutexLocker lock( &m_mut ); - return ( m_size == m_received && m_buffer.length() == 0 ); +// qDebug() << Q_FUNC_INFO << ( m_size <= m_pos ); + return ( m_size <= m_pos ); } @@ -104,5 +151,103 @@ void BufferIODevice::clear() { QMutexLocker lock( &m_mut ); + + m_pos = 0; m_buffer.clear(); } + + +unsigned int +BufferIODevice::blockSize() +{ + return BLOCKSIZE; +} + + +int +BufferIODevice::blockForPos( qint64 pos ) const +{ + // 0 / 4096 -> block 0 + // 4095 / 4096 -> block 0 + // 4096 / 4096 -> block 1 + + return pos / BLOCKSIZE; +} + + +int +BufferIODevice::offsetForPos( qint64 pos ) const +{ + // 0 % 4096 -> offset 0 + // 4095 % 4096 -> offset 4095 + // 4096 % 4096 -> offset 0 + + return pos % BLOCKSIZE; +} + + +int +BufferIODevice::nextEmptyBlock() const +{ + int i = 0; + foreach( const QByteArray& ba, m_buffer ) + { + if ( ba.isEmpty() ) + return i; + + i++; + } + + if ( i == maxBlocks() ) + return -1; + + return i; +} + + +int +BufferIODevice::maxBlocks() const +{ + int i = m_size / BLOCKSIZE; + + if ( ( m_size % BLOCKSIZE ) > 0 ) + i++; + + return i; +} + + +bool +BufferIODevice::isBlockEmpty( int block ) const +{ + if ( block >= m_buffer.count() ) + return true; + + if ( m_buffer.at( block ).isEmpty() ) + return true; +} + + +QByteArray +BufferIODevice::getData( qint64 pos, qint64 size ) +{ +// qDebug() << Q_FUNC_INFO << pos << size << 1; + QByteArray ba; + int block = blockForPos( pos ); + int offset = offsetForPos( pos ); + + QMutexLocker lock( &m_mut ); + while( ba.count() < size ) + { + if ( block > maxBlocks() ) + break; + + if ( isBlockEmpty( block ) ) + break; + + ba.append( m_buffer.at( block++ ).mid( offset ) ); + } + +// qDebug() << Q_FUNC_INFO << pos << size << 2; + return ba.left( size ); +} diff --git a/src/libtomahawk/network/bufferiodevice.h b/src/libtomahawk/network/bufferiodevice.h index 69a661d76..0fca19b94 100644 --- a/src/libtomahawk/network/bufferiodevice.h +++ b/src/libtomahawk/network/bufferiodevice.h @@ -4,35 +4,58 @@ #include #include #include +#include class BufferIODevice : public QIODevice { Q_OBJECT + public: explicit BufferIODevice( unsigned int size = 0, QObject *parent = 0 ); virtual bool open( OpenMode mode ); virtual void close(); + virtual bool seek( qint64 pos ); + void seeked( int block ); + virtual qint64 bytesAvailable() const; virtual qint64 size() const; virtual bool atEnd() const; + virtual qint64 pos() const { qDebug() << Q_FUNC_INFO << m_pos; return m_pos; } - void addData( QByteArray ba ); + void addData( int block, const QByteArray& ba ); void clear(); - OpenMode openMode() const { qDebug() << "openMode"; return QIODevice::ReadWrite; } + OpenMode openMode() const { qDebug() << "openMode"; return QIODevice::ReadOnly | QIODevice::Unbuffered; } void inputComplete( const QString& errmsg = "" ); + virtual bool isSequential() const { return false; } + + static unsigned int blockSize(); + + int maxBlocks() const; + int nextEmptyBlock() const; + bool isBlockEmpty( int block ) const; + +signals: + void blockRequest( int block ); + protected: - virtual qint64 readData( char * data, qint64 maxSize ); - virtual qint64 writeData( const char * data, qint64 maxSize ); + virtual qint64 readData( char* data, qint64 maxSize ); + virtual qint64 writeData( const char* data, qint64 maxSize ); private: - QByteArray m_buffer; + int blockForPos( qint64 pos ) const; + int offsetForPos( qint64 pos ) const; + QByteArray getData( qint64 pos, qint64 size ); + + QList m_buffer; mutable QMutex m_mut; //const methods need to lock unsigned int m_size, m_received; + + unsigned int m_pos; }; #endif // BUFFERIODEVICE_H diff --git a/src/libtomahawk/network/connection.cpp b/src/libtomahawk/network/connection.cpp index 2dc64db52..42c78bd74 100644 --- a/src/libtomahawk/network/connection.cpp +++ b/src/libtomahawk/network/connection.cpp @@ -5,7 +5,7 @@ #include "network/servent.h" -#define PROTOVER "3" // must match remote peer, or we can't talk. +#define PROTOVER "4" // must match remote peer, or we can't talk. Connection::Connection( Servent* parent ) @@ -70,8 +70,8 @@ Connection::handleIncomingQueueEmpty() if( m_sock->bytesAvailable() == 0 && m_peer_disconnected ) { qDebug() << "No more data to read, peer disconnected. shutting down connection." - << "bytesavail" << m_sock->bytesAvailable() - << "bytesrx" << m_rx_bytes; + << "bytesavail" << m_sock->bytesAvailable() + << "bytesrx" << m_rx_bytes; shutdown(); } } diff --git a/src/libtomahawk/network/filetransferconnection.cpp b/src/libtomahawk/network/filetransferconnection.cpp index ea9e9a924..dbfe3570d 100644 --- a/src/libtomahawk/network/filetransferconnection.cpp +++ b/src/libtomahawk/network/filetransferconnection.cpp @@ -10,9 +10,6 @@ #include "database/database.h" #include "sourcelist.h" -// Msgs are framed, this is the size each msg we send containing audio data: -#define BLOCKSIZE 4096 - using namespace Tomahawk; @@ -21,6 +18,7 @@ FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* c , m_cc( cc ) , m_fid( fid ) , m_type( RECEIVING ) + , m_curBlock( 0 ) , m_badded( 0 ) , m_bsent( 0 ) , m_allok( false ) @@ -37,7 +35,8 @@ FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* c // if the audioengine closes the iodev (skip/stop/etc) then kill the connection // immediately to avoid unnecessary network transfer - connect( m_iodev.data(), SIGNAL( aboutToClose() ), this, SLOT( shutdown() ), Qt::QueuedConnection ); + connect( m_iodev.data(), SIGNAL( aboutToClose() ), SLOT( shutdown() ), Qt::QueuedConnection ); + connect( m_iodev.data(), SIGNAL( blockRequest( int ) ), SLOT( onBlockRequest( int ) ) ); // auto delete when connection closes: connect( this, SIGNAL( finished() ), SLOT( deleteLater() ), Qt::QueuedConnection ); @@ -170,21 +169,41 @@ FileTransferConnection::startSending( const Tomahawk::result_ptr& result ) void FileTransferConnection::handleMsg( msg_ptr msg ) { - Q_ASSERT( m_type == FileTransferConnection::RECEIVING ); Q_ASSERT( msg->is( Msg::RAW ) ); - m_badded += msg->payload().length(); - ((BufferIODevice*)m_iodev.data())->addData( msg->payload() ); + if ( msg->payload().startsWith( "block" ) ) + { + int block = QString( msg->payload() ).mid( 5 ).toInt(); + m_readdev->seek( block * BufferIODevice::blockSize() ); + + qDebug() << "Seeked to block:" << block; + + QByteArray sm; + sm.append( QString( "doneblock%1" ).arg( block ) ); + + sendMsg( Msg::factory( sm, Msg::RAW | Msg::FRAGMENT ) ); + QTimer::singleShot( 0, this, SLOT( sendSome() ) ); + } + else if ( msg->payload().startsWith( "doneblock" ) ) + { + int block = QString( msg->payload() ).mid( 9 ).toInt(); + ((BufferIODevice*)m_iodev.data())->seeked( block ); + + m_curBlock = block; + qDebug() << "Next block is now:" << block; + } + else if ( msg->payload().startsWith( "data" ) ) + { + m_badded += msg->payload().length() - 4; + ((BufferIODevice*)m_iodev.data())->addData( m_curBlock++, msg->payload().mid( 4 ) ); + } //qDebug() << Q_FUNC_INFO << "flags" << (int) msg->flags() // << "payload len" << msg->payload().length() // << "written to device so far: " << m_badded; - if( !msg->is( Msg::FRAGMENT ) ) + if ( ((BufferIODevice*)m_iodev.data())->nextEmptyBlock() < 0 ) { - qDebug() << "*** Got last msg in filetransfer. added" << m_badded - << "io size" << m_iodev->size(); - m_allok = true; // tell our iodev there is no more data to read, no args meaning a success: ((BufferIODevice*)m_iodev.data())->inputComplete(); @@ -193,26 +212,26 @@ FileTransferConnection::handleMsg( msg_ptr msg ) } -Connection* FileTransferConnection::clone() +Connection* +FileTransferConnection::clone() { Q_ASSERT( false ); return 0; } -void FileTransferConnection::sendSome() +void +FileTransferConnection::sendSome() { Q_ASSERT( m_type == FileTransferConnection::SENDING ); - QByteArray ba = m_readdev->read( BLOCKSIZE ); - m_bsent += ba.length(); - //qDebug() << "Sending" << ba.length() << "bytes of audiofile"; + QByteArray ba = "data"; + ba.append( m_readdev->read( BufferIODevice::blockSize() ) ); + m_bsent += ba.length() - 4; if( m_readdev->atEnd() ) { sendMsg( Msg::factory( ba, Msg::RAW ) ); - qDebug() << "Sent all. DONE." << m_bsent; - shutdown( true ); return; } else @@ -225,3 +244,18 @@ void FileTransferConnection::sendSome() // (this is where upload throttling could be implemented) QTimer::singleShot( 0, this, SLOT( sendSome() ) ); } + + +void +FileTransferConnection::onBlockRequest( int block ) +{ + qDebug() << Q_FUNC_INFO << block; + + if ( m_curBlock == block ) + return; + + QByteArray sm; + sm.append( QString( "block%1" ).arg( block ) ); + + sendMsg( Msg::factory( sm, Msg::RAW | Msg::FRAGMENT ) ); +} diff --git a/src/libtomahawk/network/filetransferconnection.h b/src/libtomahawk/network/filetransferconnection.h index 467eb8d46..d0ec0007e 100644 --- a/src/libtomahawk/network/filetransferconnection.h +++ b/src/libtomahawk/network/filetransferconnection.h @@ -54,6 +54,8 @@ private slots: void sendSome(); void showStats( qint64 tx, qint64 rx ); + void onBlockRequest( int pos ); + private: QSharedPointer m_iodev; ControlConnection* m_cc; @@ -61,6 +63,8 @@ private: Type m_type; QSharedPointer m_readdev; + int m_curBlock; + int m_badded, m_bsent; bool m_allok; // got last msg ok, transfer complete?