1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-04-21 00:12:06 +02:00

* BufferIODevice & FileTransferConnection now support seeking (needs testing).

This commit is contained in:
Christian Muehlhaeuser 2011-01-10 15:00:33 +01:00
parent 0dd7d1e0e0
commit 2edd93ab3c
5 changed files with 264 additions and 58 deletions

View File

@ -1,11 +1,18 @@
#include <QDebug>
#include "bufferiodevice.h"
#include <QDebug>
#include <QCoreApplication>
#include <QThread>
BufferIODevice::BufferIODevice( unsigned int size, QObject *parent ) :
QIODevice( parent ),
m_size( size ),
m_received( 0 )
// Msgs are framed, this is the size each msg we send containing audio data:
#define BLOCKSIZE 4096
BufferIODevice::BufferIODevice( unsigned int size, QObject *parent )
: QIODevice( parent )
, m_size( size )
, m_received( 0 )
, m_pos( 0 )
{
}
@ -16,7 +23,7 @@ BufferIODevice::open( OpenMode mode )
QMutexLocker lock( &m_mut );
qDebug() << Q_FUNC_INFO;
QIODevice::open( QIODevice::ReadWrite ); // FIXME?
QIODevice::open( QIODevice::ReadOnly | QIODevice::Unbuffered ); // FIXME?
return true;
}
@ -31,6 +38,32 @@ BufferIODevice::close()
}
bool
BufferIODevice::seek( qint64 pos )
{
qDebug() << Q_FUNC_INFO << pos << m_size;
if ( pos >= m_size )
return false;
int block = blockForPos( pos );
if ( isBlockEmpty( block ) )
emit blockRequest( block );
m_pos = pos;
qDebug() << "Finished seeking";
return true;
}
void
BufferIODevice::seeked( int block )
{
qDebug() << Q_FUNC_INFO << block << m_size;
}
void
BufferIODevice::inputComplete( const QString& errmsg )
{
@ -41,62 +74,76 @@ BufferIODevice::inputComplete( const QString& errmsg )
void
BufferIODevice::addData( QByteArray ba )
BufferIODevice::addData( int block, const QByteArray& ba )
{
writeData( ba.data(), ba.length() );
{
QMutexLocker lock( &m_mut );
while ( m_buffer.count() <= block )
m_buffer << QByteArray();
m_buffer.replace( block, ba );
}
// If this was the last block of the transfer, check if we need to fill up gaps
if ( block + 1 == maxBlocks() )
{
if ( nextEmptyBlock() >= 0 )
{
emit blockRequest( nextEmptyBlock() );
}
}
emit bytesWritten( ba.count() );
emit readyRead();
}
qint64
BufferIODevice::bytesAvailable() const
{
QMutexLocker lock( &m_mut );
return m_buffer.length();
return m_size - m_pos;
}
qint64
BufferIODevice::readData( char * data, qint64 maxSize )
BufferIODevice::readData( char* data, qint64 maxSize )
{
//qDebug() << Q_FUNC_INFO << maxSize;
QMutexLocker lock( &m_mut );
// qDebug() << Q_FUNC_INFO << m_pos << maxSize << 1;
qint64 size = maxSize;
if ( m_buffer.length() < maxSize )
size = m_buffer.length();
if ( atEnd() )
return 0;
memcpy( data, m_buffer.data(), size );
m_buffer.remove( 0, size );
QByteArray ba;
ba.append( getData( m_pos, maxSize ) );
m_pos += ba.count();
//qDebug() << "readData ends, bufersize:" << m_buffer.length();
return size;
// qDebug() << Q_FUNC_INFO << maxSize << ba.count() << 2;
memcpy( data, ba.data(), ba.count() );
return ba.count();
}
qint64 BufferIODevice::writeData( const char * data, qint64 maxSize )
qint64 BufferIODevice::writeData( const char* data, qint64 maxSize )
{
{
QMutexLocker lock( &m_mut );
m_buffer.append( data, maxSize );
m_received += maxSize;
}
emit bytesWritten( maxSize );
emit readyRead();
return maxSize;
// call addData instead
Q_ASSERT( false );
return 0;
}
qint64 BufferIODevice::size() const
{
qDebug() << Q_FUNC_INFO << m_size;
return m_size;
}
bool BufferIODevice::atEnd() const
{
QMutexLocker lock( &m_mut );
return ( m_size == m_received && m_buffer.length() == 0 );
// qDebug() << Q_FUNC_INFO << ( m_size <= m_pos );
return ( m_size <= m_pos );
}
@ -104,5 +151,103 @@ void
BufferIODevice::clear()
{
QMutexLocker lock( &m_mut );
m_pos = 0;
m_buffer.clear();
}
unsigned int
BufferIODevice::blockSize()
{
return BLOCKSIZE;
}
int
BufferIODevice::blockForPos( qint64 pos ) const
{
// 0 / 4096 -> block 0
// 4095 / 4096 -> block 0
// 4096 / 4096 -> block 1
return pos / BLOCKSIZE;
}
int
BufferIODevice::offsetForPos( qint64 pos ) const
{
// 0 % 4096 -> offset 0
// 4095 % 4096 -> offset 4095
// 4096 % 4096 -> offset 0
return pos % BLOCKSIZE;
}
int
BufferIODevice::nextEmptyBlock() const
{
int i = 0;
foreach( const QByteArray& ba, m_buffer )
{
if ( ba.isEmpty() )
return i;
i++;
}
if ( i == maxBlocks() )
return -1;
return i;
}
int
BufferIODevice::maxBlocks() const
{
int i = m_size / BLOCKSIZE;
if ( ( m_size % BLOCKSIZE ) > 0 )
i++;
return i;
}
bool
BufferIODevice::isBlockEmpty( int block ) const
{
if ( block >= m_buffer.count() )
return true;
if ( m_buffer.at( block ).isEmpty() )
return true;
}
QByteArray
BufferIODevice::getData( qint64 pos, qint64 size )
{
// qDebug() << Q_FUNC_INFO << pos << size << 1;
QByteArray ba;
int block = blockForPos( pos );
int offset = offsetForPos( pos );
QMutexLocker lock( &m_mut );
while( ba.count() < size )
{
if ( block > maxBlocks() )
break;
if ( isBlockEmpty( block ) )
break;
ba.append( m_buffer.at( block++ ).mid( offset ) );
}
// qDebug() << Q_FUNC_INFO << pos << size << 2;
return ba.left( size );
}

View File

@ -4,35 +4,58 @@
#include <QIODevice>
#include <QMutexLocker>
#include <QDebug>
#include <QFile>
class BufferIODevice : public QIODevice
{
Q_OBJECT
public:
explicit BufferIODevice( unsigned int size = 0, QObject *parent = 0 );
virtual bool open( OpenMode mode );
virtual void close();
virtual bool seek( qint64 pos );
void seeked( int block );
virtual qint64 bytesAvailable() const;
virtual qint64 size() const;
virtual bool atEnd() const;
virtual qint64 pos() const { qDebug() << Q_FUNC_INFO << m_pos; return m_pos; }
void addData( QByteArray ba );
void addData( int block, const QByteArray& ba );
void clear();
OpenMode openMode() const { qDebug() << "openMode"; return QIODevice::ReadWrite; }
OpenMode openMode() const { qDebug() << "openMode"; return QIODevice::ReadOnly | QIODevice::Unbuffered; }
void inputComplete( const QString& errmsg = "" );
virtual bool isSequential() const { return false; }
static unsigned int blockSize();
int maxBlocks() const;
int nextEmptyBlock() const;
bool isBlockEmpty( int block ) const;
signals:
void blockRequest( int block );
protected:
virtual qint64 readData( char * data, qint64 maxSize );
virtual qint64 writeData( const char * data, qint64 maxSize );
virtual qint64 readData( char* data, qint64 maxSize );
virtual qint64 writeData( const char* data, qint64 maxSize );
private:
QByteArray m_buffer;
int blockForPos( qint64 pos ) const;
int offsetForPos( qint64 pos ) const;
QByteArray getData( qint64 pos, qint64 size );
QList<QByteArray> m_buffer;
mutable QMutex m_mut; //const methods need to lock
unsigned int m_size, m_received;
unsigned int m_pos;
};
#endif // BUFFERIODEVICE_H

View File

@ -5,7 +5,7 @@
#include "network/servent.h"
#define PROTOVER "3" // must match remote peer, or we can't talk.
#define PROTOVER "4" // must match remote peer, or we can't talk.
Connection::Connection( Servent* parent )
@ -70,8 +70,8 @@ Connection::handleIncomingQueueEmpty()
if( m_sock->bytesAvailable() == 0 && m_peer_disconnected )
{
qDebug() << "No more data to read, peer disconnected. shutting down connection."
<< "bytesavail" << m_sock->bytesAvailable()
<< "bytesrx" << m_rx_bytes;
<< "bytesavail" << m_sock->bytesAvailable()
<< "bytesrx" << m_rx_bytes;
shutdown();
}
}

View File

@ -10,9 +10,6 @@
#include "database/database.h"
#include "sourcelist.h"
// Msgs are framed, this is the size each msg we send containing audio data:
#define BLOCKSIZE 4096
using namespace Tomahawk;
@ -21,6 +18,7 @@ FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* c
, m_cc( cc )
, m_fid( fid )
, m_type( RECEIVING )
, m_curBlock( 0 )
, m_badded( 0 )
, m_bsent( 0 )
, m_allok( false )
@ -37,7 +35,8 @@ FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* c
// if the audioengine closes the iodev (skip/stop/etc) then kill the connection
// immediately to avoid unnecessary network transfer
connect( m_iodev.data(), SIGNAL( aboutToClose() ), this, SLOT( shutdown() ), Qt::QueuedConnection );
connect( m_iodev.data(), SIGNAL( aboutToClose() ), SLOT( shutdown() ), Qt::QueuedConnection );
connect( m_iodev.data(), SIGNAL( blockRequest( int ) ), SLOT( onBlockRequest( int ) ) );
// auto delete when connection closes:
connect( this, SIGNAL( finished() ), SLOT( deleteLater() ), Qt::QueuedConnection );
@ -170,21 +169,41 @@ FileTransferConnection::startSending( const Tomahawk::result_ptr& result )
void
FileTransferConnection::handleMsg( msg_ptr msg )
{
Q_ASSERT( m_type == FileTransferConnection::RECEIVING );
Q_ASSERT( msg->is( Msg::RAW ) );
m_badded += msg->payload().length();
((BufferIODevice*)m_iodev.data())->addData( msg->payload() );
if ( msg->payload().startsWith( "block" ) )
{
int block = QString( msg->payload() ).mid( 5 ).toInt();
m_readdev->seek( block * BufferIODevice::blockSize() );
qDebug() << "Seeked to block:" << block;
QByteArray sm;
sm.append( QString( "doneblock%1" ).arg( block ) );
sendMsg( Msg::factory( sm, Msg::RAW | Msg::FRAGMENT ) );
QTimer::singleShot( 0, this, SLOT( sendSome() ) );
}
else if ( msg->payload().startsWith( "doneblock" ) )
{
int block = QString( msg->payload() ).mid( 9 ).toInt();
((BufferIODevice*)m_iodev.data())->seeked( block );
m_curBlock = block;
qDebug() << "Next block is now:" << block;
}
else if ( msg->payload().startsWith( "data" ) )
{
m_badded += msg->payload().length() - 4;
((BufferIODevice*)m_iodev.data())->addData( m_curBlock++, msg->payload().mid( 4 ) );
}
//qDebug() << Q_FUNC_INFO << "flags" << (int) msg->flags()
// << "payload len" << msg->payload().length()
// << "written to device so far: " << m_badded;
if( !msg->is( Msg::FRAGMENT ) )
if ( ((BufferIODevice*)m_iodev.data())->nextEmptyBlock() < 0 )
{
qDebug() << "*** Got last msg in filetransfer. added" << m_badded
<< "io size" << m_iodev->size();
m_allok = true;
// tell our iodev there is no more data to read, no args meaning a success:
((BufferIODevice*)m_iodev.data())->inputComplete();
@ -193,26 +212,26 @@ FileTransferConnection::handleMsg( msg_ptr msg )
}
Connection* FileTransferConnection::clone()
Connection*
FileTransferConnection::clone()
{
Q_ASSERT( false );
return 0;
}
void FileTransferConnection::sendSome()
void
FileTransferConnection::sendSome()
{
Q_ASSERT( m_type == FileTransferConnection::SENDING );
QByteArray ba = m_readdev->read( BLOCKSIZE );
m_bsent += ba.length();
//qDebug() << "Sending" << ba.length() << "bytes of audiofile";
QByteArray ba = "data";
ba.append( m_readdev->read( BufferIODevice::blockSize() ) );
m_bsent += ba.length() - 4;
if( m_readdev->atEnd() )
{
sendMsg( Msg::factory( ba, Msg::RAW ) );
qDebug() << "Sent all. DONE." << m_bsent;
shutdown( true );
return;
}
else
@ -225,3 +244,18 @@ void FileTransferConnection::sendSome()
// (this is where upload throttling could be implemented)
QTimer::singleShot( 0, this, SLOT( sendSome() ) );
}
void
FileTransferConnection::onBlockRequest( int block )
{
qDebug() << Q_FUNC_INFO << block;
if ( m_curBlock == block )
return;
QByteArray sm;
sm.append( QString( "block%1" ).arg( block ) );
sendMsg( Msg::factory( sm, Msg::RAW | Msg::FRAGMENT ) );
}

View File

@ -54,6 +54,8 @@ private slots:
void sendSome();
void showStats( qint64 tx, qint64 rx );
void onBlockRequest( int pos );
private:
QSharedPointer<QIODevice> m_iodev;
ControlConnection* m_cc;
@ -61,6 +63,8 @@ private:
Type m_type;
QSharedPointer<QIODevice> m_readdev;
int m_curBlock;
int m_badded, m_bsent;
bool m_allok; // got last msg ok, transfer complete?