Commit c127f9e5 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): добился работы LogServer/LogReader (libpoco)

parent c8296fd1
...@@ -85,7 +85,7 @@ int main( int argc, char** argv ) ...@@ -85,7 +85,7 @@ int main( int argc, char** argv )
string logfilter(""); string logfilter("");
LogServerTypes::Command cmd = LogServerTypes::cmdNOP; LogServerTypes::Command cmd = LogServerTypes::cmdNOP;
int cmdonly = 0; int cmdonly = 0;
timeout_t tout = 0; timeout_t tout = 5000;
timeout_t rdelay = 5000; timeout_t rdelay = 5000;
string logfile(""); string logfile("");
bool logtruncate = false; bool logtruncate = false;
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <cc++/socket.h>
#include "MBTCPPersistentSlave.h" #include "MBTCPPersistentSlave.h"
#include "Configuration.h" #include "Configuration.h"
#include "Debug.h" #include "Debug.h"
......
...@@ -21,18 +21,19 @@ ...@@ -21,18 +21,19 @@
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <ev++.h> #include <ev++.h>
#include "Poco/Net/StreamSocket.h"
#include "Mutex.h" #include "Mutex.h"
#include "DebugStream.h" #include "DebugStream.h"
#include "LogAgregator.h"
#include "USocket.h"
#include "UTCPCore.h" #include "UTCPCore.h"
#include "UTCPStream.h"
#include "LogAgregator.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! Реализация "сессии" для клиентов LogServer. */ /*! Реализация "сессии" для клиентов LogServer. */
class LogSession class LogSession
{ {
public: public:
LogSession(int sock, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000, timeout_t checkConnectionTime = 10000 ); LogSession( const Poco::Net::StreamSocket& s, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000, timeout_t checkConnectionTime = 10000 );
~LogSession(); ~LogSession();
typedef sigc::slot<void, LogSession*> FinalSlot; typedef sigc::slot<void, LogSession*> FinalSlot;
...@@ -120,7 +121,7 @@ class LogSession ...@@ -120,7 +121,7 @@ class LogSession
std::shared_ptr<LogAgregator> alog; std::shared_ptr<LogAgregator> alog;
sigc::connection conn; sigc::connection conn;
std::shared_ptr<USocket> sock; std::shared_ptr<UTCPStream> sock;
ev::io io; ev::io io;
ev::timer cmdTimer; ev::timer cmdTimer;
......
...@@ -3,18 +3,20 @@ ...@@ -3,18 +3,20 @@
#define UTCPSocket_H_ #define UTCPSocket_H_
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <string> #include <string>
#include <Poco/Net/RawSocket.h> #include <Poco/Net/ServerSocket.h>
#include "PassiveTimer.h" // for timeout_t #include "PassiveTimer.h" // for timeout_t
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class UTCPSocket: class UTCPSocket:
public Poco::Net::RawSocket public Poco::Net::ServerSocket
{ {
public: public:
UTCPSocket();
// dup and accept...raw socket // dup and accept...raw socket
UTCPSocket( int sock ); UTCPSocket( int sock );
UTCPSocket(const std::string& host, int port); UTCPSocket( const std::string& host, int port );
virtual ~UTCPSocket(); virtual ~UTCPSocket();
...@@ -32,15 +34,6 @@ class UTCPSocket: ...@@ -32,15 +34,6 @@ class UTCPSocket:
int getSocket(); int getSocket();
// --------------------------------------------------------------------
// Пришлось вынести эти функции read/write[Data] в public
// т.к. они сразу "посылают" данные в канал, в отличие от operator<<
// который у TCPStream (или std::iostream?) буферизует их и из-за этого
// не позволяет работать с отправкой коротких сообщений
// --------------------------------------------------------------------
ssize_t writeData( const void* buf, size_t len, timeout_t t = 0 );
ssize_t readData( void* buf, size_t len, char separator = 0, timeout_t t = 0 );
protected: protected:
void init( bool throwflag = false ); void init( bool throwflag = false );
......
...@@ -30,10 +30,13 @@ class UTCPStream: ...@@ -30,10 +30,13 @@ class UTCPStream:
{ {
public: public:
UTCPStream( const Poco::Net::StreamSocket& so );
UTCPStream(); UTCPStream();
virtual ~UTCPStream(); virtual ~UTCPStream();
void create( const std::string& hname, int port, timeout_t tout = 1000 ); void create( const std::string& hname, int port, timeout_t tout_msec = 1000 );
bool isConnected();
// set keepalive params // set keepalive params
// return true if OK // return true if OK
......
...@@ -67,7 +67,7 @@ void ModbusTCPMaster::setChannelTimeout( timeout_t msec ) ...@@ -67,7 +67,7 @@ void ModbusTCPMaster::setChannelTimeout( timeout_t msec )
if( old == msec ) if( old == msec )
return; return;
tcp->setReceiveTimeout(msec); tcp->setReceiveTimeout(msec*1000);
int oldKeepAlive = keepAliveTimeout; int oldKeepAlive = keepAliveTimeout;
keepAliveTimeout = (msec > 1000 ? msec / 1000 : 1); keepAliveTimeout = (msec > 1000 ? msec / 1000 : 1);
...@@ -115,14 +115,14 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -115,14 +115,14 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
assert(timeout); assert(timeout);
ptTimeout.setTiming(timeout); ptTimeout.setTiming(timeout);
tcp->setReceiveTimeout(timeout); tcp->setReceiveTimeout(timeout*1000);
msg.makeHead(++nTransaction, crcNoCheckit); msg.makeHead(++nTransaction, crcNoCheckit);
for( size_t i = 0; i < 2; i++ ) for( size_t i = 0; i < 2; i++ )
{ {
//if( tcp->isPending(ost::Socket::pendingOutput, timeout) ) //if( tcp->isPending(ost::Socket::pendingOutput, timeout) )
if( tcp->poll(timeout,Poco::Net::Socket::SELECT_READ ) ) if( tcp->poll(timeout*1000,Poco::Net::Socket::SELECT_READ ) )
{ {
mbErrCode res = send(msg); mbErrCode res = send(msg);
...@@ -168,7 +168,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -168,7 +168,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
//tcp->sync(); //tcp->sync();
if( tcp->poll(timeout,Poco::Net::Socket::SELECT_READ ) ) if( tcp->poll(timeout*1000,Poco::Net::Socket::SELECT_READ ) )
{ {
size_t ret = 0; size_t ret = 0;
...@@ -385,7 +385,7 @@ void ModbusTCPMaster::reconnect() ...@@ -385,7 +385,7 @@ void ModbusTCPMaster::reconnect()
{ {
tcp = make_shared<UTCPStream>(); tcp = make_shared<UTCPStream>();
tcp->create(iaddr, port, 500); tcp->create(iaddr, port, 500);
tcp->setReceiveTimeout(replyTimeOut_ms); tcp->setReceiveTimeout(replyTimeOut_ms*1000);
tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1)); tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true); tcp->setNoDelay(true);
} }
...@@ -432,8 +432,8 @@ void ModbusTCPMaster::connect( const Poco::Net::SocketAddress& addr, int _port ) ...@@ -432,8 +432,8 @@ void ModbusTCPMaster::connect( const Poco::Net::SocketAddress& addr, int _port )
try try
{ {
tcp = make_shared<UTCPStream>(); tcp = make_shared<UTCPStream>();
tcp->connect(addr,500); // tcp->create(iaddr, port, true, 500); tcp->connect(addr,500);
tcp->setReceiveTimeout(replyTimeOut_ms); // tcp->setTimeout(replyTimeOut_ms); tcp->setReceiveTimeout(replyTimeOut_ms*1000);
tcp->setKeepAlive(true); // tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1)); tcp->setKeepAlive(true); // tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true); tcp->setNoDelay(true);
} }
...@@ -485,7 +485,6 @@ void ModbusTCPMaster::forceDisconnect() ...@@ -485,7 +485,6 @@ void ModbusTCPMaster::forceDisconnect()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusTCPMaster::isConnection() const bool ModbusTCPMaster::isConnection() const
{ {
#warning Разобраться с isConnected return tcp && tcp->isConnected();
return (tcp != nullptr); /* && tcp->isConnected(); */
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -12,14 +12,16 @@ using namespace std; ...@@ -12,14 +12,16 @@ using namespace std;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
UTCPSocket::~UTCPSocket() UTCPSocket::~UTCPSocket()
{ {
close(); Poco::Net::ServerSocket::close();
//endSocket(); }
// shutdown(so, SHUT_RDWR); // -------------------------------------------------------------------------
UTCPSocket::UTCPSocket()
{
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( int sock ): UTCPSocket::UTCPSocket( int sock ):
Poco::Net::RawSocket(sock) Poco::Net::ServerSocket(sock)
{ {
/* /*
struct sockaddr_in client_addr; struct sockaddr_in client_addr;
...@@ -29,53 +31,43 @@ UTCPSocket::UTCPSocket( int sock ): ...@@ -29,53 +31,43 @@ UTCPSocket::UTCPSocket( int sock ):
if( so < 0 ) if( so < 0 )
{ {
endSocket(); endServerServerSocket();
error(errConnectRejected); error(errConnectRejected);
return; return;
} }
Socket::state = CONNECTED; ServerServerSocket::state = CONNECTED;
*/ */
init(); init();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( const string& host, int port ): UTCPSocket::UTCPSocket( const string& host, int port ):
Poco::Net::RawSocket(Poco::Net::SocketAddress(host,port),true) Poco::Net::ServerSocket(Poco::Net::SocketAddress(host,port))
{ {
init(); init();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool UTCPSocket::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl ) bool UTCPSocket::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{ {
return UTCPCore::setKeepAliveParams(Poco::Net::RawSocket::sockfd() , timeout_sec, keepcnt, keepintvl); return UTCPCore::setKeepAliveParams(Poco::Net::ServerSocket::sockfd() , timeout_sec, keepcnt, keepintvl);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool UTCPSocket::setNoDelay(bool enable) bool UTCPSocket::setNoDelay(bool enable)
{ {
Poco::Net::RawSocket::setNoDelay(enable); Poco::Net::ServerSocket::setNoDelay(enable);
return ( Poco::Net::RawSocket::getNoDelay() == enable ); return ( Poco::Net::ServerSocket::getNoDelay() == enable );
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
int UTCPSocket::getSocket() int UTCPSocket::getSocket()
{ {
return Poco::Net::RawSocket::sockfd(); return Poco::Net::ServerSocket::sockfd();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void UTCPSocket::init( bool throwflag ) void UTCPSocket::init( bool throwflag )
{ {
// setError(throwflag); // setError(throwflag);
Poco::Net::RawSocket::setKeepAlive(true); Poco::Net::ServerSocket::setKeepAlive(true);
Poco::Net::RawSocket::setLinger(true,1); Poco::Net::ServerSocket::setLinger(true,1);
setKeepAliveParams(); setKeepAliveParams();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ssize_t UTCPSocket::writeData(const void* buf, size_t len, timeout_t t)
{
return Poco::Net::RawSocket::sendBytes(buf, len);
}
// -------------------------------------------------------------------------
ssize_t UTCPSocket::readData(void* buf, size_t len, char separator, timeout_t t)
{
return Poco::Net::RawSocket::receiveBytes(buf,len);
}
// -------------------------------------------------------------------------
...@@ -31,6 +31,12 @@ UTCPStream::~UTCPStream() ...@@ -31,6 +31,12 @@ UTCPStream::~UTCPStream()
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
UTCPStream::UTCPStream(const Poco::Net::StreamSocket& so):
Poco::Net::StreamSocket(so)
{
}
UTCPStream::UTCPStream() UTCPStream::UTCPStream()
{ {
} }
...@@ -85,12 +91,18 @@ timeout_t UTCPStream::getTimeout() const ...@@ -85,12 +91,18 @@ timeout_t UTCPStream::getTimeout() const
return tm.microseconds(); return tm.microseconds();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void UTCPStream::create(const std::string& hname, int port, timeout_t tout ) void UTCPStream::create(const std::string& hname, int port, timeout_t tout_msec )
{ {
Poco::Net::SocketAddress sa(hname,port); Poco::Net::SocketAddress sa(hname,port);
connect(sa, tout); connect(sa,tout_msec*1000);
setKeepAlive(true); setKeepAlive(true);
Poco::Net::StreamSocket::setLinger(true,1); Poco::Net::StreamSocket::setLinger(true,1);
setKeepAliveParams(); setKeepAliveParams();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool UTCPStream::isConnected()
{
//return ( Poco::Net::StreamSocket::sockfd() > 0 );
return ( Poco::Net::StreamSocket::peerAddress().addr() != 0 );
}
// -------------------------------------------------------------------------
...@@ -77,9 +77,32 @@ void LogReader::connect( const std::string& _addr, int _port, timeout_t msec ) ...@@ -77,9 +77,32 @@ void LogReader::connect( const std::string& _addr, int _port, timeout_t msec )
try try
{ {
tcp = make_shared<UTCPStream>(); tcp = make_shared<UTCPStream>();
tcp->create(iaddr, port, 500); tcp->create(iaddr, port, msec );
tcp->setReceiveTimeout(msec); tcp->setReceiveTimeout(inTimeout*1000);
tcp->setSendTimeout(outTimeout*1000);
tcp->setKeepAlive(true); tcp->setKeepAlive(true);
tcp->setBlocking(true);
}
catch( const Poco::TimeoutException& e )
{
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " timeout..";
rlog.crit() << s.str() << std::endl;
}
tcp = 0;
}
catch( const Poco::Net::NetException& e )
{
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " error: " << e.what();
rlog.crit() << s.str() << std::endl;
}
tcp = 0;
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
...@@ -113,13 +136,22 @@ void LogReader::disconnect() ...@@ -113,13 +136,22 @@ void LogReader::disconnect()
if( rlog.is_info() ) if( rlog.is_info() )
rlog.info() << iaddr << "(LogReader): disconnect." << endl; rlog.info() << iaddr << "(LogReader): disconnect." << endl;
tcp->shutdown(); try
{
//tcp->shutdown();
tcp->close();
}
catch( const Poco::Net::NetException& e )
{
cerr << "(LogReader): disconnect error: " << e.displayText() << endl;
}
tcp = 0; tcp = 0;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool LogReader::isConnection() const bool LogReader::isConnection() const
{ {
return (tcp != nullptr); /* && tcp->isConnected(); */ return (tcp && tcp->isConnected() );
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Command>& vcmd, bool cmd_only, bool verbose ) void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Command>& vcmd, bool cmd_only, bool verbose )
...@@ -192,11 +224,11 @@ void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Com ...@@ -192,11 +224,11 @@ void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Com
} }
catch( const Poco::Net::NetException& e ) catch( const Poco::Net::NetException& e )
{ {
cerr << "(LogReader): " << e.displayText() << " (" << _addr << ")" << endl; cerr << "(LogReader): send error: " << e.displayText() << " (" << _addr << ")" << endl;
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
cerr << "(LogReader): " << ex.what() << endl; cerr << "(LogReader): send error: " << ex.what() << endl;
} }
n--; n--;
...@@ -221,7 +253,7 @@ void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Com ...@@ -221,7 +253,7 @@ void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Com
{ {
int a = 2; int a = 2;
while( a > 0 && tcp->poll(reply_timeout,Poco::Net::Socket::SELECT_READ) ) while( a > 0 && tcp->poll(reply_timeout*1000,Poco::Net::Socket::SELECT_READ) )
{ {
int n = tcp->available(); int n = tcp->available();
...@@ -240,11 +272,11 @@ void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Com ...@@ -240,11 +272,11 @@ void LogReader::sendCommand(const std::string& _addr, int _port, std::vector<Com
} }
catch( const Poco::Net::NetException& e ) catch( const Poco::Net::NetException& e )
{ {
cerr << "(LogReader): " << e.displayText() << " (" << _addr << ")" << endl; cerr << "(LogReader): read error: " << e.displayText() << " (" << _addr << ")" << endl;
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
cerr << "(LogReader): " << ex.what() << endl; cerr << "(LogReader): read error: " << ex.what() << endl;
} }
if( cmdonly && isConnection() ) if( cmdonly && isConnection() )
...@@ -303,7 +335,7 @@ void LogReader::readlogs( const std::string& _addr, int _port, LogServerTypes::C ...@@ -303,7 +335,7 @@ void LogReader::readlogs( const std::string& _addr, int _port, LogServerTypes::C
send_ok = true; send_ok = true;
} }
while( tcp->poll(inTimeout,Poco::Net::Socket::SELECT_READ) ) while( tcp->poll(inTimeout*1000, Poco::Net::Socket::SELECT_READ) )
{ {
ssize_t n = tcp->available(); ssize_t n = tcp->available();
...@@ -328,7 +360,7 @@ void LogReader::readlogs( const std::string& _addr, int _port, LogServerTypes::C ...@@ -328,7 +360,7 @@ void LogReader::readlogs( const std::string& _addr, int _port, LogServerTypes::C
rcount--; rcount--;
if( rcount != 0 ) if( rcount != 0 )
rlog.warn() << "(LogReader): ...connection timeout..." << endl; rlog.warn() << "(LogReader): ...read timeout..." << endl;
disconnect(); disconnect();
} }
...@@ -356,7 +388,7 @@ void LogReader::logOnEvent( const std::string& s ) ...@@ -356,7 +388,7 @@ void LogReader::logOnEvent( const std::string& s )
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose ) void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose )
{ {
if( !tcp ) if( !tcp || !tcp->isConnected() )
{ {
cerr << "(LogReader::sendCommand): tcp=NULL! no connection?!" << endl; cerr << "(LogReader::sendCommand): tcp=NULL! no connection?!" << endl;
return; return;
...@@ -364,7 +396,7 @@ void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose ) ...@@ -364,7 +396,7 @@ void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose )
try try
{ {
if( tcp->poll(outTimeout,Poco::Net::Socket::SELECT_WRITE) ) if( tcp->poll(outTimeout*1000,Poco::Net::Socket::SELECT_WRITE) )
{ {
rlog.info() << "(LogReader): ** send command: cmd='" << msg.cmd << "' logname='" << msg.logname << "' data='" << msg.data << "'" << endl; rlog.info() << "(LogReader): ** send command: cmd='" << msg.cmd << "' logname='" << msg.logname << "' data='" << msg.data << "'" << endl;
tcp->sendBytes((unsigned char*)(&msg),sizeof(msg)); tcp->sendBytes((unsigned char*)(&msg),sizeof(msg));
...@@ -374,11 +406,11 @@ void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose ) ...@@ -374,11 +406,11 @@ void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose )
} }
catch( const Poco::Net::NetException& e ) catch( const Poco::Net::NetException& e )
{ {
cerr << "(LogReader): " << e.displayText() << endl; // " (" << _addr << ")" << endl; cerr << "(LogReader): send error: " << e.displayText() << endl; // " (" << _addr << ")" << endl;
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
cerr << "(LogReader): " << ex.what() << endl; cerr << "(LogReader): send error: " << ex.what() << endl;
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -85,6 +85,7 @@ void LogServer::evfinish( const ev::loop_ref& loop ) ...@@ -85,6 +85,7 @@ void LogServer::evfinish( const ev::loop_ref& loop )
io.stop(); io.stop();
isrunning = false; isrunning = false;
sock->close();
sock.reset(); sock.reset();
if( mylog.is_info() ) if( mylog.is_info() )
...@@ -181,7 +182,9 @@ void LogServer::ioAccept( ev::io& watcher, int revents ) ...@@ -181,7 +182,9 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
try try
{ {
auto s = make_shared<LogSession>( watcher.fd, elog, cmdTimeout ); Poco::Net::StreamSocket ss = sock->acceptConnection();
auto s = make_shared<LogSession>( ss, elog, cmdTimeout );
s->setSessionLogLevel(sessLogLevel); s->setSessionLogLevel(sessLogLevel);
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) ); s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
s->signal_logsession_command().connect( sigc::mem_fun(this, &LogServer::onCommand) ); s->signal_logsession_command().connect( sigc::mem_fun(this, &LogServer::onCommand) );
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <errno.h> #include <errno.h>
#include <cstring> #include <cstring>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <Poco/Exception.h>
#include "Exceptions.h" #include "Exceptions.h"
#include "LogSession.h" #include "LogSession.h"
#include "UniSetTypes.h" #include "UniSetTypes.h"
...@@ -52,7 +53,7 @@ LogSession::~LogSession() ...@@ -52,7 +53,7 @@ LogSession::~LogSession()
asyncEvent.stop(); asyncEvent.stop();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _cmdTimeout, timeout_t _checkConnectionTime ): LogSession::LogSession( const Poco::Net::StreamSocket& s, std::shared_ptr<DebugStream>& _log, timeout_t _cmdTimeout, timeout_t _checkConnectionTime ):
cmdTimeout(_cmdTimeout), cmdTimeout(_cmdTimeout),
checkConnectionTime(_checkConnectionTime / 1000.), checkConnectionTime(_checkConnectionTime / 1000.),
peername(""), peername(""),
...@@ -66,8 +67,8 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _ ...@@ -66,8 +67,8 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _
try try
{ {
sock = make_shared<USocket>(sfd); sock = make_shared<UTCPStream>(s);
sock->setBlocking(false);
Poco::Net::SocketAddress iaddr = sock->peerAddress(); Poco::Net::SocketAddress iaddr = sock->peerAddress();
if( iaddr.host().toString().empty() ) if( iaddr.host().toString().empty() )
...@@ -78,7 +79,7 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _ ...@@ -78,7 +79,7 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _
if( mylog.is_crit() ) if( mylog.is_crit() )
mylog.crit() << err.str() << endl; mylog.crit() << err.str() << endl;
sock.reset(); sock->close();
throw SystemError(err.str()); throw SystemError(err.str());
} }
...@@ -193,7 +194,7 @@ void LogSession::terminate() ...@@ -193,7 +194,7 @@ void LogSession::terminate()
logbuf.pop(); logbuf.pop();
} }
sock.reset(); // close.. sock->close();
final(); final();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -316,7 +317,9 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -316,7 +317,9 @@ void LogSession::writeEvent( ev::io& watcher )
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
size_t LogSession::readData( unsigned char* buf, int len ) size_t LogSession::readData( unsigned char* buf, int len )
{ {
ssize_t res = read( sock->getSocket(), buf, len ); try
{
ssize_t res = sock->receiveBytes(buf, len);
if( res > 0 ) if( res > 0 )
return res; return res;
...@@ -328,6 +331,15 @@ size_t LogSession::readData( unsigned char* buf, int len ) ...@@ -328,6 +331,15 @@ size_t LogSession::readData( unsigned char* buf, int len )
return 0; return 0;
} }
}
catch( Poco::TimeoutException& ex )
{
return 0;
}
catch( Poco::Net::ConnectionResetException &ex )
{
}
mylog.info() << peername << "(readData): client disconnected.." << endl; mylog.info() << peername << "(readData): client disconnected.." << endl;
cancelled = true; cancelled = true;
...@@ -343,7 +355,7 @@ void LogSession::readEvent( ev::io& watcher ) ...@@ -343,7 +355,7 @@ void LogSession::readEvent( ev::io& watcher )
size_t ret = readData( (unsigned char*)(&msg), sizeof(msg) ); size_t ret = readData( (unsigned char*)(&msg), sizeof(msg) );
if( cancelled ) if( ret == 0 || cancelled )
return; return;
if( ret != sizeof(msg) || msg.magic != LogServerTypes::MAGICNUM ) if( ret != sizeof(msg) || msg.magic != LogServerTypes::MAGICNUM )
......
...@@ -130,7 +130,7 @@ void CommonEventLoop::onPrepare() ...@@ -130,7 +130,7 @@ void CommonEventLoop::onPrepare()
} }
catch( std::exception& ex ) catch( std::exception& ex )
{ {
cerr << "(CommonEventLoop::onPrepare): evfinish err: " << ex.what() << endl; cerr << "(CommonEventLoop::onPrepare): evprepare err: " << ex.what() << endl;
} }
wprep = nullptr; wprep = nullptr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment