Commit eb476a0e authored by Pavel Vainerman's avatar Pavel Vainerman

(libev): Разобрался с работой циклов в многопоточной среде..

parent 6de90468
...@@ -277,8 +277,9 @@ void SharedMemory::sysCommand( const SystemMessage* sm ) ...@@ -277,8 +277,9 @@ void SharedMemory::sysCommand( const SystemMessage* sm )
bool SharedMemory::deactivateObject() bool SharedMemory::deactivateObject()
{ {
workready = false; workready = false;
// if( logserv && logserv->isRunning() )
// logserv->terminate(); if( logserv && logserv->isRunning() )
logserv->terminate();
return IONotifyController::deactivateObject(); return IONotifyController::deactivateObject();
} }
...@@ -329,7 +330,6 @@ CORBA::Boolean SharedMemory::exist() ...@@ -329,7 +330,6 @@ CORBA::Boolean SharedMemory::exist()
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void SharedMemory::sigterm( int signo ) void SharedMemory::sigterm( int signo )
{ {
cerr << myname << "************* SIGTERM...." << endl;
workready = false; workready = false;
if( signo == SIGTERM && wdt ) if( signo == SIGTERM && wdt )
wdt->stop(); wdt->stop();
......
...@@ -74,8 +74,8 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, ...@@ -74,8 +74,8 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
try try
{ {
addr = s_host.c_str(); addr = s_host.c_str();
udp = make_shared<UDPDuplexU>(addr, port); udp = make_shared<UDPReceiveU>(addr, port);
udp->setReceiveCompletion(false); // делаем неблокирующее чтение (нужно для libev) udp->setCompletion(false); // делаем неблокирующее чтение (нужно для libev)
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
...@@ -92,8 +92,6 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, ...@@ -92,8 +92,6 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
throw SystemError(s.str()); throw SystemError(s.str());
} }
//r_thr = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::receive);
//u_thr = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::update);
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this); evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this); evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
...@@ -103,6 +101,8 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, ...@@ -103,6 +101,8 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver() UNetReceiver::~UNetReceiver()
{ {
evReceive.stop();
evUpdate.stop();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec ) void UNetReceiver::setReceiveTimeout( timeout_t msec )
...@@ -178,20 +178,31 @@ void UNetReceiver::resetTimeout() ...@@ -178,20 +178,31 @@ void UNetReceiver::resetTimeout()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::start() void UNetReceiver::start()
{ {
unetinfo << myname << ":... start... " << endl;
if( !activated ) if( !activated )
{ {
activated = true; activated = true;
//u_thr->start(); evrun(true);
//r_thr->start();
evReceive.start(udp->getReceiveSocket(),ev::READ);
evUpdate.start( updateTime );
evloop = DefaultEventLoop::inst();
evloop->run( this, true );
} }
else else
forceUpdate(); forceUpdate();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::evprepare()
{
evReceive.set(loop);
evReceive.start(udp->getSocket(),ev::READ);
evUpdate.set(loop);
evUpdate.start( updateTime );
}
// -----------------------------------------------------------------------------
void UNetReceiver::evfinish()
{
evReceive.stop();
evUpdate.stop();
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() void UNetReceiver::forceUpdate()
{ {
uniset_rwmutex_wrlock l(packMutex); uniset_rwmutex_wrlock l(packMutex);
...@@ -380,8 +391,6 @@ void UNetReceiver::readEvent( ev::io& watcher ) ...@@ -380,8 +391,6 @@ void UNetReceiver::readEvent( ev::io& watcher )
if( !activated ) if( !activated )
return; return;
cerr << "******** readEvent..." << endl;
bool tout = false; bool tout = false;
try try
{ {
...@@ -469,19 +478,32 @@ void UNetReceiver::updateEvent(ev::periodic& tm, int revents ) ...@@ -469,19 +478,32 @@ void UNetReceiver::updateEvent(ev::periodic& tm, int revents )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::stop() void UNetReceiver::stop()
{ {
unetinfo << myname << ": stop.." << endl;
activated = false; activated = false;
evReceive.stop(); evstop();
evUpdate.stop();
if( evloop )
evloop->terminate(this);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetReceiver::receive() bool UNetReceiver::receive()
{ {
if( !udp->isInputReady(recvTimeout) ) // if( !udp->isInputReady(recvTimeout) )
// return false;
//udp->UDPReceive::receive((char*)(r_buf.data), sizeof(r_buf.data));
//ssize_t ret = ::recv(udp->getSocket(),r_buf.data,sizeof(r_buf.data),0);
ssize_t ret = udp->receive(r_buf.data,sizeof(r_buf.data));
if( ret < 0 )
{
unetcrit << myname << "(receive): recv err(" << errno << "): " << strerror(errno) << endl;
return false; return false;
}
size_t ret = udp->UDPReceive::receive((char*)(r_buf.data), sizeof(r_buf.data)); if( ret == 0 )
{
unetwarn << myname << "(receive): disconnected?!... recv 0 byte.." << endl;
return false;
}
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf); size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf);
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#include "SMInterface.h" #include "SMInterface.h"
#include "SharedMemory.h" #include "SharedMemory.h"
#include "UDPPacket.h" #include "UDPPacket.h"
#include "DefaultEventLoop.h" #include "EventLoopServer.h"
#include "UDPCore.h" #include "UDPCore.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
...@@ -78,7 +78,7 @@ ...@@ -78,7 +78,7 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetReceiver: class UNetReceiver:
public std::enable_shared_from_this<UNetReceiver>, public std::enable_shared_from_this<UNetReceiver>,
public EventWatcher public EventLoopServer
{ {
public: public:
UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi ); UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi );
...@@ -161,6 +161,8 @@ class UNetReceiver: ...@@ -161,6 +161,8 @@ class UNetReceiver:
void callback( ev::io& watcher, int revents ); void callback( ev::io& watcher, int revents );
void readEvent( ev::io& watcher ); void readEvent( ev::io& watcher );
void updateEvent( ev::periodic& watcher, int revents ); void updateEvent( ev::periodic& watcher, int revents );
virtual void evfinish() override;
virtual void evprepare() override;
void initIterators(); void initIterators();
...@@ -184,13 +186,12 @@ class UNetReceiver: ...@@ -184,13 +186,12 @@ class UNetReceiver:
timeout_t recvpause = { 10 }; /*!< пауза меджду приёмами пакетов, [мсек] */ timeout_t recvpause = { 10 }; /*!< пауза меджду приёмами пакетов, [мсек] */
timeout_t updatepause = { 100 }; /*!< переодичность обновления данных в SM, [мсек] */ timeout_t updatepause = { 100 }; /*!< переодичность обновления данных в SM, [мсек] */
std::shared_ptr<UDPDuplexU> udp; std::shared_ptr<UDPReceiveU> udp;
ost::IPV4Address addr; ost::IPV4Address addr;
ost::tpport_t port = { 0 }; ost::tpport_t port = { 0 };
std::string myname; std::string myname;
ev::io evReceive; ev::io evReceive;
ev::periodic evUpdate; ev::periodic evUpdate;
std::shared_ptr<DefaultEventLoop> evloop;
double updateTime = { 0.0 }; double updateTime = { 0.0 };
UniSetTypes::uniset_rwmutex pollMutex; UniSetTypes::uniset_rwmutex pollMutex;
......
...@@ -3,6 +3,6 @@ ...@@ -3,6 +3,6 @@
uniset2-start.sh -f ./uniset2-unetexchange --unet-name UNetExchange --unet-run-logserver \ uniset2-start.sh -f ./uniset2-unetexchange --unet-name UNetExchange --unet-run-logserver \
--confile test.xml --smemory-id SharedMemory \ --confile test.xml --smemory-id SharedMemory \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 --unet-sendpause 1000 \ --unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 --unet-sendpause 1000 \
--dlog-add-levels info,crit,warn --unet-log-add-levels info,crit,warn --dlog-add-levels info,crit,warn --unet-log-add-levels info,crit,warn $*
#--unet-nodes-confnode specnet #--unet-nodes-confnode specnet
// -------------------------------------------------------------------------
#ifndef DefaultEventLoop_H_
#define DefaultEventLoop_H_
// -------------------------------------------------------------------------
#include <ev++.h>
#include <memory>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <list>
// -------------------------------------------------------------------------
class EventWatcher
{
public:
EventWatcher() {}
~EventWatcher() {}
};
// -------------------------------------------------------------------------
/*!
* \brief The DefaultEventLoop class
* Т.к. libev требует чтобы был только один default_loop и плохо относится к запуску default_loop в разных потоках.
* то пришлось сделать такую обёртку - sigleton для default_loop. При этом подразумевается, что все объекты(классы),
* которые используют ev::xxx должны наследоваться от EventWatcher и не создавать свой default_loop, а использовать этот.
* Иначе будет случаться ошибка: libev: ev_loop recursion during release detected
* Типичное использование:
* \code
* в h-файле:
* class MyClass:
* public EventWatcher
* {
* ...
*
* std::shared_ptr<DefaultEventLoop> evloop;
* ev::io myio;
* ..
* }
*
* в сс-файле:
* где-то у себя, где надо запускать:
*
* ..
* myio.set(...);
* myio.start();
* ..
* evloop = DefaultEventLoop::inst();
* evloop->run( this, thread );
* \endcode
* При этом thread определяет создавать ли отдельный поток и продолжить работу или "застрять"
* на вызове run().
*
*/
class DefaultEventLoop
{
public:
~DefaultEventLoop();
static std::shared_ptr<DefaultEventLoop> inst();
void run( EventWatcher* s, bool thread = true );
bool isActive();
void terminate( EventWatcher* s );
protected:
DefaultEventLoop();
void defaultLoop();
void finish();
private:
static std::shared_ptr<DefaultEventLoop> _inst;
std::mutex m_run_mutex;
std::mutex m_mutex;
std::condition_variable m_event;
std::atomic_bool m_notify = { false };
std::atomic_bool cancelled = { false };
std::shared_ptr<ev::default_loop> evloop;
std::shared_ptr<std::thread> thr;
std::mutex m_slist_mutex;
std::list<EventWatcher*> slist;
};
// -------------------------------------------------------------------------
#endif // DefaultEventLoop_H_
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
#ifndef EventLoopServer_H_
#define EventLoopServer_H_
// -------------------------------------------------------------------------
#include <ev++.h>
#include <atomic>
#include <thread>
// -------------------------------------------------------------------------
/*!
* \brief The EventLoopServer class
* Реализация общей части всех процессов использующих libev.
* Содержит свой (динамический) eventloop;
*/
class EventLoopServer
{
public:
EventLoopServer();
virtual ~EventLoopServer();
bool evIsActive();
protected:
// действия при завершении
// завершение своих ev::xxx.stop()
virtual void evfinish(){}
// подготовка перед запуском loop
// запусу своих ev::xxx.start()
virtual void evprepare(){}
// Управление потоком событий
void evrun( bool thread = true );
void evstop();
ev::dynamic_loop loop;
private:
void onStop();
void defaultLoop();
std::atomic_bool cancelled = { false };
std::atomic_bool isrunning = { false };
ev::async evterm;
std::shared_ptr<std::thread> thr;
};
// -------------------------------------------------------------------------
#endif // EventLoopServer_H_
// -------------------------------------------------------------------------
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include "DebugStream.h" #include "DebugStream.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UTCPSocket.h" #include "UTCPSocket.h"
#include "DefaultEventLoop.h" #include "EventLoopServer.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class LogSession; class LogSession;
class LogAgregator; class LogAgregator;
...@@ -71,7 +71,7 @@ LogReader. Читающих клиентов может быть скольуг ...@@ -71,7 +71,7 @@ LogReader. Читающих клиентов может быть скольуг
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class LogServer: class LogServer:
public EventWatcher public EventLoopServer
{ {
public: public:
...@@ -98,7 +98,7 @@ class LogServer: ...@@ -98,7 +98,7 @@ class LogServer:
inline bool isRunning() inline bool isRunning()
{ {
return (evloop && evloop->isActive()); return evIsActive();
} }
void init( const std::string& prefix, xmlNode* cnode = 0 ); void init( const std::string& prefix, xmlNode* cnode = 0 );
...@@ -108,7 +108,9 @@ class LogServer: ...@@ -108,7 +108,9 @@ class LogServer:
protected: protected:
LogServer(); LogServer();
void mainLoop( bool thread ); virtual void evprepare() override;
virtual void evfinish() override;
void ioAccept( ev::io& watcher, int revents ); void ioAccept( ev::io& watcher, int revents );
void sessionFinished( LogSession* s ); void sessionFinished( LogSession* s );
...@@ -122,7 +124,6 @@ class LogServer: ...@@ -122,7 +124,6 @@ class LogServer:
Debug::type sessLogLevel; Debug::type sessLogLevel;
size_t sessMaxCount = { 10 }; size_t sessMaxCount = { 10 };
std::atomic_bool running = { false };
DebugStream mylog; DebugStream mylog;
ev::io io; ev::io io;
...@@ -132,8 +133,6 @@ class LogServer: ...@@ -132,8 +133,6 @@ class LogServer:
std::string myname = { "LogServer" }; std::string myname = { "LogServer" };
std::string addr; std::string addr;
ost::tpport_t port; ost::tpport_t port;
std::shared_ptr<DefaultEventLoop> evloop;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // LogServer_H_ #endif // LogServer_H_
......
...@@ -62,7 +62,7 @@ class LogSession ...@@ -62,7 +62,7 @@ class LogSession
} }
// запуск обработки входящих запросов // запуск обработки входящих запросов
void run(); void run( ev::loop_ref& loop );
void terminate(); void terminate();
bool isAcive(); bool isAcive();
...@@ -93,8 +93,9 @@ class LogSession ...@@ -93,8 +93,9 @@ class LogSession
std::shared_ptr<LogAgregator> alog; std::shared_ptr<LogAgregator> alog;
sigc::connection conn; sigc::connection conn;
ev::io io;
std::shared_ptr<USocket> sock; std::shared_ptr<USocket> sock;
ev::io io;
ev::timer cmdTimer; ev::timer cmdTimer;
ev::async asyncEvent; ev::async asyncEvent;
std::mutex io_mutex; std::mutex io_mutex;
......
...@@ -4,18 +4,38 @@ ...@@ -4,18 +4,38 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <cc++/socket.h> #include <cc++/socket.h>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// обёртка над ost::UDPReceive, чтобы достучаться до "сырого сокета" // различные классы-обёртки, чтобы достучаться до "сырого сокета" и других функций
// для дальнейшего использования с libev.. // необходимых при использовании с libev..
// -------------------------------------------------------------------------
class UDPSocketU:
public ost::UDPSocket
{
public:
UDPSocketU( const ost::IPV4Address &bind, ost::tpport_t port):
ost::UDPSocket(bind,port)
{}
virtual ~UDPSocketU(){}
inline SOCKET getSocket(){ return ost::UDPSocket::so; }
};
// -------------------------------------------------------------------------
class UDPReceiveU: class UDPReceiveU:
public ost::UDPReceive public ost::UDPReceive
{ {
public: public:
SOCKET getSocket(){ return ost::UDPReceive::so; } UDPReceiveU( const ost::IPV4Address &bind, ost::tpport_t port):
ost::UDPReceive(bind,port)
{}
virtual ~UDPReceiveU(){}
inline SOCKET getSocket(){ return ost::UDPReceive::so; }
inline void setCompletion( bool set ){ ost::UDPReceive::setCompletion(set); }
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// обёртка над ost::UDPReceive, чтобы достучаться до "сырого сокета"
// для дальнейшего использования с libev..
class UDPDuplexU: class UDPDuplexU:
public ost::UDPDuplex public ost::UDPDuplex
{ {
...@@ -25,6 +45,8 @@ class UDPDuplexU: ...@@ -25,6 +45,8 @@ class UDPDuplexU:
ost::UDPDuplex(bind,port) ost::UDPDuplex(bind,port)
{} {}
virtual ~UDPDuplexU(){}
SOCKET getReceiveSocket(){ return ost::UDPReceive::so; } SOCKET getReceiveSocket(){ return ost::UDPReceive::so; }
void setReceiveCompletion( bool set ){ ost::UDPReceive::setCompletion(set); } void setReceiveCompletion( bool set ){ ost::UDPReceive::setCompletion(set); }
}; };
......
...@@ -14,9 +14,8 @@ ...@@ -14,9 +14,8 @@
#include "ModbusTypes.h" #include "ModbusTypes.h"
#include "ModbusServer.h" #include "ModbusServer.h"
#include "ModbusTCPSession.h" #include "ModbusTCPSession.h"
#include "ThreadCreator.h"
#include "UTCPSocket.h" #include "UTCPSocket.h"
#include "DefaultEventLoop.h" #include "EventLoopServer.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! ModbusTCPServer /*! ModbusTCPServer
* Реализация сервера на основе libev. Подерживается "много" соединений (постоянных). * Реализация сервера на основе libev. Подерживается "много" соединений (постоянных).
...@@ -26,7 +25,7 @@ ...@@ -26,7 +25,7 @@
* т.к.из многих "соединений" будут вызываться одни и теже обработатчики. * т.к.из многих "соединений" будут вызываться одни и теже обработатчики.
*/ */
class ModbusTCPServer: class ModbusTCPServer:
public EventWatcher, public EventLoopServer,
public ModbusServer public ModbusServer
{ {
public: public:
...@@ -38,6 +37,8 @@ class ModbusTCPServer: ...@@ -38,6 +37,8 @@ class ModbusTCPServer:
*/ */
void run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, bool thread = false ); void run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, bool thread = false );
virtual bool isActive() override;
void setMaxSessions( size_t num ); void setMaxSessions( size_t num );
inline size_t getMaxSessions() inline size_t getMaxSessions()
{ {
...@@ -91,10 +92,10 @@ class ModbusTCPServer: ...@@ -91,10 +92,10 @@ class ModbusTCPServer:
// ------------------------------------------------- // -------------------------------------------------
// Таймер. // Таймер.
// Т.к. mainLoop() "бесконечный", то сделана возможность // Т.к. eventLoop() "бесконечный", то сделана возможность
// подключиться к сигналу "таймера", например для обновления статистики по сессиям // подключиться к сигналу "таймера", например для обновления статистики по сессиям
// \warning Следует иметь ввиду, что обработчик сигнала, должен быть максимально коротким // \warning Следует иметь ввиду, что обработчик сигнала, должен быть максимально коротким
// т.к. на это время останавливается работа основного потока (mainLoop) // т.к. на это время останавливается работа основного потока (eventLoop)
// ------------------------------------------------- // -------------------------------------------------
typedef sigc::signal<void> TimerSignal; typedef sigc::signal<void> TimerSignal;
TimerSignal signal_timer(); TimerSignal signal_timer();
...@@ -110,8 +111,8 @@ class ModbusTCPServer: ...@@ -110,8 +111,8 @@ class ModbusTCPServer:
// функция receive пока не поддерживается... // функция receive пока не поддерживается...
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override; virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override;
virtual void mainLoop(); virtual void evprepare() override;
void finish(); virtual void evfinish() override;
virtual void ioAccept(ev::io& watcher, int revents); virtual void ioAccept(ev::io& watcher, int revents);
void onTimer( ev::timer& t, int revents ); void onTimer( ev::timer& t, int revents );
...@@ -148,13 +149,11 @@ class ModbusTCPServer: ...@@ -148,13 +149,11 @@ class ModbusTCPServer:
timeout_t sessTimeout = { 10000 }; // msec timeout_t sessTimeout = { 10000 }; // msec
std::shared_ptr<DefaultEventLoop> evloop;
ev::io io; ev::io io;
std::shared_ptr<UTCPSocket> sock;
ev::timer ioTimer; ev::timer ioTimer;
std::shared_ptr<UTCPSocket> sock;
const std::unordered_set<ModbusRTU::ModbusAddr>* vmbaddr; const std::unordered_set<ModbusRTU::ModbusAddr>* vmbaddr;
std::shared_ptr< ThreadCreator<ModbusTCPServer> > thrMainLoop;
TimerSignal m_timer_signal; TimerSignal m_timer_signal;
timeout_t tmTime_msec = { TIMEOUT_INF }; // время по умолчанию для таймера (TimerSignal) timeout_t tmTime_msec = { TIMEOUT_INF }; // время по умолчанию для таймера (TimerSignal)
...@@ -164,8 +163,6 @@ class ModbusTCPServer: ...@@ -164,8 +163,6 @@ class ModbusTCPServer:
// транслирование сигналов от Sessions.. // транслирование сигналов от Sessions..
void postReceiveEvent( ModbusRTU::mbErrCode res ); void postReceiveEvent( ModbusRTU::mbErrCode res );
ModbusRTU::mbErrCode preReceiveEvent( const std::unordered_set<ModbusRTU::ModbusAddr> vaddr, timeout_t tout ); ModbusRTU::mbErrCode preReceiveEvent( const std::unordered_set<ModbusRTU::ModbusAddr> vaddr, timeout_t tout );
std::atomic_bool cancelled;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // ModbusTCPServer_H_ #endif // ModbusTCPServer_H_
......
...@@ -53,7 +53,7 @@ class ModbusTCPSession: ...@@ -53,7 +53,7 @@ class ModbusTCPSession:
void setSessionTimeout( double t ); void setSessionTimeout( double t );
// запуск обработки входящих запросов // запуск обработки входящих запросов
void run(); void run( ev::loop_ref& loop );
virtual bool isActive() override; virtual bool isActive() override;
...@@ -125,9 +125,10 @@ class ModbusTCPSession: ...@@ -125,9 +125,10 @@ class ModbusTCPSession:
ModbusRTU::ModbusMessage buf; ModbusRTU::ModbusMessage buf;
ev::io io; ev::io io;
ev::timer ioTimeout;
std::shared_ptr<USocket> sock; std::shared_ptr<USocket> sock;
std::queue<UTCPCore::Buffer*> qsend; std::queue<UTCPCore::Buffer*> qsend;
ev::timer ioTimeout;
double sessTimeout = { 10.0 }; double sessTimeout = { 10.0 };
bool ignoreAddr = { false }; bool ignoreAddr = { false };
......
...@@ -31,8 +31,7 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ): ...@@ -31,8 +31,7 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ):
ignoreAddr(false), ignoreAddr(false),
maxSessions(10), maxSessions(10),
sessCount(0), sessCount(0),
sessTimeout(10000), sessTimeout(10000)
cancelled(false)
{ {
setCRCNoCheckit(true); setCRCNoCheckit(true);
{ {
...@@ -40,13 +39,14 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ): ...@@ -40,13 +39,14 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ):
s << iaddr << ":" << port; s << iaddr << ":" << port;
myname = s.str(); myname = s.str();
} }
io.set<ModbusTCPServer, &ModbusTCPServer::ioAccept>(this);
ioTimer.set<ModbusTCPServer, &ModbusTCPServer::onTimer>(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusTCPServer::~ModbusTCPServer() ModbusTCPServer::~ModbusTCPServer()
{ {
if( cancelled )
finish();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::setMaxSessions( unsigned int num ) void ModbusTCPServer::setMaxSessions( unsigned int num )
...@@ -80,18 +80,15 @@ void ModbusTCPServer::setSessionTimeout( timeout_t msec ) ...@@ -80,18 +80,15 @@ void ModbusTCPServer::setSessionTimeout( timeout_t msec )
void ModbusTCPServer::run( const std::unordered_set<ModbusAddr>& _vmbaddr, bool thread ) void ModbusTCPServer::run( const std::unordered_set<ModbusAddr>& _vmbaddr, bool thread )
{ {
vmbaddr = &_vmbaddr; vmbaddr = &_vmbaddr;
evrun(thread);
if( !thread )
{
mainLoop();
return;
}
thrMainLoop = make_shared< ThreadCreator<ModbusTCPServer> >(this, &ModbusTCPServer::mainLoop);
thrMainLoop->start();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::mainLoop() bool ModbusTCPServer::isActive()
{
return evIsActive();
}
// -------------------------------------------------------------------------
void ModbusTCPServer::evprepare()
{ {
try try
{ {
...@@ -107,40 +104,25 @@ void ModbusTCPServer::mainLoop() ...@@ -107,40 +104,25 @@ void ModbusTCPServer::mainLoop()
sock->setCompletion(false); sock->setCompletion(false);
io.set<ModbusTCPServer, &ModbusTCPServer::ioAccept>(this); io.set(loop);
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
ioTimer.set<ModbusTCPServer, &ModbusTCPServer::onTimer>(this); ioTimer.set(loop);
if( tmTime != TIMEOUT_INF ) if( tmTime_msec != TIMEOUT_INF )
ioTimer.start(tmTime); ioTimer.start(tmTime);
if( dlog->is_info() )
dlog->info() << myname << "(ModbusTCPServer): run main loop.." << endl;
{
evloop = DefaultEventLoop::inst();
evloop->run(this, false);
}
if( dlog->is_info() )
dlog->info() << myname << "(ModbusTCPServer): main loop exit.." << endl;
cancelled = true;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::terminate() void ModbusTCPServer::terminate()
{ {
finish(); evstop();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::finish() void ModbusTCPServer::evfinish()
{ {
if( cancelled ) if( !io.is_active() )
return; return;
cancelled = true;
if( dlog->is_info() ) if( dlog->is_info() )
dlog->info() << myname << "(ModbusTCPServer): terminate..." << endl; dlog->info() << myname << "(ModbusTCPServer): terminate..." << endl;
...@@ -161,9 +143,6 @@ void ModbusTCPServer::finish() ...@@ -161,9 +143,6 @@ void ModbusTCPServer::finish()
} }
catch( std::exception& ex ) {} catch( std::exception& ex ) {}
} }
if( evloop )
evloop->terminate(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::sessionFinished( ModbusTCPSession* s ) void ModbusTCPServer::sessionFinished( ModbusTCPSession* s )
...@@ -192,11 +171,6 @@ void ModbusTCPServer::getSessions( Sessions& lst ) ...@@ -192,11 +171,6 @@ void ModbusTCPServer::getSessions( Sessions& lst )
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusTCPServer::isActive()
{
return !cancelled;
}
// -------------------------------------------------------------------------
ModbusTCPServer::TimerSignal ModbusTCPServer::signal_timer() ModbusTCPServer::TimerSignal ModbusTCPServer::signal_timer()
{ {
return m_timer_signal; return m_timer_signal;
...@@ -204,16 +178,18 @@ ModbusTCPServer::TimerSignal ModbusTCPServer::signal_timer() ...@@ -204,16 +178,18 @@ ModbusTCPServer::TimerSignal ModbusTCPServer::signal_timer()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::setTimer( timeout_t msec ) void ModbusTCPServer::setTimer( timeout_t msec )
{ {
tmTime = msec; tmTime_msec = msec;
if( msec == TIMEOUT_INF ) if( msec == TIMEOUT_INF )
{ {
tmTime = 0; tmTime = 0;
if( ioTimer.is_active() )
ioTimer.stop(); ioTimer.stop();
} }
else else
{ {
tmTime = (double)msec / 1000.; tmTime = (double)msec / 1000.;
if( ioTimer.is_active() )
ioTimer.start( tmTime ); ioTimer.start( tmTime );
} }
} }
...@@ -228,7 +204,7 @@ void ModbusTCPServer::ioAccept(ev::io& watcher, int revents) ...@@ -228,7 +204,7 @@ void ModbusTCPServer::ioAccept(ev::io& watcher, int revents)
return; return;
} }
if( cancelled ) if( !evIsActive() )
{ {
if( dlog->is_crit() ) if( dlog->is_crit() )
dlog->crit() << myname << "(ModbusTCPServer::ioAccept): terminate work.." << endl; dlog->crit() << myname << "(ModbusTCPServer::ioAccept): terminate work.." << endl;
...@@ -284,7 +260,7 @@ void ModbusTCPServer::ioAccept(ev::io& watcher, int revents) ...@@ -284,7 +260,7 @@ void ModbusTCPServer::ioAccept(ev::io& watcher, int revents)
slist.push_back(s); slist.push_back(s);
} }
s->run(); s->run(loop);
sessCount++; sessCount++;
} }
catch( Exception& ex ) catch( Exception& ex )
...@@ -314,7 +290,7 @@ void ModbusTCPServer::onTimer( ev::timer& t, int revents ) ...@@ -314,7 +290,7 @@ void ModbusTCPServer::onTimer( ev::timer& t, int revents )
dlog->crit() << myname << "(onTimer): " << ex.what() << endl; dlog->crit() << myname << "(onTimer): " << ex.what() << endl;
} }
ioTimer.start(tmTime); // restart timer t.start(tmTime); // restart timer
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
...@@ -89,12 +89,15 @@ void ModbusTCPSession::setSessionTimeout( double t ) ...@@ -89,12 +89,15 @@ void ModbusTCPSession::setSessionTimeout( double t )
ioTimeout.start(t); ioTimeout.start(t);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPSession::run() void ModbusTCPSession::run( ev::loop_ref& loop )
{ {
if( dlog->is_info() ) if( dlog->is_info() )
dlog->info() << peername << "(run): run session.." << endl; dlog->info() << peername << "(run): run session.." << endl;
io.set(loop);
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
ioTimeout.set(loop);
ioTimeout.start(sessTimeout); ioTimeout.start(sessTimeout);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
...@@ -27,8 +27,6 @@ using namespace UniSetTypes; ...@@ -27,8 +27,6 @@ using namespace UniSetTypes;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::~LogServer() LogServer::~LogServer()
{ {
if( running )
terminate();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::LogServer( std::shared_ptr<LogAgregator> log ): LogServer::LogServer( std::shared_ptr<LogAgregator> log ):
...@@ -55,16 +53,11 @@ LogServer::LogServer(): ...@@ -55,16 +53,11 @@ LogServer::LogServer():
{ {
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::terminate() void LogServer::evfinish()
{ {
if( !running )
return;
if( mylog.is_info() ) if( mylog.is_info() )
mylog.info() << myname << "(LogServer): terminate..." << endl; mylog.info() << myname << "(LogServer): terminate..." << endl;
io.stop();
auto lst(slist); auto lst(slist);
// Копируем сперва себе список сессий.. // Копируем сперва себе список сессий..
...@@ -80,37 +73,24 @@ void LogServer::terminate() ...@@ -80,37 +73,24 @@ void LogServer::terminate()
catch( std::exception& ex ) {} catch( std::exception& ex ) {}
} }
running = false; io.stop();
cerr << "LOGServer: finished..." << endl;
if( evloop )
evloop->terminate(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread ) void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread )
{ {
addr = _addr; addr = _addr;
port = _port; port = _port;
evrun(thread);
if( !running )
{
if( !thread )
running = true;
mainLoop( thread );
running = true;
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::mainLoop( bool thread ) void LogServer::terminate()
{
evstop();
}
// -------------------------------------------------------------------------
void LogServer::evprepare()
{ {
if( running )
{
if( elog->is_crit() )
elog->crit() << myname << "(LogServer::mainLoopt): ALREADY RUNNING.." << endl;
return;
}
try try
{ {
ost::InetAddress iaddr(addr.c_str()); ost::InetAddress iaddr(addr.c_str());
...@@ -137,32 +117,26 @@ void LogServer::mainLoop( bool thread ) ...@@ -137,32 +117,26 @@ void LogServer::mainLoop( bool thread )
} }
sock->setCompletion(false); sock->setCompletion(false);
UTCPCore::setKeepAliveParams(sock->getSocket());
io.set<LogServer, &LogServer::ioAccept>(this); io.set<LogServer, &LogServer::ioAccept>(this);
io.set(loop);
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
// скобки специально чтобы пораньше освободить evloop (выйти из "зоны" видимости)
{
evloop = DefaultEventLoop::inst();
evloop->run( this, thread );
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::ioAccept( ev::io& watcher, int revents ) void LogServer::ioAccept( ev::io& watcher, int revents )
{ {
if (EV_ERROR & revents) if( EV_ERROR & revents )
{ {
if( elog->is_crit() ) if( mylog.is_crit() )
elog->crit() << myname << "(LogServer::ioAccept): invalid event" << endl; mylog.crit() << myname << "(LogServer::ioAccept): invalid event" << endl;
return; return;
} }
if( !running ) if( !evIsActive() )
{ {
if( elog->is_crit() ) if( mylog.is_crit() )
elog->crit() << myname << "(LogServer::ioAccept): terminate work.." << endl; mylog.crit() << myname << "(LogServer::ioAccept): terminate work.." << endl;
sock->reject(); sock->reject();
return; return;
...@@ -190,7 +164,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents ) ...@@ -190,7 +164,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s); slist.push_back(s);
} }
s->run(); s->run(loop);
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
......
...@@ -105,16 +105,19 @@ void LogSession::logOnEvent( const std::string& s ) ...@@ -105,16 +105,19 @@ void LogSession::logOnEvent( const std::string& s )
asyncEvent.send(); asyncEvent.send();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::run() void LogSession::run( ev::loop_ref& loop )
{ {
setSessionLogLevel(Debug::ANY); setSessionLogLevel(Debug::ANY);
if( mylog.is_info() ) if( mylog.is_info() )
mylog.info() << peername << "(run): run session.." << endl; mylog.info() << peername << "(run): run session.." << endl;
asyncEvent.set(loop);
cmdTimer.set(loop);
io.set(loop);
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
cmdTimer.start( cmdTimeout / 1000. ); cmdTimer.start( cmdTimeout / 1000. );
// asyncEvent.start(); // слать логи начинаем только после обработки команд.. если есть..
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::terminate() void LogSession::terminate()
......
#include <iostream>
#include <chrono>
#include "DefaultEventLoop.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
std::shared_ptr<DefaultEventLoop> DefaultEventLoop::_inst;
// ---------------------------------------------------------------------------
std::shared_ptr<DefaultEventLoop> DefaultEventLoop::inst()
{
if( _inst == nullptr )
_inst = std::shared_ptr<DefaultEventLoop>( new DefaultEventLoop() );
return _inst;
}
// ---------------------------------------------------------------------------
DefaultEventLoop::~DefaultEventLoop()
{
if( !cancelled )
{
cerr << " ~DefaultEventLoop(): not canceled.." << endl;
finish();
}
}
// ---------------------------------------------------------------------------
void DefaultEventLoop::run( EventWatcher* s, bool thread )
{
if( cancelled )
return;
{
std::unique_lock<std::mutex> lk(m_run_mutex);
if( !thr )
thr = make_shared<std::thread>( [ = ] { defaultLoop(); } );
}
{
std::unique_lock<std::mutex> lk(m_slist_mutex);
slist.push_back(s);
}
if( !thread )
{
std::unique_lock<std::mutex> lk(m_mutex);
while( !m_notify )
m_event.wait(lk);
if( thr->joinable() )
thr->join();
}
}
// -------------------------------------------------------------------------
bool DefaultEventLoop::isActive()
{
return !cancelled;
}
// -------------------------------------------------------------------------
void DefaultEventLoop::terminate( EventWatcher* s )
{
cerr << "(DefaultEventLoop::defaultLoop): terminate.." << endl;
std::unique_lock<std::mutex> lk(m_slist_mutex);
for( auto i = slist.begin(); i != slist.end(); i++ )
{
if( (*i) == s )
{
slist.erase(i);
break;
}
}
if( slist.empty() && !cancelled && evloop )
finish();
}
// -------------------------------------------------------------------------
void DefaultEventLoop::finish()
{
cerr << "(DefaultEventLoop::fini): TERMINATE EVENT LOOP.." << endl;
cancelled = true;
if( !evloop )
return;
cerr << "(DefaultEventLoop::fini): BREAK EVENT LOOP.." << endl;
evloop->break_loop(ev::ALL);
std::unique_lock<std::mutex> lk(m_mutex);
m_event.wait_for(lk, std::chrono::seconds(1), [ = ]()
{
return (m_notify == true);
} );
if( !m_notify )
{
// а как прервать EVENT LOOP?!!
cerr << "(DefaultEventLoop::fini): MAIN LOOP NOT BREAKED! kill(SIGTERM)!.." << endl;
raise(SIGTERM);
}
if( thr && thr->joinable() )
{
cerr << "(DefaultEventLoop::fini): join.." << endl;
thr->join();
}
cerr << "(DefaultEventLoop::fini): exit..." << endl;
_inst.reset();
}
// -------------------------------------------------------------------------
DefaultEventLoop::DefaultEventLoop()
{
}
// -------------------------------------------------------------------------
void DefaultEventLoop::defaultLoop()
{
cerr << "(DefaultEventLoop::defaultLoop): run main loop.." << endl;
// скобки специально чтобы evloop пораньше вышел из "зоны" видимости
{
evloop = make_shared<ev::default_loop>();
while( !cancelled )
{
try
{
evloop->run(0);
}
catch( std::exception& ex )
{
cerr << "(DefaultEventLoop::defaultLoop): " << ex.what() << endl;
}
}
cerr << "(DefaultEventLoop::defaultLoop): MAIN EVENT LOOP EXIT ****************" << endl;
}
{
std::unique_lock<std::mutex> lk(m_mutex);
m_notify = true;
}
cancelled = true;
m_event.notify_all();
}
// -------------------------------------------------------------------------
#include <iostream>
#include "EventLoopServer.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
EventLoopServer::EventLoopServer():
loop(ev::AUTO)
{
evterm.set(loop);
evterm.set<EventLoopServer, &EventLoopServer::onStop>(this);
}
// -------------------------------------------------------------------------
EventLoopServer::~EventLoopServer()
{
if( !cancelled )
evstop();
}
// ---------------------------------------------------------------------------
void EventLoopServer::evrun( bool thread )
{
if( isrunning )
return;
isrunning = true;
if( !thread )
{
defaultLoop();
return;
}
else if( !thr )
thr = make_shared<std::thread>( [ = ] { defaultLoop(); } );
}
// ---------------------------------------------------------------------------
bool EventLoopServer::evIsActive()
{
return isrunning;
}
// -------------------------------------------------------------------------
void EventLoopServer::evstop()
{
cancelled = true;
evterm.send();
if( thr )
{
thr->join();
thr = nullptr;
}
}
// -------------------------------------------------------------------------
void EventLoopServer::onStop()
{
try
{
evfinish();
}
catch( std::exception& ex )
{
}
evterm.stop();
loop.break_loop(ev::ALL);
}
// -------------------------------------------------------------------------
void EventLoopServer::defaultLoop()
{
evterm.start();
evprepare();
while( !cancelled )
{
try
{
loop.run(0);
}
catch( std::exception& ex )
{
cerr << "(EventLoopServer::defaultLoop): " << ex.what() << endl;
}
}
cancelled = true;
isrunning = false;
}
// -------------------------------------------------------------------------
...@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libProcesses.la ...@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libProcesses.la
libProcesses_la_CXXFLAGS = $(SIGC_CFLAGS) libProcesses_la_CXXFLAGS = $(SIGC_CFLAGS)
libProcesses_la_LIBADD = $(SIGC_LIBS) libProcesses_la_LIBADD = $(SIGC_LIBS)
libProcesses_la_SOURCES = IOController_iSK.cc IOController.cc IONotifyController.cc \ libProcesses_la_SOURCES = IOController_iSK.cc IOController.cc IONotifyController.cc \
NCRestorer.cc NCRestorer_XML.cc DefaultEventLoop.cc NCRestorer.cc NCRestorer_XML.cc EventLoopServer.cc
local-clean: local-clean:
rm -rf *iSK.cc rm -rf *iSK.cc
......
...@@ -269,7 +269,7 @@ include/HourGlass.h ...@@ -269,7 +269,7 @@ include/HourGlass.h
include/IOController.h include/IOController.h
include/IONotifyController.h include/IONotifyController.h
include/IORFile.h include/IORFile.h
include/DefaultEventLoop.h include/EventLoopServer.h
include/LogAgregator.h include/LogAgregator.h
include/LogReader.h include/LogReader.h
include/LogServer.h include/LogServer.h
...@@ -389,7 +389,7 @@ src/Processes/IONotifyController.cc ...@@ -389,7 +389,7 @@ src/Processes/IONotifyController.cc
src/Processes/Makefile.am src/Processes/Makefile.am
src/Processes/NCRestorer.cc src/Processes/NCRestorer.cc
src/Processes/NCRestorer_XML.cc src/Processes/NCRestorer_XML.cc
src/Processes/DefaultEventLoop.cc src/Processes/EventLoopServer.cc
src/Services/DBServer.cc src/Services/DBServer.cc
src/Services/Makefile.am src/Services/Makefile.am
src/Timers/Makefile.am src/Timers/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment