Commit bf1f42ba authored by Pavel Vainerman's avatar Pavel Vainerman

(Unet2): Добавил работу с резервным каналом обмена.

parent bbf50bb0
......@@ -13,7 +13,8 @@ shm(0),
initPause(0),
activated(false),
no_sender(false),
sender(0)
sender(0),
sender2(0)
{
if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" );
......@@ -55,6 +56,7 @@ sender(0)
UniXML_iterator n_it(nodes);
string default_ip(n_it.getProp("unet_broadcast_ip"));
string default_ip2(n_it.getProp("unet_broadcast_ip2"));
if( !n_it.goChildren() )
throw UniSetTypes::SystemError("(UNetExchange): Items not found for <nodes>");
......@@ -74,11 +76,17 @@ sender(0)
// Если указано поле unet_broadcast_ip непосредственно у узла - берём его
// если указано общий broadcast ip для всех узлов - берём его
string h("");
string h2("");
if( !default_ip.empty() )
h = default_ip;
if( !n_it.getProp("unet_broadcast_ip").empty() )
h = n_it.getProp("unet_broadcast_ip");
if( !default_ip2.empty() )
h2 = default_ip2;
if( !n_it.getProp("unet_broadcast_ip2").empty() )
h2 = n_it.getProp("unet_broadcast_ip2");
if( h.empty() )
{
ostringstream err;
......@@ -87,18 +95,41 @@ sender(0)
throw UniSetTypes::SystemError(err.str());
}
if( h2.empty() && dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(init): ip2 not used..." << endl;
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
int p = n_it.getIntProp("id");
if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port");
int p2 = p; // по умолчанию порт на втором канале такой же как на первом
if( !n_it.getProp("unet_port2").empty() )
p2 = n_it.getIntProp("unet_port2");
string n(n_it.getProp("name"));
if( n == conf->getLocalNodeName() )
{
if( no_sender )
{
dlog[Debug::INFO] << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
continue;
}
dlog[Debug::INFO] << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = new UNetSender(h,p,shm,s_field,s_fvalue,ic);
sender->setSendPause(sendpause);
// создаём "писателя" для второго канала если задан
if( !h2.empty() )
{
dlog[Debug::INFO] << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = new UNetSender(h2,p2,shm,s_field,s_fvalue,ic);
sender2->setSendPause(sendpause);
}
continue;
}
......@@ -124,6 +155,20 @@ sender(0)
}
}
string s_resp2_id(n_it.getProp("unet_respond2_id"));
UniSetTypes::ObjectId resp2_id = UniSetTypes::DefaultObjectId;
if( !s_resp2_id.empty() )
{
resp2_id = conf->getSensorID(s_resp2_id);
if( resp2_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown RespondID(2).. Not found id for '" << s_resp2_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_id(n_it.getProp("unet_lostpackets_id"));
UniSetTypes::ObjectId lp_id = UniSetTypes::DefaultObjectId;
if( !s_lp_id.empty() )
......@@ -138,8 +183,25 @@ sender(0)
}
}
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
UniSetTypes::ObjectId lp2_id = UniSetTypes::DefaultObjectId;
if( !s_lp2_id.empty() )
{
lp2_id = conf->getSensorID(s_lp2_id);
if( lp2_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown LostPacketsID(2).. Not found id for '" << s_lp2_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
UNetReceiver* r = new UNetReceiver(h,p,shm);
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
r->setLockUpdate(false);
r->setReceiveTimeout(recvTimeout);
r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
......@@ -148,7 +210,27 @@ sender(0)
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id);
r->setLostPacketsID(lp_id);
recvlist.push_back(r);
UNetReceiver* r2 = 0;
if( !h2.empty() ) // создаём читателя впо второму каналу
{
r2 = new UNetReceiver(h2,p2,shm);
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setReceiveTimeout(recvTimeout);
r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause);
r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id);
r2->setLostPacketsID(lp2_id);
}
ReceiverInfo ri(r,r2);
recvlist.push_back(ri);
}
// -------------------------------
......@@ -194,9 +276,15 @@ sender(0)
UNetExchange::~UNetExchange()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
delete (*it);
{
if( it->r1 )
delete it->r1;
if( it->r2 )
delete it->r2;
}
delete sender;
delete sender2;
delete shm;
}
// -----------------------------------------------------------------------------
......@@ -205,7 +293,7 @@ bool UNetExchange::checkExistUNetHost( const std::string addr, ost::tpport_t por
ost::IPV4Address a1(addr.c_str());
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( (*it)->getAddress() == a1.getAddress() && (*it)->getPort() == port )
if( it->r1->getAddress() == a1.getAddress() && it->r1->getPort() == port )
return true;
}
......@@ -214,8 +302,13 @@ bool UNetExchange::checkExistUNetHost( const std::string addr, ost::tpport_t por
// -----------------------------------------------------------------------------
void UNetExchange::startReceivers()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
(*it)->start();
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( it->r1 )
it->r1->start();
if( it->r2 )
it->r2->start();
}
}
// -----------------------------------------------------------------------------
void UNetExchange::waitSMReady()
......@@ -345,6 +438,8 @@ void UNetExchange::sysCommand( UniSetTypes::SystemMessage *sm )
startReceivers();
if( sender )
sender->start();
if( sender2 )
sender2->start();
}
break;
......@@ -408,12 +503,16 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
if( sender )
sender->askSensors(cmd);
if( sender2 )
sender2->askSensors(cmd);
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm )
{
if( sender )
sender->updateSensor( sm->id , sm->value );
if( sender2 )
sender2->updateSensor( sm->id , sm->value );
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
......@@ -440,7 +539,14 @@ void UNetExchange::sigterm( int signo )
{
try
{
(*it)->stop();
if( it->r1 )
it->r1->stop();
}
catch(...){}
try
{
if( it->r2 )
it->r2->stop();
}
catch(...){}
}
......@@ -451,6 +557,12 @@ void UNetExchange::sigterm( int signo )
sender->stop();
}
catch(...){}
try
{
if( sender2 )
sender2->stop();
}
catch(...){}
UniSetObject_LT::sigterm(signo);
}
......@@ -460,6 +572,8 @@ void UNetExchange::initIterators()
shm->initAIterator(aitHeartBeat);
if( sender )
sender->initIterators();
if( sender2 )
sender2->initIterators();
}
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, const char* argv[] )
......
......@@ -17,33 +17,62 @@
/*!
\page pageUNetExchange2 Сетевой обмен на основе UDP (UNet2)
\par Обмен построен на основе протокола UDP.
- \ref pgUnet2_Common
- \ref pgUnet2_Conf
- \ref pgUnet2_Reserv
\section pgUnet2_Common Общее описание
Обмен построен на основе протокола UDP.
Основная идея заключается в том, что каждый узел на порту равном своему ID
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
пар "id - value". Другие узлы принимают их. Помимо этого данный процесс запускает
по потоку приёма для каждого другого узла и ловит пакеты от них, сохраняя данные в SM.
\par При своём старте процесс считывает из секции <nodes> список узлов с которыми необходимо вести обмен, параметры своего узла.
Открывает по потоку приёма на каждый узел и поток передачи для своих данных.
\par Пример конфигурирования
По умолчанию при считывании используются свойства \a ip и \a id - в качестве порта.
Но можно переопределять эти параметры, при помощи указания \a unet_port и/или \a unet_ip.
Помимо этого можно задать broadcast-адрес по умолчанию \a unet_broadcast_ip для всех узлов в
свойствах секции <nodes unet_broadcast_ip="xxx.255">
\par
При своём старте процесс считывает из секции \<nodes> список узлов с которыми необходимо
вести обмен, а также параметры своего узла. Открывает по потоку приёма на каждый узел и поток
передачи для своих данных. Помимо этого такие же потоки для резервных каналов, если они включены
(см. \ref pgUnet2_Reserv ).
\section pgUnet2_Conf Пример конфигурирования
По умолчанию при считывании используется \b unet_broadcast_ip (указанный в секции \<nodes>)
и \b id узла - в качестве порта.
Но можно переопределять эти параметры, при помощи указания \b unet_port и/или \b unet_broadcast_ip,
для конкретного узла (\<item>).
\code
<nodes port="2809" unet_broadcast_ip="192.168.56.255">
<item ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_port="3000" unet_ip="192.168.56.1">
<item ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_port="3000" unet_broadcast_ip="192.168.57.255">
<iocards>
...
</iocards>
</item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_ip="192.168.56.2"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002" unet_ip="192.168.56.3"/>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes>
\endcode
\section pgUnet2_Reserv Настройка резервного канала связи
В текущей реализации поддерживается возможность обмена по двум подсетям (эзернет-каналам).
Она основана на том, что, для каждого узла помимо основного "читателя",
создаётся дополнительный "читатель"(поток) слушающий другой ip-адрес и порт.
А так же, для локального узла создаётся дополнительный "писатель"(поток),
который посылает данные в (указанную) вторую подсеть. Для того, чтобы задействовать
второй канал, достаточно объявить в настройках переменные
\b unet_broadcast_ip2. А также в случае необходимости для конкретного узла
можно указать \b unet_broadcast_ip2 и \b unet_port2.
Переключение между "каналами" происходит по следующей логике:
При старте включается только первый канал. Второй канал работает в режиме "пассивного" чтения.
Т.е. все пакеты принимаются, но данные в SharedMemory не обновляются.
Если во время работы пропадает связь по первому каналу, идёт переключение на второй канал.
Первый канал переводиться в "пассивный" режим, а второй канал, перекодится в "нормальный"(активный)
режим. Далее работа ведётся по второму каналу, независимо от, того, что связь на перовм
канале может восстановиться. Это сделано для защиты от постоянных перескакиваний
с канала на канал. Работа на втором канале,будет вестись, пока не пропадёт связь
на нём. Тогда будет попвтка переключиться обратно на первый канал и так "по кругу".
В свою очередь "писатели"(если они не отключены) всегда посылают данные в оба канала.
*/
// -----------------------------------------------------------------------------
class UNetExchange:
......@@ -107,11 +136,21 @@ class UNetExchange:
bool activated;
int activateTimeout;
typedef std::list<UNetReceiver*> ReceiverList;
struct ReceiverInfo
{
ReceiverInfo():r1(0),r2(0){}
ReceiverInfo(UNetReceiver* _r1, UNetReceiver* _r2 ):r1(_r1),r2(_r2){}
UNetReceiver* r1; /*!< приём по первому каналу */
UNetReceiver* r2; /*!< приём по второму каналу */
};
typedef std::list<ReceiverInfo> ReceiverList;
ReceiverList recvlist;
bool no_sender; /*!< флаг отключения посылки сообщений */
bool no_sender; /*!< флаг отключения посылки сообщений (создания потока для посылки)*/
UNetSender* sender;
UNetSender* sender2;
};
// -----------------------------------------------------------------------------
#endif // UNetExchange_H_
......
......@@ -36,6 +36,7 @@ maxDifferens(1000),
waitClean(false),
rnum(0),
maxProcessingCount(100),
lockUpdate(false),
d_icache(UniSetUDP::MaxDCount),
a_icache(UniSetUDP::MaxACount),
d_cache_init_ok(false),
......@@ -263,6 +264,10 @@ void UNetReceiver::real_update()
shm->initDIterator(ii.dit);
}
// обновление данных в SM (блокировано)
if( lockUpdate )
continue;
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
......@@ -272,7 +277,7 @@ void UNetReceiver::real_update()
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,id,val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << id << endl;
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << id << endl;
}
catch( UniSetTypes::Exception& ex)
{
......@@ -299,6 +304,10 @@ void UNetReceiver::real_update()
shm->initDIterator(ii.dit);
}
// обновление данных в SM (блокировано)
if( lockUpdate )
continue;
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
......@@ -308,7 +317,7 @@ void UNetReceiver::real_update()
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
}
catch( UniSetTypes::Exception& ex)
{
......
......@@ -58,6 +58,9 @@ class UNetReceiver
void receive();
void update();
// блокировать сохранение данный в SM
void setLockUpdate( bool st ){ lockUpdate = st; }
inline bool isRecvOK(){ return ptRecvTimeout.checkTime(); }
inline unsigned long getLostPacketsNum(){ return lostPackets; }
......@@ -137,7 +140,9 @@ class UNetReceiver
bool waitClean; /*!< флаг означающий, что ждём очистики очереди до конца */
unsigned long rnum; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
int maxProcessingCount; /*!< максимальное число обрабатываемых за один раз сообщений */
bool lockUpdate; /*!< флаг блокировки сохранения принятых данных в SM */
struct ItemInfo
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment