Commit fef7e52e authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): переделка на использование libev

parent 256ad9f4
...@@ -171,6 +171,7 @@ int main( int argc, char** argv ) ...@@ -171,6 +171,7 @@ int main( int argc, char** argv )
LogServer ls(la); LogServer ls(la);
ls.setMaxSessionCount(msess); ls.setMaxSessionCount(msess);
dlog->addLevel(Debug::ANY); dlog->addLevel(Debug::ANY);
dlog2->addLevel(Debug::ANY); dlog2->addLevel(Debug::ANY);
dlog3->addLevel(Debug::ANY); dlog3->addLevel(Debug::ANY);
...@@ -205,7 +206,6 @@ int main( int argc, char** argv ) ...@@ -205,7 +206,6 @@ int main( int argc, char** argv )
msleep(delay); msleep(delay);
} }
} }
catch( const SystemError& err ) catch( const SystemError& err )
{ {
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
START=uniset2-start.sh START=uniset2-start.sh
${START} -f ./smemory-test --confile ./test.xml --dlog-add-levels level1 --localNode LocalhostNode $* ${START} -f ./smemory-test --confile ./test.xml --dlog-add-levels level1 --localNode LocalhostNode \
--sm-log-add-levels any $* --sm-run-logserver
#--ulog-add-levels crit,warn,info #--ulog-add-levels crit,warn,info
#system,level2,level8,level9 #system,level2,level8,level9
// -------------------------------------------------------------------------
#ifndef DefaultEventLoop_H_
#define DefaultEventLoop_H_
// -------------------------------------------------------------------------
#include <ev++.h>
#include <memory>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <list>
// -------------------------------------------------------------------------
class EventWatcher
{
public:
EventWatcher(){}
virtual ~EventWatcher(){}
};
// -------------------------------------------------------------------------
/*!
* \brief The DefaultEventLoop class
* Т.к. libev требует чтобы был только один default_loop и плохо относится к запуску default_loop в разных потоках.
* то пришлось сделать такую обёртку - sigleton для default_loop. При этом подразумевается, что все объекты(классы),
* которые используют ev::xxx должны наследоваться от EventWatcher и не создавать свой default_loop, а использовать этот.
* Иначе будет случаться ошибка: libev: ev_loop recursion during release detected
* Типичное использование:
* \code
* в h-файле:
* class MyClass:
* public EventWatcher
* {
* ...
*
* std::shared_ptr<DefaultEventLoop> evloop;
* ev::io myio;
* ..
* }
*
* в сс-файле:
* где-то у себя, где надо запускать:
*
* ..
* myio.set(...);
* myio.start();
* ..
* evloop = DefaultEventLoop::inst();
* evloop->run( this, thread );
* \endcode
* При этом thread определяет создавать ли отдельный поток и продолжить работу или "застрять"
* на вызове run().
*
*/
class DefaultEventLoop
{
public:
~DefaultEventLoop();
static std::shared_ptr<DefaultEventLoop> inst();
void run( EventWatcher* s, bool thread = true );
bool isActive();
void terminate( EventWatcher* s );
protected:
DefaultEventLoop();
void defaultLoop();
void finish();
private:
static std::shared_ptr<DefaultEventLoop> _inst;
std::mutex m_run_mutex;
std::mutex m_mutex;
std::condition_variable m_event;
std::atomic_bool m_notify = { false };
std::atomic_bool cancelled = { false };
std::shared_ptr<ev::default_loop> evloop;
std::shared_ptr<std::thread> thr;
std::mutex m_slist_mutex;
std::list<EventWatcher*> slist;
};
// -------------------------------------------------------------------------
#endif // DefaultEventLoop_H_
// -------------------------------------------------------------------------
...@@ -21,10 +21,14 @@ ...@@ -21,10 +21,14 @@
#include <string> #include <string>
#include <memory> #include <memory>
#include <cc++/socket.h> #include <cc++/socket.h>
#include <ev++.h>
#include "Mutex.h" #include "Mutex.h"
#include "UniXML.h" #include "UniXML.h"
#include "DebugStream.h" #include "DebugStream.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UTCPSocket.h"
#include "DefaultEventLoop.h"
// -------------------------------------------------------------------------
class LogSession; class LogSession;
class LogAgregator; class LogAgregator;
class NullLogSession; class NullLogSession;
...@@ -66,7 +70,8 @@ LogReader. Читающих клиентов может быть скольуг ...@@ -66,7 +70,8 @@ LogReader. Читающих клиентов может быть скольуг
Это сделано для "оптимизации передачи" (чтобы не передавать каждый байт) Это сделано для "оптимизации передачи" (чтобы не передавать каждый байт)
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class LogServer class LogServer:
public EventWatcher
{ {
public: public:
...@@ -74,18 +79,11 @@ class LogServer ...@@ -74,18 +79,11 @@ class LogServer
LogServer( std::shared_ptr<LogAgregator> log ); LogServer( std::shared_ptr<LogAgregator> log );
~LogServer(); ~LogServer();
inline void setSessionTimeout( timeout_t msec )
{
sessTimeout = msec;
}
inline void setCmdTimeout( timeout_t msec ) inline void setCmdTimeout( timeout_t msec )
{ {
cmdTimeout = msec; cmdTimeout = msec;
} }
inline void setOutTimeout( timeout_t msec )
{
outTimeout = msec;
}
inline void setSessionLog( Debug::type t ) inline void setSessionLog( Debug::type t )
{ {
sessLogLevel = t; sessLogLevel = t;
...@@ -96,10 +94,11 @@ class LogServer ...@@ -96,10 +94,11 @@ class LogServer
} }
void run( const std::string& addr, ost::tpport_t port, bool thread = true ); void run( const std::string& addr, ost::tpport_t port, bool thread = true );
void terminate();
inline bool isRunning() inline bool isRunning()
{ {
return (thr && thr->isRunning()); return (evloop && evloop->isActive());
} }
void init( const std::string& prefix, xmlNode* cnode = 0 ); void init( const std::string& prefix, xmlNode* cnode = 0 );
...@@ -109,8 +108,9 @@ class LogServer ...@@ -109,8 +108,9 @@ class LogServer
protected: protected:
LogServer(); LogServer();
void work(); void mainLoop( bool thread );
void sessionFinished( std::shared_ptr<LogSession> s ); void ioAccept( ev::io& watcher, int revents );
void sessionFinished( LogSession* s );
private: private:
typedef std::list< std::shared_ptr<LogSession> > SessionList; typedef std::list< std::shared_ptr<LogSession> > SessionList;
...@@ -118,19 +118,22 @@ class LogServer ...@@ -118,19 +118,22 @@ class LogServer
UniSetTypes::uniset_rwmutex mutSList; UniSetTypes::uniset_rwmutex mutSList;
timeout_t timeout; timeout_t timeout;
timeout_t sessTimeout;
timeout_t cmdTimeout; timeout_t cmdTimeout;
timeout_t outTimeout;
Debug::type sessLogLevel; Debug::type sessLogLevel;
int sessMaxCount; size_t sessMaxCount = { 10 };
std::atomic_bool cancelled; std::atomic_bool running = { false };
DebugStream mylog; DebugStream mylog;
std::shared_ptr< ThreadCreator<LogServer> > thr; ev::io io;
std::shared_ptr<UTCPSocket> sock;
std::shared_ptr<DebugStream> elog; // eventlog..
std::string myname = { "LogServer" };
std::string addr;
ost::tpport_t port;
std::shared_ptr<ost::TCPSocket> tcp; std::shared_ptr<DefaultEventLoop> evloop;
std::shared_ptr<DebugStream> elog;
std::shared_ptr<NullLogSession> nullsess;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // LogServer_H_ #endif // LogServer_H_
......
...@@ -19,26 +19,24 @@ ...@@ -19,26 +19,24 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <string> #include <string>
#include <memory> #include <memory>
#include <deque> #include <queue>
#include <cc++/socket.h> #include <cc++/socket.h>
#include <condition_variable> #include <ev++.h>
#include <mutex>
#include "Mutex.h" #include "Mutex.h"
#include "DebugStream.h" #include "DebugStream.h"
#include "LogAgregator.h" #include "LogAgregator.h"
#include "PassiveTimer.h" #include "USocket.h"
#include "UTCPCore.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! Реализация "сессии" для клиентов LogServer. */ /*! Реализация "сессии" для клиентов LogServer. */
class LogSession: class LogSession
public std::enable_shared_from_this<LogSession>,
public ost::TCPSession
{ {
public: public:
LogSession( ost::TCPSocket& server, std::shared_ptr<DebugStream>& log, timeout_t sessTimeout = 10000, timeout_t cmdTimeout = 2000, timeout_t outTimeout = 2000, timeout_t delay = 2000 ); LogSession(int sock, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000 );
virtual ~LogSession(); virtual ~LogSession();
typedef sigc::slot<void, std::shared_ptr<LogSession>> FinalSlot; typedef sigc::slot<void, LogSession*> FinalSlot;
void connectFinalSession( FinalSlot sl ); void connectFinalSession( FinalSlot sl );
inline void cancel() inline void cancel()
...@@ -52,83 +50,59 @@ class LogSession: ...@@ -52,83 +50,59 @@ class LogSession:
inline void setSessionLogLevel( Debug::type t ) inline void setSessionLogLevel( Debug::type t )
{ {
slog.level(t); mylog.level(t);
} }
inline void addSessionLogLevel( Debug::type t ) inline void addSessionLogLevel( Debug::type t )
{ {
slog.addLevel(t); mylog.addLevel(t);
} }
inline void delSessionLogLevel( Debug::type t ) inline void delSessionLogLevel( Debug::type t )
{ {
slog.delLevel(t); mylog.delLevel(t);
} }
// запуск обработки входящих запросов
void run();
void terminate();
bool isAcive();
protected: protected:
LogSession( ost::TCPSocket& server ); LogSession( ost::TCPSocket& server );
virtual void run(); void event( ev::async& watcher, int revents );
virtual void final(); void callback( ev::io& watcher, int revents );
void readEvent( ev::io& watcher );
void writeEvent( ev::io& watcher );
size_t readData( unsigned char* buf, int len );
void cmdProcessing( const std::string& cmdLogName, const LogServerTypes::lsMessage& msg );
void onCmdTimeout( ev::timer& watcher, int revents );
void final();
void logOnEvent( const std::string& s ); void logOnEvent( const std::string& s );
void readStream();
// msec
timeout_t sessTimeout = { 10000 };
timeout_t cmdTimeout = { 2000 }; timeout_t cmdTimeout = { 2000 };
timeout_t outTimeout = { 2000 };
timeout_t delayTime = { 2000 };
private: private:
typedef std::deque<std::string> LogBuffer; std::queue<UTCPCore::Buffer*> logbuf;
LogBuffer lbuf; std::mutex logbuf_mutex;
std::string peername = { "" }; std::string peername = { "" };
std::string caddr = { "" }; std::string caddr = { "" };
std::shared_ptr<DebugStream> log; std::shared_ptr<DebugStream> log;
std::shared_ptr<LogAgregator> alog; std::shared_ptr<LogAgregator> alog;
sigc::connection conn; sigc::connection conn;
std::shared_ptr<LogSession> myptr; ev::io io;
std::shared_ptr<USocket> sock;
ev::timer cmdTimer;
ev::async asyncEvent;
std::mutex io_mutex;
// PassiveTimer ptSessionTimeout;
FinalSlot slFin; FinalSlot slFin;
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
DebugStream slog; DebugStream mylog;
std::ostringstream sbuf;
std::mutex log_mutex;
std::condition_variable log_event;
std::atomic_bool log_notify = ATOMIC_VAR_INIT(0);
};
// -------------------------------------------------------------------------
/*! Сессия просто заверщающаяся с указанным сообщением */
class NullLogSession:
public ost::Thread
{
public:
NullLogSession( const std::string& _msg );
~NullLogSession();
void add( ost::TCPSocket& server );
void setMessage( const std::string& _msg );
inline void cancel()
{
cancelled = true;
}
protected:
virtual void run();
virtual void final();
private:
std::string msg;
typedef std::list< std::shared_ptr<ost::TCPStream> > TCPStreamList;
TCPStreamList slist;
UniSetTypes::uniset_rwmutex smutex;
std::atomic_bool cancelled;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // LogSession_H_ #endif // LogSession_H_
......
...@@ -3,10 +3,61 @@ ...@@ -3,10 +3,61 @@
#define UTCPCore_H_ #define UTCPCore_H_
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <cc++/thread.h> // ..for timeout_t #include <cc++/thread.h> // ..for timeout_t
#include <string>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
namespace UTCPCore namespace UTCPCore
{ {
bool setKeepAliveParams( int sock, timeout_t timeout_sec = 5, int conn_keepcnt = 1, int keepintvl = 2 ); bool setKeepAliveParams( int sock, timeout_t timeout_sec = 5, int conn_keepcnt = 1, int keepintvl = 2 );
// -------------------------------------------
// author: https://gist.github.com/koblas/3364414
// ----------------------
// for use with ev::io..
// Buffer class - allow for output buffering such that it can be written out into async pieces
struct Buffer
{
unsigned char* data;
ssize_t len;
ssize_t pos;
Buffer( const unsigned char* bytes, ssize_t nbytes )
{
pos = 0;
len = nbytes;
if( len <=0 ) // ??!!
return;
data = new unsigned char[nbytes];
memcpy(data, bytes, nbytes);
}
Buffer( const std::string& s )
{
pos = 0;
len = s.length();
if( len <=0 ) // ??!!
return;
data = new unsigned char[len];
memcpy(data, s.data(), len);
}
virtual ~Buffer()
{
delete [] data;
}
unsigned char* dpos()
{
return data + pos;
}
ssize_t nbytes()
{
return len - pos;
}
};
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // UTCPCore_H_ #endif // UTCPCore_H_
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "ModbusTCPSession.h" #include "ModbusTCPSession.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UTCPSocket.h" #include "UTCPSocket.h"
#include "DefaultEventLoop.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! ModbusTCPServer /*! ModbusTCPServer
* Реализация сервера на основе libev. Подерживается "много" соединений (постоянных). * Реализация сервера на основе libev. Подерживается "много" соединений (постоянных).
...@@ -25,6 +26,7 @@ ...@@ -25,6 +26,7 @@
* т.к.из многих "соединений" будут вызываться одни и теже обработатчики. * т.к.из многих "соединений" будут вызываться одни и теже обработатчики.
*/ */
class ModbusTCPServer: class ModbusTCPServer:
public EventWatcher,
public ModbusServer public ModbusServer
{ {
public: public:
...@@ -144,7 +146,7 @@ class ModbusTCPServer: ...@@ -144,7 +146,7 @@ class ModbusTCPServer:
timeout_t sessTimeout = { 10000 }; // msec timeout_t sessTimeout = { 10000 }; // msec
std::shared_ptr<ev::default_loop> evloop; std::shared_ptr<DefaultEventLoop> evloop;
ev::io io; ev::io io;
std::shared_ptr<UTCPSocket> sock; std::shared_ptr<UTCPSocket> sock;
ev::timer ioTimer; ev::timer ioTimer;
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "ModbusServer.h" #include "ModbusServer.h"
#include "PassiveTimer.h" #include "PassiveTimer.h"
#include "USocket.h" #include "USocket.h"
#include "UTCPCore.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! /*!
* \brief The ModbusTCPSession class * \brief The ModbusTCPSession class
...@@ -60,39 +61,6 @@ class ModbusTCPSession: ...@@ -60,39 +61,6 @@ class ModbusTCPSession:
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override; virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
// -------------------------------------------
// author:
// Buffer class - allow for output buffering such that it can be written out into async pieces
struct Buffer
{
unsigned char* data;
ssize_t len;
ssize_t pos;
Buffer( const unsigned char* bytes, ssize_t nbytes )
{
pos = 0;
len = nbytes;
data = new unsigned char[nbytes];
memcpy(data, bytes, nbytes);
}
virtual ~Buffer()
{
delete [] data;
}
unsigned char* dpos()
{
return data + pos;
}
ssize_t nbytes()
{
return len - pos;
}
};
void callback( ev::io& watcher, int revents ); void callback( ev::io& watcher, int revents );
void onTimeout( ev::timer& watcher, int revents ); void onTimeout( ev::timer& watcher, int revents );
virtual void readEvent( ev::io& watcher ); virtual void readEvent( ev::io& watcher );
...@@ -158,7 +126,7 @@ class ModbusTCPSession: ...@@ -158,7 +126,7 @@ class ModbusTCPSession:
ev::io io; ev::io io;
std::shared_ptr<USocket> sock; std::shared_ptr<USocket> sock;
std::queue<Buffer*> qsend; std::queue<UTCPCore::Buffer*> qsend;
ev::timer ioTimeout; ev::timer ioTimeout;
double sessTimeout = { 10.0 }; double sessTimeout = { 10.0 };
......
...@@ -46,7 +46,7 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ): ...@@ -46,7 +46,7 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ):
ModbusTCPServer::~ModbusTCPServer() ModbusTCPServer::~ModbusTCPServer()
{ {
if( evloop ) if( evloop )
evloop->break_loop(); evloop->terminate(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::setMaxSessions( unsigned int num ) void ModbusTCPServer::setMaxSessions( unsigned int num )
...@@ -118,24 +118,13 @@ void ModbusTCPServer::mainLoop() ...@@ -118,24 +118,13 @@ void ModbusTCPServer::mainLoop()
if( dlog->is_info() ) if( dlog->is_info() )
dlog->info() << myname << "(ModbusTCPServer): run main loop.." << endl; dlog->info() << myname << "(ModbusTCPServer): run main loop.." << endl;
evloop = make_shared<ev::default_loop>();
while( !cancelled )
{ {
try evloop = DefaultEventLoop::inst();
{ evloop->run(this,false);
evloop->run();
}
catch( std::exception& ex )
{
dlog->crit() << myname << "(ModbusTCPServer::mainLoop): " << ex.what() << endl;
}
} }
dlog->info() << myname << "(ModbusTCPServer::mainLoop): MAIN EVENT LOOP EXIT ****************" << endl; if( dlog->is_info() )
dlog->info() << myname << "(ModbusTCPServer): main loop exit.." << endl;
//shutdown(sock, SHUT_RDWR);
//close(sock);
cancelled = true; cancelled = true;
} }
...@@ -153,10 +142,6 @@ void ModbusTCPServer::terminate() ...@@ -153,10 +142,6 @@ void ModbusTCPServer::terminate()
ioTimer.stop(); ioTimer.stop();
io.stop(); io.stop();
if( evloop )
evloop->break_loop();
auto lst(slist); auto lst(slist);
// Копируем сперва себе список сессий.. // Копируем сперва себе список сессий..
...@@ -171,6 +156,9 @@ void ModbusTCPServer::terminate() ...@@ -171,6 +156,9 @@ void ModbusTCPServer::terminate()
} }
catch( std::exception& ex ) {} catch( std::exception& ex ) {}
} }
if( evloop )
evloop->terminate(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::sessionFinished( ModbusTCPSession* s ) void ModbusTCPServer::sessionFinished( ModbusTCPSession* s )
......
...@@ -157,7 +157,7 @@ void ModbusTCPSession::writeEvent( ev::io& watcher ) ...@@ -157,7 +157,7 @@ void ModbusTCPSession::writeEvent( ev::io& watcher )
if( qsend.empty() ) if( qsend.empty() )
return; return;
Buffer* buffer = qsend.front(); UTCPCore::Buffer* buffer = qsend.front();
ssize_t ret = write(watcher.fd, buffer->dpos(), buffer->nbytes()); ssize_t ret = write(watcher.fd, buffer->dpos(), buffer->nbytes());
...@@ -252,7 +252,7 @@ void ModbusTCPSession::final() ...@@ -252,7 +252,7 @@ void ModbusTCPSession::final()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
mbErrCode ModbusTCPSession::sendData( unsigned char* buf, int len ) mbErrCode ModbusTCPSession::sendData( unsigned char* buf, int len )
{ {
qsend.push( new Buffer(buf, len) ); qsend.push( new UTCPCore::Buffer(buf, len) );
return erNoError; return erNoError;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
...@@ -33,6 +33,7 @@ UTCPSocket::UTCPSocket( int sock ): ...@@ -33,6 +33,7 @@ UTCPSocket::UTCPSocket( int sock ):
} }
Socket::state = CONNECTED; Socket::state = CONNECTED;
init();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( const std::string& hname, unsigned backlog, unsigned mss ): UTCPSocket::UTCPSocket( const std::string& hname, unsigned backlog, unsigned mss ):
......
...@@ -27,30 +27,9 @@ using namespace UniSetTypes; ...@@ -27,30 +27,9 @@ using namespace UniSetTypes;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::~LogServer() LogServer::~LogServer()
{ {
if( nullsess ) cerr << myname << " ~LogServer() " << endl;
nullsess->cancel(); if( running )
terminate();
{
// uniset_rwmutex_wrlock l(mutSList);
for( const auto& i : slist )
{
if( i->isRunning() )
i->cancel();
}
}
cancelled = true;
if( tcp && !slist.empty() )
tcp->reject();
if( thr )
{
thr->stop();
if( thr->isRunning() )
thr->join();
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::LogServer( std::shared_ptr<LogAgregator> log ): LogServer::LogServer( std::shared_ptr<LogAgregator> log ):
...@@ -61,38 +40,80 @@ LogServer::LogServer( std::shared_ptr<LogAgregator> log ): ...@@ -61,38 +40,80 @@ LogServer::LogServer( std::shared_ptr<LogAgregator> log ):
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::LogServer( std::shared_ptr<DebugStream> log ): LogServer::LogServer( std::shared_ptr<DebugStream> log ):
timeout(TIMEOUT_INF), timeout(TIMEOUT_INF),
sessTimeout(3600000),
cmdTimeout(2000), cmdTimeout(2000),
outTimeout(2000),
sessLogLevel(Debug::NONE), sessLogLevel(Debug::NONE),
sessMaxCount(10), sock(0),
cancelled(false),
thr(0),
tcp(0),
elog(log) elog(log)
{ {
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::LogServer(): LogServer::LogServer():
timeout(TIMEOUT_INF), timeout(TIMEOUT_INF),
sessTimeout(3600000),
cmdTimeout(2000), cmdTimeout(2000),
outTimeout(2000),
sessLogLevel(Debug::NONE), sessLogLevel(Debug::NONE),
sessMaxCount(10), sock(0),
cancelled(false),
thr(0),
tcp(0),
elog(nullptr) elog(nullptr)
{ {
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::run( const std::string& addr, ost::tpport_t port, bool thread ) void LogServer::terminate()
{ {
if( !running )
return;
if( mylog.is_info() )
mylog.info() << myname << "(LogServer): terminate..." << endl;
io.stop();
auto lst(slist);
// Копируем сперва себе список сессий..
// т.к при вызове terminate()
// у Session будет вызван сигнал "final"
// который приведёт к вызову sessionFinished()..в котором список будет меняться..
for( const auto& s : lst )
{
try
{
s->terminate();
}
catch( std::exception& ex ) {}
}
running = false;
if( evloop )
{
cerr << myname << ": terminate evloop.." << endl;
evloop->terminate(this);
}
}
// -------------------------------------------------------------------------
void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread )
{
addr = _addr;
port = _port;
if( !running )
{
mainLoop( thread );
running = true;
}
}
// -------------------------------------------------------------------------
void LogServer::mainLoop( bool thread )
{
if( running )
{
if( elog->is_crit() )
elog->crit() << myname << "(LogServer::mainLoopt): ALREADY RUNNING.." << endl;
return;
}
try try
{ {
ost::InetAddress iaddr(addr.c_str()); ost::InetAddress iaddr(addr.c_str());
tcp = make_shared<ost::TCPSocket>(iaddr, port); sock = make_shared<UTCPSocket>(iaddr, port);
} }
catch( ost::Socket* socket ) catch( ost::Socket* socket )
{ {
...@@ -108,112 +129,79 @@ void LogServer::run( const std::string& addr, ost::tpport_t port, bool thread ) ...@@ -108,112 +129,79 @@ void LogServer::run( const std::string& addr, ost::tpport_t port, bool thread )
else else
err << "client socket failed" << endl; err << "client socket failed" << endl;
if( mylog.is_crit() )
mylog.crit() << myname << "(LogServer): " << err.str() << endl;
throw SystemError( err.str() ); throw SystemError( err.str() );
} }
if( !thread ) sock->setCompletion(false);
work(); UTCPCore::setKeepAliveParams(sock->getSocket());
else
io.set<LogServer, &LogServer::ioAccept>(this);
io.start(sock->getSocket(), ev::READ);
// скобки специально чтобы пораньшк выйти из "зоны" видимости
{ {
thr = make_shared<ThreadCreator<LogServer>>(this, &LogServer::work); evloop = DefaultEventLoop::inst();
thr->start(); evloop->run( this, thread );
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::work() void LogServer::ioAccept( ev::io& watcher, int revents )
{ {
cancelled = false; if (EV_ERROR & revents)
{
if( elog->is_crit() )
elog->crit() << myname << "(LogServer::ioAccept): invalid event" << endl;
return;
}
while( !cancelled ) if( !running )
{ {
try if( elog->is_crit() )
{ elog->crit() << myname << "(LogServer::ioAccept): terminate work.." << endl;
while( !cancelled && tcp->isPendingConnection(timeout) )
{ sock->reject();
if( cancelled ) break; return;
{
uniset_rwmutex_wrlock l(mutSList);
int sz = slist.size();
if( sz >= sessMaxCount )
{
ostringstream err;
err << "(LOG SERVER): Exceeded the limit on the number of sessions = " << sessMaxCount << endl;
if( !nullsess )
{
ostringstream err;
err << "(LOG SERVER): Exceeded the limit on the number of sessions = " << sessMaxCount << endl;
nullsess = make_shared<NullLogSession>(err.str());
//nullsess->detach();
nullsess->start();
}
else
nullsess->setMessage(err.str());
nullsess->add(*(tcp.get())); // опасно передавать "сырой указатель", теряем контроль
continue;
}
}
if( cancelled ) break;
auto s = make_shared<LogSession>( *(tcp.get()), elog, sessTimeout, cmdTimeout, outTimeout);
s->setSessionLogLevel(sessLogLevel);
{
uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s);
}
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
//s->detach();
s->start();
}
}
catch( ost::Socket* socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
cerr << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum << endl;
if( errnum == ost::Socket::errBindingFailed )
{
cerr << "bind failed; port busy" << endl;
// ::exit(-1);
}
else
cerr << "client socket failed" << endl;
}
catch( const std::exception& ex )
{
cerr << "catch exception: " << ex.what() << endl;
}
} }
{ {
// uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
for( const auto& i : slist ) if( slist.size() >= sessMaxCount )
i->disconnect(); {
if( mylog.is_crit() )
mylog.crit() << myname << "(LogServer::ioAccept): session limit(" << sessMaxCount << ")" << endl;
if( nullsess ) sock->reject();
nullsess->cancel(); return;
}
} }
for( const auto& i : slist ) try
{ {
if( i->isRunning() ) auto s = make_shared<LogSession>( watcher.fd, elog, cmdTimeout );
i->ost::Thread::join(); s->setSessionLogLevel(sessLogLevel);
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
{
uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s);
}
s->run();
}
catch( const std::exception& ex )
{
mylog.warn() << "(LogServer::ioAccept): catch exception: " << ex.what() << endl;
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::sessionFinished( std::shared_ptr<LogSession> s ) void LogServer::sessionFinished( LogSession* s )
{ {
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i = slist.begin(); i != slist.end(); ++i ) for( SessionList::iterator i = slist.begin(); i != slist.end(); ++i )
{ {
if( i->get() == s.get() ) if( i->get() == s )
{ {
slist.erase(i); slist.erase(i);
return; return;
...@@ -228,21 +216,14 @@ void LogServer::init( const std::string& prefix, xmlNode* cnode ) ...@@ -228,21 +216,14 @@ void LogServer::init( const std::string& prefix, xmlNode* cnode )
// можем на cnode==0 не проверять, т.е. UniXML::iterator корректно отрабатывает эту ситуацию // можем на cnode==0 не проверять, т.е. UniXML::iterator корректно отрабатывает эту ситуацию
UniXML::iterator it(cnode); UniXML::iterator it(cnode);
timeout_t sessTimeout = conf->getArgPInt("--" + prefix + "-session-timeout", it.getProp("sessTimeout"), 3600000);
timeout_t cmdTimeout = conf->getArgPInt("--" + prefix + "-cmd-timeout", it.getProp("cmdTimeout"), 2000); timeout_t cmdTimeout = conf->getArgPInt("--" + prefix + "-cmd-timeout", it.getProp("cmdTimeout"), 2000);
timeout_t outTimeout = conf->getArgPInt("--" + prefix + "-out-timeout", it.getProp("outTimeout"), 2000);
setSessionTimeout(sessTimeout);
setCmdTimeout(cmdTimeout); setCmdTimeout(cmdTimeout);
setOutTimeout(outTimeout);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
std::string LogServer::help_print( const std::string& prefix ) std::string LogServer::help_print( const std::string& prefix )
{ {
ostringstream h; ostringstream h;
h << "--" << prefix << "-session-timeout msec - Timeout for session. Default: 10 min." << endl;
h << "--" << prefix << "-cmd-timeout msec - Timeout for wait command. Default: 2000 msec." << endl; h << "--" << prefix << "-cmd-timeout msec - Timeout for wait command. Default: 2000 msec." << endl;
h << "--" << prefix << "-out-timeout msec - Timeout for send to client. Default: 2000 msec." << endl;
return std::move( h.str() ); return std::move( h.str() );
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#include <iostream>
#include <chrono>
#include "DefaultEventLoop.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
std::shared_ptr<DefaultEventLoop> DefaultEventLoop::_inst;
// ---------------------------------------------------------------------------
std::shared_ptr<DefaultEventLoop> DefaultEventLoop::inst()
{
if( _inst == nullptr )
_inst = std::shared_ptr<DefaultEventLoop>( new DefaultEventLoop() );
return _inst;
}
// ---------------------------------------------------------------------------
DefaultEventLoop::~DefaultEventLoop()
{
if( !cancelled )
{
cerr << " ~DefaultEventLoop(): not canceled.." << endl;
finish();
}
}
// ---------------------------------------------------------------------------
void DefaultEventLoop::run( EventWatcher* s, bool thread )
{
if( cancelled )
return;
{
std::unique_lock<std::mutex> lk(m_run_mutex);
if( !thr )
thr = make_shared<std::thread>( [=] { defaultLoop(); } );
}
{
std::unique_lock<std::mutex> lk(m_slist_mutex);
slist.push_back(s);
}
if( !thread )
{
std::unique_lock<std::mutex> lk(m_mutex);
while( !m_notify )
m_event.wait(lk);
if( thr->joinable() )
thr->join();
}
}
// -------------------------------------------------------------------------
bool DefaultEventLoop::isActive()
{
return !cancelled;
}
// -------------------------------------------------------------------------
void DefaultEventLoop::terminate( EventWatcher* s )
{
cerr << "(DefaultEventLoop::defaultLoop): terminate.." << endl;
std::unique_lock<std::mutex> lk(m_slist_mutex);
for( auto i=slist.begin(); i!=slist.end(); i++ )
{
if( (*i) == s )
{
slist.erase(i);
break;
}
}
if( slist.empty() && !cancelled && evloop )
finish();
}
// -------------------------------------------------------------------------
void DefaultEventLoop::finish()
{
cerr << "(DefaultEventLoop::fini): TERMINATE EVENT LOOP.." << endl;
cancelled = true;
if( !evloop )
return;
cerr << "(DefaultEventLoop::fini): BREAK EVENT LOOP.." << endl;
evloop->break_loop(ev::ALL);
std::unique_lock<std::mutex> lk(m_mutex);
m_event.wait_for(lk, std::chrono::seconds(3), [=]()
{
return (m_notify == true);
} );
if( !m_notify )
{
// а как прервать EVENT LOOP?!!
cerr << "(DefaultEventLoop::fini): MAIN LOOP NOT BREAKED! kill(SIGTERM)!.." << endl;
raise(SIGTERM);
}
if( thr && thr->joinable() )
{
cerr << "(DefaultEventLoop::fini): join.." << endl;
thr->join();
}
cerr << "(DefaultEventLoop::fini): exit..." << endl;
_inst.reset();
}
// -------------------------------------------------------------------------
DefaultEventLoop::DefaultEventLoop()
{
}
// -------------------------------------------------------------------------
void DefaultEventLoop::defaultLoop()
{
cerr << "(DefaultEventLoop::defaultLoop): run main loop.." << endl;
// скобки специально чтобы evloop пораньше вышел из "зоны" видимости
{
evloop = make_shared<ev::default_loop>();
while( !cancelled )
{
try
{
evloop->run(0);
}
catch( std::exception& ex )
{
cerr << "(DefaultEventLoop::defaultLoop): " << ex.what() << endl;
}
}
cerr << "(DefaultEventLoop::defaultLoop): MAIN EVENT LOOP EXIT ****************" << endl;
}
{
std::unique_lock<std::mutex> lk(m_mutex);
m_notify = true;
}
cancelled = true;
m_event.notify_all();
}
// -------------------------------------------------------------------------
...@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libProcesses.la ...@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libProcesses.la
libProcesses_la_CXXFLAGS = $(SIGC_CFLAGS) libProcesses_la_CXXFLAGS = $(SIGC_CFLAGS)
libProcesses_la_LIBADD = $(SIGC_LIBS) libProcesses_la_LIBADD = $(SIGC_LIBS)
libProcesses_la_SOURCES = IOController_iSK.cc IOController.cc IONotifyController.cc \ libProcesses_la_SOURCES = IOController_iSK.cc IOController.cc IONotifyController.cc \
NCRestorer.cc NCRestorer_XML.cc NCRestorer.cc NCRestorer_XML.cc DefaultEventLoop.cc
local-clean: local-clean:
rm -rf *iSK.cc rm -rf *iSK.cc
......
...@@ -192,11 +192,12 @@ TEST_CASE("MaxSessions", "[LogServer]" ) ...@@ -192,11 +192,12 @@ TEST_CASE("MaxSessions", "[LogServer]" )
la->signal_stream_event().connect( sigc::ptr_fun(la_logOnEvent) ); la->signal_stream_event().connect( sigc::ptr_fun(la_logOnEvent) );
LogServer ls(la); LogServer ls(la);
ls.setCmdTimeout(100);
//ls.setSessionLog(Debug::ANY); //ls.setSessionLog(Debug::ANY);
ls.setMaxSessionCount(1); ls.setMaxSessionCount(1);
ls.run( ip, port, true ); ls.run( ip, port, true );
for( int i = 0; i < 3 && !ls.isRunning(); i++ ) for( int i = 0; i < 4 && !ls.isRunning(); i++ )
msleep(500); msleep(500);
CHECK( ls.isRunning() ); CHECK( ls.isRunning() );
...@@ -229,9 +230,13 @@ TEST_CASE("MaxSessions", "[LogServer]" ) ...@@ -229,9 +230,13 @@ TEST_CASE("MaxSessions", "[LogServer]" )
{ {
uniset_mutex_lock l(r2_mutex); uniset_mutex_lock l(r2_mutex);
/*
// Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1' // Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1'
size_t pos = msg2.str().find("Exceeded the limit"); size_t pos = msg2.str().find("Exceeded the limit");
REQUIRE( pos != std::string::npos ); REQUIRE( pos != std::string::npos );
*/
// ничего не получили..
REQUIRE( msg2.str() == "" );
} }
g_read_cancel = true; g_read_cancel = true;
......
...@@ -269,6 +269,7 @@ include/HourGlass.h ...@@ -269,6 +269,7 @@ include/HourGlass.h
include/IOController.h include/IOController.h
include/IONotifyController.h include/IONotifyController.h
include/IORFile.h include/IORFile.h
include/DefaultEventLoop.h
include/LogAgregator.h include/LogAgregator.h
include/LogReader.h include/LogReader.h
include/LogServer.h include/LogServer.h
...@@ -387,6 +388,7 @@ src/Processes/IONotifyController.cc ...@@ -387,6 +388,7 @@ src/Processes/IONotifyController.cc
src/Processes/Makefile.am src/Processes/Makefile.am
src/Processes/NCRestorer.cc src/Processes/NCRestorer.cc
src/Processes/NCRestorer_XML.cc src/Processes/NCRestorer_XML.cc
src/Processes/DefaultEventLoop.cc
src/Services/DBServer.cc src/Services/DBServer.cc
src/Services/Makefile.am src/Services/Makefile.am
src/Timers/Makefile.am src/Timers/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment