Commit 69067405 authored by Pavel Vainerman's avatar Pavel Vainerman

Работа на корректным завершением для ModbusTCPMaster и MBSlave

(modbustcpserver-echo): исправил ошибку в обработке аргументов командной строки (не выводился help)
parent fef7e52e
...@@ -67,6 +67,10 @@ Version 2.5 ...@@ -67,6 +67,10 @@ Version 2.5
UNET: сделать чтобы "канал"(UReceiver, USender) не вылетал если не удалось создать сокет, а собственно, UNET: сделать чтобы "канал"(UReceiver, USender) не вылетал если не удалось создать сокет, а собственно,
делал периодические попытки пока не получиться...(т.к. сеть может подняться позже внешними системами мониторинга) делал периодические попытки пока не получиться...(т.к. сеть может подняться позже внешними системами мониторинга)
libev
=======
- переписать UNetUDP
- UniSetActivator (обработка сигналов и возможно вынести сюда DefaultEventLoop)
SM: подумать насчёт асинхронности публикации событий и посылки уведомления (setValue/push) через очередь.. SM: подумать насчёт асинхронности публикации событий и посылки уведомления (setValue/push) через очередь..
......
...@@ -49,7 +49,7 @@ int main( int argc, char** argv ) ...@@ -49,7 +49,7 @@ int main( int argc, char** argv )
{ {
while(1) while(1)
{ {
opt = getopt_long(argc, argv, "hva:p:i:bc:", longopts, &optindex); opt = getopt_long(argc, argv, "hva:p:i:c:", longopts, &optindex);
if( opt == -1 ) if( opt == -1 )
break; break;
......
...@@ -121,8 +121,13 @@ int main( int argc, char** argv ) ...@@ -121,8 +121,13 @@ int main( int argc, char** argv )
try try
{ {
while( (opt = getopt_long(argc, argv, "hvna:w:z:r:x:c:b:d:s:t:p:i:ol:d:e:u:", longopts, &optindex)) != -1 ) while( 1 )
{ {
opt = getopt_long(argc, argv, "hvna:w:z:r:x:c:b:d:s:t:p:i:ol:d:e:u:", longopts, &optindex);
if( opt == -1 )
break;
switch (opt) switch (opt)
{ {
case 'h': case 'h':
......
...@@ -27,8 +27,6 @@ MBTCPTestServer::MBTCPTestServer( const std::unordered_set<ModbusAddr>& _vaddr, ...@@ -27,8 +27,6 @@ MBTCPTestServer::MBTCPTestServer( const std::unordered_set<ModbusAddr>& _vaddr,
lastWriteOutputSingleRegister(0), lastWriteOutputSingleRegister(0),
lastForceCoilsQ(0, 0), lastForceCoilsQ(0, 0),
lastWriteOutputQ(0, 0), lastWriteOutputQ(0, 0),
thr(0),
isrunning(false),
disabled(false) disabled(false)
{ {
ost::InetAddress ia(inetaddr.c_str()); ost::InetAddress ia(inetaddr.c_str());
...@@ -87,15 +85,8 @@ MBTCPTestServer::MBTCPTestServer( const std::unordered_set<ModbusAddr>& _vaddr, ...@@ -87,15 +85,8 @@ MBTCPTestServer::MBTCPTestServer( const std::unordered_set<ModbusAddr>& _vaddr,
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
MBTCPTestServer::~MBTCPTestServer() MBTCPTestServer::~MBTCPTestServer()
{ {
if( thr ) if( sslot )
{ sslot->terminate();
thr->stop();
if( thr->isRunning() )
thr->join();
}
delete sslot;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBTCPTestServer::setLog( std::shared_ptr<DebugStream> dlog ) void MBTCPTestServer::setLog( std::shared_ptr<DebugStream> dlog )
...@@ -104,21 +95,16 @@ void MBTCPTestServer::setLog( std::shared_ptr<DebugStream> dlog ) ...@@ -104,21 +95,16 @@ void MBTCPTestServer::setLog( std::shared_ptr<DebugStream> dlog )
sslot->setLog(dlog); sslot->setLog(dlog);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBTCPTestServer::runThread()
{
thr = new ThreadCreator<MBTCPTestServer>(this, &MBTCPTestServer::execute);
thr->start();
}
// -------------------------------------------------------------------------
void MBTCPTestServer::execute() void MBTCPTestServer::execute()
{ {
isrunning = true; if( sslot )
sslot->run( vaddr ); sslot->run( vaddr, true );
isrunning = false;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBTCPTestServer::sigterm( int signo ) void MBTCPTestServer::sigterm( int signo )
{ {
if( sslot )
sslot->terminate();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusRTU::mbErrCode MBTCPTestServer::readCoilStatus( ReadCoilMessage& query, ModbusRTU::mbErrCode MBTCPTestServer::readCoilStatus( ReadCoilMessage& query,
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
#include <atomic> #include <atomic>
#include <ostream> #include <ostream>
#include <unordered_set> #include <unordered_set>
#include "ThreadCreator.h"
#include "modbus/ModbusTCPServerSlot.h" #include "modbus/ModbusTCPServerSlot.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! Реализация MBTCPTestServer для тестирования */ /*! Реализация MBTCPTestServer для тестирования */
...@@ -31,14 +30,12 @@ class MBTCPTestServer ...@@ -31,14 +30,12 @@ class MBTCPTestServer
sslot->setIgnoreAddrMode(state); sslot->setIgnoreAddrMode(state);
} }
void runThread(); /*!< запуск с отдельным потоком */
void execute(); /*!< основной цикл работы */ void execute(); /*!< основной цикл работы */
void setLog( std::shared_ptr<DebugStream> dlog ); void setLog( std::shared_ptr<DebugStream> dlog );
inline bool isRunning() inline bool isRunning()
{ {
return isrunning; return ( sslot && sslot->isActive() );
} }
inline void disableExchange( bool set = true ) inline void disableExchange( bool set = true )
...@@ -157,8 +154,6 @@ class MBTCPTestServer ...@@ -157,8 +154,6 @@ class MBTCPTestServer
#endif #endif
private: private:
ThreadCreator<MBTCPTestServer>* thr;
std::atomic_bool isrunning;
bool disabled; bool disabled;
std::string myname; std::string myname;
}; };
......
...@@ -72,7 +72,7 @@ static void InitTest() ...@@ -72,7 +72,7 @@ static void InitTest()
//mbs->setVerbose(true); //mbs->setVerbose(true);
CHECK( mbs != nullptr ); CHECK( mbs != nullptr );
mbs->runThread(); mbs->execute();
for( int i = 0; !mbs->isRunning() && i < 10; i++ ) for( int i = 0; !mbs->isRunning() && i < 10; i++ )
msleep(200); msleep(200);
......
...@@ -73,7 +73,7 @@ static void InitTest() ...@@ -73,7 +73,7 @@ static void InitTest()
CHECK( mbs1 != nullptr ); CHECK( mbs1 != nullptr );
mbs1->setReply(0); mbs1->setReply(0);
mbs1->runThread(); mbs1->execute();
for( int i = 0; !mbs1->isRunning() && i < 10; i++ ) for( int i = 0; !mbs1->isRunning() && i < 10; i++ )
msleep(200); msleep(200);
...@@ -105,7 +105,7 @@ static void InitTest() ...@@ -105,7 +105,7 @@ static void InitTest()
CHECK( mbs2 != nullptr ); CHECK( mbs2 != nullptr );
mbs2->setReply(0); mbs2->setReply(0);
mbs2->runThread(); mbs2->execute();
for( int i = 0; !mbs2->isRunning() && i < 10; i++ ) for( int i = 0; !mbs2->isRunning() && i < 10; i++ )
msleep(200); msleep(200);
......
...@@ -147,9 +147,9 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const ...@@ -147,9 +147,9 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const
timeout_t aftersend_pause = conf->getArgInt("--" + prefix + "-aftersend-pause", it.getProp("afterSendPause")); timeout_t aftersend_pause = conf->getArgInt("--" + prefix + "-aftersend-pause", it.getProp("afterSendPause"));
string stype = conf->getArgParam("--" + prefix + "-type", it.getProp("type")); mbtype = conf->getArgParam("--" + prefix + "-type", it.getProp("type"));
if( stype == "RTU" ) if( mbtype == "RTU" )
{ {
// ---------- init RS ---------- // ---------- init RS ----------
string dev = conf->getArgParam("--" + prefix + "-dev", it.getProp("device")); string dev = conf->getArgParam("--" + prefix + "-dev", it.getProp("device"));
...@@ -183,7 +183,7 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const ...@@ -183,7 +183,7 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const
rs->setLog(l); rs->setLog(l);
conf->initLogStream(l, prefix + "-exchangelog"); conf->initLogStream(l, prefix + "-exchangelog");
} }
else if( stype == "TCP" ) else if( mbtype == "TCP" )
{ {
string iaddr = conf->getArgParam("--" + prefix + "-inet-addr", it.getProp("iaddr")); string iaddr = conf->getArgParam("--" + prefix + "-inet-addr", it.getProp("iaddr"));
...@@ -201,8 +201,8 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const ...@@ -201,8 +201,8 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const
tcpserver->setReplyTimeout(reply_tout); tcpserver->setReplyTimeout(reply_tout);
mbslot = std::static_pointer_cast<ModbusServerSlot>(tcpserver); mbslot = std::static_pointer_cast<ModbusServerSlot>(tcpserver);
thr = make_shared< ThreadCreator<MBSlave> >(this, &MBSlave::execute_tcp); //thr = make_shared< ThreadCreator<MBSlave> >(this, &MBSlave::execute_tcp);
thr->setFinalAction(this, &MBSlave::finalThread); //thr->setFinalAction(this, &MBSlave::finalThread);
mbinfo << myname << "(init): init TCP connection ok. " << " inet=" << iaddr << " port=" << port << endl; mbinfo << myname << "(init): init TCP connection ok. " << " inet=" << iaddr << " port=" << port << endl;
ostringstream n; ostringstream n;
...@@ -490,31 +490,14 @@ MBSlave::~MBSlave() ...@@ -490,31 +490,14 @@ MBSlave::~MBSlave()
{ {
cancelled = true; cancelled = true;
if( tcpserver ) if( tcpserver && tcpserver->isActive() )
{ tcpserver->terminate();
if( !tcpCancelled )
{
sigterm(SIGTERM);
timeout_t waitPause = updateStatTime / 10;
// специально делаем больше шагов(15).. чтобы с запасом..
for( int i = 0; i < 15 && !tcpCancelled; i++ )
msleep(waitPause);
}
if( !tcpCancelled )
{
dcrit << myname << "(~): TCP NOT CANCELED" << endl;
}
}
if( thr && thr->isRunning() ) if( thr && thr->isRunning() )
{ {
thr->stop(); thr->stop();
// thr->join(); // thr->join();
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void MBSlave::finalThread() void MBSlave::finalThread()
...@@ -633,11 +616,10 @@ void MBSlave::execute_tcp() ...@@ -633,11 +616,10 @@ void MBSlave::execute_tcp()
tcpCancelled = false; tcpCancelled = false;
tcpserver->run( vaddr ); tcpserver->run( vaddr, true );
tcpCancelled = true; // tcpCancelled = true;
// mbinfo << myname << "(execute_tcp): tcpserver stopped.." << endl;
mbinfo << myname << "(execute_tcp): tcpserver stopped.." << endl;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBSlave::updateStatistics() void MBSlave::updateStatistics()
...@@ -726,6 +708,7 @@ void MBSlave::updateTCPStatistics() ...@@ -726,6 +708,7 @@ void MBSlave::updateTCPStatistics()
// суммарное количество по всем // суммарное количество по всем
askCount = 0; askCount = 0;
for( const auto& s : sess ) for( const auto& s : sess )
{ {
if( !activated || cancelled ) if( !activated || cancelled )
...@@ -796,7 +779,7 @@ void MBSlave::updateTCPStatistics() ...@@ -796,7 +779,7 @@ void MBSlave::updateTCPStatistics()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBSlave::updateThresholds() void MBSlave::updateThresholds()
{ {
for( auto&& i: thrlist ) for( auto && i : thrlist )
{ {
try try
{ {
...@@ -857,7 +840,11 @@ void MBSlave::sysCommand( const UniSetTypes::SystemMessage* sm ) ...@@ -857,7 +840,11 @@ void MBSlave::sysCommand( const UniSetTypes::SystemMessage* sm )
{ {
UniSetTypes::uniset_rwmutex_rlock l(mutex_start); UniSetTypes::uniset_rwmutex_rlock l(mutex_start);
askSensors(UniversalIO::UIONotify); askSensors(UniversalIO::UIONotify);
thr->start();
if( mbtype == "RTU" && thr )
thr->start();
else if( mbtype == "TCP")
execute_tcp();
} }
break; break;
...@@ -1011,12 +998,13 @@ bool MBSlave::deactivateObject() ...@@ -1011,12 +998,13 @@ bool MBSlave::deactivateObject()
{ {
mbinfo << myname << "(deactivateObject): ..." << endl; mbinfo << myname << "(deactivateObject): ..." << endl;
if( cancelled )
return UniSetObject::deactivateObject();
activated = false; activated = false;
cancelled = true; cancelled = true;
if( tcpserver && tcpserver->isAcive() ) if( mbtype == "RTU" )
tcpserver->sigterm(SIGTERM);
else
{ {
try try
{ {
...@@ -1029,21 +1017,30 @@ bool MBSlave::deactivateObject() ...@@ -1029,21 +1017,30 @@ bool MBSlave::deactivateObject()
} }
} }
return UniSetObject::deactivateObject(); return UniSetObject::deactivateObject();
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void MBSlave::sigterm( int signo ) void MBSlave::sigterm( int signo )
{ {
mbinfo << myname << ": ********* SIGTERM(" << signo << ") ********" << endl; mbinfo << myname << ": ********* SIGTERM(" << signo << ") ********" << endl;
if( cancelled )
{
UniSetObject::sigterm(signo);
return;
}
activated = false; activated = false;
cancelled = true; cancelled = true;
if( tcpserver ) if( tcpserver )
{ {
cancelled = true; cancelled = true;
cerr << "********* MBSlave::sigterm" << endl;
if( tcpserver && tcpserver->isAcive() ) if( tcpserver->isActive() )
tcpserver->sigterm(signo); tcpserver->terminate();
} }
else else
{ {
...@@ -2464,6 +2461,7 @@ ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query, ...@@ -2464,6 +2461,7 @@ ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query,
ModbusRTU::DiagnosticRetMessage& reply ) ModbusRTU::DiagnosticRetMessage& reply )
{ {
auto mbserver = dynamic_pointer_cast<ModbusServer>(mbslot); auto mbserver = dynamic_pointer_cast<ModbusServer>(mbslot);
if( !mbserver ) if( !mbserver )
return ModbusRTU::erHardwareError; return ModbusRTU::erHardwareError;
...@@ -2497,8 +2495,8 @@ ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query, ...@@ -2497,8 +2495,8 @@ ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query,
if( query.subf == ModbusRTU::dgClearCounters ) if( query.subf == ModbusRTU::dgClearCounters )
{ {
mbserver->resetAskCounter(); mbserver->resetAskCounter();
mbserver->resetErrCount(erOperationFailed,0); mbserver->resetErrCount(erOperationFailed, 0);
mbserver->resetErrCount(ModbusRTU::erBadCheckSum,0); mbserver->resetErrCount(ModbusRTU::erBadCheckSum, 0);
// другие счётчики пока не сбрасываем.. // другие счётчики пока не сбрасываем..
reply = query; reply = query;
return ModbusRTU::erNoError; return ModbusRTU::erNoError;
......
...@@ -572,6 +572,7 @@ class MBSlave: ...@@ -572,6 +572,7 @@ class MBSlave:
std::string logserv_host = {""}; std::string logserv_host = {""};
int logserv_port = {0}; int logserv_port = {0};
VMonitor vmon; VMonitor vmon;
std::string mbtype = { "" };
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// TCPServer section.. // TCPServer section..
......
...@@ -276,8 +276,10 @@ void SharedMemory::sysCommand( const SystemMessage* sm ) ...@@ -276,8 +276,10 @@ void SharedMemory::sysCommand( const SystemMessage* sm )
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool SharedMemory::deactivateObject() bool SharedMemory::deactivateObject()
{ {
if( logserv ) cerr << myname << "************* deactivateObject()..." << endl;
logserv = nullptr; workready = false;
// if( logserv && logserv->isRunning() )
// logserv->terminate();
return IONotifyController::deactivateObject(); return IONotifyController::deactivateObject();
} }
...@@ -317,7 +319,7 @@ bool SharedMemory::activateObject() ...@@ -317,7 +319,7 @@ bool SharedMemory::activateObject()
activated = true; activated = true;
} }
cerr << "************************** activate: " << pt.getCurrent() << " msec " << endl; cout << myname << ": ********** activate: " << pt.getCurrent() << " msec " << endl;
return res; return res;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
...@@ -328,10 +330,14 @@ CORBA::Boolean SharedMemory::exist() ...@@ -328,10 +330,14 @@ CORBA::Boolean SharedMemory::exist()
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void SharedMemory::sigterm( int signo ) void SharedMemory::sigterm( int signo )
{ {
cerr << myname << "************* SIGTERM...." << endl;
workready = false;
if( signo == SIGTERM && wdt ) if( signo == SIGTERM && wdt )
wdt->stop(); wdt->stop();
// raise(SIGKILL); if( logserv && logserv->isRunning() )
logserv->terminate();
IONotifyController::sigterm(signo); IONotifyController::sigterm(signo);
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
class EventWatcher class EventWatcher
{ {
public: public:
EventWatcher(){} EventWatcher() {}
virtual ~EventWatcher(){} ~EventWatcher() {}
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! /*!
......
...@@ -34,7 +34,7 @@ class LogSession ...@@ -34,7 +34,7 @@ class LogSession
public: public:
LogSession(int sock, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000 ); LogSession(int sock, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000 );
virtual ~LogSession(); ~LogSession();
typedef sigc::slot<void, LogSession*> FinalSlot; typedef sigc::slot<void, LogSession*> FinalSlot;
void connectFinalSession( FinalSlot sl ); void connectFinalSession( FinalSlot sl );
......
...@@ -24,7 +24,8 @@ namespace UTCPCore ...@@ -24,7 +24,8 @@ namespace UTCPCore
{ {
pos = 0; pos = 0;
len = nbytes; len = nbytes;
if( len <=0 ) // ??!!
if( len <= 0 ) // ??!!
return; return;
data = new unsigned char[nbytes]; data = new unsigned char[nbytes];
...@@ -36,7 +37,7 @@ namespace UTCPCore ...@@ -36,7 +37,7 @@ namespace UTCPCore
pos = 0; pos = 0;
len = s.length(); len = s.length();
if( len <=0 ) // ??!! if( len <= 0 ) // ??!!
return; return;
data = new unsigned char[len]; data = new unsigned char[len];
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
\todo Разобратся с тем как отвечать на неправильные запросы! \todo Разобратся с тем как отвечать на неправильные запросы!
Формат ответных сообщений!!! Коды ошибок!!! Формат ответных сообщений!!! Коды ошибок!!!
\todo Доработать terminate, чтобы можно было прервать ожидание \todo Доработать terminate, чтобы можно было прервать ожидание
\todo Перейти на libev..
*/ */
class ModbusRTUSlave: class ModbusRTUSlave:
public ModbusServer public ModbusServer
...@@ -38,7 +39,7 @@ class ModbusRTUSlave: ...@@ -38,7 +39,7 @@ class ModbusRTUSlave:
} }
virtual void terminate() override; virtual void terminate() override;
virtual bool isAcive() override; virtual bool isActive() override;
protected: protected:
......
...@@ -58,11 +58,11 @@ class ModbusServer ...@@ -58,11 +58,11 @@ class ModbusServer
// --------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------
// сигнал вызова receive, ДО обработки realReceive() // сигнал вызова receive, ДО обработки realReceive()
// \return ModbusRTU::errNoError, тогда обработка продолжиться. // \return ModbusRTU::errNoError, тогда обработка продолжиться.
typedef sigc::signal<ModbusRTU::mbErrCode,const std::unordered_set<ModbusRTU::ModbusAddr>, timeout_t> PreReceiveSignal; typedef sigc::signal<ModbusRTU::mbErrCode, const std::unordered_set<ModbusRTU::ModbusAddr>, timeout_t> PreReceiveSignal;
PreReceiveSignal signal_pre_receive(); PreReceiveSignal signal_pre_receive();
// сигнал после обработки realReceive() // сигнал после обработки realReceive()
typedef sigc::signal<void,ModbusRTU::mbErrCode> PostReceiveSignal; typedef sigc::signal<void, ModbusRTU::mbErrCode> PostReceiveSignal;
PostReceiveSignal signal_post_receive(); PostReceiveSignal signal_post_receive();
// --------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------
...@@ -142,16 +142,19 @@ class ModbusServer ...@@ -142,16 +142,19 @@ class ModbusServer
virtual void cleanupChannel() {} virtual void cleanupChannel() {}
virtual void terminate() {} virtual void terminate() {}
virtual bool isAcive() = 0; virtual bool isActive() = 0;
// ------------ Статистика --------------- // ------------ Статистика ---------------
typedef std::unordered_map<ModbusRTU::mbErrCode, size_t> ExchangeErrorMap; typedef std::unordered_map<ModbusRTU::mbErrCode, size_t> ExchangeErrorMap;
ExchangeErrorMap getErrorMap(); ExchangeErrorMap getErrorMap();
size_t getErrCount( ModbusRTU::mbErrCode e ); size_t getErrCount( ModbusRTU::mbErrCode e );
size_t resetErrCount( ModbusRTU::mbErrCode e, size_t set=0 ); size_t resetErrCount( ModbusRTU::mbErrCode e, size_t set = 0 );
inline size_t getAskCount() { return askCount; } inline size_t getAskCount()
{
return askCount;
}
void resetAskCounter(); void resetAskCounter();
protected: protected:
......
...@@ -111,6 +111,8 @@ class ModbusTCPServer: ...@@ -111,6 +111,8 @@ class ModbusTCPServer:
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override; virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override;
virtual void mainLoop(); virtual void mainLoop();
void finish();
virtual void ioAccept(ev::io& watcher, int revents); virtual void ioAccept(ev::io& watcher, int revents);
void onTimer( ev::timer& t, int revents ); void onTimer( ev::timer& t, int revents );
......
...@@ -55,7 +55,7 @@ class ModbusTCPSession: ...@@ -55,7 +55,7 @@ class ModbusTCPSession:
// запуск обработки входящих запросов // запуск обработки входящих запросов
void run(); void run();
virtual bool isAcive() override; virtual bool isActive() override;
protected: protected:
......
...@@ -141,7 +141,7 @@ void ModbusRTUSlave::terminate() ...@@ -141,7 +141,7 @@ void ModbusRTUSlave::terminate()
catch(...) {} catch(...) {}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusRTUSlave::isAcive() bool ModbusRTUSlave::isActive()
{ {
return false; return false;
} }
......
...@@ -1530,22 +1530,23 @@ mbErrCode ModbusServer::receive(const std::unordered_set<ModbusAddr>& vaddr, tim ...@@ -1530,22 +1530,23 @@ mbErrCode ModbusServer::receive(const std::unordered_set<ModbusAddr>& vaddr, tim
if( !m_pre_signal.empty() ) if( !m_pre_signal.empty() )
{ {
ret = m_pre_signal.emit(vaddr,msecTimeout); ret = m_pre_signal.emit(vaddr, msecTimeout);
if( ret != erNoError && ret != erSessionClosed ) if( ret != erNoError && ret != erSessionClosed )
{ {
errmap[ret] +=1; errmap[ret] += 1;
return ret; return ret;
} }
} }
ret = realReceive(vaddr,msecTimeout); ret = realReceive(vaddr, msecTimeout);
// собираем статистику.. // собираем статистику..
if( ret != erTimeOut && ret != erSessionClosed ) if( ret != erTimeOut && ret != erSessionClosed )
askCount++; askCount++;
if( ret != erNoError && ret != erSessionClosed ) if( ret != erNoError && ret != erSessionClosed )
errmap[ret] +=1; errmap[ret] += 1;
if( ret != erSessionClosed ) if( ret != erSessionClosed )
m_post_signal.emit(ret); m_post_signal.emit(ret);
...@@ -1668,6 +1669,7 @@ ModbusServer::ExchangeErrorMap ModbusServer::getErrorMap() ...@@ -1668,6 +1669,7 @@ ModbusServer::ExchangeErrorMap ModbusServer::getErrorMap()
size_t ModbusServer::getErrCount( mbErrCode e ) size_t ModbusServer::getErrCount( mbErrCode e )
{ {
auto i = errmap.find(e); auto i = errmap.find(e);
if( i == errmap.end() ) if( i == errmap.end() )
return 0; return 0;
...@@ -1677,6 +1679,7 @@ size_t ModbusServer::getErrCount( mbErrCode e ) ...@@ -1677,6 +1679,7 @@ size_t ModbusServer::getErrCount( mbErrCode e )
size_t ModbusServer::resetErrCount( mbErrCode e, size_t set ) size_t ModbusServer::resetErrCount( mbErrCode e, size_t set )
{ {
auto i = errmap.find(e); auto i = errmap.find(e);
if( i == errmap.end() ) if( i == errmap.end() )
return 0; return 0;
......
...@@ -45,8 +45,8 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ): ...@@ -45,8 +45,8 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ):
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusTCPServer::~ModbusTCPServer() ModbusTCPServer::~ModbusTCPServer()
{ {
if( evloop ) if( cancelled )
evloop->terminate(this); finish();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::setMaxSessions( unsigned int num ) void ModbusTCPServer::setMaxSessions( unsigned int num )
...@@ -120,7 +120,7 @@ void ModbusTCPServer::mainLoop() ...@@ -120,7 +120,7 @@ void ModbusTCPServer::mainLoop()
{ {
evloop = DefaultEventLoop::inst(); evloop = DefaultEventLoop::inst();
evloop->run(this,false); evloop->run(this, false);
} }
if( dlog->is_info() ) if( dlog->is_info() )
...@@ -131,6 +131,11 @@ void ModbusTCPServer::mainLoop() ...@@ -131,6 +131,11 @@ void ModbusTCPServer::mainLoop()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::terminate() void ModbusTCPServer::terminate()
{ {
finish();
}
// -------------------------------------------------------------------------
void ModbusTCPServer::finish()
{
if( cancelled ) if( cancelled )
return; return;
...@@ -187,7 +192,7 @@ void ModbusTCPServer::getSessions( Sessions& lst ) ...@@ -187,7 +192,7 @@ void ModbusTCPServer::getSessions( Sessions& lst )
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusTCPServer::isAcive() bool ModbusTCPServer::isActive()
{ {
return !cancelled; return !cancelled;
} }
...@@ -328,6 +333,6 @@ mbErrCode ModbusTCPServer::preReceiveEvent(const std::unordered_set<ModbusAddr> ...@@ -328,6 +333,6 @@ mbErrCode ModbusTCPServer::preReceiveEvent(const std::unordered_set<ModbusAddr>
if( m_pre_signal.empty() ) if( m_pre_signal.empty() )
return ModbusRTU::erNoError; return ModbusRTU::erNoError;
return m_pre_signal.emit(vaddr,tout); return m_pre_signal.emit(vaddr, tout);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -171,6 +171,6 @@ void ModbusTCPServerSlot::sigterm( int signo ) ...@@ -171,6 +171,6 @@ void ModbusTCPServerSlot::sigterm( int signo )
{ {
terminate(); terminate();
} }
catch(...) {} catch( std::exception& ex ) {}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -98,7 +98,7 @@ void ModbusTCPSession::run() ...@@ -98,7 +98,7 @@ void ModbusTCPSession::run()
ioTimeout.start(sessTimeout); ioTimeout.start(sessTimeout);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusTCPSession::isAcive() bool ModbusTCPSession::isActive()
{ {
return io.is_active(); return io.is_active();
} }
......
...@@ -27,7 +27,6 @@ using namespace UniSetTypes; ...@@ -27,7 +27,6 @@ using namespace UniSetTypes;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::~LogServer() LogServer::~LogServer()
{ {
cerr << myname << " ~LogServer() " << endl;
if( running ) if( running )
terminate(); terminate();
} }
...@@ -84,18 +83,19 @@ void LogServer::terminate() ...@@ -84,18 +83,19 @@ void LogServer::terminate()
running = false; running = false;
if( evloop ) if( evloop )
{
cerr << myname << ": terminate evloop.." << endl;
evloop->terminate(this); evloop->terminate(this);
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread ) void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread )
{ {
addr = _addr; addr = _addr;
port = _port; port = _port;
if( !running ) if( !running )
{ {
if( !thread )
running = true;
mainLoop( thread ); mainLoop( thread );
running = true; running = true;
} }
...@@ -107,6 +107,7 @@ void LogServer::mainLoop( bool thread ) ...@@ -107,6 +107,7 @@ void LogServer::mainLoop( bool thread )
{ {
if( elog->is_crit() ) if( elog->is_crit() )
elog->crit() << myname << "(LogServer::mainLoopt): ALREADY RUNNING.." << endl; elog->crit() << myname << "(LogServer::mainLoopt): ALREADY RUNNING.." << endl;
return; return;
} }
...@@ -131,6 +132,7 @@ void LogServer::mainLoop( bool thread ) ...@@ -131,6 +132,7 @@ void LogServer::mainLoop( bool thread )
if( mylog.is_crit() ) if( mylog.is_crit() )
mylog.crit() << myname << "(LogServer): " << err.str() << endl; mylog.crit() << myname << "(LogServer): " << err.str() << endl;
throw SystemError( err.str() ); throw SystemError( err.str() );
} }
...@@ -140,7 +142,7 @@ void LogServer::mainLoop( bool thread ) ...@@ -140,7 +142,7 @@ void LogServer::mainLoop( bool thread )
io.set<LogServer, &LogServer::ioAccept>(this); io.set<LogServer, &LogServer::ioAccept>(this);
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
// скобки специально чтобы пораньшк выйти из "зоны" видимости // скобки специально чтобы пораньше освободить evloop (выйти из "зоны" видимости)
{ {
evloop = DefaultEventLoop::inst(); evloop = DefaultEventLoop::inst();
evloop->run( this, thread ); evloop->run( this, thread );
...@@ -168,6 +170,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents ) ...@@ -168,6 +170,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
{ {
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
if( slist.size() >= sessMaxCount ) if( slist.size() >= sessMaxCount )
{ {
if( mylog.is_crit() ) if( mylog.is_crit() )
......
...@@ -90,7 +90,7 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _ ...@@ -90,7 +90,7 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _
io.set<LogSession, &LogSession::callback>(this); io.set<LogSession, &LogSession::callback>(this);
cmdTimer.set<LogSession, &LogSession::onCmdTimeout>(this); cmdTimer.set<LogSession, &LogSession::onCmdTimeout>(this);
asyncEvent.set<LogSession,&LogSession::event>(this); asyncEvent.set<LogSession, &LogSession::event>(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::logOnEvent( const std::string& s ) void LogSession::logOnEvent( const std::string& s )
...@@ -100,6 +100,7 @@ void LogSession::logOnEvent( const std::string& s ) ...@@ -100,6 +100,7 @@ void LogSession::logOnEvent( const std::string& s )
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
logbuf.emplace(new UTCPCore::Buffer(s)); logbuf.emplace(new UTCPCore::Buffer(s));
if( asyncEvent.is_active() ) if( asyncEvent.is_active() )
asyncEvent.send(); asyncEvent.send();
} }
...@@ -112,7 +113,7 @@ void LogSession::run() ...@@ -112,7 +113,7 @@ void LogSession::run()
mylog.info() << peername << "(run): run session.." << endl; mylog.info() << peername << "(run): run session.." << endl;
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
cmdTimer.start( cmdTimeout/1000. ); cmdTimer.start( cmdTimeout / 1000. );
// asyncEvent.start(); // слать логи начинаем только после обработки команд.. если есть.. // asyncEvent.start(); // слать логи начинаем только после обработки команд.. если есть..
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -122,18 +123,22 @@ void LogSession::terminate() ...@@ -122,18 +123,22 @@ void LogSession::terminate()
mylog.info() << peername << "(terminate)..." << endl; mylog.info() << peername << "(terminate)..." << endl;
cancelled = true; cancelled = true;
std::unique_lock<std::mutex> lk(logbuf_mutex);
{
std::unique_lock<std::mutex> lk2(io_mutex);
io.stop();
cmdTimer.stop();
asyncEvent.stop();
conn.disconnect();
}
{ {
std::unique_lock<std::mutex> lk(logbuf_mutex);
while( !logbuf.empty() ) while( !logbuf.empty() )
logbuf.pop(); logbuf.pop();
} }
std::unique_lock<std::mutex> lk2(io_mutex);
io.stop();
cmdTimer.stop();
asyncEvent.stop();
conn.disconnect();
sock.reset(); // close.. sock.reset(); // close..
final(); final();
} }
...@@ -196,6 +201,7 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -196,6 +201,7 @@ void LogSession::writeEvent( ev::io& watcher )
return; return;
auto buffer = logbuf.front(); auto buffer = logbuf.front();
if( !buffer ) if( !buffer )
return; return;
...@@ -226,6 +232,7 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -226,6 +232,7 @@ void LogSession::writeEvent( ev::io& watcher )
} }
std::unique_lock<std::mutex> lk1(io_mutex); std::unique_lock<std::mutex> lk1(io_mutex);
if( logbuf.empty() ) if( logbuf.empty() )
io.set(ev::READ); io.set(ev::READ);
else else
...@@ -268,6 +275,7 @@ void LogSession::readEvent( ev::io& watcher ) ...@@ -268,6 +275,7 @@ void LogSession::readEvent( ev::io& watcher )
{ {
if( mylog.is_warn() ) if( mylog.is_warn() )
mylog.warn() << peername << "(readEvent): BAD MESSAGE..." << endl; mylog.warn() << peername << "(readEvent): BAD MESSAGE..." << endl;
return; return;
} }
...@@ -356,38 +364,38 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes:: ...@@ -356,38 +364,38 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::
{ {
case LogServerTypes::cmdSetLevel: case LogServerTypes::cmdSetLevel:
l.log->level( (Debug::type)msg.data ); l.log->level( (Debug::type)msg.data );
break; break;
case LogServerTypes::cmdAddLevel: case LogServerTypes::cmdAddLevel:
l.log->addLevel( (Debug::type)msg.data ); l.log->addLevel( (Debug::type)msg.data );
break; break;
case LogServerTypes::cmdDelLevel: case LogServerTypes::cmdDelLevel:
l.log->delLevel( (Debug::type)msg.data ); l.log->delLevel( (Debug::type)msg.data );
break; break;
case LogServerTypes::cmdRotate: case LogServerTypes::cmdRotate:
l.log->onLogFile(true); l.log->onLogFile(true);
break; break;
case LogServerTypes::cmdList: // обработали выше (в начале) case LogServerTypes::cmdList: // обработали выше (в начале)
break; break;
case LogServerTypes::cmdOffLogFile: case LogServerTypes::cmdOffLogFile:
l.log->offLogFile(); l.log->offLogFile();
break; break;
case LogServerTypes::cmdOnLogFile: case LogServerTypes::cmdOnLogFile:
l.log->onLogFile(); l.log->onLogFile();
break; break;
case LogServerTypes::cmdFilterMode: case LogServerTypes::cmdFilterMode:
l.log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) ); l.log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
break; break;
default: default:
mylog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl; mylog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break; break;
} }
} // end if for } // end if for
......
...@@ -37,7 +37,6 @@ ...@@ -37,7 +37,6 @@
#include "Debug.h" #include "Debug.h"
#include "Configuration.h" #include "Configuration.h"
#include "Mutex.h" #include "Mutex.h"
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace std; using namespace std;
......
...@@ -31,8 +31,9 @@ void DefaultEventLoop::run( EventWatcher* s, bool thread ) ...@@ -31,8 +31,9 @@ void DefaultEventLoop::run( EventWatcher* s, bool thread )
{ {
std::unique_lock<std::mutex> lk(m_run_mutex); std::unique_lock<std::mutex> lk(m_run_mutex);
if( !thr ) if( !thr )
thr = make_shared<std::thread>( [=] { defaultLoop(); } ); thr = make_shared<std::thread>( [ = ] { defaultLoop(); } );
} }
{ {
...@@ -43,6 +44,7 @@ void DefaultEventLoop::run( EventWatcher* s, bool thread ) ...@@ -43,6 +44,7 @@ void DefaultEventLoop::run( EventWatcher* s, bool thread )
if( !thread ) if( !thread )
{ {
std::unique_lock<std::mutex> lk(m_mutex); std::unique_lock<std::mutex> lk(m_mutex);
while( !m_notify ) while( !m_notify )
m_event.wait(lk); m_event.wait(lk);
...@@ -60,7 +62,8 @@ void DefaultEventLoop::terminate( EventWatcher* s ) ...@@ -60,7 +62,8 @@ void DefaultEventLoop::terminate( EventWatcher* s )
{ {
cerr << "(DefaultEventLoop::defaultLoop): terminate.." << endl; cerr << "(DefaultEventLoop::defaultLoop): terminate.." << endl;
std::unique_lock<std::mutex> lk(m_slist_mutex); std::unique_lock<std::mutex> lk(m_slist_mutex);
for( auto i=slist.begin(); i!=slist.end(); i++ )
for( auto i = slist.begin(); i != slist.end(); i++ )
{ {
if( (*i) == s ) if( (*i) == s )
{ {
...@@ -77,6 +80,7 @@ void DefaultEventLoop::finish() ...@@ -77,6 +80,7 @@ void DefaultEventLoop::finish()
{ {
cerr << "(DefaultEventLoop::fini): TERMINATE EVENT LOOP.." << endl; cerr << "(DefaultEventLoop::fini): TERMINATE EVENT LOOP.." << endl;
cancelled = true; cancelled = true;
if( !evloop ) if( !evloop )
return; return;
...@@ -84,7 +88,7 @@ void DefaultEventLoop::finish() ...@@ -84,7 +88,7 @@ void DefaultEventLoop::finish()
evloop->break_loop(ev::ALL); evloop->break_loop(ev::ALL);
std::unique_lock<std::mutex> lk(m_mutex); std::unique_lock<std::mutex> lk(m_mutex);
m_event.wait_for(lk, std::chrono::seconds(3), [=]() m_event.wait_for(lk, std::chrono::seconds(1), [ = ]()
{ {
return (m_notify == true); return (m_notify == true);
} ); } );
......
...@@ -230,11 +230,11 @@ TEST_CASE("MaxSessions", "[LogServer]" ) ...@@ -230,11 +230,11 @@ TEST_CASE("MaxSessions", "[LogServer]" )
{ {
uniset_mutex_lock l(r2_mutex); uniset_mutex_lock l(r2_mutex);
/* /*
// Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1' // Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1'
size_t pos = msg2.str().find("Exceeded the limit"); size_t pos = msg2.str().find("Exceeded the limit");
REQUIRE( pos != std::string::npos ); REQUIRE( pos != std::string::npos );
*/ */
// ничего не получили.. // ничего не получили..
REQUIRE( msg2.str() == "" ); REQUIRE( msg2.str() == "" );
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment