Commit 4ecc9a59 authored by Pavel Vainerman's avatar Pavel Vainerman

(MBSlave): добавил таймер контроля работы основного потока обмена (eventloop),

и перезапуск в случае его "вылета".
parent 17792ffe
...@@ -150,6 +150,10 @@ namespace uniset ...@@ -150,6 +150,10 @@ namespace uniset
timeout_t aftersend_pause = conf->getArgInt("--" + prefix + "-aftersend-pause", it.getProp("afterSendPause")); timeout_t aftersend_pause = conf->getArgInt("--" + prefix + "-aftersend-pause", it.getProp("afterSendPause"));
checkExchangeTime = conf->getArgPInt("--" + prefix + "-check-exchange-time", it.getProp("checkExchangeTime"), 10000);
vmonit(checkExchangeTime);
vmonit(restartTCPServerCount);
mbtype = conf->getArgParam("--" + prefix + "-type", it.getProp("type")); mbtype = conf->getArgParam("--" + prefix + "-type", it.getProp("type"));
if( mbtype == "RTU" ) if( mbtype == "RTU" )
...@@ -646,7 +650,12 @@ namespace uniset ...@@ -646,7 +650,12 @@ namespace uniset
// для обновления пороговых датчиков // для обновления пороговых датчиков
tcpserver->signal_post_receive().connect( sigc::mem_fun(this, &MBSlave::postReceiveEvent) ); tcpserver->signal_post_receive().connect( sigc::mem_fun(this, &MBSlave::postReceiveEvent) );
mbinfo << myname << "(execute_tcp): run tcpserver (" runTCPServer();
}
// -------------------------------------------------------------------------
void MBSlave::runTCPServer()
{
mbinfo << myname << "(runTCPServer): run tcpserver ("
<< tcpserver->getInetAddress() << ":" << tcpserver->getInetPort() << tcpserver->getInetAddress() << ":" << tcpserver->getInetPort()
<< ")" << ")"
<< "[" << "["
...@@ -713,12 +722,9 @@ namespace uniset ...@@ -713,12 +722,9 @@ namespace uniset
throw; throw;
} }
cerr << myname << "**************** " << endl;
msleep(tcpRepeatCreateSocketPause); msleep(tcpRepeatCreateSocketPause);
} }
// tcpCancelled = true;
// mbinfo << myname << "(execute_tcp): tcpserver stopped.." << endl;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBSlave::updateStatistics() void MBSlave::updateStatistics()
...@@ -787,10 +793,29 @@ namespace uniset ...@@ -787,10 +793,29 @@ namespace uniset
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBSlave::updateTCPStatistics() void MBSlave::updateTCPStatistics()
{ {
#if 0
// тестирование вылета при SEGFAULT
// или перезапуска при невыловленном EXCEPTION
// для mainloop
static size_t upCounter = 0;
upCounter++;
if( ++upCounter > 5 )
{
// IOMap::iterator i;
// cout << "SEGFAULT: " << i->first << endl;
upCounter = 0;
cerr << ".....THROW...." << endl;
throw std::string("TEST STRING EXCEPTION");
}
#endif
// ВНИМАНИЕ! Эта функция вызывается из основного eventLoop // ВНИМАНИЕ! Эта функция вызывается из основного eventLoop
// поэтому она должна быть максимально быстрой и безопасной // поэтому она должна быть максимально быстрой и безопасной
// иначе накроется весь обмен // иначе накроется весь обмен
// т.к. на это время останавливается работа основного потока (eventLoop) // т.к. на это время останавливается работа основного потока (eventLoop)
// принимающего запросы
try try
{ {
...@@ -951,6 +976,8 @@ namespace uniset ...@@ -951,6 +976,8 @@ namespace uniset
thr->start(); thr->start();
else if( mbtype == "TCP") else if( mbtype == "TCP")
execute_tcp(); execute_tcp();
askTimer(tmCheckExchange,checkExchangeTime);
} }
break; break;
...@@ -1083,9 +1110,25 @@ namespace uniset ...@@ -1083,9 +1110,25 @@ namespace uniset
} }
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void MBSlave::timerInfo( const TimerMessage* tm )
{
if( tm->id == tmCheckExchange )
{
if( !tcpserver )
return;
if( !tcpserver->isActive() )
{
mbwarn << myname << "(timerInfo): tcpserver thread failed! restart.." << endl;
restartTCPServerCount++;
runTCPServer();
}
}
}
// ------------------------------------------------------------------------------------------
bool MBSlave::activateObject() bool MBSlave::activateObject()
{ {
// блокирование обработки Starsp // блокирование обработки StartUp
// пока не пройдёт инициализация датчиков // пока не пройдёт инициализация датчиков
// см. sysCommand() // см. sysCommand()
{ {
...@@ -2839,7 +2882,14 @@ namespace uniset ...@@ -2839,7 +2882,14 @@ namespace uniset
inf << " " << ModbusRTU::addr2str(m.first) << ": iomap=" << m.second.size() << endl; inf << " " << ModbusRTU::addr2str(m.first) << ": iomap=" << m.second.size() << endl;
inf << " myaddr: " << ModbusServer::vaddr2str(vaddr) << endl; inf << " myaddr: " << ModbusServer::vaddr2str(vaddr) << endl;
inf << "Statistic: connectionCount=" << connCount << " smPingOK=" << smPingOK << endl; inf << "Statistic:"
<< " connectionCount=" << connCount
<< " smPingOK=" << smPingOK;
if( tcpserver )
inf << " restartTCPServerCount=" << restartTCPServerCount;
inf << endl;
if( sslot ) // т.е. если у нас tcp if( sslot ) // т.е. если у нас tcp
{ {
......
...@@ -500,6 +500,7 @@ namespace uniset ...@@ -500,6 +500,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* msg ) override; virtual void sysCommand( const uniset::SystemMessage* msg ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override; virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void timerInfo( const uniset::TimerMessage* tm ) override;
void askSensors( UniversalIO::UIOCommand cmd ); void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady(); void waitSMReady();
virtual void execute_rtu(); virtual void execute_rtu();
...@@ -508,6 +509,7 @@ namespace uniset ...@@ -508,6 +509,7 @@ namespace uniset
virtual void updateTCPStatistics(); virtual void updateTCPStatistics();
virtual void updateThresholds(); virtual void updateThresholds();
virtual void postReceiveEvent( ModbusRTU::mbErrCode res ); virtual void postReceiveEvent( ModbusRTU::mbErrCode res );
void runTCPServer();
virtual bool activateObject() override; virtual bool activateObject() override;
virtual bool deactivateObject() override; virtual bool deactivateObject() override;
...@@ -516,6 +518,13 @@ namespace uniset ...@@ -516,6 +518,13 @@ namespace uniset
virtual void sigterm( int signo ) override; virtual void sigterm( int signo ) override;
virtual void finalThread(); virtual void finalThread();
enum Timer
{
tmCheckExchange
};
uniset::timeout_t checkExchangeTime = { 10000 }; // контроль "живости" потока обмена, мсек
virtual void initIterators(); virtual void initIterators();
bool initItem( UniXML::iterator& it ); bool initItem( UniXML::iterator& it );
bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec ); bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec );
...@@ -566,6 +575,7 @@ namespace uniset ...@@ -566,6 +575,7 @@ namespace uniset
PassiveTimer ptTimeout; PassiveTimer ptTimeout;
long connCount = { 0 }; long connCount = { 0 };
long restartTCPServerCount = { 0 };
std::atomic_bool activated = { false }; std::atomic_bool activated = { false };
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
......
...@@ -39,13 +39,14 @@ namespace uniset ...@@ -39,13 +39,14 @@ namespace uniset
*/ */
bool async_evrun( size_t waitRunningTimeout_msec = 60000 ); bool async_evrun( size_t waitRunningTimeout_msec = 60000 );
void evstop(); /*!< остановить раннее запущенный поток (async_run) */
/*! синхронный запуск /*! синхронный запуск
* функция вернёт управление, только в случае неудачного запуска * функция вернёт управление, только в случае неудачного запуска
* либо если evrun уже был вызван
*/ */
bool evrun(); bool evrun();
void evstop(); /*!< остановить раннее запущенный поток (event loop) */
ev::dynamic_loop loop; ev::dynamic_loop loop;
private: private:
...@@ -57,6 +58,7 @@ namespace uniset ...@@ -57,6 +58,7 @@ namespace uniset
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
std::atomic_bool isactive = { false }; std::atomic_bool isactive = { false };
std::timed_mutex run_mutex;
ev::async evterm; ev::async evterm;
std::shared_ptr<std::thread> thr; std::shared_ptr<std::thread> thr;
......
...@@ -24,17 +24,38 @@ namespace uniset ...@@ -24,17 +24,38 @@ namespace uniset
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
bool EventLoopServer::evrun() bool EventLoopServer::evrun()
{ {
{
std::lock_guard<std::timed_mutex> l(run_mutex);
if( isactive )
return false;
isactive = true;
}
defaultLoop(); // <-- здесь бесконечный цикл.. defaultLoop(); // <-- здесь бесконечный цикл..
return false; return false;
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
bool EventLoopServer::async_evrun( size_t timeout_msec ) bool EventLoopServer::async_evrun( size_t timeout_msec )
{ {
if( !run_mutex.try_lock_for(std::chrono::milliseconds(timeout_msec)) )
return false;
std::lock_guard<std::timed_mutex> l(run_mutex,std::adopt_lock);
if( isactive ) if( isactive )
return true; return true;
isactive = true; isactive = true;
if( thr )
{
if( thr->joinable() )
thr->join();
thr = nullptr;
}
if( !thr ) if( !thr )
thr = make_shared<std::thread>( [&] { defaultLoop(); } ); thr = make_shared<std::thread>( [&] { defaultLoop(); } );
...@@ -43,7 +64,9 @@ namespace uniset ...@@ -43,7 +64,9 @@ namespace uniset
// если запуститься не удалось // если запуститься не удалось
if( !ret && thr ) if( !ret && thr )
{ {
thr->join(); if( thr->joinable() )
thr->join();
thr = nullptr; thr = nullptr;
} }
...@@ -57,12 +80,26 @@ namespace uniset ...@@ -57,12 +80,26 @@ namespace uniset
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void EventLoopServer::evstop() void EventLoopServer::evstop()
{ {
cancelled = true; {
evterm.send(); std::lock_guard<std::timed_mutex> l(run_mutex);
if( thr && !isactive )
{
if( thr->joinable() )
thr->join();
thr = nullptr;
return;
}
cancelled = true;
evterm.send();
}
if( thr ) if( thr )
{ {
thr->join(); if( thr->joinable() )
thr->join();
thr = nullptr; thr = nullptr;
} }
} }
...@@ -115,10 +152,12 @@ namespace uniset ...@@ -115,10 +152,12 @@ namespace uniset
cerr << "(EventLoopServer::defaultLoop): UNKNOWN EXCEPTION.." << endl; cerr << "(EventLoopServer::defaultLoop): UNKNOWN EXCEPTION.." << endl;
} }
isrunning = false; {
looprunOK_event.notify_all(); std::lock_guard<std::timed_mutex> l(run_mutex);
isrunning = false;
isactive = false; looprunOK_event.notify_all();
isactive = false;
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool EventLoopServer::waitDefaultLoopRunning( size_t waitTimeout_msec ) bool EventLoopServer::waitDefaultLoopRunning( size_t waitTimeout_msec )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment