You need to sign in or sign up before continuing.
Commit ef6c70aa authored by Pavel Vainerman's avatar Pavel Vainerman

(MBTCPMaster): Реализация с отдельным потоком обмена.

parent f47ea61b
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "Calibration.h" #include "Calibration.h"
#include "SMInterface.h" #include "SMInterface.h"
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h"
#include "IOBase.h" #include "IOBase.h"
#include "VTypes.h" #include "VTypes.h"
#include "MTR.h" #include "MTR.h"
...@@ -68,10 +69,11 @@ ...@@ -68,10 +69,11 @@
Порт задаётся в конфигурационном файле параметром \b gateway_port или Порт задаётся в конфигурационном файле параметром \b gateway_port или
параметром командной строки \b --xxx-gateway-port. По умолчанию используется порт \b 502. параметром командной строки \b --xxx-gateway-port. По умолчанию используется порт \b 502.
\b --xxx-recv-timeout или \b recv_timeout msec - таймаут на приём сообщений. По умолчанию 2000 мсек. \b --xxx-recv-timeout или \b recv_timeout msec - таймаут на приём одного сообщения. По умолчанию 100 мсек.
\b --xxx-all-timeout или \b all_timeout msec - таймаут на определение отсутсвия связи \b --xxx-timeout или \b timeout msec - таймаут на определение отсутсвия связи
(после этого идёт попытка реинициализировать соединение) (после этого идёт попытка реинициализировать соединение)
По умолчанию 5000 мсек.
\b --xxx-no-query-optimization или \b no_query_optimization - [1|0] отключить оптимизацию запросов \b --xxx-no-query-optimization или \b no_query_optimization - [1|0] отключить оптимизацию запросов
...@@ -158,14 +160,18 @@ ...@@ -158,14 +160,18 @@
При этом будет записывыться значение "default". При этом будет записывыться значение "default".
\warning Регистр должен быть уникальным. И может повторятся только если указан параметр \a nbit или \a nbyte. \warning Регистр должен быть уникальным. И может повторятся только если указан параметр \a nbit или \a nbyte.
*/ */
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/*! /*!
Реализация Modbus TCP Master для обмена с многими ModbusRTU устройствами \par Реализация Modbus TCP Master для обмена с многими ModbusRTU устройствами
через один modbus tcp шлюз. через один modbus tcp шлюз.
\par Чтобы не зависеть от таймаутов TCP соединений, которые могут неопределённо зависать
на создании соединения с недоступным хостом. Обмен вынесен в отдельный поток.
При этом в этом же потоке обновляются данные в SM. В свою очередь информация о датчиках
связи обновляется в основном потоке (чтобы не зависеть от TCP).
*/ */
class MBTCPMaster: class MBTCPMaster:
public UniSetObject_LT public UniSetObject_LT
...@@ -353,6 +359,7 @@ class MBTCPMaster: ...@@ -353,6 +359,7 @@ class MBTCPMaster:
SMInterface* shm; SMInterface* shm;
void step(); void step();
void poll_thread();
void poll(); void poll();
bool pollRTU( RTUDevice* dev, RegMap::iterator& it ); bool pollRTU( RTUDevice* dev, RegMap::iterator& it );
...@@ -360,6 +367,7 @@ class MBTCPMaster: ...@@ -360,6 +367,7 @@ class MBTCPMaster:
void updateRTU(RegMap::iterator& it); void updateRTU(RegMap::iterator& it);
void updateMTR(RegMap::iterator& it); void updateMTR(RegMap::iterator& it);
void updateRSProperty( RSProperty* p, bool write_only=false ); void updateRSProperty( RSProperty* p, bool write_only=false );
void updateRespondSensors();
virtual void processingMessage( UniSetTypes::VoidMessage *msg ); virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg ); void sysCommand( UniSetTypes::SystemMessage *msg );
...@@ -398,6 +406,8 @@ class MBTCPMaster: ...@@ -398,6 +406,8 @@ class MBTCPMaster:
void readConfiguration(); void readConfiguration();
bool check_item( UniXML_iterator& it ); bool check_item( UniXML_iterator& it );
bool checkProcActive();
void setProcActive( bool st );
private: private:
MBTCPMaster(); MBTCPMaster();
...@@ -415,17 +425,13 @@ class MBTCPMaster: ...@@ -415,17 +425,13 @@ class MBTCPMaster:
IOController::AIOStateList::iterator aitHeartBeat; IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id; UniSetTypes::ObjectId test_id;
UniSetTypes::uniset_mutex pollMutex; UniSetTypes::uniset_mutex actMutex;
bool activated; bool activated;
int activateTimeout; int activateTimeout;
bool noQueryOptimization; bool noQueryOptimization;
bool force_disconnect; bool force_disconnect;
bool allNotRespond;
Trigger trAllNotRespond;
PassiveTimer ptAllNotRespond;
std::string prefix; std::string prefix;
bool no_extimer; bool no_extimer;
...@@ -433,6 +439,16 @@ class MBTCPMaster: ...@@ -433,6 +439,16 @@ class MBTCPMaster:
timeout_t stat_time; /*!< время сбора статистики обмена */ timeout_t stat_time; /*!< время сбора статистики обмена */
int poll_count; int poll_count;
PassiveTimer ptStatistic; /*!< таймер для сбора статистики обмена */ PassiveTimer ptStatistic; /*!< таймер для сбора статистики обмена */
// т.к. TCP может "зависнуть" на подключении к недоступному узлу
// делаем опрос в отдельном потоке
ThreadCreator<MBTCPMaster>* pollThread; /*!< поток опроса */
bool pollActivated;
UniSetTypes::uniset_mutex pollMutex;
// определение timeout для соединения
PassiveTimer ptTimeout;
UniSetTypes::uniset_mutex tcpMutex;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // _MBTCPMaster_H_ #endif // _MBTCPMaster_H_
......
/* $Id: SMInterface.h,v 1.1 2008/12/14 21:57:50 vpashka Exp $ */
//--------------------------------------------------------------------------
#ifndef SMInterface_H_
#define SMInterface_H_
//--------------------------------------------------------------------------
#include <string>
#include "UniSetTypes.h"
#include "IONotifyController.h"
#include "UniversalInterface.h"
class SMInterface
{
public:
SMInterface( UniSetTypes::ObjectId _shmID, UniversalInterface* ui,
UniSetTypes::ObjectId myid, IONotifyController* ic=0 );
~SMInterface();
void setState ( UniSetTypes::ObjectId, bool state );
void setValue ( UniSetTypes::ObjectId, long value );
bool saveState ( IOController_i::SensorInfo& si, bool state, UniversalIO::IOTypes type, UniSetTypes::ObjectId supplier );
bool saveValue ( IOController_i::SensorInfo& si, long value, UniversalIO::IOTypes type, UniSetTypes::ObjectId supplier );
bool saveLocalState ( UniSetTypes::ObjectId id, bool state, UniversalIO::IOTypes type=UniversalIO::DigitalInput );
bool saveLocalValue ( UniSetTypes::ObjectId id, long value, UniversalIO::IOTypes type=UniversalIO::AnalogInput );
void setUndefinedState( IOController_i::SensorInfo& si, bool undefined, UniSetTypes::ObjectId supplier );
long getValue ( UniSetTypes::ObjectId id );
bool getState ( UniSetTypes::ObjectId id );
void askSensor( UniSetTypes::ObjectId id, UniversalIO::UIOCommand cmd,
UniSetTypes::ObjectId backid = UniSetTypes::DefaultObjectId );
inline bool alarm( UniSetTypes::AlarmMessage& msg, UniSetTypes::ObjectId messenger )
{
return ui->alarm(msg,messenger);
}
IOController_i::DSensorInfoSeq* getDigitalSensorsMap();
IOController_i::ASensorInfoSeq* getAnalogSensorsMap();
IONotifyController_i::ThresholdsListSeq* getThresholdsList();
void localSaveValue( IOController::AIOStateList::iterator& it,
UniSetTypes::ObjectId sid,
CORBA::Long newvalue, UniSetTypes::ObjectId sup_id );
void localSaveState( IOController::DIOStateList::iterator& it,
UniSetTypes::ObjectId sid,
CORBA::Boolean newstate, UniSetTypes::ObjectId sup_id );
void localSetState( IOController::DIOStateList::iterator& it,
UniSetTypes::ObjectId sid,
CORBA::Boolean newstate, UniSetTypes::ObjectId sup_id );
void localSetValue( IOController::AIOStateList::iterator& it,
UniSetTypes::ObjectId sid,
CORBA::Long value, UniSetTypes::ObjectId sup_id );
bool localGetState( IOController::DIOStateList::iterator& it,
UniSetTypes::ObjectId sid );
long localGetValue( IOController::AIOStateList::iterator& it,
UniSetTypes::ObjectId sid );
/*! функция выставления признака неопределённого состояния для аналоговых датчиков
// для дискретных датчиков необходимости для подобной функции нет.
// см. логику выставления в функции localSaveState
*/
void localSetUndefinedState( IOController::AIOStateList::iterator& it,
bool undefined, UniSetTypes::ObjectId sid );
// специальные функции
IOController::DIOStateList::iterator dioEnd();
IOController::AIOStateList::iterator aioEnd();
void initAIterator( IOController::AIOStateList::iterator& it );
void initDIterator( IOController::DIOStateList::iterator& it );
bool exist();
bool waitSMready( int msec, int pause=5000 );
bool waitSMworking( UniSetTypes::ObjectId, int msec, int pause=3000 );
inline bool isLocalwork(){ return (ic==NULL); }
inline UniSetTypes::ObjectId ID(){ return myid; }
inline IONotifyController* SM(){ return ic; }
inline UniSetTypes::ObjectId getSMID(){ return shmID; }
protected:
IONotifyController* ic;
UniversalInterface* ui;
CORBA::Object_var oref;
UniSetTypes::ObjectId shmID;
UniSetTypes::ObjectId myid;
};
//--------------------------------------------------------------------------
#endif
...@@ -10,6 +10,7 @@ using namespace UniSetTypes; ...@@ -10,6 +10,7 @@ using namespace UniSetTypes;
#define BEG_FUNC(name) \ #define BEG_FUNC(name) \
try \ try \
{ \ { \
uniset_mutex_lock l(shmMutex,500); \
IONotifyController_i_var shm;\ IONotifyController_i_var shm;\
for( unsigned int i=0; i<conf->getRepeatCount(); i++)\ for( unsigned int i=0; i<conf->getRepeatCount(); i++)\
{\ {\
...@@ -32,6 +33,7 @@ using namespace UniSetTypes; ...@@ -32,6 +33,7 @@ using namespace UniSetTypes;
#define BEG_FUNC1(name) \ #define BEG_FUNC1(name) \
try \ try \
{ \ { \
uniset_mutex_lock l(shmMutex,500); \
if( true ) \ if( true ) \
{ \ { \
try \ try \
......
...@@ -45,34 +45,36 @@ mbErrCode ModbusTCPMaster::sendData( unsigned char* buf, int len ) ...@@ -45,34 +45,36 @@ mbErrCode ModbusTCPMaster::sendData( unsigned char* buf, int len )
mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
ModbusMessage& reply, timeout_t timeout ) ModbusMessage& reply, timeout_t timeout )
{ {
if( iaddr.empty() ) try
{ {
dlog[Debug::WARN] << "(query): unknown ip address for server..." << endl; if( iaddr.empty() )
return erHardwareError; {
} dlog[Debug::WARN] << "(query): unknown ip address for server..." << endl;
return erHardwareError;
}
if( !isConnection() ) if( !isConnection() )
{ {
if( dlog.debugging(Debug::INFO) ) if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(ModbusTCPMaster): no connection.. reconnnect..." << endl; dlog[Debug::INFO] << "(ModbusTCPMaster): no connection.. reconnnect..." << endl;
reconnect(); reconnect();
} }
if( !isConnection() ) if( !isConnection() )
{ {
dlog[Debug::WARN] << "(query): not connected to server..." << endl; dlog[Debug::WARN] << "(query): not connected to server..." << endl;
return erTimeOut; return erTimeOut;
} }
assert(timeout); assert(timeout);
ptTimeout.setTiming(timeout); ptTimeout.setTiming(timeout);
tcp->setTimeout(timeout); tcp->setTimeout(timeout);
// ost::Thread::setException(ost::Thread::throwException);
// ost::tpport_t port;
// cerr << "****** peer: " << tcp->getPeer(&port) << " err: " << tcp->getErrorNumber() << endl;
ost::Thread::setException(ost::Thread::throwException);
try
{
if( nTransaction >= numeric_limits<ModbusRTU::ModbusData>::max() ) if( nTransaction >= numeric_limits<ModbusRTU::ModbusData>::max() )
nTransaction = 0; nTransaction = 0;
...@@ -136,6 +138,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -136,6 +138,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
while( !qrecv.empty() ) while( !qrecv.empty() )
qrecv.pop(); qrecv.pop();
tcp->sync();
if( tcp->isPending(ost::Socket::pendingInput,timeout) ) if( tcp->isPending(ost::Socket::pendingInput,timeout) )
{ {
/* /*
...@@ -158,9 +161,14 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -158,9 +161,14 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( ret < (int)sizeof(rmh) ) if( ret < (int)sizeof(rmh) )
{ {
ost::tpport_t port;
if( dlog.debugging(Debug::INFO) ) if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(ModbusTCPMaster::query): ret=" << (int)ret dlog[Debug::INFO] << "(ModbusTCPMaster::query): ret=" << (int)ret
<< " < rmh=" << (int)sizeof(rmh) << endl; << " < rmh=" << (int)sizeof(rmh)
<< " err: " << tcp->getErrorNumber()
<< " perr: " << tcp->getPeer(&port)
<< endl;
disconnect(); disconnect();
return erTimeOut; // return erHardwareError; return erTimeOut; // return erHardwareError;
} }
...@@ -211,17 +219,18 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -211,17 +219,18 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
{ {
dlog[Debug::WARN] << "(query): " << ex << endl; dlog[Debug::WARN] << "(query): " << ex << endl;
} }
catch(SystemError& err) catch( SystemError& err )
{ {
dlog[Debug::WARN] << "(query): " << err << endl; dlog[Debug::WARN] << "(query): " << err << endl;
} }
catch(Exception& ex) catch( Exception& ex )
{ {
dlog[Debug::WARN] << "(query): " << ex << endl; dlog[Debug::WARN] << "(query): " << ex << endl;
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
dlog[Debug::WARN] << e.getString() << ": " << e.getSystemErrorString() << endl; dlog[Debug::WARN] << "(query): tcp error: " << e.getString() << endl;
return erTimeOut;
} }
catch(...) catch(...)
{ {
...@@ -252,19 +261,42 @@ void ModbusTCPMaster::reconnect() ...@@ -252,19 +261,42 @@ void ModbusTCPMaster::reconnect()
// cerr << "tcp diconnect..." << endl; // cerr << "tcp diconnect..." << endl;
tcp->disconnect(); tcp->disconnect();
delete tcp; delete tcp;
tcp = 0;
} }
ost::Thread::setException(ost::Thread::throwException); ost::Thread::setException(ost::Thread::throwException);
// cerr << "create new tcp..." << endl; try
tcp = new ost::TCPStream(iaddr.c_str()); {
tcp->setTimeout(500); // TCPStream (const char *name, Family family=IPV4, unsigned mss=536, bool throwflag=false, timeout_t timer=0)
tcp = new ost::TCPStream(iaddr.c_str(),ost::Socket::IPV4,536,true,500);
tcp->setTimeout(replyTimeOut_ms);
}
catch(ost::Socket *socket)
{
ost::tpport_t port;
int err = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
dlog[Debug::CRIT] << "tcp error " << saddr.getHostname() << ":" << port << " = " << err << endl;
}
catch( ost::SockException& e)
{
dlog[Debug::CRIT] << "tcp error: " << e.getString() << endl;
}
catch(...)
{
dlog[Debug::CRIT] << "create TCPStream[" << iaddr << "] error..." << endl;
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPMaster::connect( ost::InetAddress addr, int port ) void ModbusTCPMaster::connect( ost::InetAddress addr, int port )
{ {
if( tcp ) if( tcp )
{
disconnect(); disconnect();
delete tcp;
tcp = 0;
}
// if( !tcp ) // if( !tcp )
// { // {
...@@ -275,9 +307,26 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int port ) ...@@ -275,9 +307,26 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int port )
dlog[Debug::INFO] << "(ModbusTCPMaster): connect to " << s.str() << endl; dlog[Debug::INFO] << "(ModbusTCPMaster): connect to " << s.str() << endl;
iaddr = s.str(); iaddr = s.str();
tcp = new ost::TCPStream(iaddr.c_str()); try
tcp->setTimeout(500); {
tcp = new ost::TCPStream(iaddr.c_str());
tcp->setTimeout(replyTimeOut_ms);
}
catch(ost::Socket *socket)
{
ost::tpport_t port;
int err = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
dlog[Debug::CRIT] << ": tcp error " << saddr.getHostname() << ":" << port << " = " << err << endl;
}
catch( ost::SockException& e)
{
dlog[Debug::CRIT] << "tcp error: " << e.getString() << endl;
}
catch(...)
{
dlog[Debug::CRIT] << "create TCPStream[" << iaddr << "] error..." << endl;
}
// } // }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
...@@ -10,7 +10,7 @@ PassiveTimer pt(1000); ...@@ -10,7 +10,7 @@ PassiveTimer pt(1000);
int main() int main()
{ {
PassiveTimer pt1(5000); PassiveTimer pt1(5000);
cout << " pt1.getInterval()=" << pt1.getInterval() << endl; cout << " pt1.getInterval()=" << pt1.getInterval() << " TEST: " << ((pt1.getInterval()==5000) ? "OK" : "FAILED") << endl;
PassiveTimer pt2; PassiveTimer pt2;
cout << " pt2.getInterval()=" << pt2.getInterval() << endl; cout << " pt2.getInterval()=" << pt2.getInterval() << endl;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment