Commit fed1214b authored by Pavel Vainerman's avatar Pavel Vainerman

(MBTCPMaster): Реализация с отдельным потоком обмена.

parent e8c30a17
......@@ -14,6 +14,7 @@
#include "Calibration.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "IOBase.h"
#include "VTypes.h"
#include "MTR.h"
......@@ -68,10 +69,11 @@
Порт задаётся в конфигурационном файле параметром \b gateway_port или
параметром командной строки \b --xxx-gateway-port. По умолчанию используется порт \b 502.
\b --xxx-recv-timeout или \b recv_timeout msec - таймаут на приём сообщений. По умолчанию 2000 мсек.
\b --xxx-recv-timeout или \b recv_timeout msec - таймаут на приём одного сообщения. По умолчанию 100 мсек.
\b --xxx-all-timeout или \b all_timeout msec - таймаут на определение отсутсвия связи
\b --xxx-timeout или \b timeout msec - таймаут на определение отсутсвия связи
(после этого идёт попытка реинициализировать соединение)
По умолчанию 5000 мсек.
\b --xxx-no-query-optimization или \b no_query_optimization - [1|0] отключить оптимизацию запросов
......@@ -158,14 +160,18 @@
При этом будет записывыться значение "default".
\warning Регистр должен быть уникальным. И может повторятся только если указан параметр \a nbit или \a nbyte.
*/
// -----------------------------------------------------------------------------
/*!
Реализация Modbus TCP Master для обмена с многими ModbusRTU устройствами
\par Реализация Modbus TCP Master для обмена с многими ModbusRTU устройствами
через один modbus tcp шлюз.
\par Чтобы не зависеть от таймаутов TCP соединений, которые могут неопределённо зависать
на создании соединения с недоступным хостом. Обмен вынесен в отдельный поток.
При этом в этом же потоке обновляются данные в SM. В свою очередь информация о датчиках
связи обновляется в основном потоке (чтобы не зависеть от TCP).
*/
class MBTCPMaster:
public UniSetObject_LT
......@@ -353,6 +359,7 @@ class MBTCPMaster:
SMInterface* shm;
void step();
void poll_thread();
void poll();
bool pollRTU( RTUDevice* dev, RegMap::iterator& it );
......@@ -360,6 +367,7 @@ class MBTCPMaster:
void updateRTU(RegMap::iterator& it);
void updateMTR(RegMap::iterator& it);
void updateRSProperty( RSProperty* p, bool write_only=false );
void updateRespondSensors();
virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg );
......@@ -398,6 +406,8 @@ class MBTCPMaster:
void readConfiguration();
bool check_item( UniXML_iterator& it );
bool checkProcActive();
void setProcActive( bool st );
private:
MBTCPMaster();
......@@ -415,17 +425,13 @@ class MBTCPMaster:
IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id;
UniSetTypes::uniset_mutex pollMutex;
UniSetTypes::uniset_mutex actMutex;
bool activated;
int activateTimeout;
bool noQueryOptimization;
bool force_disconnect;
bool allNotRespond;
Trigger trAllNotRespond;
PassiveTimer ptAllNotRespond;
std::string prefix;
bool no_extimer;
......@@ -433,6 +439,16 @@ class MBTCPMaster:
timeout_t stat_time; /*!< время сбора статистики обмена */
int poll_count;
PassiveTimer ptStatistic; /*!< таймер для сбора статистики обмена */
// т.к. TCP может "зависнуть" на подключении к недоступному узлу
// делаем опрос в отдельном потоке
ThreadCreator<MBTCPMaster>* pollThread; /*!< поток опроса */
bool pollActivated;
UniSetTypes::uniset_mutex pollMutex;
// определение timeout для соединения
PassiveTimer ptTimeout;
UniSetTypes::uniset_mutex tcpMutex;
};
// -----------------------------------------------------------------------------
#endif // _MBTCPMaster_H_
......
/* $Id: SMInterface.h,v 1.1 2008/12/14 21:57:50 vpashka Exp $ */
//--------------------------------------------------------------------------
#ifndef SMInterface_H_
#define SMInterface_H_
//--------------------------------------------------------------------------
#include <string>
#include "UniSetTypes.h"
#include "Mutex.h"
#include "IONotifyController.h"
#include "UniversalInterface.h"
class SMInterface
......@@ -89,6 +88,7 @@ class SMInterface
CORBA::Object_var oref;
UniSetTypes::ObjectId shmID;
UniSetTypes::ObjectId myid;
UniSetTypes::uniset_mutex shmMutex;
};
//--------------------------------------------------------------------------
......
......@@ -10,6 +10,7 @@ using namespace UniSetTypes;
#define BEG_FUNC(name) \
try \
{ \
uniset_mutex_lock l(shmMutex,500); \
IONotifyController_i_var shm;\
for( unsigned int i=0; i<conf->getRepeatCount(); i++)\
{\
......@@ -32,6 +33,7 @@ using namespace UniSetTypes;
#define BEG_FUNC1(name) \
try \
{ \
uniset_mutex_lock l(shmMutex,500); \
if( true ) \
{ \
try \
......
......@@ -45,6 +45,8 @@ mbErrCode ModbusTCPMaster::sendData( unsigned char* buf, int len )
mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
ModbusMessage& reply, timeout_t timeout )
{
try
{
if( iaddr.empty() )
{
dlog[Debug::WARN] << "(query): unknown ip address for server..." << endl;
......@@ -69,10 +71,10 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
tcp->setTimeout(timeout);
ost::Thread::setException(ost::Thread::throwException);
// ost::Thread::setException(ost::Thread::throwException);
// ost::tpport_t port;
// cerr << "****** peer: " << tcp->getPeer(&port) << " err: " << tcp->getErrorNumber() << endl;
try
{
if( nTransaction >= numeric_limits<ModbusRTU::ModbusData>::max() )
nTransaction = 0;
......@@ -136,6 +138,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
while( !qrecv.empty() )
qrecv.pop();
tcp->sync();
if( tcp->isPending(ost::Socket::pendingInput,timeout) )
{
/*
......@@ -158,9 +161,14 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( ret < (int)sizeof(rmh) )
{
ost::tpport_t port;
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(ModbusTCPMaster::query): ret=" << (int)ret
<< " < rmh=" << (int)sizeof(rmh) << endl;
<< " < rmh=" << (int)sizeof(rmh)
<< " err: " << tcp->getErrorNumber()
<< " perr: " << tcp->getPeer(&port)
<< endl;
disconnect();
return erTimeOut; // return erHardwareError;
}
......@@ -211,17 +219,18 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
{
dlog[Debug::WARN] << "(query): " << ex << endl;
}
catch(SystemError& err)
catch( SystemError& err )
{
dlog[Debug::WARN] << "(query): " << err << endl;
}
catch(Exception& ex)
catch( Exception& ex )
{
dlog[Debug::WARN] << "(query): " << ex << endl;
}
catch( ost::SockException& e )
{
dlog[Debug::WARN] << e.getString() << ": " << e.getSystemErrorString() << endl;
dlog[Debug::WARN] << "(query): tcp error: " << e.getString() << endl;
return erTimeOut;
}
catch(...)
{
......@@ -252,19 +261,42 @@ void ModbusTCPMaster::reconnect()
// cerr << "tcp diconnect..." << endl;
tcp->disconnect();
delete tcp;
tcp = 0;
}
ost::Thread::setException(ost::Thread::throwException);
// cerr << "create new tcp..." << endl;
tcp = new ost::TCPStream(iaddr.c_str());
tcp->setTimeout(500);
try
{
// TCPStream (const char *name, Family family=IPV4, unsigned mss=536, bool throwflag=false, timeout_t timer=0)
tcp = new ost::TCPStream(iaddr.c_str(),ost::Socket::IPV4,536,true,500);
tcp->setTimeout(replyTimeOut_ms);
}
catch(ost::Socket *socket)
{
ost::tpport_t port;
int err = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
dlog[Debug::CRIT] << "tcp error " << saddr.getHostname() << ":" << port << " = " << err << endl;
}
catch( ost::SockException& e)
{
dlog[Debug::CRIT] << "tcp error: " << e.getString() << endl;
}
catch(...)
{
dlog[Debug::CRIT] << "create TCPStream[" << iaddr << "] error..." << endl;
}
}
// -------------------------------------------------------------------------
void ModbusTCPMaster::connect( ost::InetAddress addr, int port )
{
if( tcp )
{
disconnect();
delete tcp;
tcp = 0;
}
// if( !tcp )
// {
......@@ -275,9 +307,26 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int port )
dlog[Debug::INFO] << "(ModbusTCPMaster): connect to " << s.str() << endl;
iaddr = s.str();
try
{
tcp = new ost::TCPStream(iaddr.c_str());
tcp->setTimeout(500);
tcp->setTimeout(replyTimeOut_ms);
}
catch(ost::Socket *socket)
{
ost::tpport_t port;
int err = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
dlog[Debug::CRIT] << ": tcp error " << saddr.getHostname() << ":" << port << " = " << err << endl;
}
catch( ost::SockException& e)
{
dlog[Debug::CRIT] << "tcp error: " << e.getString() << endl;
}
catch(...)
{
dlog[Debug::CRIT] << "create TCPStream[" << iaddr << "] error..." << endl;
}
// }
}
// -------------------------------------------------------------------------
......
......@@ -10,7 +10,7 @@ PassiveTimer pt(1000);
int main()
{
PassiveTimer pt1(5000);
cout << " pt1.getInterval()=" << pt1.getInterval() << endl;
cout << " pt1.getInterval()=" << pt1.getInterval() << " TEST: " << ((pt1.getInterval()==5000) ? "OK" : "FAILED") << endl;
PassiveTimer pt2;
cout << " pt2.getInterval()=" << pt2.getInterval() << endl;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment