Commit 73ff04a1 authored by Pavel Vainerman's avatar Pavel Vainerman

(unet2): Реализовал логику переключение каналов (основной-резервный)

parent 596b2795
......@@ -89,7 +89,7 @@
priority - приоритет сообщения об изменении данного датчика
textname - текстовое имя датчика
-->
<nodes port="2809" unet_broadcast_ip="192.168.1.255">
<nodes port="2809" unet_broadcast_ip="192.168.1.255" unet_broadcast_ip2="192.168.122.255">
<item infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards>
<item card="1" name="DI32"/>
......
......@@ -122,12 +122,22 @@ sender2(0)
sender = new UNetSender(h,p,shm,s_field,s_fvalue,ic);
sender->setSendPause(sendpause);
// создаём "писателя" для второго канала если задан
if( !h2.empty() )
try
{
dlog[Debug::INFO] << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = new UNetSender(h2,p2,shm,s_field,s_fvalue,ic);
sender2->setSendPause(sendpause);
// создаём "писателя" для второго канала если задан
if( !h2.empty() )
{
dlog[Debug::INFO] << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = new UNetSender(h2,p2,shm,s_field,s_fvalue,ic);
sender2->setSendPause(sendpause);
}
}
catch(...)
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = 0;
dlog[Debug::CRIT] << myname << "(ignore): DON`T CREATE 'UNetSender' for " << h2 << ":" << p2 << endl;
}
continue;
......@@ -210,23 +220,38 @@ sender2(0)
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
UNetReceiver* r2 = 0;
if( !h2.empty() ) // создаём читателя впо второму каналу
try
{
if( !h2.empty() ) // создаём читателя впо второму каналу
{
dlog[Debug::INFO] << myname << "(init): add reserv receiver "
<< h2 << ":" << p2 << endl;
r2 = new UNetReceiver(h2,p2,shm);
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setReceiveTimeout(recvTimeout);
r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause);
r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
}
}
catch(...)
{
r2 = new UNetReceiver(h2,p2,shm);
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setReceiveTimeout(recvTimeout);
r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause);
r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id);
r2->setLostPacketsID(lp2_id);
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = 0;
dlog[Debug::CRIT] << myname << "(ignore): DON`T CREATE 'UNetReceiver' for " << h2 << ":" << p2 << endl;
}
ReceiverInfo ri(r,r2);
......@@ -616,3 +641,45 @@ UNetExchange* UNetExchange::init_unetexchange( int argc, const char* argv[], Uni
return new UNetExchange(ID,icID,ic);
}
// -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( UNetReceiver* r, UNetReceiver::Event ev )
{
// пока, что другие события нас не интересуют
if( ev != UNetReceiver::evTimeout )
return;
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( it->r1 == r )
{
// если нет второго канала
// то и переключать некуда
if( !it->r2 )
return;
// пропала связь по первому каналу...
// переключаемся на второй
it->r1->setLockUpdate(true);
it->r2->setLockUpdate(false);
it->r2->resetTimeout();
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(event): " << r->getName()
<< ": timeout for channel1.. select channel2" << endl;
return;
}
if( it->r2 == r )
{
// пропала связь по второму каналу...
// переключаемся на первый
it->r1->setLockUpdate(false);
it->r1->resetTimeout();
it->r2->setLockUpdate(true);
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(event): " << r->getName()
<< ": timeout for channel2.. select channel1" << endl;
return;
}
}
}
// -----------------------------------------------------------------------------
......@@ -106,6 +106,7 @@ class UNetExchange:
void timerInfo( UniSetTypes::TimerMessage *tm );
void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady();
void receiverEvent( UNetReceiver* r, UNetReceiver::Event ev );
virtual bool activateObject();
......@@ -114,6 +115,7 @@ class UNetExchange:
void initIterators();
void startReceivers();
enum Timer
{
......
......@@ -44,7 +44,7 @@ a_cache_init_ok(false)
{
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
s << "R(" << s_host << ":" << port << ")";
myname = s.str();
}
......@@ -129,6 +129,12 @@ void UNetReceiver::setLostPacketsID( UniSetTypes::ObjectId id )
shm->initAIterator(aitLostPackets);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setLockUpdate( bool st )
{
uniset_mutex_lock l(lockMutex,200);
lockUpdate = st;
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
{
if( !activated )
......@@ -265,8 +271,11 @@ void UNetReceiver::real_update()
}
// обновление данных в SM (блокировано)
if( lockUpdate )
continue;
{
uniset_mutex_lock l(lockMutex,100);
if( lockUpdate )
continue;
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
......@@ -305,8 +314,11 @@ void UNetReceiver::real_update()
}
// обновление данных в SM (блокировано)
if( lockUpdate )
continue;
{
uniset_mutex_lock l(lockMutex,100);
if( lockUpdate )
continue;
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
......@@ -349,7 +361,7 @@ void UNetReceiver::receive()
try
{
if( recv() )
ptRecvTimeout.reset();
ptRecvTimeout.reset();
}
catch( UniSetTypes::Exception& ex)
{
......@@ -364,6 +376,14 @@ void UNetReceiver::receive()
dlog[Debug::WARN] << myname << "(receive): catch ..." << std::endl;
}
if( trTimeout.change(ptRecvTimeout.checkTime()) )
{
if( ptRecvTimeout.checkTime() )
slEvent(this,evTimeout);
else
slEvent(this,evOK);
}
msleep(recvpause);
}
......@@ -511,3 +531,8 @@ void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl )
{
slEvent = sl;
}
// -----------------------------------------------------------------------------
......@@ -5,6 +5,7 @@
#include <string>
#include <queue>
#include <cc++/socket.h>
#include <sigc++/sigc++.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
......@@ -58,8 +59,12 @@ class UNetReceiver
void receive();
void update();
inline std::string getName(){ return myname; }
// блокировать сохранение данный в SM
void setLockUpdate( bool st ){ lockUpdate = st; }
void setLockUpdate( bool st );
inline void resetTimeout(){ ptRecvTimeout.reset(); trTimeout.change(false); }
inline bool isRecvOK(){ return ptRecvTimeout.checkTime(); }
inline unsigned long getLostPacketsNum(){ return lostPackets; }
......@@ -78,6 +83,16 @@ class UNetReceiver
inline ost::IPV4Address getAddress(){ return addr; }
inline ost::tpport_t getPort(){ return port; }
/*! Коды событий */
enum Event
{
evOK, /*!< связь есть */
evTimeout /*!< потеря связи */
};
typedef sigc::slot<void,UNetReceiver*,Event> EventSlot;
void connectEvent( EventSlot sl );
protected:
SMInterface* shm;
......@@ -143,6 +158,10 @@ class UNetReceiver
int maxProcessingCount; /*!< максимальное число обрабатываемых за один раз сообщений */
bool lockUpdate; /*!< флаг блокировки сохранения принятых данных в SM */
UniSetTypes::uniset_mutex lockMutex;
EventSlot slEvent;
Trigger trTimeout;
struct ItemInfo
{
......
......@@ -23,7 +23,7 @@ s_thr(0)
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
s << "S(" << s_host << ":" << port << ")";
myname = s.str();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment