1
0
mirror of https://github.com/tomahawk-player/tomahawk.git synced 2025-08-06 14:16:32 +02:00

* Prevent shutting down socket before all data has been read.

This commit is contained in:
Christian Muehlhaeuser
2010-10-27 04:37:22 +02:00
parent bafdc244a7
commit 23d7eeeb02
6 changed files with 61 additions and 51 deletions

View File

@@ -1,10 +1,11 @@
#include <QDebug> #include <QDebug>
#include "bufferiodevice.h" #include "bufferiodevice.h"
BufferIODevice::BufferIODevice( unsigned int size, QObject *parent ) : BufferIODevice::BufferIODevice( unsigned int size, QObject *parent ) :
QIODevice( parent ), QIODevice( parent ),
m_size(size), m_size( size ),
m_received(0) m_received( 0 )
{ {
} }
@@ -27,7 +28,6 @@ BufferIODevice::close()
qDebug() << Q_FUNC_INFO; qDebug() << Q_FUNC_INFO;
QIODevice::close(); QIODevice::close();
// TODO ?
} }
@@ -39,6 +39,7 @@ BufferIODevice::inputComplete( const QString& errmsg )
emit readChannelFinished(); emit readChannelFinished();
} }
void void
BufferIODevice::addData( QByteArray ba ) BufferIODevice::addData( QByteArray ba )
{ {
@@ -57,10 +58,8 @@ BufferIODevice::bytesAvailable() const
qint64 qint64
BufferIODevice::readData( char * data, qint64 maxSize ) BufferIODevice::readData( char * data, qint64 maxSize )
{ {
// qDebug() << Q_FUNC_INFO << maxSize; //qDebug() << Q_FUNC_INFO << maxSize;
QMutexLocker lock( &m_mut ); QMutexLocker lock( &m_mut );
// qDebug() << "readData begins, bufersize:" << m_buffer.length();
qint64 size = maxSize; qint64 size = maxSize;
if ( m_buffer.length() < maxSize ) if ( m_buffer.length() < maxSize )
@@ -69,7 +68,7 @@ BufferIODevice::readData( char * data, qint64 maxSize )
memcpy( data, m_buffer.data(), size ); memcpy( data, m_buffer.data(), size );
m_buffer.remove( 0, size ); m_buffer.remove( 0, size );
// qDebug() << "readData ends, bufersize:" << m_buffer.length(); //qDebug() << "readData ends, bufersize:" << m_buffer.length();
return size; return size;
} }
@@ -93,13 +92,14 @@ qint64 BufferIODevice::size() const
return m_size; return m_size;
} }
bool BufferIODevice::atEnd() const bool BufferIODevice::atEnd() const
{ {
QMutexLocker lock( &m_mut ); QMutexLocker lock( &m_mut );
return m_size == m_received && return ( m_size == m_received && m_buffer.length() == 0 );
m_buffer.length() == 0;
} }
void void
BufferIODevice::clear() BufferIODevice::clear()
{ {

View File

@@ -21,7 +21,6 @@ public:
void addData( QByteArray ba ); void addData( QByteArray ba );
void clear(); void clear();
bool isOpen() const { qDebug() << "isOpen"; return true; }
OpenMode openMode() const { qDebug() << "openMode"; return QIODevice::ReadWrite; } OpenMode openMode() const { qDebug() << "openMode"; return QIODevice::ReadWrite; }
void inputComplete( const QString& errmsg = "" ); void inputComplete( const QString& errmsg = "" );

View File

@@ -266,10 +266,17 @@ Connection::socketDisconnected()
void void
Connection::socketDisconnectedError(QAbstractSocket::SocketError e) Connection::socketDisconnectedError(QAbstractSocket::SocketError e)
{ {
qDebug() << "SOCKET ERROR CODE" << e << this->name() << " CALLING Connection::shutdown(false)"; if ( e == QAbstractSocket::RemoteHostClosedError )
return;
qDebug() << "SOCKET ERROR CODE" << e << this->name() << "CALLING Connection::shutdown(false)";
m_peer_disconnected = true; m_peer_disconnected = true;
emit socketErrored(e); emit socketErrored(e);
shutdown(false); emit socketClosed();
shutdown( false );
} }

View File

@@ -65,8 +65,8 @@ public:
qint64 bytesSent() const { return m_tx_bytes; } qint64 bytesSent() const { return m_tx_bytes; }
qint64 bytesReceived() const { return m_rx_bytes; } qint64 bytesReceived() const { return m_rx_bytes; }
void setMsgProcessorModeOut( quint32 m ) { m_msgprocessor_out.setMode(m); } void setMsgProcessorModeOut( quint32 m ) { m_msgprocessor_out.setMode( m ); }
void setMsgProcessorModeIn( quint32 m ) { m_msgprocessor_in.setMode(m); } void setMsgProcessorModeIn( quint32 m ) { m_msgprocessor_in.setMode( m ); }
signals: signals:
void ready(); void ready();
@@ -74,7 +74,7 @@ signals:
void finished(); void finished();
void statsTick( qint64 tx_bytes_sec, qint64 rx_bytes_sec ); void statsTick( qint64 tx_bytes_sec, qint64 rx_bytes_sec );
void socketClosed(); void socketClosed();
void socketErrored(QAbstractSocket::SocketError); void socketErrored( QAbstractSocket::SocketError );
protected: protected:
virtual void setup() = 0; virtual void setup() = 0;
@@ -93,7 +93,7 @@ private slots:
void handleIncomingQueueEmpty(); void handleIncomingQueueEmpty();
void sendMsg_now( msg_ptr ); void sendMsg_now( msg_ptr );
void socketDisconnected(); void socketDisconnected();
void socketDisconnectedError(QAbstractSocket::SocketError); void socketDisconnectedError( QAbstractSocket::SocketError );
void readyRead(); void readyRead();
void doSetup(); void doSetup();
void authCheckTimeout(); void authCheckTimeout();

View File

@@ -16,8 +16,7 @@
using namespace Tomahawk; using namespace Tomahawk;
FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* cc, FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* cc, QString fid, unsigned int size )
QString fid, unsigned int size )
: Connection( s ) : Connection( s )
, m_cc( cc ) , m_cc( cc )
, m_fid( fid ) , m_fid( fid )
@@ -27,7 +26,8 @@ FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* c
, m_allok( false ) , m_allok( false )
{ {
qDebug() << Q_FUNC_INFO; qDebug() << Q_FUNC_INFO;
BufferIODevice * bio = new BufferIODevice(size);
BufferIODevice* bio = new BufferIODevice( size );
m_iodev = QSharedPointer<QIODevice>( bio ); // device audio data gets written to m_iodev = QSharedPointer<QIODevice>( bio ); // device audio data gets written to
m_iodev->open( QIODevice::ReadWrite ); m_iodev->open( QIODevice::ReadWrite );
@@ -41,13 +41,18 @@ FileTransferConnection::FileTransferConnection( Servent* s, ControlConnection* c
connect( this, SIGNAL( finished() ), SLOT( deleteLater() ), Qt::QueuedConnection ); connect( this, SIGNAL( finished() ), SLOT( deleteLater() ), Qt::QueuedConnection );
// don't fuck with our messages at all. No compression, no parsing, nothing: // don't fuck with our messages at all. No compression, no parsing, nothing:
this->setMsgProcessorModeIn( MsgProcessor::NOTHING ); this->setMsgProcessorModeIn ( MsgProcessor::NOTHING );
this->setMsgProcessorModeOut( MsgProcessor::NOTHING ); this->setMsgProcessorModeOut( MsgProcessor::NOTHING );
} }
FileTransferConnection::FileTransferConnection( Servent* s, QString fid ) FileTransferConnection::FileTransferConnection( Servent* s, QString fid )
: Connection(s), m_fid(fid), m_type(SENDING), m_badded(0), m_bsent(0), m_allok( false ) : Connection( s )
, m_fid( fid )
, m_type( SENDING )
, m_badded( 0 )
, m_bsent( 0 )
, m_allok( false )
{ {
APP->servent().registerFileTransferConnection( this ); APP->servent().registerFileTransferConnection( this );
// auto delete when connection closes: // auto delete when connection closes:
@@ -65,7 +70,10 @@ FileTransferConnection::~FileTransferConnection()
// protected, we could expose it: // protected, we could expose it:
//m_iodev->setErrorString("FTConnection providing data went away mid-transfer"); //m_iodev->setErrorString("FTConnection providing data went away mid-transfer");
((BufferIODevice*)m_iodev.data())->inputComplete();
} }
APP->servent().fileTransferFinished( this ); APP->servent().fileTransferFinished( this );
} }
@@ -73,9 +81,9 @@ FileTransferConnection::~FileTransferConnection()
QString QString
FileTransferConnection::id() const FileTransferConnection::id() const
{ {
return QString("FTC[%1 %2]") return QString( "FTC[%1 %2]" )
.arg( m_type == SENDING ? "TX" : "RX" ) .arg( m_type == SENDING ? "TX" : "RX" )
.arg(m_fid); .arg( m_fid );
} }
@@ -85,8 +93,8 @@ FileTransferConnection::showStats( qint64 tx, qint64 rx )
if( tx > 0 || rx > 0 ) if( tx > 0 || rx > 0 )
{ {
qDebug() << id() qDebug() << id()
<< QString("Down: %L1 bytes/sec, ").arg(rx) << QString( "Down: %L1 bytes/sec," ).arg( rx )
<< QString("Up: %L1 bytes/sec").arg(tx); << QString( "Up: %L1 bytes/sec" ).arg( tx );
} }
} }
@@ -95,7 +103,7 @@ void
FileTransferConnection::setup() FileTransferConnection::setup()
{ {
connect( this, SIGNAL( statsTick( qint64, qint64 ) ), SLOT( showStats( qint64, qint64 ) ) ); connect( this, SIGNAL( statsTick( qint64, qint64 ) ), SLOT( showStats( qint64, qint64 ) ) );
if(m_type == RECEIVING) if( m_type == RECEIVING )
{ {
qDebug() << "in RX mode"; qDebug() << "in RX mode";
return; return;
@@ -103,25 +111,25 @@ FileTransferConnection::setup()
qDebug() << "in TX mode, fid:" << m_fid; qDebug() << "in TX mode, fid:" << m_fid;
DatabaseCommand_LoadFile * cmd = new DatabaseCommand_LoadFile(m_fid); DatabaseCommand_LoadFile* cmd = new DatabaseCommand_LoadFile( m_fid );
connect(cmd, SIGNAL(result(QVariantMap)), this, SLOT(startSending(QVariantMap))); connect( cmd, SIGNAL( result( QVariantMap ) ), SLOT( startSending( QVariantMap ) ) );
TomahawkApp::instance()->database()->enqueue(QSharedPointer<DatabaseCommand>(cmd)); TomahawkApp::instance()->database()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
} }
void void
FileTransferConnection::startSending( QVariantMap f ) FileTransferConnection::startSending( QVariantMap f )
{ {
Tomahawk::result_ptr result(new Tomahawk::Result(f, collection_ptr())); Tomahawk::result_ptr result( new Tomahawk::Result( f, collection_ptr() ) );
qDebug() << "Starting to transmit" << result->url(); qDebug() << "Starting to transmit" << result->url();
QSharedPointer<QIODevice> io = TomahawkApp::instance()->getIODeviceForUrl(result); QSharedPointer<QIODevice> io = TomahawkApp::instance()->getIODeviceForUrl( result );
if(!io) if( !io )
{ {
qDebug() << "Couldn't read from source:" << result->url(); qDebug() << "Couldn't read from source:" << result->url();
shutdown(); shutdown();
return; return;
} }
m_readdev = QSharedPointer<QIODevice>(io); m_readdev = QSharedPointer<QIODevice>( io );
sendSome(); sendSome();
} }
@@ -129,7 +137,7 @@ FileTransferConnection::startSending( QVariantMap f )
void void
FileTransferConnection::handleMsg( msg_ptr msg ) FileTransferConnection::handleMsg( msg_ptr msg )
{ {
Q_ASSERT(m_type == FileTransferConnection::RECEIVING); Q_ASSERT( m_type == FileTransferConnection::RECEIVING );
Q_ASSERT( msg->is( Msg::RAW ) ); Q_ASSERT( msg->is( Msg::RAW ) );
m_badded += msg->payload().length(); m_badded += msg->payload().length();
@@ -139,13 +147,11 @@ FileTransferConnection::handleMsg( msg_ptr msg )
// << "payload len" << msg->payload().length() // << "payload len" << msg->payload().length()
// << "written to device so far: " << m_badded; // << "written to device so far: " << m_badded;
if( !msg->is( Msg::FRAGMENT ) ) if( !msg->is( Msg::FRAGMENT ) )
{ {
qDebug() << endl qDebug() << "*** Got last msg in filetransfer. added" << m_badded
<< "*** Got last msg in filetransfer. added" << m_badded << "io size" << m_iodev->size();
<< "io size" << m_iodev->size()
<< endl;
m_allok = true; m_allok = true;
// tell our iodev there is no more data to read, no args meaning a success: // tell our iodev there is no more data to read, no args meaning a success:
((BufferIODevice*)m_iodev.data())->inputComplete(); ((BufferIODevice*)m_iodev.data())->inputComplete();
@@ -156,23 +162,24 @@ FileTransferConnection::handleMsg( msg_ptr msg )
Connection* FileTransferConnection::clone() Connection* FileTransferConnection::clone()
{ {
Q_ASSERT(false); return 0; Q_ASSERT( false );
return 0;
} }
void FileTransferConnection::sendSome() void FileTransferConnection::sendSome()
{ {
Q_ASSERT(m_type == FileTransferConnection::SENDING); Q_ASSERT( m_type == FileTransferConnection::SENDING );
QByteArray ba = m_readdev->read(BLOCKSIZE); QByteArray ba = m_readdev->read( BLOCKSIZE );
m_bsent += ba.length(); m_bsent += ba.length();
//qDebug() << "Sending " << ba.length() << " bytes of audiofile"; //qDebug() << "Sending" << ba.length() << "bytes of audiofile";
if( m_readdev->atEnd() ) if( m_readdev->atEnd() )
{ {
sendMsg( Msg::factory( ba, Msg::RAW ) ); sendMsg( Msg::factory( ba, Msg::RAW ) );
qDebug() << "Sent all. DONE. " << m_bsent; qDebug() << "Sent all. DONE." << m_bsent;
shutdown(true); shutdown( true );
return; return;
} }
else else

View File

@@ -32,10 +32,7 @@ public:
void setup(); void setup();
Connection* clone(); Connection* clone();
const QSharedPointer<QIODevice>& iodevice() const QSharedPointer<QIODevice>& iodevice() { return m_iodev; }
{
return m_iodev;
}
Type type() const { return m_type; } Type type() const { return m_type; }
QString fid() const { return m_fid; } QString fid() const { return m_fid; }