Commit e3263be4 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogSession): добавлена "активная" проверка соединения

(защиты от зависающих соединений)
parent e9225e2a
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
Name: libuniset2 Name: libuniset2
Version: 2.2 Version: 2.2
Release: alt31.5 Release: alt31.6
Summary: UniSet - library for building distributed industrial control systems Summary: UniSet - library for building distributed industrial control systems
License: LGPL License: LGPL
...@@ -486,6 +486,9 @@ mv -f %buildroot%python_sitelibdir_noarch/* %buildroot%python_sitelibdir/%oname ...@@ -486,6 +486,9 @@ mv -f %buildroot%python_sitelibdir_noarch/* %buildroot%python_sitelibdir/%oname
# .. # ..
%changelog %changelog
* Mon Apr 11 2016 Pavel Vainerman <pv@altlinux.ru> 2.2-alt31.6
- LogSession: added the connection test
* Sun Apr 10 2016 Pavel Vainerman <pv@altlinux.ru> 2.2-alt31.5 * Sun Apr 10 2016 Pavel Vainerman <pv@altlinux.ru> 2.2-alt31.5
- LogSession: add buffer limit.. (200 messages, ~30kB) - LogSession: add buffer limit.. (200 messages, ~30kB)
......
...@@ -33,7 +33,7 @@ class LogSession ...@@ -33,7 +33,7 @@ class LogSession
{ {
public: public:
LogSession(int sock, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000 ); LogSession(int sock, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000, timeout_t checkConnectionTime = 10000 );
~LogSession(); ~LogSession();
typedef sigc::slot<void, LogSession*> FinalSlot; typedef sigc::slot<void, LogSession*> FinalSlot;
...@@ -87,11 +87,13 @@ class LogSession ...@@ -87,11 +87,13 @@ class LogSession
size_t readData( unsigned char* buf, int len ); size_t readData( unsigned char* buf, int len );
void cmdProcessing( const std::string& cmdLogName, const LogServerTypes::lsMessage& msg ); void cmdProcessing( const std::string& cmdLogName, const LogServerTypes::lsMessage& msg );
void onCmdTimeout( ev::timer& watcher, int revents ); void onCmdTimeout( ev::timer& watcher, int revents );
void onCheckConnectionTimer( ev::timer& watcher, int revents );
void final(); void final();
void logOnEvent( const std::string& s ); void logOnEvent( const std::string& s );
timeout_t cmdTimeout = { 2000 }; timeout_t cmdTimeout = { 2000 };
float checkConnectionTime = { 10. }; // время на проверку живости соединения..(сек)
// Т.к. сообщений может быть ОЧЕНЬ МНОГО.. сеть медленная // Т.к. сообщений может быть ОЧЕНЬ МНОГО.. сеть медленная
// очередь может не успевать рассасываться, // очередь может не успевать рассасываться,
...@@ -124,6 +126,7 @@ class LogSession ...@@ -124,6 +126,7 @@ class LogSession
ev::io io; ev::io io;
ev::timer cmdTimer; ev::timer cmdTimer;
ev::async asyncEvent; ev::async asyncEvent;
ev::timer checkConnectionTimer;
FinalSlot slFin; FinalSlot slFin;
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
......
...@@ -45,12 +45,16 @@ LogSession::~LogSession() ...@@ -45,12 +45,16 @@ LogSession::~LogSession()
if( cmdTimer.is_active() ) if( cmdTimer.is_active() )
cmdTimer.stop(); cmdTimer.stop();
if( checkConnectionTimer.is_active() )
checkConnectionTimer.stop();
if( asyncEvent.is_active() ) if( asyncEvent.is_active() )
asyncEvent.stop(); asyncEvent.stop();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _cmdTimeout ): LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _cmdTimeout, timeout_t _checkConnectionTime ):
cmdTimeout(_cmdTimeout), cmdTimeout(_cmdTimeout),
checkConnectionTime(_checkConnectionTime/1000.),
peername(""), peername(""),
caddr(""), caddr(""),
log(_log) log(_log)
...@@ -86,6 +90,7 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _ ...@@ -86,6 +90,7 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _
io.set<LogSession, &LogSession::callback>(this); io.set<LogSession, &LogSession::callback>(this);
cmdTimer.set<LogSession, &LogSession::onCmdTimeout>(this); cmdTimer.set<LogSession, &LogSession::onCmdTimeout>(this);
asyncEvent.set<LogSession, &LogSession::event>(this); asyncEvent.set<LogSession, &LogSession::event>(this);
checkConnectionTimer.set<LogSession, &LogSession::onCheckConnectionTimer>(this);
if( log ) if( log )
conn = log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) ); conn = log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
...@@ -144,10 +149,12 @@ void LogSession::run( const ev::loop_ref& loop ) ...@@ -144,10 +149,12 @@ void LogSession::run( const ev::loop_ref& loop )
asyncEvent.set(loop); asyncEvent.set(loop);
cmdTimer.set(loop); cmdTimer.set(loop);
checkConnectionTimer.set(loop);
io.set(loop); io.set(loop);
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
cmdTimer.start( cmdTimeout / 1000. ); cmdTimer.start( cmdTimeout / 1000. );
checkConnectionTimer.start( checkConnectionTime );
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::terminate() void LogSession::terminate()
...@@ -232,6 +239,7 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -232,6 +239,7 @@ void LogSession::writeEvent( ev::io& watcher )
if( logbuf.empty() ) if( logbuf.empty() )
{ {
io.set(EV_NONE); io.set(EV_NONE);
checkConnectionTimer.start( checkConnectionTime ); // restart timer
return; return;
} }
...@@ -277,10 +285,14 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -277,10 +285,14 @@ void LogSession::writeEvent( ev::io& watcher )
if( logbuf.empty() ) if( logbuf.empty() )
{ {
io.set(EV_NONE); io.set(EV_NONE);
checkConnectionTimer.start( checkConnectionTime ); // restart timer
return; return;
} }
} }
if( checkConnectionTimer.is_active() )
checkConnectionTimer.stop();
io.set(ev::WRITE); io.set(ev::WRITE);
//io.set(ev::READ | ev::WRITE); //io.set(ev::READ | ev::WRITE);
} }
...@@ -370,6 +382,7 @@ void LogSession::readEvent( ev::io& watcher ) ...@@ -370,6 +382,7 @@ void LogSession::readEvent( ev::io& watcher )
cmdTimer.stop(); cmdTimer.stop();
asyncEvent.start(); asyncEvent.start();
checkConnectionTimer.start( checkConnectionTime ); // restart timer
} }
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::lsMessage& msg ) void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::lsMessage& msg )
...@@ -462,12 +475,45 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes:: ...@@ -462,12 +475,45 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::onCmdTimeout(ev::timer& watcher, int revents) void LogSession::onCmdTimeout( ev::timer& watcher, int revents )
{ {
if( EV_ERROR & revents )
{
if( mylog.is_crit() )
mylog.crit() << peername << "(onCmdTimeout): EVENT ERROR.." << endl;
return;
}
io.set(ev::WRITE); io.set(ev::WRITE);
asyncEvent.start(); asyncEvent.start();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::onCheckConnectionTimer( ev::timer& watcher, int revents )
{
if( EV_ERROR & revents )
{
if( mylog.is_crit() )
mylog.crit() << peername << "(onCheckConnectionTimer): EVENT ERROR.." << endl;
return;
}
std::unique_lock<std::mutex> lk(logbuf_mutex);
if( !logbuf.empty() )
{
checkConnectionTimer.start( checkConnectionTime ); // restart timer
return;
}
// если клиент уже отвалился.. то при попытке write.. сессия будет закрыта.
ostringstream err;
err << "..logclient ping message.." << endl;
logbuf.emplace(new UTCPCore::Buffer(std::move(err.str())));
io.set(ev::WRITE);
checkConnectionTimer.start( checkConnectionTime ); // restart timer
}
// -------------------------------------------------------------------------
void LogSession::final() void LogSession::final()
{ {
slFin(this); slFin(this);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment