Commit 465d626e authored by Pavel Vainerman's avatar Pavel Vainerman

Различные "флаги" ф процессах переведены на использование std::atomic_bool

Реализовано корректное завершение потоков в MBTCPMultiSlave Различные мелки исправления
parent 56f35e19
...@@ -22,6 +22,7 @@ respond_id(DefaultObjectId), ...@@ -22,6 +22,7 @@ respond_id(DefaultObjectId),
respond_invert(false), respond_invert(false),
askCount(0), askCount(0),
activated(false), activated(false),
cancelled(false),
activateTimeout(500), activateTimeout(500),
pingOK(true), pingOK(true),
force(false), force(false),
...@@ -120,6 +121,9 @@ prefix(prefix) ...@@ -120,6 +121,9 @@ prefix(prefix)
mbslot = mbtcp; mbslot = mbtcp;
thr = new ThreadCreator<MBSlave>(this,&MBSlave::execute_tcp); thr = new ThreadCreator<MBSlave>(this,&MBSlave::execute_tcp);
dinfo << myname << "(init): init TCP connection ok. " << " inet=" << iaddr << " port=" << port << endl; dinfo << myname << "(init): init TCP connection ok. " << " inet=" << iaddr << " port=" << port << endl;
if( dlog.debugging(Debug::LEVEL9) )
mbtcp->setLog(dlog);
} }
else else
throw UniSetTypes::SystemError(myname+"(MBSlave): Unknown slave type. Use: --mbs-type [RTU|TCP]"); throw UniSetTypes::SystemError(myname+"(MBSlave): Unknown slave type. Use: --mbs-type [RTU|TCP]");
...@@ -357,9 +361,17 @@ prefix(prefix) ...@@ -357,9 +361,17 @@ prefix(prefix)
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
MBSlave::~MBSlave() MBSlave::~MBSlave()
{ {
cancelled = true;
if( thr && thr->isRunning() )
{
thr->stop();
thr->join();
}
delete thr;
delete mbslot; delete mbslot;
delete shm; delete shm;
delete thr;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void MBSlave::waitSMReady() void MBSlave::waitSMReady()
...@@ -386,7 +398,7 @@ void MBSlave::execute_rtu() ...@@ -386,7 +398,7 @@ void MBSlave::execute_rtu()
ModbusRTU::mbErrCode prev = erNoError; ModbusRTU::mbErrCode prev = erNoError;
while(1) while( !cancelled )
{ {
try try
{ {
...@@ -467,7 +479,9 @@ void MBSlave::execute_tcp() ...@@ -467,7 +479,9 @@ void MBSlave::execute_tcp()
ModbusRTU::mbErrCode prev = erNoError; ModbusRTU::mbErrCode prev = erNoError;
while(1) dinfo << myname << "(execute_tcp): thread running.." << endl;
while( !cancelled )
{ {
try try
{ {
...@@ -540,6 +554,8 @@ void MBSlave::execute_tcp() ...@@ -540,6 +554,8 @@ void MBSlave::execute_tcp()
} }
catch(...){} catch(...){}
} }
dinfo << myname << "(execute_tcp): thread stopped.." << endl;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void MBSlave::sysCommand( const UniSetTypes::SystemMessage *sm ) void MBSlave::sysCommand( const UniSetTypes::SystemMessage *sm )
...@@ -721,10 +737,27 @@ bool MBSlave::activateObject() ...@@ -721,10 +737,27 @@ bool MBSlave::activateObject()
return true; return true;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool MBSlave::disactivateObject()
{
dinfo << myname << "(disactivateObject): ..." << endl;
activated = false;
cancelled = true;
try
{
if( mbslot )
mbslot->sigterm(SIGTERM);
}
catch(...){}
return UniSetObject_LT::disactivateObject();
}
// ------------------------------------------------------------------------------------------
void MBSlave::sigterm( int signo ) void MBSlave::sigterm( int signo )
{ {
dinfo << myname << ": ********* SIGTERM(" << signo <<") ********" << endl; dinfo << myname << ": ********* SIGTERM(" << signo <<") ********" << endl;
activated = false; activated = false;
cancelled = true;
try try
{ {
if( mbslot ) if( mbslot )
......
...@@ -144,6 +144,7 @@ class MBSlave: ...@@ -144,6 +144,7 @@ class MBSlave:
virtual void execute_tcp(); virtual void execute_tcp();
virtual bool activateObject() override; virtual bool activateObject() override;
virtual bool disactivateObject() override;
// действия при завершении работы // действия при завершении работы
virtual void sigterm( int signo ) override; virtual void sigterm( int signo ) override;
...@@ -187,6 +188,7 @@ class MBSlave: ...@@ -187,6 +188,7 @@ class MBSlave:
ExchangeErrorMap errmap; /*!< статистика обмена */ ExchangeErrorMap errmap; /*!< статистика обмена */
std::atomic_bool activated; std::atomic_bool activated;
std::atomic_bool cancelled;
int activateTimeout; int activateTimeout;
bool pingOK; bool pingOK;
timeout_t wait_msec; timeout_t wait_msec;
......
...@@ -150,7 +150,9 @@ void MBTCPMultiSlave::execute_tcp() ...@@ -150,7 +150,9 @@ void MBTCPMultiSlave::execute_tcp()
sslot->setMaxSessions( sessMaxNum ); sslot->setMaxSessions( sessMaxNum );
while(1) dinfo << myname << "(execute_tcp): thread running.." << endl;
while( !cancelled )
{ {
try try
{ {
...@@ -286,6 +288,8 @@ void MBTCPMultiSlave::execute_tcp() ...@@ -286,6 +288,8 @@ void MBTCPMultiSlave::execute_tcp()
} }
catch(...){} catch(...){}
} }
dinfo << myname << "(execute_tcp): thread stopped.." << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void MBTCPMultiSlave::initIterators() void MBTCPMultiSlave::initIterators()
...@@ -298,3 +302,27 @@ void MBTCPMultiSlave::initIterators() ...@@ -298,3 +302,27 @@ void MBTCPMultiSlave::initIterators()
i.second.initIterators(shm); i.second.initIterators(shm);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool MBTCPMultiSlave::disactivateObject()
{
if( mbslot )
{
ModbusTCPServerSlot* sslot = dynamic_cast<ModbusTCPServerSlot*>(mbslot);
if( sslot )
sslot->sigterm(SIGTERM);
}
return MBSlave::disactivateObject();
}
// -----------------------------------------------------------------------------
void MBTCPMultiSlave::sigterm( int signo )
{
if( mbslot )
{
ModbusTCPServerSlot* sslot = dynamic_cast<ModbusTCPServerSlot*>(mbslot);
if( sslot )
sslot->sigterm(signo);
}
MBSlave::sigterm(signo);
}
// -----------------------------------------------------------------------------
...@@ -35,6 +35,8 @@ class MBTCPMultiSlave: ...@@ -35,6 +35,8 @@ class MBTCPMultiSlave:
protected: protected:
virtual void execute_tcp() override; virtual void execute_tcp() override;
virtual void initIterators() override; virtual void initIterators() override;
virtual bool disactivateObject() override;
virtual void sigterm( int signo ) override;
timeout_t sessTimeout; /*!< таймаут на сессию */ timeout_t sessTimeout; /*!< таймаут на сессию */
timeout_t waitTimeout; timeout_t waitTimeout;
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
uniset2-start.sh -f ./uniset2-mbtcp-multislave --confile test.xml --dlog-add-levels level3,level4 \ uniset2-start.sh -f ./uniset2-mbtcp-multislave --confile test.xml --dlog-add-levels level3,level4 \
--smemory-id SharedMemory \ --smemory-id SharedMemory \
--mbs-name MBMultiSlave1 --mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \ --mbs-name MBMultiSlave1 --mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \
$*
# --mbs-force 1 # --mbs-force 1
#--mbs-reg-from-id 1 \ #--mbs-reg-from-id 1 \
#--mbs-filter-field CAN2sender --mbs-filter-value SYSTSNode \ #--mbs-filter-field CAN2sender --mbs-filter-value SYSTSNode \
\ No newline at end of file
...@@ -240,9 +240,7 @@ bool SharedMemory::activateObject() ...@@ -240,9 +240,7 @@ bool SharedMemory::activateObject()
// инициализируем указатели // инициализируем указатели
for( auto &it: hlist ) for( auto &it: hlist )
{
it.ioit = myioEnd(); it.ioit = myioEnd();
}
itPulsar = myioEnd(); itPulsar = myioEnd();
...@@ -261,7 +259,6 @@ bool SharedMemory::activateObject() ...@@ -261,7 +259,6 @@ bool SharedMemory::activateObject()
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
CORBA::Boolean SharedMemory::exist() CORBA::Boolean SharedMemory::exist()
{ {
// return activated;
return workready; return workready;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
...@@ -270,6 +267,7 @@ void SharedMemory::sigterm( int signo ) ...@@ -270,6 +267,7 @@ void SharedMemory::sigterm( int signo )
if( signo == SIGTERM ) if( signo == SIGTERM )
wdt->stop(); wdt->stop();
// raise(SIGKILL); // raise(SIGKILL);
IONotifyController_LT::sigterm(signo);
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void SharedMemory::checkHeartBeat() void SharedMemory::checkHeartBeat()
......
...@@ -32,20 +32,20 @@ namespace UniSetExtensions ...@@ -32,20 +32,20 @@ namespace UniSetExtensions
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// "синтаксический сахар"..для логов // "синтаксический сахар"..для логов
#define dinfo if( UniSetExtensions::dlog.debugging(Debug::INFO) ) UniSetExtensions::dlog #define dinfo if( UniSetExtensions::dlog.debugging(Debug::INFO) ) UniSetExtensions::dlog[Debug::INFO]
#define dwarn if( UniSetExtensions::dlog.debugging(Debug::WARN) ) UniSetExtensions::dlog #define dwarn if( UniSetExtensions::dlog.debugging(Debug::WARN) ) UniSetExtensions::dlog[Debug::WARN]
#define dcrit if( UniSetExtensions::dlog.debugging(Debug::CRIT) ) UniSetExtensions::dlog #define dcrit if( UniSetExtensions::dlog.debugging(Debug::CRIT) ) UniSetExtensions::dlog[Debug::CRIT]
#define dlog1 if( UniSetExtensions::dlog.debugging(Debug::LEVEL1) ) UniSetExtensions::dlog #define dlog1 if( UniSetExtensions::dlog.debugging(Debug::LEVEL1) ) UniSetExtensions::dlog[Debug::LEVEL1]
#define dlog2 if( UniSetExtensions::dlog.debugging(Debug::LEVEL2) ) UniSetExtensions::dlog #define dlog2 if( UniSetExtensions::dlog.debugging(Debug::LEVEL2) ) UniSetExtensions::dlog[Debug::LEVEL2]
#define dlog3 if( UniSetExtensions::dlog.debugging(Debug::LEVEL3) ) UniSetExtensions::dlog #define dlog3 if( UniSetExtensions::dlog.debugging(Debug::LEVEL3) ) UniSetExtensions::dlog[Debug::LEVEL3]
#define dlog4 if( UniSetExtensions::dlog.debugging(Debug::LEVEL4) ) UniSetExtensions::dlog #define dlog4 if( UniSetExtensions::dlog.debugging(Debug::LEVEL4) ) UniSetExtensions::dlog[Debug::LEVEL4]
#define dlog5 if( UniSetExtensions::dlog.debugging(Debug::LEVEL5) ) UniSetExtensions::dlog #define dlog5 if( UniSetExtensions::dlog.debugging(Debug::LEVEL5) ) UniSetExtensions::dlog[Debug::LEVEL5]
#define dlog6 if( UniSetExtensions::dlog.debugging(Debug::LEVEL6) ) UniSetExtensions::dlog #define dlog6 if( UniSetExtensions::dlog.debugging(Debug::LEVEL6) ) UniSetExtensions::dlog[Debug::LEVEL6]
#define dlog7 if( UniSetExtensions::dlog.debugging(Debug::LEVEL7) ) UniSetExtensions::dlog #define dlog7 if( UniSetExtensions::dlog.debugging(Debug::LEVEL7) ) UniSetExtensions::dlog[Debug::LEVEL7]
#define dlog8 if( UniSetExtensions::dlog.debugging(Debug::LEVEL8) ) UniSetExtensions::dlog #define dlog8 if( UniSetExtensions::dlog.debugging(Debug::LEVEL8) ) UniSetExtensions::dlog[Debug::LEVEL8]
#define dlog9 if( UniSetExtensions::dlog.debugging(Debug::LEVEL9) ) UniSetExtensions::dlog #define dlog9 if( UniSetExtensions::dlog.debugging(Debug::LEVEL9) ) UniSetExtensions::dlog[Debug::LEVEL9]
#define dlogsys if( UniSetExtensions::dlog.debugging(Debug::SYSTEM) ) UniSetExtensions::dlog #define dlogsys if( UniSetExtensions::dlog.debugging(Debug::SYSTEM) ) UniSetExtensions::dlog[Debug::SYSTEM]
#define dlogrep if( UniSetExtensions::dlog.debugging(Debug::REPOSITORY) ) UniSetExtensions::dlog #define dlogrep if( UniSetExtensions::dlog.debugging(Debug::REPOSITORY) ) UniSetExtensions::dlog[Debug::REPOSITORY]
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // Extensions_H_ #endif // Extensions_H_
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -249,19 +249,19 @@ namespace UniSetTypes ...@@ -249,19 +249,19 @@ namespace UniSetTypes
} // end of UniSetTypes namespace } // end of UniSetTypes namespace
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// "синтаксический сахар"..для логов // "синтаксический сахар"..для логов
#define uinfo if( UniSetTypes::ulog.debugging(Debug::INFO) ) UniSetTypes::ulog #define uinfo if( UniSetTypes::ulog.debugging(Debug::INFO) ) UniSetTypes::ulog[Debug::INFO]
#define uwarn if( UniSetTypes::ulog.debugging(Debug::WARN) ) UniSetTypes::ulog #define uwarn if( UniSetTypes::ulog.debugging(Debug::WARN) ) UniSetTypes::ulog[Debug::WARN]
#define ucrit if( UniSetTypes::ulog.debugging(Debug::CRIT) ) UniSetTypes::ulog #define ucrit if( UniSetTypes::ulog.debugging(Debug::CRIT) ) UniSetTypes::ulog[Debug::CRIT]
#define ulog1 if( UniSetTypes::ulog.debugging(Debug::LEVEL1) ) UniSetTypes::ulog #define ulog1 if( UniSetTypes::ulog.debugging(Debug::LEVEL1) ) UniSetTypes::ulog[Debug::LEVEL1]
#define ulog2 if( UniSetTypes::ulog.debugging(Debug::LEVEL2) ) UniSetTypes::ulog #define ulog2 if( UniSetTypes::ulog.debugging(Debug::LEVEL2) ) UniSetTypes::ulog[Debug::LEVEL2]
#define ulog3 if( UniSetTypes::ulog.debugging(Debug::LEVEL3) ) UniSetTypes::ulog #define ulog3 if( UniSetTypes::ulog.debugging(Debug::LEVEL3) ) UniSetTypes::ulog[Debug::LEVEL3]
#define ulog4 if( UniSetTypes::ulog.debugging(Debug::LEVEL4) ) UniSetTypes::ulog #define ulog4 if( UniSetTypes::ulog.debugging(Debug::LEVEL4) ) UniSetTypes::ulog[Debug::LEVEL4]
#define ulog5 if( UniSetTypes::ulog.debugging(Debug::LEVEL5) ) UniSetTypes::ulog #define ulog5 if( UniSetTypes::ulog.debugging(Debug::LEVEL5) ) UniSetTypes::ulog[Debug::LEVEL5]
#define ulog6 if( UniSetTypes::ulog.debugging(Debug::LEVEL6) ) UniSetTypes::ulog #define ulog6 if( UniSetTypes::ulog.debugging(Debug::LEVEL6) ) UniSetTypes::ulog[Debug::LEVEL6]
#define ulog7 if( UniSetTypes::ulog.debugging(Debug::LEVEL7) ) UniSetTypes::ulog #define ulog7 if( UniSetTypes::ulog.debugging(Debug::LEVEL7) ) UniSetTypes::ulog[Debug::LEVEL7]
#define ulog8 if( UniSetTypes::ulog.debugging(Debug::LEVEL8) ) UniSetTypes::ulog #define ulog8 if( UniSetTypes::ulog.debugging(Debug::LEVEL8) ) UniSetTypes::ulog[Debug::LEVEL8]
#define ulog9 if( UniSetTypes::ulog.debugging(Debug::LEVEL9) ) UniSetTypes::ulog #define ulog9 if( UniSetTypes::ulog.debugging(Debug::LEVEL9) ) UniSetTypes::ulog[Debug::LEVEL9]
#define ulogsys if( UniSetTypes::ulog.debugging(Debug::SYSTEM) ) UniSetTypes::ulog #define ulogsys if( UniSetTypes::ulog.debugging(Debug::SYSTEM) ) UniSetTypes::ulog[Debug::SYSTEM]
#define ulogrep if( UniSetTypes::ulog.debugging(Debug::REPOSITORY) ) UniSetTypes::ulog #define ulogrep if( UniSetTypes::ulog.debugging(Debug::REPOSITORY) ) UniSetTypes::ulog[Debug::REPOSITORY]
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#endif // Configuration_H_ #endif // Configuration_H_
...@@ -59,12 +59,10 @@ namespace UniSetTypes ...@@ -59,12 +59,10 @@ namespace UniSetTypes
std::ostream& operator<<(std::ostream& os, uniset_mutex& m ); std::ostream& operator<<(std::ostream& os, uniset_mutex& m );
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! \class uniset_mutex_lock /*! \class uniset_mutex_lock
* \author Pavel Vainerman * \author Pavel Vainerman
* *
* Предназначен для блокирования совместного доступа. Как пользоваться см. \ref MutexHowToPage * Предназначен для блокирования совместного доступа. Как пользоваться см. \ref MutexHowToPage
* \note Если ресурс уже занят, то lock ждет его освобождения... * \note Если ресурс уже занят, то lock ждет его освобождения...
* \warning Насколько ожидание защищено от зависания надо еще проверять!
* \todo Может на откуп пользователям оставить проверку занятости ресурса перед захватом? может не ждать?
*/ */
class uniset_mutex_lock class uniset_mutex_lock
{ {
...@@ -76,7 +74,7 @@ namespace UniSetTypes ...@@ -76,7 +74,7 @@ namespace UniSetTypes
private: private:
uniset_mutex* mutex; uniset_mutex* mutex;
std::atomic<int> locked; std::atomic_bool locked;
uniset_mutex_lock(const uniset_mutex_lock&)=delete; uniset_mutex_lock(const uniset_mutex_lock&)=delete;
uniset_mutex_lock& operator=(const uniset_mutex_lock&)=delete; uniset_mutex_lock& operator=(const uniset_mutex_lock&)=delete;
......
...@@ -120,7 +120,8 @@ class UniSetManager: ...@@ -120,7 +120,8 @@ class UniSetManager:
UniSetManager(); UniSetManager();
enum OManagerCommand{deactiv, activ, initial, term}; enum OManagerCommand{ deactiv, activ, initial, term };
friend std::ostream& operator<<(std::ostream& os, OManagerCommand& cmd );
// работа со списком объектов // работа со списком объектов
void objects(OManagerCommand cmd); void objects(OManagerCommand cmd);
......
...@@ -186,7 +186,7 @@ class UniSetObject: ...@@ -186,7 +186,7 @@ class UniSetObject:
virtual void cleanMsgQueue( MessagesQueue& q ); virtual void cleanMsgQueue( MessagesQueue& q );
inline bool isActive(){ return active; } inline bool isActive(){ return active; }
inline void setActive( bool set ){ active = set ? 1 : 0; } inline void setActive( bool set ){ active = set; }
UniSetTypes::VoidMessage msg; UniSetTypes::VoidMessage msg;
UniSetManager* mymngr; UniSetManager* mymngr;
......
...@@ -95,6 +95,7 @@ class ModbusTCPServer: ...@@ -95,6 +95,7 @@ class ModbusTCPServer:
private: private:
std::atomic_bool cancelled;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // ModbusTCPServer_H_ #endif // ModbusTCPServer_H_
......
...@@ -103,6 +103,8 @@ class ModbusTCPSession: ...@@ -103,6 +103,8 @@ class ModbusTCPSession:
FinalSlot slFin; FinalSlot slFin;
std::atomic_bool cancelled;
// статистика // статистика
UniSetTypes::uniset_rwmutex mAsk; UniSetTypes::uniset_rwmutex mAsk;
unsigned int askCount; unsigned int askCount;
......
...@@ -14,7 +14,8 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress &ia, int port ): ...@@ -14,7 +14,8 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress &ia, int port ):
ignoreAddr(false), ignoreAddr(false),
maxSessions(10), maxSessions(10),
sessCount(0), sessCount(0),
sessTimeout(10000) sessTimeout(10000),
cancelled(false)
{ {
setCRCNoCheckit(true); setCRCNoCheckit(true);
} }
...@@ -23,6 +24,16 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress &ia, int port ): ...@@ -23,6 +24,16 @@ ModbusTCPServer::ModbusTCPServer( ost::InetAddress &ia, int port ):
ModbusTCPServer::~ModbusTCPServer() ModbusTCPServer::~ModbusTCPServer()
{ {
terminate(); terminate();
/*
{
uniset_mutex_lock l(sMutex);
for( auto& s: slist )
{
if( s->isRunning() )
s->ost::Thread::join();
}
}
*/
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::setMaxSessions( unsigned int num ) void ModbusTCPServer::setMaxSessions( unsigned int num )
...@@ -61,10 +72,16 @@ bool ModbusTCPServer::waitQuery( ModbusRTU::ModbusAddr mbaddr, timeout_t msec ) ...@@ -61,10 +72,16 @@ bool ModbusTCPServer::waitQuery( ModbusRTU::ModbusAddr mbaddr, timeout_t msec )
if( sessCount >= maxSessions ) if( sessCount >= maxSessions )
return false; return false;
if( cancelled )
return false;
try try
{ {
if( isPendingConnection(msec) ) if( isPendingConnection(msec) )
{ {
if( cancelled )
return false;
ModbusTCPSession* s = new ModbusTCPSession(*this,mbaddr,sessTimeout); ModbusTCPSession* s = new ModbusTCPSession(*this,mbaddr,sessTimeout);
s->connectReadCoil( sigc::mem_fun(this, &ModbusTCPServer::readCoilStatus) ); s->connectReadCoil( sigc::mem_fun(this, &ModbusTCPServer::readCoilStatus) );
...@@ -340,11 +357,23 @@ void ModbusTCPServer::cleanInputStream() ...@@ -340,11 +357,23 @@ void ModbusTCPServer::cleanInputStream()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::terminate() void ModbusTCPServer::terminate()
{ {
cancelled = true;
if( dlog.is_info() ) if( dlog.is_info() )
dlog.info() << "(ModbusTCPServer): terminate..." << endl; dlog.info() << "(ModbusTCPServer): terminate..." << endl;
if( tcp && tcp.isConnected() ) if( tcp && tcp.isConnected() )
tcp.disconnect(); tcp.disconnect();
uniset_mutex_lock l(sMutex);
for( auto &s: slist )
{
try
{
s->terminate();
}
catch(...){}
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPServer::sessionFinished( ModbusTCPSession* s ) void ModbusTCPServer::sessionFinished( ModbusTCPSession* s )
......
...@@ -140,7 +140,7 @@ ModbusRTU::mbErrCode ModbusTCPServerSlot::remoteService( ModbusRTU::RemoteServic ...@@ -140,7 +140,7 @@ ModbusRTU::mbErrCode ModbusTCPServerSlot::remoteService( ModbusRTU::RemoteServic
return slRemoteService(query,reply); return slRemoteService(query,reply);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusRTU::mbErrCode ModbusTCPServerSlot::fileTransfer( ModbusRTU::FileTransferMessage& query, ModbusRTU::mbErrCode ModbusTCPServerSlot::fileTransfer( ModbusRTU::FileTransferMessage& query,
ModbusRTU::FileTransferRetMessage& reply ) ModbusRTU::FileTransferRetMessage& reply )
{ {
if( !slFileTransfer ) if( !slFileTransfer )
......
...@@ -14,6 +14,9 @@ using namespace UniSetTypes; ...@@ -14,6 +14,9 @@ using namespace UniSetTypes;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusTCPSession::~ModbusTCPSession() ModbusTCPSession::~ModbusTCPSession()
{ {
cancelled = true;
if( isRunning() )
ost::Thread::join();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusTCPSession::ModbusTCPSession( ost::TCPSocket &server, ModbusRTU::ModbusAddr a, timeout_t timeout ): ModbusTCPSession::ModbusTCPSession( ost::TCPSocket &server, ModbusRTU::ModbusAddr a, timeout_t timeout ):
...@@ -22,6 +25,7 @@ addr(a), ...@@ -22,6 +25,7 @@ addr(a),
timeout(timeout), timeout(timeout),
peername(""), peername(""),
caddr(""), caddr(""),
cancelled(false),
askCount(0) askCount(0)
{ {
setCRCNoCheckit(true); setCRCNoCheckit(true);
...@@ -35,6 +39,9 @@ unsigned int ModbusTCPSession::getAskCount() ...@@ -35,6 +39,9 @@ unsigned int ModbusTCPSession::getAskCount()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPSession::run() void ModbusTCPSession::run()
{ {
if( cancelled )
return;
{ {
ost::tpport_t p; ost::tpport_t p;
ost::InetAddress iaddr = getIPV4Peer(&p); ost::InetAddress iaddr = getIPV4Peer(&p);
...@@ -50,8 +57,12 @@ void ModbusTCPSession::run() ...@@ -50,8 +57,12 @@ void ModbusTCPSession::run()
// cerr << "**************** CREATE SESS FOR " << string( inet_ntoa(a) ) << endl; // cerr << "**************** CREATE SESS FOR " << string( inet_ntoa(a) ) << endl;
} }
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << peername << "(run): run thread of sessions.." << endl;
ModbusRTU::mbErrCode res = erTimeOut; ModbusRTU::mbErrCode res = erTimeOut;
while( isPending(Socket::pendingInput, timeout) ) cancelled = false;
while( !cancelled && isPending(Socket::pendingInput, timeout) )
{ {
res = receive(addr,timeout); res = receive(addr,timeout);
...@@ -68,7 +79,13 @@ void ModbusTCPSession::run() ...@@ -68,7 +79,13 @@ void ModbusTCPSession::run()
} }
} }
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << peername << "(run): stop thread of sessions..disconnect.." << endl;
disconnect(); disconnect();
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << peername << "(run): thread stopping..." << endl;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusRTU::mbErrCode ModbusTCPSession::receive( ModbusRTU::ModbusAddr addr, timeout_t msec ) ModbusRTU::mbErrCode ModbusTCPSession::receive( ModbusRTU::ModbusAddr addr, timeout_t msec )
...@@ -106,9 +123,15 @@ ModbusRTU::mbErrCode ModbusTCPSession::receive( ModbusRTU::ModbusAddr addr, time ...@@ -106,9 +123,15 @@ ModbusRTU::mbErrCode ModbusTCPSession::receive( ModbusRTU::ModbusAddr addr, time
} }
} }
if( cancelled )
return erSessionClosed;
memset(&buf,0,sizeof(buf)); memset(&buf,0,sizeof(buf));
res = recv( addr, buf, msec ); res = recv( addr, buf, msec );
if( cancelled )
return erSessionClosed;
if( res!=erNoError ) // && res!=erBadReplyNodeAddress ) if( res!=erNoError ) // && res!=erBadReplyNodeAddress )
{ {
if( res < erInternalErrorCode ) if( res < erInternalErrorCode )
...@@ -134,6 +157,9 @@ ModbusRTU::mbErrCode ModbusTCPSession::receive( ModbusRTU::ModbusAddr addr, time ...@@ -134,6 +157,9 @@ ModbusRTU::mbErrCode ModbusTCPSession::receive( ModbusRTU::ModbusAddr addr, time
if( res!=erNoError ) if( res!=erNoError )
return res; return res;
if( cancelled )
return erSessionClosed;
// processing message... // processing message...
res = processing(buf); res = processing(buf);
} }
...@@ -246,11 +272,18 @@ void ModbusTCPSession::cleanInputStream() ...@@ -246,11 +272,18 @@ void ModbusTCPSession::cleanInputStream()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPSession::terminate() void ModbusTCPSession::terminate()
{ {
ModbusServer::terminate();
if( dlog.debugging(Debug::INFO) ) if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << peername << "(ModbusTCPSession): terminate..." << endl; dlog[Debug::INFO] << peername << "(terminate)..." << endl;
cancelled = true;
if( isConnected() ) if( isConnected() )
disconnect(); disconnect();
// if( isRunning() )
// ost::Thread::join();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
mbErrCode ModbusTCPSession::readCoilStatus( ReadCoilMessage& query, mbErrCode ModbusTCPSession::readCoilStatus( ReadCoilMessage& query,
......
...@@ -69,7 +69,7 @@ static UniSetActivator* gActivator=0; ...@@ -69,7 +69,7 @@ static UniSetActivator* gActivator=0;
//static ThreadCreator<UniSetActivator>* termthread=0; //static ThreadCreator<UniSetActivator>* termthread=0;
static int SIGNO; static int SIGNO;
static int MYPID; static int MYPID;
static const int TERMINATE_TIMEOUT = 2; // время отведенное на завершение процесса [сек] static const int TERMINATE_TIMEOUT = 10; // время отведенное на завершение процесса [сек]
ost::AtomicCounter procterm = 0; ost::AtomicCounter procterm = 0;
ost::AtomicCounter doneterm = 0; ost::AtomicCounter doneterm = 0;
...@@ -118,6 +118,9 @@ sig(false) ...@@ -118,6 +118,9 @@ sig(false)
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetActivator::init() void UniSetActivator::init()
{ {
if( getId() == DefaultObjectId )
myname = "UniSetActivator";
orb = conf->getORB(); orb = conf->getORB();
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA"); CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj); PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj);
...@@ -145,8 +148,8 @@ UniSetActivator::~UniSetActivator() ...@@ -145,8 +148,8 @@ UniSetActivator::~UniSetActivator()
procterm = 1; procterm = 1;
doneterm = 1; doneterm = 1;
set_signals(false); set_signals(false);
gActivator=0; gActivator=0;
} }
if( orbthr ) if( orbthr )
...@@ -283,7 +286,7 @@ void UniSetActivator::work() ...@@ -283,7 +286,7 @@ void UniSetActivator::work()
ulogsys << myname << "(work): запускаем orb на обработку запросов..."<< endl; ulogsys << myname << "(work): запускаем orb на обработку запросов..."<< endl;
try try
{ {
if(orbthr) if( orbthr )
thpid = orbthr->getTID(); thpid = orbthr->getTID();
else else
thpid = getpid(); thpid = getpid();
......
...@@ -54,7 +54,7 @@ class MPush: public unary_function<UniSetManager*, bool> ...@@ -54,7 +54,7 @@ class MPush: public unary_function<UniSetManager*, bool>
catch(...){} catch(...){}
return false; return false;
} }
private: private:
const UniSetTypes::TransportMessage& msg; const UniSetTypes::TransportMessage& msg;
}; };
...@@ -110,7 +110,7 @@ UniSetManager::~UniSetManager() ...@@ -110,7 +110,7 @@ UniSetManager::~UniSetManager()
{ {
try try
{ {
objects(deactiv); objects(deactiv);
} }
catch(...){} catch(...){}
try try
...@@ -160,7 +160,7 @@ bool UniSetManager::addObject( UniSetObject *obj ) ...@@ -160,7 +160,7 @@ bool UniSetManager::addObject( UniSetObject *obj )
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool UniSetManager::removeObject(UniSetObject* obj) bool UniSetManager::removeObject( UniSetObject* obj )
{ {
{ //lock { //lock
uniset_rwmutex_wrlock lock(olistMutex); uniset_rwmutex_wrlock lock(olistMutex);
...@@ -201,13 +201,12 @@ bool UniSetManager::removeObject(UniSetObject* obj) ...@@ -201,13 +201,12 @@ bool UniSetManager::removeObject(UniSetObject* obj)
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
/*! /*!
* Функция работы со списком менеджеров * Функция работы со списком менеджеров
*/ */
void UniSetManager::managers(OManagerCommand cmd) void UniSetManager::managers( OManagerCommand cmd )
{ {
uinfo << myname <<"(managers): mlist.size=" uinfo << myname <<"(managers): mlist.size=" << mlist.size() << " cmd=" << cmd << endl;
<< mlist.size() << " cmd=" << cmd << endl;
{ //lock { //lock
uniset_rwmutex_rlock lock(mlistMutex); uniset_rwmutex_rlock lock(mlistMutex);
for( auto &li: mlist ) for( auto &li: mlist )
...@@ -361,7 +360,7 @@ void UniSetManager::broadcast(const TransportMessage& msg) ...@@ -361,7 +360,7 @@ void UniSetManager::broadcast(const TransportMessage& msg)
{ {
// себя не забыть... // себя не забыть...
// push(msg); // push(msg);
// Всем объектам... // Всем объектам...
{ //lock { //lock
uniset_rwmutex_rlock lock(olistMutex); uniset_rwmutex_rlock lock(olistMutex);
...@@ -511,7 +510,7 @@ SimpleInfoSeq* UniSetManager::getObjectsInfo( CORBA::Long maxlength ) ...@@ -511,7 +510,7 @@ SimpleInfoSeq* UniSetManager::getObjectsInfo( CORBA::Long maxlength )
int length = objectsCount()+1; int length = objectsCount()+1;
if( length >= maxlength ) if( length >= maxlength )
length = maxlength; length = maxlength;
res->length(length); res->length(length);
// используем рекурсивную функцию // используем рекурсивную функцию
...@@ -522,3 +521,18 @@ SimpleInfoSeq* UniSetManager::getObjectsInfo( CORBA::Long maxlength ) ...@@ -522,3 +521,18 @@ SimpleInfoSeq* UniSetManager::getObjectsInfo( CORBA::Long maxlength )
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
std::ostream& operator<<(std::ostream& os, UniSetManager::OManagerCommand& cmd )
{
// { deactiv, activ, initial, term };
if( cmd == UniSetManager::deactiv )
return os << "deactivate";
if( cmd == UniSetManager::activ )
return os << "activate";
if( cmd == UniSetManager::initial )
return os << "init";
if( cmd == UniSetManager::term )
return os << "terminate";
return os << "unkwnown";
}
// ------------------------------------------------------------------------------------------
\ No newline at end of file
...@@ -93,8 +93,8 @@ stCountOfQueueFull(0) ...@@ -93,8 +93,8 @@ stCountOfQueueFull(0)
{ {
threadcreate = false; threadcreate = false;
myid = UniSetTypes::DefaultObjectId; myid = UniSetTypes::DefaultObjectId;
myname = "noname"; myname = "UnknownUniSetObject";
section = "nonameSection"; section = "UnknownSection";
} }
init_object(); init_object();
...@@ -135,12 +135,20 @@ stCountOfQueueFull(0) ...@@ -135,12 +135,20 @@ stCountOfQueueFull(0)
UniSetObject::~UniSetObject() UniSetObject::~UniSetObject()
{ {
disactivate(); disactivate();
delete tmr;
if(thr) tmr->terminate();
if( thr )
{ {
thr->stop(); thr->stop();
if( thr->isRunning() )
thr->join();
delete thr; delete thr;
} }
delete tmr;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::init_object() void UniSetObject::init_object()
...@@ -624,7 +632,7 @@ bool UniSetObject::disactivate() ...@@ -624,7 +632,7 @@ bool UniSetObject::disactivate()
try try
{ {
uinfo << "disactivateObject..." << endl; uinfo << myname << "(disactivate): ..." << endl;
PortableServer::POA_var poamngr = mymngr->getPOA(); PortableServer::POA_var poamngr = mymngr->getPOA();
if( !PortableServer::POA_Helper::is_nil(poamngr) ) if( !PortableServer::POA_Helper::is_nil(poamngr) )
...@@ -634,33 +642,34 @@ bool UniSetObject::disactivate() ...@@ -634,33 +642,34 @@ bool UniSetObject::disactivate()
disactivateObject(); disactivateObject();
} }
catch(...){} catch(...){}
unregister(); unregister();
PortableServer::ObjectId_var oid = poamngr->servant_to_id(static_cast<PortableServer::ServantBase*>(this)); PortableServer::ObjectId_var oid = poamngr->servant_to_id(static_cast<PortableServer::ServantBase*>(this));
poamngr->deactivate_object(oid); poamngr->deactivate_object(oid);
uinfo << "ok..." << endl; uinfo << myname << "(disacivate): finished..." << endl;
return true; return true;
} }
uwarn << "manager already destroyed.." << endl; uwarn << myname << "(disactivate): manager already destroyed.." << endl;
} }
catch(CORBA::TRANSIENT) catch(CORBA::TRANSIENT)
{ {
uwarn << "isExist: нет связи..."<< endl; uwarn << myname << "(disactivate): isExist: нет связи..."<< endl;
} }
catch( CORBA::SystemException& ex ) catch( CORBA::SystemException& ex )
{ {
uwarn << "UniSetObject: "<<"поймали CORBA::SystemException: " << ex.NP_minorString() << endl; uwarn << myname << "(disactivate): " << "поймали CORBA::SystemException: " << ex.NP_minorString() << endl;
} }
catch(CORBA::Exception& ex) catch(CORBA::Exception& ex)
{ {
uwarn << "UniSetObject: "<<"поймали CORBA::Exception." << endl; uwarn << myname << "(disactivate): " << "поймали CORBA::Exception." << endl;
} }
catch(Exception& ex) catch(Exception& ex)
{ {
uwarn << "UniSetObject: "<< ex << endl; uwarn << myname << "(disactivate): " << ex << endl;
} }
catch(...) catch(...)
{ {
uwarn << "UniSetObject: "<<" catch ..." << endl; uwarn << myname << "(disactivate): " << " catch ..." << endl;
} }
return false; return false;
...@@ -742,15 +751,15 @@ bool UniSetObject::activate() ...@@ -742,15 +751,15 @@ bool UniSetObject::activate()
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::work() void UniSetObject::work()
{ {
uinfo << myname << ": thread processing messages run..." << endl; uinfo << myname << ": thread processing messages running..." << endl;
if( thr ) if( thr )
msgpid = thr->getTID(); msgpid = thr->getTID();
while( isActive() ) while( isActive() )
{
callback(); callback();
}
uinfo << myname << ": thread processing messages stop..." << endl; uinfo << myname << ": thread processing messages stopped..." << endl;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::callback() void UniSetObject::callback()
......
...@@ -73,7 +73,7 @@ bool uniset_mutex::try_lock_for( const time_t& msec ) ...@@ -73,7 +73,7 @@ bool uniset_mutex::try_lock_for( const time_t& msec )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
uniset_mutex_lock::uniset_mutex_lock( uniset_mutex& m, const time_t timeMS ): uniset_mutex_lock::uniset_mutex_lock( uniset_mutex& m, const time_t timeMS ):
mutex(&m), mutex(&m),
locked(0) locked(false)
{ {
if( timeMS == 0 ) if( timeMS == 0 )
...@@ -93,18 +93,21 @@ uniset_mutex_lock::uniset_mutex_lock( uniset_mutex& m, const time_t timeMS ): ...@@ -93,18 +93,21 @@ uniset_mutex_lock::uniset_mutex_lock( uniset_mutex& m, const time_t timeMS ):
return; return;
} }
locked = 1; locked = true;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool uniset_mutex_lock::lock_ok() bool uniset_mutex_lock::lock_ok()
{ {
return (locked == 1); return locked;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
uniset_mutex_lock::~uniset_mutex_lock() uniset_mutex_lock::~uniset_mutex_lock()
{ {
mutex->unlock(); if( locked )
locked = 0; {
mutex->unlock();
locked = false;
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
uniset_rwmutex::uniset_rwmutex( const std::string& name ): uniset_rwmutex::uniset_rwmutex( const std::string& name ):
......
...@@ -140,6 +140,25 @@ int main( int argc, const char **argv ) ...@@ -140,6 +140,25 @@ int main( int argc, const char **argv )
try try
{ {
{ {
cout << "check timed_mutex..." << endl;
std::timed_mutex m;
cout << " 'unlock' without 'lock'..";
m.unlock();
cout << " ok." << endl;
cout << "try lock (lock): " << ( m.try_lock() ? "OK" : "FAIL" ) << endl;
m.unlock();
m.lock();
cout << "try lock (fail): " << ( m.try_lock() ? "FAIL" : "OK" ) << endl;
m.unlock();
}
{
uniset_mutex m("testmutex"); uniset_mutex m("testmutex");
{ {
uniset_mutex_lock l(m); uniset_mutex_lock l(m);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment