Commit cbe40d7e authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

[unet-multicast]: added simple tests, added "unet-multicast-tester"

parent 564b179b
bin_PROGRAMS = @PACKAGE@-unetexchange @PACKAGE@-unet-udp-tester
bin_PROGRAMS = @PACKAGE@-unetexchange @PACKAGE@-unet-udp-tester @PACKAGE@-unet-multicast-tester
# не забывайте править версию в2.pc-файле
UNETUDP_VER=@LIBVER@
......@@ -24,6 +24,10 @@ libUniSet2UNetUDP_la_SOURCES = UDPPacket.cc UDPTransport.cc MulticastTransport.
@PACKAGE@_unet_udp_tester_LDADD = $(top_builddir)/lib/libUniSet2.la $(POCO_LIBS)
@PACKAGE@_unet_udp_tester_CXXFLAGS = $(POCO_CFLAGS)
@PACKAGE@_unet_multicast_tester_SOURCES = UDPPacket.cc MulticastTransport.cc unet-multicast-tester.cc
@PACKAGE@_unet_multicast_tester_LDADD = $(top_builddir)/lib/libUniSet2.la $(POCO_LIBS)
@PACKAGE@_unet_multicast_tester_CXXFLAGS = $(POCO_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions
......
......@@ -25,11 +25,30 @@
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
xmlNode* MulticastReceiveTransport::getReceiveListNode( UniXML::iterator root )
{
UniXML::iterator it = root;
if( !it.find("multicast") )
return nullptr;
if( !it.goChildren() )
return nullptr;
if( !it.find("receive") )
return nullptr;
if( !it.goChildren() )
return nullptr;
return it.getCurrent();
}
// -------------------------------------------------------------------------
/*
* <item id="3000" unet_port="2048" unet_multicast_ip="192.168.0.255" unet_port2="2048" unet_multicast_ip2="192.169.0.255">
<multicast>
<receive>
1<group addr="224.0.0.1" addr2="224.0.0.1"/>
<group addr="224.0.0.1" addr2="224.0.0.1"/>
</receive>
<send>
<group addr="224.0.0.1"/>
......@@ -37,7 +56,7 @@ using namespace uniset;
</multicast>
</item>
*/
std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan, const std::string& section )
{
ostringstream fieldIp;
fieldIp << "unet_multicast_ip";
......@@ -68,11 +87,11 @@ std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFrom
if( !it.goChildren() )
throw SystemError("(MulticastReceiveTransport): empty <multicast> node");
if( !it.find("receive") )
throw SystemError("(MulticastReceiveTransport): not found <receive> in <multicast>");
if( !it.find(section) )
throw SystemError("(MulticastReceiveTransport): not found <" + section + "> in <multicast>");
if( !it.goChildren() )
throw SystemError("(MulticastReceiveTransport): empty <receive> groups");
throw SystemError("(MulticastReceiveTransport): empty <" + section + "> groups");
std::vector<Poco::Net::IPAddress> groups;
......@@ -199,12 +218,22 @@ ssize_t MulticastReceiveTransport::receive( void* r_buf, size_t sz )
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::isReadyForReceive( timeout_t tout )
{
return udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ);
}
// -------------------------------------------------------------------------
std::vector<Poco::Net::IPAddress> MulticastReceiveTransport::getGroups()
{
return groups;
}
// -------------------------------------------------------------------------
/*
* <item id="3000" unet_port="2048" unet_multicast_ip="192.168.0.255" unet_port2="2048" unet_multicast_ip2="192.169.0.255">
<multicast>
<receive>
1<group addr="224.0.0.1" addr2="224.0.0.1"/>
<group addr="224.0.0.1" addr2="224.0.0.1"/>
</receive>
<send>
<group addr="224.0.0.1"/>
......@@ -366,3 +395,7 @@ ssize_t MulticastSendTransport::send( const void* buf, size_t sz )
return udp->sendTo(buf, sz, saddr);
}
// -------------------------------------------------------------------------
std::vector<Poco::Net::IPAddress> MulticastSendTransport::getGroups()
{
return groups;
}
......@@ -31,7 +31,8 @@ namespace uniset
{
public:
static std::unique_ptr<MulticastReceiveTransport> createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan );
static std::unique_ptr<MulticastReceiveTransport> createFromXml(UniXML::iterator it, const std::string& defaultIP, int numChan, const std::string& section = "receive");
static xmlNode* getReceiveListNode( UniXML::iterator root );
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress>& joinGroups );
virtual ~MulticastReceiveTransport();
......@@ -43,7 +44,9 @@ namespace uniset
virtual bool createConnection(bool throwEx, timeout_t readTimeout, bool noblock) override;
virtual void disconnect() override;
virtual int getSocket() const override;
std::vector<Poco::Net::IPAddress> getGroups();
bool isReadyForReceive( timeout_t tout ) override;
virtual ssize_t receive(void* r_buf, size_t sz) override;
protected:
......@@ -68,6 +71,7 @@ namespace uniset
virtual bool createConnection(bool throwEx, timeout_t sendTimeout) override;
virtual int getSocket() const override;
std::vector<Poco::Net::IPAddress> getGroups();
// write
virtual bool isReadyForSend(timeout_t tout) override;
......
......@@ -124,6 +124,11 @@ ssize_t UDPReceiveTransport::receive( void* r_buf, size_t sz )
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
bool UDPReceiveTransport::isReadyForReceive( timeout_t tout )
{
return udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ);
}
// -------------------------------------------------------------------------
std::unique_ptr<UDPSendTransport> UDPSendTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
{
ostringstream fieldIp;
......
......@@ -43,6 +43,7 @@ namespace uniset
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive( void* r_buf, size_t sz ) override;
virtual bool isReadyForReceive(timeout_t tout) override;
protected:
std::unique_ptr<UDPReceiveU> udp;
......
......@@ -92,7 +92,16 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop");
const std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop");
UNetReceiver::UpdateStrategy r_upStrategy = UNetReceiver::strToUpdateStrategy(updateStrategy);
if( r_upStrategy == UNetReceiver::useUpdateUnknown )
{
ostringstream err;
err << myname << ": Unknown update strategy!!! '" << updateStrategy << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
no_sender = conf->getArgInt("--" + prefix + "-nosender", it.getProp("nosender"));
......@@ -115,977 +124,1302 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
UniXML::iterator n_it(nodes);
const string unet_transport(n_it.getProp2("unet_transport", "udp"));
string default_ip(n_it.getProp("unet_broadcast_ip"));
string default_ip2(n_it.getProp("unet_broadcast_ip2"));
const string unet_transport = conf->getArg2Param("--" + prefix + "-transport", n_it.getProp("unet_transport"), "udp");
if( unet_transport == "multicast" )
initMulticastTransport(n_it, n_field, n_fvalue, prefix);
else
initUDPTransport(n_it, n_field, n_fvalue, prefix);
for( auto&& r : recvlist )
{
default_ip = n_it.getProp("unet_multicast_ip");
default_ip2 = n_it.getProp("unet_multicast_ip2");
if( r.r1 )
{
r.r1->setReceiveTimeout(recvTimeout);
r.r1->setPrepareTime(prepareTime);
r.r1->setEvrunTimeout(evrunTimeout);
r.r1->setLostTimeout(lostTimeout);
r.r1->setReceivePause(recvpause);
r.r1->setUpdatePause(updatepause);
r.r1->setCheckConnectionPause(checkConnectionPause);
r.r1->setInitPause(initpause);
r.r1->setMaxDifferens(maxDiff);
r.r1->setMaxProcessingCount(maxProcessingCount);
r.r1->setUpdateStrategy(r_upStrategy);
}
if( r.r2 )
{
r.r2->setReceiveTimeout(recvTimeout);
r.r2->setPrepareTime(prepareTime);
r.r2->setEvrunTimeout(evrunTimeout);
r.r2->setLostTimeout(lostTimeout);
r.r2->setReceivePause(recvpause);
r.r2->setUpdatePause(updatepause);
r.r2->setCheckConnectionPause(checkConnectionPause);
r.r2->setInitPause(initpause);
r.r2->setMaxDifferens(maxDiff);
r.r2->setMaxProcessingCount(maxProcessingCount);
r.r2->setUpdateStrategy(r_upStrategy);
}
}
if( !n_it.goChildren() )
throw uniset::SystemError("(UNetExchange): Items not found for <nodes>");
if( sender )
{
sender->setSendPause(sendpause);
sender->setPackSendPause(packsendpause);
sender->setPackSendPauseFactor(packsendpauseFactor);
sender->setCheckConnectionPause(checkConnectionPause);
}
for( ; n_it.getCurrent(); n_it.goNext() )
if( sender2 )
{
if( n_it.getIntProp("unet_ignore") )
sender2->setSendPause(sendpause);
sender2->setPackSendPause(packsendpause);
sender2->setPackSendPauseFactor(packsendpauseFactor);
sender2->setCheckConnectionPause(checkConnectionPause);
}
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--" + prefix + "-heartbeat-id", it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
if( sidHeartBeat == DefaultObjectId )
{
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
// проверяем фильтры для подсетей
if( !uniset::check_filter(n_it, n_field, n_fvalue) )
continue;
int heartbeatTime = conf->getArgPInt("--" + prefix + "-heartbeat-time", it.getProp("heartbeatTime"), conf->getHeartBeatTime());
string n(n_it.getProp("name"));
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
if( n == conf->getLocalNodeName() )
maxHeartBeat = conf->getArgPInt("--" + prefix + "-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
{
test_id = conf->getSensorID("TestMode_S");
if( test_id == DefaultObjectId )
{
if( no_sender )
{
unetinfo << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
continue;
}
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
unetinfo << myname << "(init): test_id=" << test_id << endl;
if( unet_transport == "multicast" )
{
auto s1 = MulticastSendTransport::createFromXml(n_it, default_ip, 0);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
}
else // default
{
auto s1 = UDPSendTransport::createFromXml(n_it, default_ip, 0);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
}
activateTimeout = conf->getArgPInt("--" + prefix + "-activate-timeout", 20000);
sender->setSendPause(sendpause);
sender->setPackSendPause(packsendpause);
sender->setPackSendPauseFactor(packsendpauseFactor);
sender->setCheckConnectionPause(checkConnectionPause);
loga->add(sender->getLog());
if( ic )
ic->logAgregator()->add(loga);
try
{
sender2 = nullptr;
// создаём "писателя" для второго канала если задан
if( unet_transport == "multicast" )
{
if( n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty() )
{
auto s2 = MulticastSendTransport::createFromXml(n_it, default_ip2, 2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
}
else // default
{
if( n_it.getProp("unet_broadcast_ip2").empty() || !default_ip2.empty() )
{
auto s2 = UDPSendTransport::createFromXml(n_it, default_ip2, 2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
}
vmonit(s_field);
vmonit(s_fvalue);
vmonit(maxHeartBeat);
}
// -----------------------------------------------------------------------------
UNetExchange::~UNetExchange()
{
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistTransport( const std::string& transportID ) noexcept
{
for( const auto& it : recvlist )
{
if( it.r1->getTransportID() == transportID )
return true;
}
if( sender2 )
{
sender2->setSendPause(sendpause);
sender2->setCheckConnectionPause(checkConnectionPause);
loga->add(sender2->getLog());
}
else
unetwarn << myname << "(ignore): sender for Channel2 disabled " << endl;
}
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = nullptr;
unetcrit << myname << "IGNORE! reserv channel create error:" << ex.what() << endl;
}
return false;
}
// -----------------------------------------------------------------------------
void UNetExchange::startReceivers()
{
for( const auto& it : recvlist )
{
if( it.r1 )
it.r1->start();
continue;
}
if( it.r2 )
it.r2->start();
}
}
// -----------------------------------------------------------------------------
bool UNetExchange::waitSMReady()
{
// waiting for SM is ready...
int tout = uniset_conf()->getArgPInt("--unet-sm-ready-timeout", "", uniset_conf()->getNCReadyTimeout());
unetinfo << myname << "(init): add UNetReceiver.." << endl;
std::unique_ptr<UNetReceiveTransport> transport1;
timeout_t ready_timeout = uniset_conf()->getNCReadyTimeout();
if( unet_transport == "multicast" )
transport1 = MulticastReceiveTransport::createFromXml(n_it, default_ip, 0);
else // default
transport1 = UDPReceiveTransport::createFromXml(n_it, default_ip, 0);
if( tout > 0 )
ready_timeout = tout;
else if( tout < 0 )
ready_timeout = UniSetTimer::WaitUpTime;
if( checkExistTransport(transport1->ID()) )
if( !shm->waitSMreadyWithCancellation(ready_timeout, cancelled, 50) )
{
if( !cancelled )
{
unetinfo << myname << "(init): " << transport1->ID() << " already added! Ignore.." << endl;
continue;
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
unetcrit << err.str() << endl;
}
bool resp_invert = n_it.getIntProp("unet_respond_invert");
return false;
}
string s_resp_id(n_it.getProp("unet_respond1_id"));
uniset::ObjectId resp_id = uniset::DefaultObjectId;
return true;
}
// -----------------------------------------------------------------------------
void UNetExchange::timerInfo( const TimerMessage* tm )
{
if( !activated )
return;
if( !s_resp_id.empty() )
{
resp_id = conf->getSensorID(s_resp_id);
if( tm->id == tmStep )
step();
}
// -----------------------------------------------------------------------------
void UNetExchange::step() noexcept
{
if( !activated )
return;
if( resp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID.. Not found id for '" << s_resp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
if( sidHeartBeat != DefaultObjectId && ptHeartBeat.checkTime() )
{
try
{
shm->localSetValue(itHeartBeat, sidHeartBeat, maxHeartBeat, getId());
ptHeartBeat.reset();
}
string s_resp2_id(n_it.getProp("unet_respond2_id"));
uniset::ObjectId resp2_id = uniset::DefaultObjectId;
if( !s_resp2_id.empty() )
catch( const std::exception& ex )
{
resp2_id = conf->getSensorID(s_resp2_id);
if( resp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(2).. Not found id for '" << s_resp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
unetcrit << myname << "(step): (hb) " << ex.what() << std::endl;
}
}
string s_lp_id(n_it.getProp("unet_lostpackets1_id"));
uniset::ObjectId lp_id = uniset::DefaultObjectId;
for( auto&& it : recvlist )
it.step(shm, myname, unetlog);
}
if( !s_lp_id.empty() )
// -----------------------------------------------------------------------------
void UNetExchange::ReceiverInfo::step( const std::shared_ptr<SMInterface>& shm, const std::string& myname, std::shared_ptr<DebugStream>& unetlog ) noexcept
{
try
{
if( sidRespond != DefaultObjectId )
{
lp_id = conf->getSensorID(s_lp_id);
bool resp = ( (r1 && r1->isRecvOK()) || (r2 && r2->isRecvOK()) );
if( lp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID.. Not found id for '" << s_lp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
if( respondInvert )
resp = !resp;
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
uniset::ObjectId lp2_id = uniset::DefaultObjectId;
// сохраняем только если закончилось время на начальную инициализацию
if( (r1 && r1->isInitOK()) || (r2 && r2->isInitOK()) )
shm->localSetValue(itRespond, sidRespond, resp, shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(ReceiverInfo::step): (respond): " << ex.what() << std::endl;
}
if( !s_lp2_id.empty() )
try
{
if( sidLostPackets != DefaultObjectId )
{
lp2_id = conf->getSensorID(s_lp2_id);
if( lp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(2).. Not found id for '" << s_lp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
long l = 0;
string s_lp_comm_id(n_it.getProp("unet_lostpackets_id"));
uniset::ObjectId lp_comm_id = uniset::DefaultObjectId;
if( r1 )
l += r1->getLostPacketsNum();
if( !s_lp_comm_id.empty() )
{
lp_comm_id = conf->getSensorID(s_lp_comm_id);
if( r2 )
l += r2->getLostPacketsNum();
if( lp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(comm).. Not found id for '" << s_lp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
shm->localSetValue(itLostPackets, sidLostPackets, l, shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(ReceiverInfo::step): (lostpackets): " << ex.what() << std::endl;
}
string s_resp_comm_id(n_it.getProp("unet_respond_id"));
uniset::ObjectId resp_comm_id = uniset::DefaultObjectId;
if( !s_resp_comm_id.empty() )
try
{
if( sidChannelNum != DefaultObjectId )
{
resp_comm_id = conf->getSensorID(s_resp_comm_id);
if( resp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(comm).. Not found id for '" << s_resp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
long c = 0;
string s_numchannel_id(n_it.getProp("unet_numchannel_id"));
uniset::ObjectId numchannel_id = uniset::DefaultObjectId;
if( r1 && !r1->isLockUpdate() )
c = 1;
if( !s_numchannel_id.empty() )
{
numchannel_id = conf->getSensorID(s_numchannel_id);
if( r2 && !r2->isLockUpdate() )
c = 2;
if( numchannel_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown NumChannelID.. Not found id for '" << s_numchannel_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
shm->localSetValue(itChannelNum, sidChannelNum, c, shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(ReceiverInfo::step): (channelnum): " << ex.what() << std::endl;
}
string s_channelSwitchCount_id(n_it.getProp("unet_channelswitchcount_id"));
uniset::ObjectId channelswitchcount_id = uniset::DefaultObjectId;
if( !s_channelSwitchCount_id.empty() )
try
{
if( sidChannelSwitchCount != DefaultObjectId )
shm->localSetValue(itChannelSwitchCount, sidChannelSwitchCount, channelSwitchCount, shm->ID());
}
catch( const std::exception& ex )
{
unetcrit << myname << "(ReceiverInfo::step): (channelSwitchCount): " << ex.what() << std::endl;
}
}
// -----------------------------------------------------------------------------
void UNetExchange::sysCommand( const uniset::SystemMessage* sm )
{
switch( sm->command )
{
case SystemMessage::StartUp:
{
channelswitchcount_id = conf->getSensorID(s_channelSwitchCount_id);
if( channelswitchcount_id == uniset::DefaultObjectId )
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown ChannelSwitchCountID.. Not found id for '" << channelswitchcount_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
try
{
unetinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->async_run(logserv_host, logserv_port);
}
catch( std::exception& ex )
{
unetwarn << myname << "(init): run logserver FAILED. ERR: " << ex.what() << endl;
}
}
}
UNetReceiver::UpdateStrategy r_upStrategy = UNetReceiver::strToUpdateStrategy( n_it.getProp2("unet_update_strategy", updateStrategy) );
if( r_upStrategy == UNetReceiver::useUpdateUnknown )
{
ostringstream err;
err << myname << ": Unknown update strategy!!! '" << n_it.getProp2("unet_update_strategy", updateStrategy) << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
if( !waitSMReady() )
{
if( !cancelled )
{
// std::terminate();
uterminate();
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver " << transport1->ID() << endl;
auto r = make_shared<UNetReceiver>(std::move(transport1), shm, false, prefix);
return;
}
loga->add(r->getLog());
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep(initPause);
PassiveTimer ptAct(activateTimeout);
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
r->setLockUpdate(false);
while( !cancelled && !activated && !ptAct.checkTime() )
{
cout << myname << "(sysCommand): wait activate..." << endl;
msleep(300);
r->setReceiveTimeout(recvTimeout);
r->setPrepareTime(prepareTime);
r->setEvrunTimeout(evrunTimeout);
r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause);
r->setCheckConnectionPause(checkConnectionPause);
r->setInitPause(initpause);
r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r->setUpdateStrategy(r_upStrategy);
if( activated )
break;
}
shared_ptr<UNetReceiver> r2(nullptr);
if( cancelled )
return;
try
{
std::unique_ptr<UNetReceiveTransport> transport2 = nullptr;
if( !activated )
unetcrit << myname << "(sysCommand): ************* don`t activate?! ************" << endl;
if( unet_transport == "multicast" )
{
if (!n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty())
transport2 = MulticastReceiveTransport::createFromXml(n_it, default_ip2, 2);
}
else // default
{
if( !n_it.getProp("unet_broadcast_ip2").empty() || !default_ip2.empty() )
transport2 = UDPReceiveTransport::createFromXml(n_it, default_ip2, 2);
}
uniset::uniset_rwmutex_rlock l(mutex_start);
if( transport2 ) // создаём читателя по второму каналу
{
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver " << transport2->ID() << endl;
r2 = make_shared<UNetReceiver>(std::move(transport2), shm, false, prefix);
if( shm->isLocalwork() )
askSensors(UniversalIO::UIONotify);
}
loga->add(r2->getLog());
askTimer(tmStep, steptime);
startReceivers();
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
if( sender )
sender->start();
r2->setReceiveTimeout(recvTimeout);
r2->setPrepareTime(prepareTime);
r2->setEvrunTimeout(evrunTimeout);
r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause);
r2->setCheckConnectionPause(checkConnectionPause);
r2->setInitPause(initpause);
r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r2->setUpdateStrategy(r_upStrategy);
}
}
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = nullptr;
unetcrit << myname << "(ignore): DON`T CREATE reserve 'UNetReceiver'. error: " << ex.what() << endl;
if( sender2 )
sender2->start();
}
break;
ReceiverInfo ri(r, r2);
ri.setRespondID(resp_comm_id, resp_invert);
ri.setLostPacketsID(lp_comm_id);
ri.setChannelNumID(numchannel_id);
ri.setChannelSwitchCountID(channelswitchcount_id);
recvlist.emplace_back( std::move(ri) );
}
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--" + prefix + "-heartbeat-id", it.getProp("heartbeat_id"));
case SystemMessage::FoldUp:
case SystemMessage::Finish:
if( shm->isLocalwork() )
askSensors(UniversalIO::UIODontNotify);
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
break;
if( sidHeartBeat == DefaultObjectId )
case SystemMessage::WatchDog:
{
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
startReceivers(); // если уже запущены, то это приведёт к вызову UNetReceiver::forceUpdate() ( см. UNetReceiver::start() )
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт автономная работа, то нужно заказывать датчики
// если запущены в одном процессе с SharedMemory2,
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetExchange)
if( shm->isLocalwork() )
askSensors(UniversalIO::UIONotify);
}
break;
int heartbeatTime = conf->getArgPInt("--" + prefix + "-heartbeat-time", it.getProp("heartbeatTime"), conf->getHeartBeatTime());
case SystemMessage::LogRotate:
{
unetlogany << myname << "(sysCommand): logRotate" << std::endl;
string fname = unetlog->getLogFile();
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
if( !fname.empty() )
{
unetlog->logFile(fname, true);
unetlogany << myname << "(sysCommand): ***************** dlog LOG ROTATE *****************" << std::endl;
}
}
break;
maxHeartBeat = conf->getArgPInt("--" + prefix + "-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
default:
break;
}
else
}
// ------------------------------------------------------------------------------------------
void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
{
if( !shm->waitSMworking(test_id, activateTimeout, 50) )
{
test_id = conf->getSensorID("TestMode_S");
ostringstream err;
err << myname
<< "(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<< activateTimeout << " мсек";
if( test_id == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
unetcrit << err.str() << endl << flush;
uterminate();
throw SystemError(err.str());
}
unetinfo << myname << "(init): test_id=" << test_id << endl;
activateTimeout = conf->getArgPInt("--" + prefix + "-activate-timeout", 20000);
if( sender )
sender->askSensors(cmd);
if( ic )
ic->logAgregator()->add(loga);
vmonit(s_field);
vmonit(s_fvalue);
vmonit(maxHeartBeat);
}
// -----------------------------------------------------------------------------
UNetExchange::~UNetExchange()
{
if( sender2 )
sender2->askSensors(cmd);
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistTransport( const std::string& transportID ) noexcept
// ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( const uniset::SensorMessage* sm )
{
for( const auto& it : recvlist )
{
if( it.r1->getTransportID() == transportID )
return true;
}
if( sender )
sender->updateSensor( sm->id, sm->value );
return false;
if( sender2 )
sender2->updateSensor( sm->id, sm->value );
}
// -----------------------------------------------------------------------------
void UNetExchange::startReceivers()
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
{
for( const auto& it : recvlist )
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
if( it.r1 )
it.r1->start();
if( it.r2 )
it.r2->start();
activated = false;
uniset::uniset_rwmutex_wrlock l(mutex_start);
UniSetObject::activateObject();
initIterators();
activated = true;
}
return true;
}
// -----------------------------------------------------------------------------
bool UNetExchange::waitSMReady()
// ------------------------------------------------------------------------------------------
bool UNetExchange::deactivateObject()
{
// waiting for SM is ready...
int tout = uniset_conf()->getArgPInt("--unet-sm-ready-timeout", "", uniset_conf()->getNCReadyTimeout());
timeout_t ready_timeout = uniset_conf()->getNCReadyTimeout();
if( tout > 0 )
ready_timeout = tout;
else if( tout < 0 )
ready_timeout = UniSetTimer::WaitUpTime;
cancelled = true;
if( !shm->waitSMreadyWithCancellation(ready_timeout, cancelled, 50) )
if( activated )
{
if( !cancelled )
{
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
unetcrit << err.str() << endl;
}
return false;
unetinfo << myname << "(deactivateObject): disactivate.." << endl;
activated = false;
termReceivers();
termSenders();
}
return true;
}
// -----------------------------------------------------------------------------
void UNetExchange::timerInfo( const TimerMessage* tm )
{
if( !activated )
return;
if( tm->id == tmStep )
step();
return UniSetObject::deactivateObject();
}
// -----------------------------------------------------------------------------
void UNetExchange::step() noexcept
// ------------------------------------------------------------------------------------------
void UNetExchange::termReceivers()
{
if( !activated )
return;
if( sidHeartBeat != DefaultObjectId && ptHeartBeat.checkTime() )
for( const auto& it : recvlist )
{
try
{
shm->localSetValue(itHeartBeat, sidHeartBeat, maxHeartBeat, getId());
ptHeartBeat.reset();
if( it.r1 )
it.r1->stop();
}
catch( const std::exception& ex )
catch(...) {}
try
{
unetcrit << myname << "(step): (hb) " << ex.what() << std::endl;
if( it.r2 )
it.r2->stop();
}
catch(...) {}
}
for( auto&& it : recvlist )
it.step(shm, myname, unetlog);
}
// -----------------------------------------------------------------------------
void UNetExchange::ReceiverInfo::step( const std::shared_ptr<SMInterface>& shm, const std::string& myname, std::shared_ptr<DebugStream>& unetlog ) noexcept
// ------------------------------------------------------------------------------------------
void UNetExchange::termSenders()
{
try
{
if( sidRespond != DefaultObjectId )
{
bool resp = ( (r1 && r1->isRecvOK()) || (r2 && r2->isRecvOK()) );
if( respondInvert )
resp = !resp;
// сохраняем только если закончилось время на начальную инициализацию
if( (r1 && r1->isInitOK()) || (r2 && r2->isInitOK()) )
shm->localSetValue(itRespond, sidRespond, resp, shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(ReceiverInfo::step): (respond): " << ex.what() << std::endl;
if( sender )
sender->stop();
}
catch(...) {}
try
{
if( sidLostPackets != DefaultObjectId )
{
long l = 0;
if( r1 )
l += r1->getLostPacketsNum();
if( sender2 )
sender2->stop();
}
catch(...) {}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::initIterators() noexcept
{
shm->initIterator(itHeartBeat);
if( r2 )
l += r2->getLostPacketsNum();
if( sender )
sender->initIterators();
shm->localSetValue(itLostPackets, sidLostPackets, l, shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(ReceiverInfo::step): (lostpackets): " << ex.what() << std::endl;
}
if( sender2 )
sender2->initIterators();
try
{
if( sidChannelNum != DefaultObjectId )
{
long c = 0;
for( auto&& it : recvlist )
it.initIterators(shm);
}
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
{
cout << "Default prefix='unet'" << endl;
cout << "--prefix-name NameID - Идентификтора процесса." << endl;
cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--prefix-prepare-time msec - Время необходимое на подготовку (восстановление связи) при переключении на другой канал" << endl;
cout << "--prefix-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--prefix-recvpause msec - Пауза между приёмами. По умолчанию 10" << endl;
cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl;
cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с recvpause и sendpause). По умолчанию 100" << endl;
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - Максимальное количество пакетов обрабатываемых за один раз (если их слишком много)" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-update-strategy [thread,evloop] - Стратегия обновления данных в SM. " << endl;
cout << " 'thread' - у каждого UNetReceiver отдельный поток" << endl;
cout << " 'evloop' - используется общий (с приёмом сообщений) event loop" << endl;
cout << " По умолчанию: evloop" << endl;
if( r1 && !r1->isLockUpdate() )
c = 1;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << endl;
cout << "--prefix-nodes-confnode name - <Узел> откуда считывается список узлов. Default: <nodes>" << endl;
cout << "--prefix-nodes-filter-field name - Фильтрующее поле для списка узлов" << endl;
cout << "--prefix-nodes-filter-value name - Значение фильтрующего поля для списка узлов" << endl;
cout << endl;
cout << " Logs: " << endl;
cout << "--prefix-log-... - log control" << endl;
cout << " add-levels ..." << endl;
cout << " del-levels ..." << endl;
cout << " set-levels ..." << endl;
cout << " logfile filaname" << endl;
cout << " no-debug " << endl;
cout << " LogServer: " << endl;
cout << "--prefix-run-logserver - run logserver. Default: localhost:id" << endl;
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
cout << LogServer::help_print("prefix-logserver") << endl;
}
// -----------------------------------------------------------------------------
std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const char* const argv[], uniset::ObjectId icID,
const std::shared_ptr<SharedMemory>& ic, const std::string& prefix )
{
auto conf = uniset_conf();
if( r2 && !r2->isLockUpdate() )
c = 2;
string p("--" + prefix + "-name");
string name = conf->getArgParam(p, "UNetExchange1");
shm->localSetValue(itChannelNum, sidChannelNum, c, shm->ID());
}
}
catch( const std::exception& ex )
if( name.empty() )
{
unetcrit << myname << "(ReceiverInfo::step): (channelnum): " << ex.what() << std::endl;
cerr << "(unetexchange): Не задан name'" << endl;
return 0;
}
try
{
if( sidChannelSwitchCount != DefaultObjectId )
shm->localSetValue(itChannelSwitchCount, sidChannelSwitchCount, channelSwitchCount, shm->ID());
}
catch( const std::exception& ex )
ObjectId ID = conf->getObjectID(name);
if( ID == uniset::DefaultObjectId )
{
unetcrit << myname << "(ReceiverInfo::step): (channelSwitchCount): " << ex.what() << std::endl;
cerr << "(unetexchange): Not found ObjectID for '" << name
<< " in section '" << conf->getObjectsSection() << "'" << endl;
return 0;
}
dinfo << "(unetexchange): name = " << name << "(" << ID << ")" << endl;
return make_shared<UNetExchange>(ID, icID, ic, prefix);
}
// -----------------------------------------------------------------------------
void UNetExchange::sysCommand( const uniset::SystemMessage* sm )
void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept
{
switch( sm->command )
for( auto&& it : recvlist )
{
case SystemMessage::StartUp:
if( it.r1 == r )
{
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
if( ev == UNetReceiver::evTimeout )
{
try
{
unetinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->async_run(logserv_host, logserv_port);
// если нет второго канала или нет связи
// то и переключать не надо
if( !it.r2 || !it.r2->isRecvOK() )
return;
// пропала связь по первому каналу...
// переключаемся на второй
it.r1->setLockUpdate(true);
it.r2->setLockUpdate(false);
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": timeout for channel1.. select channel 2" << endl;
}
else if( ev == UNetReceiver::evOK )
{
// если связь восстановилась..
// проверяем, а что там со вторым каналом
// если у него связи нет, то забираем себе..
if( !it.r2 || !it.r2->isRecvOK() )
{
it.r1->setLockUpdate(false);
if( it.r2 )
it.r2->setLockUpdate(true);
// если какой-то канал уже работал
// то увеличиваем счётчик переключений
// а если ещё не работал, значит это просто первое включение канала
// а не переключение
if( it.channelSwitchCount > 0 )
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": link failed for channel2.. select again channel1.." << endl;
}
catch( std::exception& ex )
}
return;
}
if( it.r2 == r )
{
if( ev == UNetReceiver::evTimeout )
{
// если первого канала нет или нет связи
// то и переключать не надо
if( !it.r1 || !it.r1->isRecvOK() )
return;
// пропала связь по второму каналу...
// переключаемся на первый
it.r1->setLockUpdate(false);
it.r2->setLockUpdate(true);
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": timeout for channel2.. select channel 1" << endl;
}
else if( ev == UNetReceiver::evOK )
{
// если связь восстановилась..
// проверяем, а что там со первым каналом
// если у него связи нет, то забираем себе..
if( !it.r1 || !it.r1->isRecvOK() )
{
unetwarn << myname << "(init): run logserver FAILED. ERR: " << ex.what() << endl;
if( it.r1 )
it.r1->setLockUpdate(true);
it.r2->setLockUpdate(false);
// если какой-то канал уже работал
// то увеличиваем счётчик переключений
// а если ещё не работал, значит это просто первое включение канала
// а не переключение
if( it.channelSwitchCount > 0 )
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": link failed for channel1.. select again channel2.." << endl;
}
}
if( !waitSMReady() )
return;
}
}
}
// -----------------------------------------------------------------------------
uniset::SimpleInfo* UNetExchange::getInfo( const char* userparam )
{
uniset::SimpleInfo_var i = UniSetObject::getInfo(userparam);
ostringstream inf;
inf << i->info << endl;
inf << vmon.pretty_str() << endl;
inf << endl;
if( logserv )
{
inf << "LogServer: " << logserv_host << ":" << logserv_port
<< ( logserv->isRunning() ? " [RUNNIG]" : " [STOPPED]" )
<< endl
<< " " << logserv->getShortInfo()
<< endl;
}
else
inf << "LogServer: NONE" << endl;
inf << endl;
inf << "Receivers: " << endl;
for( const auto& r : recvlist )
{
inf << "[ " << endl;
inf << " chan1: " << ( r.r1 ? r.r1->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << " chan2: " << ( r.r2 ? r.r2->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << "]" << endl;
}
inf << endl;
inf << "Senders: " << endl;
inf << "[ " << endl;
inf << " chan1: " << ( sender ? sender->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << " chan2: " << ( sender2 ? sender2->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << "]" << endl;
inf << endl;
i->info = inf.str().c_str();
return i._retn();
}
// ----------------------------------------------------------------------------
void UNetExchange::initUDPTransport( UniXML::iterator n_it,
const std::string& n_field,
const std::string& n_fvalue,
const std::string& prefix )
{
auto conf = uniset_conf();
const string default_ip = n_it.getProp("unet_broadcast_ip");
const string default_ip2 = n_it.getProp("unet_broadcast_ip2");
if( !n_it.goChildren() )
throw uniset::SystemError("(UNetExchange): Items not found for <nodes>");
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
{
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
// проверяем фильтры для подсетей
if( !uniset::check_filter(n_it, n_field, n_fvalue) )
continue;
string n = n_it.getProp("name");
if( n == conf->getLocalNodeName() )
{
if( no_sender )
{
if( !cancelled )
unetinfo << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
continue;
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
auto s1 = UDPSendTransport::createFromXml(n_it, default_ip, 0);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
loga->add(sender->getLog());
try
{
sender2 = nullptr;
// создаём "писателя" для второго канала если задан
if( n_it.getProp("unet_broadcast_ip2").empty() || !default_ip2.empty() )
{
// std::terminate();
uterminate();
auto s2 = UDPSendTransport::createFromXml(n_it, default_ip2, 2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
return;
if( sender2 )
loga->add(sender2->getLog());
else
unetwarn << myname << "(ignore): sender for Channel2 disabled " << endl;
}
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = nullptr;
unetcrit << myname << "IGNORE! reserv channel create error:" << ex.what() << endl;
}
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep(initPause);
PassiveTimer ptAct(activateTimeout);
continue;
}
while( !cancelled && !activated && !ptAct.checkTime() )
unetinfo << myname << "(init): add UNetReceiver for node " << n_it.getProp("name") << endl;
std::unique_ptr<UNetReceiveTransport> transport1;
transport1 = UDPReceiveTransport::createFromXml(n_it, default_ip, 0);
if( checkExistTransport(transport1->ID()) )
{
unetinfo << myname << "(init): " << transport1->ID() << " already added! Ignore.." << endl;
continue;
}
bool resp_invert = n_it.getIntProp("unet_respond_invert");
string s_resp_id(n_it.getProp("unet_respond1_id"));
uniset::ObjectId resp_id = uniset::DefaultObjectId;
if( !s_resp_id.empty() )
{
resp_id = conf->getSensorID(s_resp_id);
if( resp_id == uniset::DefaultObjectId )
{
cout << myname << "(sysCommand): wait activate..." << endl;
msleep(300);
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID.. Not found id for '" << s_resp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_resp2_id(n_it.getProp("unet_respond2_id"));
uniset::ObjectId resp2_id = uniset::DefaultObjectId;
if( !s_resp2_id.empty() )
{
resp2_id = conf->getSensorID(s_resp2_id);
if( resp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(2).. Not found id for '" << s_resp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_id(n_it.getProp("unet_lostpackets1_id"));
uniset::ObjectId lp_id = uniset::DefaultObjectId;
if( !s_lp_id.empty() )
{
lp_id = conf->getSensorID(s_lp_id);
if( lp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID.. Not found id for '" << s_lp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
uniset::ObjectId lp2_id = uniset::DefaultObjectId;
if( !s_lp2_id.empty() )
{
lp2_id = conf->getSensorID(s_lp2_id);
if( lp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(2).. Not found id for '" << s_lp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_comm_id(n_it.getProp("unet_lostpackets_id"));
uniset::ObjectId lp_comm_id = uniset::DefaultObjectId;
if( !s_lp_comm_id.empty() )
{
lp_comm_id = conf->getSensorID(s_lp_comm_id);
if( lp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(comm).. Not found id for '" << s_lp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_resp_comm_id(n_it.getProp("unet_respond_id"));
uniset::ObjectId resp_comm_id = uniset::DefaultObjectId;
if( !s_resp_comm_id.empty() )
{
resp_comm_id = conf->getSensorID(s_resp_comm_id);
if( activated )
break;
if( resp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(comm).. Not found id for '" << s_resp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
if( cancelled )
return;
string s_numchannel_id(n_it.getProp("unet_numchannel_id"));
uniset::ObjectId numchannel_id = uniset::DefaultObjectId;
if( !activated )
unetcrit << myname << "(sysCommand): ************* don`t activate?! ************" << endl;
if( !s_numchannel_id.empty() )
{
numchannel_id = conf->getSensorID(s_numchannel_id);
if( numchannel_id == uniset::DefaultObjectId )
{
uniset::uniset_rwmutex_rlock l(mutex_start);
if( shm->isLocalwork() )
askSensors(UniversalIO::UIONotify);
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown NumChannelID.. Not found id for '" << s_numchannel_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
askTimer(tmStep, steptime);
startReceivers();
string s_channelSwitchCount_id(n_it.getProp("unet_channelswitchcount_id"));
uniset::ObjectId channelswitchcount_id = uniset::DefaultObjectId;
if( sender )
sender->start();
if( !s_channelSwitchCount_id.empty() )
{
channelswitchcount_id = conf->getSensorID(s_channelSwitchCount_id);
if( sender2 )
sender2->start();
if( channelswitchcount_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown ChannelSwitchCountID.. Not found id for '" << channelswitchcount_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
break;
case SystemMessage::FoldUp:
case SystemMessage::Finish:
if( shm->isLocalwork() )
askSensors(UniversalIO::UIODontNotify);
unetinfo << myname << "(init): (node='" << n << "') add basic receiver " << transport1->ID() << endl;
auto r = make_shared<UNetReceiver>(std::move(transport1), shm, false, prefix);
break;
loga->add(r->getLog());
case SystemMessage::WatchDog:
{
startReceivers(); // если уже запущены, то это приведёт к вызову UNetReceiver::forceUpdate() ( см. UNetReceiver::start() )
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
r->setLockUpdate(false);
r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт автономная работа, то нужно заказывать датчики
// если запущены в одном процессе с SharedMemory2,
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetExchange)
if( shm->isLocalwork() )
askSensors(UniversalIO::UIONotify);
}
break;
shared_ptr<UNetReceiver> r2(nullptr);
case SystemMessage::LogRotate:
try
{
unetlogany << myname << "(sysCommand): logRotate" << std::endl;
string fname = unetlog->getLogFile();
std::unique_ptr<UNetReceiveTransport> transport2 = nullptr;
if( !fname.empty() )
if( !n_it.getProp("unet_broadcast_ip2").empty() || !default_ip2.empty() )
transport2 = UDPReceiveTransport::createFromXml(n_it, default_ip2, 2);
if( transport2 ) // создаём читателя по второму каналу
{
unetlog->logFile(fname, true);
unetlogany << myname << "(sysCommand): ***************** dlog LOG ROTATE *****************" << std::endl;
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver " << transport2->ID() << endl;
r2 = make_shared<UNetReceiver>(std::move(transport2), shm, false, prefix);
loga->add(r2->getLog());
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
}
}
break;
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = nullptr;
unetcrit << myname << "(ignore): DON`T CREATE reserve 'UNetReceiver'. error: " << ex.what() << endl;
}
default:
break;
ReceiverInfo ri(r, r2);
ri.setRespondID(resp_comm_id, resp_invert);
ri.setLostPacketsID(lp_comm_id);
ri.setChannelNumID(numchannel_id);
ri.setChannelSwitchCountID(channelswitchcount_id);
recvlist.emplace_back( std::move(ri) );
}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
// ----------------------------------------------------------------------------
/*
<nodes ...>
<item id="3000" unet_multicast_receive_from_all_nodes="1" unet_multicast_ip="192.168.0.255" unet_multicast_ip2="192.169.0.255">
<multicast>
<send>
<group addr="224.0.0.1"/>
</send>
</multicast>
</item>
...
</nodes>
*/
void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
const std::string& n_field,
const std::string& n_fvalue,
const std::string& prefix )
{
if( !shm->waitSMworking(test_id, activateTimeout, 50) )
{
ostringstream err;
err << myname
<< "(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<< activateTimeout << " мсек";
unetcrit << err.str() << endl << flush;
uterminate();
throw SystemError(err.str());
}
if( sender )
sender->askSensors(cmd);
if( sender2 )
sender2->askSensors(cmd);
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( const uniset::SensorMessage* sm )
{
if( sender )
sender->updateSensor( sm->id, sm->value );
if( sender2 )
sender2->updateSensor( sm->id, sm->value );
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated = false;
uniset::uniset_rwmutex_wrlock l(mutex_start);
UniSetObject::activateObject();
initIterators();
activated = true;
}
auto conf = uniset_conf();
const string default_ip = n_it.getProp("unet_multicast_ip");
const string default_ip2 = n_it.getProp("unet_multicast_ip2");
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::deactivateObject()
{
cancelled = true;
if( !n_it.goChildren() )
throw uniset::SystemError("(UNetExchange): Items not found for <nodes>");
if( activated )
{
unetinfo << myname << "(deactivateObject): disactivate.." << endl;
activated = false;
termReceivers();
termSenders();
}
xmlNode* selfNode = nullptr;
return UniSetObject::deactivateObject();
}
// ------------------------------------------------------------------------------------------
void UNetExchange::termReceivers()
{
for( const auto& it : recvlist )
// init senders
for( ; n_it.getCurrent(); n_it.goNext() )
{
try
if( n_it.getIntProp("unet_ignore") )
{
if( it.r1 )
it.r1->stop();
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
catch(...) {}
try
// проверяем фильтры для подсетей
if( !uniset::check_filter(n_it, n_field, n_fvalue) )
continue;
string n = n_it.getProp("name");
if( n == conf->getLocalNodeName() )
{
if( it.r2 )
it.r2->stop();
selfNode = n_it.getCurrent();
if( no_sender )
{
unetinfo << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
break;
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
auto s1 = MulticastSendTransport::createFromXml(n_it, default_ip, 0);
unetinfo << myname << "(init): send (channel1) to multicast groups: " << endl;
for( const auto& gr : s1->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
loga->add(sender->getLog());
try
{
sender2 = nullptr;
if( n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty() )
{
auto s2 = MulticastSendTransport::createFromXml(n_it, default_ip2, 2);
if( s2 )
{
unetinfo << myname << "(init): send (channel2) to multicast groups: " << endl;
for( const auto& gr : s2->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
}
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
if( sender2 )
loga->add(sender2->getLog());
else
unetwarn << myname << "(ignore): sender for Channel2 disabled " << endl;
}
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = nullptr;
unetcrit << myname << "IGNORE! reserv channel create error:" << ex.what() << endl;
}
break;
}
catch(...) {}
}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::termSenders()
{
try
// INIT RECEIVERS
if( selfNode == nullptr )
{
if( sender )
sender->stop();
unetwarn << myname << "IGNORE! RECEIVE DISABLED.." << endl;
return;
}
catch(...) {}
try
UniXML::iterator it(selfNode);
if( !it.getIntProp("unet_multicast_receive_from_all_nodes") )
{
if( sender2 )
sender2->stop();
initMulticastReceiverForNode(n_it, default_ip, default_ip2, "receive", prefix);
return;
}
catch(...) {}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::initIterators() noexcept
{
shm->initIterator(itHeartBeat);
if( sender )
sender->initIterators();
// init receivers (by nodes)
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
{
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
// проверяем фильтры для подсетей
if( !uniset::check_filter(n_it, n_field, n_fvalue) )
continue;
if( sender2 )
sender2->initIterators();
string n = n_it.getProp("name");
for( auto&& it : recvlist )
it.initIterators(shm);
}
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
{
cout << "Default prefix='unet'" << endl;
cout << "--prefix-name NameID - Идентификтора процесса." << endl;
cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--prefix-prepare-time msec - Время необходимое на подготовку (восстановление связи) при переключении на другой канал" << endl;
cout << "--prefix-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--prefix-recvpause msec - Пауза между приёмами. По умолчанию 10" << endl;
cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl;
cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с recvpause и sendpause). По умолчанию 100" << endl;
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - Максимальное количество пакетов обрабатываемых за один раз (если их слишком много)" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-update-strategy [thread,evloop] - Стратегия обновления данных в SM. " << endl;
cout << " 'thread' - у каждого UNetReceiver отдельный поток" << endl;
cout << " 'evloop' - используется общий (с приёмом сообщений) event loop" << endl;
cout << " По умолчанию: evloop" << endl;
if( n == conf->getLocalNodeName() )
continue;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << endl;
cout << "--prefix-nodes-confnode name - <Узел> откуда считывается список узлов. Default: <nodes>" << endl;
cout << "--prefix-nodes-filter-field name - Фильтрующее поле для списка узлов" << endl;
cout << "--prefix-nodes-filter-value name - Значение фильтрующего поля для списка узлов" << endl;
cout << endl;
cout << " Logs: " << endl;
cout << "--prefix-log-... - log control" << endl;
cout << " add-levels ..." << endl;
cout << " del-levels ..." << endl;
cout << " set-levels ..." << endl;
cout << " logfile filaname" << endl;
cout << " no-debug " << endl;
cout << " LogServer: " << endl;
cout << "--prefix-run-logserver - run logserver. Default: localhost:id" << endl;
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
cout << LogServer::help_print("prefix-logserver") << endl;
initMulticastReceiverForNode(n_it, default_ip, default_ip2, "send", prefix);
}
}
// -----------------------------------------------------------------------------
std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const char* const argv[], uniset::ObjectId icID,
const std::shared_ptr<SharedMemory>& ic, const std::string& prefix )
// ----------------------------------------------------------------------------
void UNetExchange::initMulticastReceiverForNode(UniXML::iterator n_it,
const std::string& default_ip,
const std::string& default_ip2,
const std::string& section,
const std::string& prefix )
{
auto conf = uniset_conf();
string p("--" + prefix + "-name");
string name = conf->getArgParam(p, "UNetExchange1");
unetinfo << myname << "(init): add UNetReceiver for node " << n_it.getProp("name") << endl;
auto transport1 = MulticastReceiveTransport::createFromXml(n_it, default_ip, 0, section);
if( name.empty() )
if( checkExistTransport(transport1->ID()) )
{
cerr << "(unetexchange): Не задан name'" << endl;
return 0;
unetinfo << myname << "(init): " << transport1->ID() << " already added! Ignore.." << endl;
return;
}
ObjectId ID = conf->getObjectID(name);
bool resp_invert = n_it.getIntProp("unet_respond_invert");
if( ID == uniset::DefaultObjectId )
string s_resp_id(n_it.getProp("unet_respond1_id"));
uniset::ObjectId resp_id = uniset::DefaultObjectId;
if( !s_resp_id.empty() )
{
cerr << "(unetexchange): Not found ObjectID for '" << name
<< " in section '" << conf->getObjectsSection() << "'" << endl;
return 0;
resp_id = conf->getSensorID(s_resp_id);
if( resp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID.. Not found id for '" << s_resp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
dinfo << "(unetexchange): name = " << name << "(" << ID << ")" << endl;
return make_shared<UNetExchange>(ID, icID, ic, prefix);
}
// -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept
{
for( auto&& it : recvlist )
string s_resp2_id(n_it.getProp("unet_respond2_id"));
uniset::ObjectId resp2_id = uniset::DefaultObjectId;
if( !s_resp2_id.empty() )
{
if( it.r1 == r )
resp2_id = conf->getSensorID(s_resp2_id);
if( resp2_id == uniset::DefaultObjectId )
{
if( ev == UNetReceiver::evTimeout )
{
// если нет второго канала или нет связи
// то и переключать не надо
if( !it.r2 || !it.r2->isRecvOK() )
return;
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(2).. Not found id for '" << s_resp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
// пропала связь по первому каналу...
// переключаемся на второй
it.r1->setLockUpdate(true);
it.r2->setLockUpdate(false);
it.channelSwitchCount++;
string s_lp_id(n_it.getProp("unet_lostpackets1_id"));
uniset::ObjectId lp_id = uniset::DefaultObjectId;
dlog8 << myname << "(event): " << r->getName()
<< ": timeout for channel1.. select channel 2" << endl;
}
else if( ev == UNetReceiver::evOK )
{
// если связь восстановилась..
// проверяем, а что там со вторым каналом
// если у него связи нет, то забираем себе..
if( !it.r2 || !it.r2->isRecvOK() )
{
it.r1->setLockUpdate(false);
if( !s_lp_id.empty() )
{
lp_id = conf->getSensorID(s_lp_id);
if( it.r2 )
it.r2->setLockUpdate(true);
if( lp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID.. Not found id for '" << s_lp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
// если какой-то канал уже работал
// то увеличиваем счётчик переключений
// а если ещё не работал, значит это просто первое включение канала
// а не переключение
if( it.channelSwitchCount > 0 )
it.channelSwitchCount++;
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
uniset::ObjectId lp2_id = uniset::DefaultObjectId;
dlog8 << myname << "(event): " << r->getName()
<< ": link failed for channel2.. select again channel1.." << endl;
}
}
if( !s_lp2_id.empty() )
{
lp2_id = conf->getSensorID(s_lp2_id);
return;
if( lp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(2).. Not found id for '" << s_lp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
if( it.r2 == r )
string s_lp_comm_id(n_it.getProp("unet_lostpackets_id"));
uniset::ObjectId lp_comm_id = uniset::DefaultObjectId;
if( !s_lp_comm_id.empty() )
{
lp_comm_id = conf->getSensorID(s_lp_comm_id);
if( lp_comm_id == uniset::DefaultObjectId )
{
if( ev == UNetReceiver::evTimeout )
{
// если первого канала нет или нет связи
// то и переключать не надо
if( !it.r1 || !it.r1->isRecvOK() )
return;
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(comm).. Not found id for '" << s_lp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
// пропала связь по второму каналу...
// переключаемся на первый
it.r1->setLockUpdate(false);
it.r2->setLockUpdate(true);
it.channelSwitchCount++;
string s_resp_comm_id(n_it.getProp("unet_respond_id"));
uniset::ObjectId resp_comm_id = uniset::DefaultObjectId;
dlog8 << myname << "(event): " << r->getName()
<< ": timeout for channel2.. select channel 1" << endl;
}
else if( ev == UNetReceiver::evOK )
{
// если связь восстановилась..
// проверяем, а что там со первым каналом
// если у него связи нет, то забираем себе..
if( !it.r1 || !it.r1->isRecvOK() )
{
if( it.r1 )
it.r1->setLockUpdate(true);
if( !s_resp_comm_id.empty() )
{
resp_comm_id = conf->getSensorID(s_resp_comm_id);
it.r2->setLockUpdate(false);
if( resp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(comm).. Not found id for '" << s_resp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
// если какой-то канал уже работал
// то увеличиваем счётчик переключений
// а если ещё не работал, значит это просто первое включение канала
// а не переключение
if( it.channelSwitchCount > 0 )
it.channelSwitchCount++;
string s_numchannel_id(n_it.getProp("unet_numchannel_id"));
uniset::ObjectId numchannel_id = uniset::DefaultObjectId;
dlog8 << myname << "(event): " << r->getName()
<< ": link failed for channel1.. select again channel2.." << endl;
}
}
if( !s_numchannel_id.empty() )
{
numchannel_id = conf->getSensorID(s_numchannel_id);
return;
if( numchannel_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown NumChannelID.. Not found id for '" << s_numchannel_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
}
// -----------------------------------------------------------------------------
uniset::SimpleInfo* UNetExchange::getInfo( const char* userparam )
{
uniset::SimpleInfo_var i = UniSetObject::getInfo(userparam);
ostringstream inf;
inf << i->info << endl;
inf << vmon.pretty_str() << endl;
inf << endl;
string s_channelSwitchCount_id(n_it.getProp("unet_channelswitchcount_id"));
uniset::ObjectId channelswitchcount_id = uniset::DefaultObjectId;
if( logserv )
if( !s_channelSwitchCount_id.empty() )
{
inf << "LogServer: " << logserv_host << ":" << logserv_port
<< ( logserv->isRunning() ? " [RUNNIG]" : " [STOPPED]" )
<< endl
<< " " << logserv->getShortInfo()
<< endl;
channelswitchcount_id = conf->getSensorID(s_channelSwitchCount_id);
if( channelswitchcount_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown ChannelSwitchCountID.. Not found id for '" << channelswitchcount_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
else
inf << "LogServer: NONE" << endl;
inf << endl;
inf << "Receivers: " << endl;
unetinfo << myname << "(init): (node='" << n_it.getProp("name") << "') add channel1 receiver " << transport1->ID() << endl;
unetinfo << myname << "(init): receive (channel1) from multicast groups: " << endl;
for( const auto& r : recvlist )
for( const auto& gr : transport1->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
auto r = make_shared<UNetReceiver>(std::move(transport1), shm, false, prefix);
loga->add(r->getLog());
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
r->setLockUpdate(false);
r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
shared_ptr<UNetReceiver> r2(nullptr);
try
{
inf << "[ " << endl;
inf << " chan1: " << ( r.r1 ? r.r1->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << " chan2: " << ( r.r2 ? r.r2->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << "]" << endl;
}
std::unique_ptr<MulticastReceiveTransport> transport2 = nullptr;
inf << endl;
if (!n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty())
transport2 = MulticastReceiveTransport::createFromXml(n_it, default_ip2, 2, section);
inf << "Senders: " << endl;
inf << "[ " << endl;
inf << " chan1: " << ( sender ? sender->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << " chan2: " << ( sender2 ? sender2->getShortInfo() : "[ DISABLED ]" ) << endl;
inf << "]" << endl;
inf << endl;
if( transport2 ) // создаём читателя по второму каналу
{
unetinfo << myname << "(init): (node='" << n_it.getProp("name") << "') add channel2 receiver " << transport2->ID() << endl;
unetinfo << myname << "(init): receive(channel2) from multicast groups: " << endl;
i->info = inf.str().c_str();
return i._retn();
}
// ----------------------------------------------------------------------------
for( const auto& gr : transport2->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
r2 = make_shared<UNetReceiver>(std::move(transport2), shm, false, prefix);
loga->add(r2->getLog());
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
}
}
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = nullptr;
unetcrit << myname << "(ignore): DON`T CREATE reserve 'UNetReceiver'. error: " << ex.what() << endl;
}
ReceiverInfo ri(r, r2);
ri.setRespondID(resp_comm_id, resp_invert);
ri.setLostPacketsID(lp_comm_id);
ri.setChannelNumID(numchannel_id);
ri.setChannelSwitchCountID(channelswitchcount_id);
recvlist.emplace_back( std::move(ri) );
}
......@@ -242,6 +242,9 @@ namespace uniset
void termSenders();
void termReceivers();
void initMulticastTransport( UniXML::iterator nodes, const std::string& n_field, const std::string& n_fvalue, const std::string& prefix );
void initMulticastReceiverForNode( UniXML::iterator n_it, const std::string& default_ip1, const std::string& default_ip2, const std::string& section, const std::string& prefix);
void initUDPTransport(UniXML::iterator nodes, const std::string& n_field, const std::string& n_fvalue, const std::string& prefix);
void initIterators() noexcept;
void startReceivers();
......
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#ifndef UNetTransport_H_
#define UNetTransport_H_
// -------------------------------------------------------------------------
#include <string>
#include "PassiveTimer.h" // for typedef timeout_t
// -------------------------------------------------------------------------
namespace uniset
{
// Интерфейс для работы получения данных по сети
class UNetReceiveTransport
{
public:
virtual ~UNetReceiveTransport() {}
virtual bool isConnected() const = 0;
virtual std::string toString() const = 0;
virtual std::string ID() const noexcept = 0;
virtual bool createConnection( bool throwEx, timeout_t recvTimeout, bool noblock ) = 0;
virtual int getSocket() const = 0;
virtual bool isReadyForReceive(timeout_t tout) = 0;
virtual ssize_t receive( void* r_buf, size_t sz ) = 0;
virtual void disconnect() = 0;
};
// Интерфейс для посылки данных в сеть
class UNetSendTransport
{
public:
virtual ~UNetSendTransport() {}
virtual bool isConnected() const = 0;
virtual std::string toString() const = 0;
virtual bool createConnection( bool throwEx, timeout_t sendTimeout ) = 0;
virtual int getSocket() const = 0;
// write
virtual bool isReadyForSend( timeout_t tout ) = 0;
virtual ssize_t send( const void* r_buf, size_t sz ) = 0;
};
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // UNetTransport_H_
// -------------------------------------------------------------------------
if HAVE_TESTS
noinst_PROGRAMS = tests-with-sm urecv-perf-test
noinst_PROGRAMS = tests-with-sm tests-multicast-with-sm urecv-perf-test
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc test_unetmulticast.cc
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc
tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(top_builddir)/extensions/UNetUDP/libUniSet2UNetUDP.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(SIGC_LIBS) $(POCO_LIBS)
tests_with_sm_CPPFLAGS = -I$(top_builddir)/include -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/UNetUDP \
-I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
-I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
tests_multicast_with_sm_SOURCES = tests_multicast_with_sm.cc test_unetmulticast.cc test_unetudp_multicast.cc
tests_multicast_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(top_builddir)/extensions/UNetUDP/libUniSet2UNetUDP.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(SIGC_LIBS) $(POCO_LIBS)
tests_multicast_with_sm_CPPFLAGS = -I$(top_builddir)/include -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/UNetUDP \
-I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
urecv_perf_test_SOURCES = urecv_perf_test.cc
urecv_perf_test_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
......@@ -18,7 +27,7 @@ urecv_perf_test_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/ex
$(SIGC_LIBS) $(POCO_LIBS)
urecv_perf_test_CPPFLAGS = -I$(top_builddir)/include -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/UNetUDP \
-I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
-I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
include $(top_builddir)/testsuite/testsuite-common.mk
......
......@@ -98,7 +98,7 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
REQUIRE( ret == s_buf.len );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][udp][packetqueue]")
{
UNetReceiver::PacketQueue q;
......@@ -136,7 +136,7 @@ TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
q.pop();
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udp][udpmessage]")
{
SECTION("UDPMessage::isFull()")
{
......@@ -184,7 +184,7 @@ TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: sizeOf", "[unetudp][sizeof]")
TEST_CASE("[UNetUDP]: sizeOf", "[unetudp][udp][sizeof]")
{
UniSetUDP::UDPMessage m;
......@@ -200,7 +200,7 @@ TEST_CASE("[UNetUDP]: sizeOf", "[unetudp][sizeof]")
}
// -----------------------------------------------------------------------------
#if 0
TEST_CASE("[UNetUDP]: respond sensor", "[unetudp]")
TEST_CASE("[UNetUDP]: respond sensor", "[unetudp][udp]")
{
InitTest();
......@@ -214,7 +214,7 @@ TEST_CASE("[UNetUDP]: respond sensor", "[unetudp]")
}
#endif
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check sender", "[unetudp][sender]")
TEST_CASE("[UNetUDP]: check sender", "[unetudp][udp][sender]")
{
InitTest();
......@@ -285,7 +285,7 @@ TEST_CASE("[UNetUDP]: check sender", "[unetudp][sender]")
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check receiver", "[unetudp][receiver]")
TEST_CASE("[UNetUDP]: check receiver", "[unetudp][udp][receiver]")
{
InitTest();
......@@ -335,7 +335,7 @@ TEST_CASE("[UNetUDP]: check receiver", "[unetudp][receiver]")
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check packets 'hole'", "[unetudp][udphole]")
TEST_CASE("[UNetUDP]: check packets 'hole'", "[unetudp][udp][udphole]")
{
InitTest();
......@@ -386,7 +386,7 @@ TEST_CASE("[UNetUDP]: check packets 'hole'", "[unetudp][udphole]")
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check packets 'MaxDifferens'", "[unetudp][maxdifferens]")
TEST_CASE("[UNetUDP]: check packets 'MaxDifferens'", "[unetudp][udp][maxdifferens]")
{
InitTest();
......@@ -414,7 +414,7 @@ TEST_CASE("[UNetUDP]: check packets 'MaxDifferens'", "[unetudp][maxdifferens]")
REQUIRE( ui->getValue(node2_lostpackets_as) > nlost );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: bad packet number", "[unetudp][badnumber]")
TEST_CASE("[UNetUDP]: bad packet number", "[unetudp][udp][badnumber]")
{
InitTest();
......@@ -455,7 +455,7 @@ TEST_CASE("[UNetUDP]: bad packet number", "[unetudp][badnumber]")
REQUIRE( ui->getValue(8) == 160 );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: switching channels", "[unetudp][chswitch]")
TEST_CASE("[UNetUDP]: switching channels", "[unetudp][udp][chswitch]")
{
InitTest();
UniSetUDP::UDPMessage pack;
......@@ -479,7 +479,7 @@ TEST_CASE("[UNetUDP]: switching channels", "[unetudp][chswitch]")
REQUIRE( ui->getValue(node1_channelSwitchCount_as) == 0 );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][sender]")
TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][udp][sender]")
{
InitTest();
......
#include <catch.hpp>
// -----------------------------------------------------------------------------
#include <memory>
#include "UniSetTypes.h"
#include "UInterface.h"
#include "UDPPacket.h"
#include "UDPCore.h"
#include "MulticastTransport.h"
// -----------------------------------------------------------------------------
// include-ы искплючительно для того, чтобы их обработал gcov (покрытие кода)
#include "UNetReceiver.h"
#include "UNetSender.h"
#include "UNetExchange.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset;
// -----------------------------------------------------------------------------
static shared_ptr<UInterface> ui = nullptr;
static ObjectId aid = 2;
static std::unique_ptr<MulticastReceiveTransport> udp_r = nullptr;
static std::unique_ptr<MulticastSendTransport> udp_s = nullptr;
static int s_port = 3003; // Node2
static int s_nodeID = 3003;
static int s_procID = 123;
static int s_numpack = 1;
static ObjectId node2_respond_s = 12;
static ObjectId node2_lostpackets_as = 13;
static int maxDifferense = 5; // см. unetudp-test-configure.xml --unet-maxdifferense
static int recvTimeout = 1000; // --unet-recv-timeout
static ObjectId node1_numchannel_as = 14;
static ObjectId node1_channelSwitchCount_as = 15;
// -----------------------------------------------------------------------------
static void initHelpers()
{
if( udp_r && udp_s )
return;
UniXML xml("unetudp-test-configure.xml");
UniXML::iterator it = xml.findNode(xml.getFirstNode(), "nodes");
REQUIRE( it.getCurrent() );
REQUIRE( it.goChildren() );
REQUIRE( it.findName("item", "localhost", false) );
REQUIRE( it.getName() == "item" );
REQUIRE( it.getProp("name") == "localhost" );
if( !udp_r )
{
// udp_r = MulticastReceiveTransport::createFromXml(it, "127.0.0.1", 0, "send");
std::vector<Poco::Net::IPAddress> groups;
groups.emplace_back("238.255.1.1");
udp_r = make_unique<MulticastReceiveTransport>("127.0.0.1", 3000, groups);
REQUIRE( udp_r->toString() == "127.0.0.1:3000" );
REQUIRE( udp_r->createConnection(false, 5000, true) );
}
if( !udp_s )
{
// udp_s = MulticastSendTransport::createFromXml(it, "127.0.0.1", 0);
std::vector<Poco::Net::IPAddress> groups;
groups.emplace_back("238.255.1.2");
udp_s = make_unique<MulticastSendTransport>("127.0.0.1", 3002, groups);
REQUIRE( udp_s->toString() == "127.0.0.1:3002" );
REQUIRE( udp_s->createConnection(false, 5000) );
}
}
// -----------------------------------------------------------------------------
void InitMulticastTest()
{
auto conf = uniset_conf();
CHECK( conf != nullptr );
if( ui == nullptr )
{
ui = make_shared<UInterface>();
// UI понадобиться для проверки записанных в SM значений.
CHECK( ui->getObjectIndex() != nullptr );
CHECK( ui->getConf() == conf );
CHECK( ui->waitReady(aid, 10000) );
}
if( udp_r == nullptr )
initHelpers();
if( udp_s == nullptr )
initHelpers();
}
// -----------------------------------------------------------------------------
// pnum - минималный номер ожидаемого пакета ( 0 - любой пришедщий )
// ncycle - сколько пакетов разрешено "пропустить" прежде чем дождёмся нужного.. (чтобы не ждать бесконечно)
static UniSetUDP::UDPMessage mreceive( unsigned int pnum = 0, timeout_t tout = 2000, int ncycle = 20 )
{
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
while( ncycle > 0 )
{
if( !udp_r->isReadyForReceive(tout) )
break;
size_t ret = udp_r->receive(&(buf.data), sizeof(buf.data) );
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, buf);
if( sz == 0 || pnum == 0 || ( pnum > 0 && pack.num >= pnum ) ) // -V560
break;
REQUIRE( pack.magic == UniSetUDP::UNETUDP_MAGICNUM );
ncycle--;
}
// if( pnum > 0 && pack.num < pnum )
// return UniSetUDP::UDPMessage(); // empty message
return pack;
}
// -----------------------------------------------------------------------------
void msend( UniSetUDP::UDPMessage& pack, int tout = 2000 )
{
CHECK( udp_s->isReadyForSend(tout) );
pack.nodeID = s_nodeID;
pack.procID = s_procID;
pack.num = s_numpack++;
UniSetUDP::UDPPacket s_buf;
pack.transport_msg(s_buf);
size_t ret = udp_s->send(&s_buf.data, s_buf.len);
REQUIRE( ret == s_buf.len );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check multicast sender", "[unetudp][multicast][sender]")
{
InitMulticastTest();
SECTION("Test: read default pack...")
{
UniSetUDP::UDPMessage pack = mreceive();
REQUIRE( pack.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
for( size_t i = 0; i < pack.asize(); i++ )
{
REQUIRE( pack.a_dat[i].val == i + 1 );
}
REQUIRE( pack.dValue(0) == 1 );
REQUIRE( pack.dValue(1) == 0 );
// т.к. данные в SM не менялись, то должен придти пакет с тем же номером что и был..
UniSetUDP::UDPMessage pack2 = mreceive();
REQUIRE( pack2.num == pack.num );
}
SECTION("Test: change AI data...")
{
UniSetUDP::UDPMessage pack0 = mreceive();
ui->setValue(2, 100);
REQUIRE( ui->getValue(2) == 100 );
msleep(120);
UniSetUDP::UDPMessage pack = mreceive( pack0.num + 1 );
REQUIRE( pack.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 100 );
ui->setValue(2, 250);
REQUIRE( ui->getValue(2) == 250 );
msleep(120);
UniSetUDP::UDPMessage pack2 = mreceive( pack.num + 1 );
REQUIRE( pack2.num != 0 );
REQUIRE( pack2.num > pack.num );
REQUIRE( pack2.asize() == 4 );
REQUIRE( pack2.dsize() == 2 );
REQUIRE( pack2.a_dat[0].val == 250 );
}
SECTION("Test: change DI data...")
{
UniSetUDP::UDPMessage pack0 = mreceive();
ui->setValue(6, 1);
REQUIRE( ui->getValue(6) == 1 );
msleep(120);
UniSetUDP::UDPMessage pack = mreceive( pack0.num + 1 );
REQUIRE( pack.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.dValue(0) == 1 );
ui->setValue(6, 0);
REQUIRE( ui->getValue(6) == 0 );
msleep(120);
UniSetUDP::UDPMessage pack2 = mreceive( pack.num + 1 );
REQUIRE( pack2.num != 0 );
REQUIRE( pack2.num > pack.num );
REQUIRE( pack2.asize() == 4 );
REQUIRE( pack2.dsize() == 2 );
REQUIRE( pack2.dValue(0) == 0 );
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check multicast receiver", "[unetudp][multicast][receiver]")
{
InitMulticastTest();
SECTION("Test: send data pack...")
{
REQUIRE( ui->getValue(node2_respond_s) == 0 );
UniSetUDP::UDPMessage pack;
pack.addAData(8, 100);
pack.addAData(9, -100);
pack.addDData(10, true);
pack.addDData(11, false);
REQUIRE( ui->getValue(8) == 0 );
REQUIRE( ui->getValue(9) == 0 );
REQUIRE( ui->getValue(10) == 0 );
REQUIRE( ui->getValue(11) == 0 );
msend(pack);
msleep(600);
REQUIRE( ui->getValue(8) == 100 );
REQUIRE( ui->getValue(9) == -100 );
REQUIRE( ui->getValue(10) == 1 );
REQUIRE( ui->getValue(11) == 0 );
}
SECTION("Test: send data pack2..")
{
UniSetUDP::UDPMessage pack;
pack.addAData(8, 10);
pack.addAData(9, -10);
pack.addDData(10, false);
pack.addDData(11, true);
msend(pack);
msleep(900);
REQUIRE( ui->getValue(8) == 10 );
REQUIRE( ui->getValue(9) == -10 );
REQUIRE( ui->getValue(10) == 0 );
REQUIRE( ui->getValue(11) == 1 );
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check multicast packets 'hole'", "[unetudp][multicast][udphole]")
{
InitMulticastTest();
// проверяем обработку "дырок" в пакетах..
UniSetUDP::UDPMessage pack;
pack.addAData(8, 15);
msend(pack);
msleep(120);
REQUIRE( ui->getValue(8) == 15 );
unsigned long nlost = ui->getValue(node2_lostpackets_as);
int lastnum = s_numpack - 1;
// искусственно делаем дырку в два пакета
s_numpack = lastnum + 3;
UniSetUDP::UDPMessage pack_hole;
pack_hole.addAData(8, 30);
msend(pack_hole); // пакет с дыркой
msleep(80);
REQUIRE( ui->getValue(8) == 15 );
// REQUIRE( ui->getValue(node2_lostpackets_as) == nlost );
s_numpack = lastnum + 1;
UniSetUDP::UDPMessage pack1;
pack1.addAData(8, 21);
msend(pack1); // заполняем первую дырку..// дырка закроется.. пакет тут же обработается
msleep(100);
REQUIRE( ui->getValue(8) == 21 );
// REQUIRE( ui->getValue(node2_lostpackets_as) == nlost );
s_numpack = lastnum + 2;
UniSetUDP::UDPMessage pack2;
pack2.addAData(8, 25);
msend(pack2); // заполняем следующую дырку
msleep(120);
// тут обработка дойдёт уже до "первого" пакета..
REQUIRE( ui->getValue(8) == 30 );
// REQUIRE( ui->getValue(node2_lostpackets_as) == nlost );
// возвращаем к нормальному..чтобы следующие тесты не поломались..
for( int i = 0; i < 10; i++ )
{
msend(pack2);
msleep(100);
}
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check multicast packets 'MaxDifferens'", "[unetudp][multicast][maxdifferens]")
{
InitMulticastTest();
// проверяем обработку "дырок" в пакетах..
UniSetUDP::UDPMessage pack;
pack.addAData(8, 50);
msend(pack);
msleep(1000);
REQUIRE( ui->getValue(8) == 50 );
unsigned long nlost = ui->getValue(node2_lostpackets_as);
int need_num = s_numpack;
// искуственно делаем дырку в два пакета
s_numpack += maxDifferense + 1;
UniSetUDP::UDPMessage pack_hole;
pack_hole.addAData(8, 150);
msend(pack_hole); // пакет с дыркой > maxDifference (должен обработаться)
msleep(120);
REQUIRE( ui->getValue(8) == 150 );
// msleep(2000);
// REQUIRE( ui->getValue(node2_lostpackets_as) > nlost );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: multicast bad packet number", "[unetudp][multicast][badnumber]")
{
InitMulticastTest();
// посылаем нормальный пакет
UniSetUDP::UDPMessage pack;
pack.addAData(8, 60);
msend(pack);
msleep(150);
REQUIRE( ui->getValue(8) == 60 );
int lastpack = s_numpack - 1;
// посылаем пакет с тем же номером
s_numpack = lastpack;
UniSetUDP::UDPMessage pack1;
pack1.addAData(8, 150);
msend(pack1); // должен быть "откинут"
msleep(120);
REQUIRE( ui->getValue(8) == 60 );
// посылаем пакет с меньшим номером
s_numpack = lastpack - 2;
UniSetUDP::UDPMessage pack2;
pack2.addAData(8, 155);
msend(pack2); // должен быть "откинут"
msleep(120);
REQUIRE( ui->getValue(8) == 60 );
// посылаем нормальный
s_numpack = lastpack + 1;
UniSetUDP::UDPMessage pack3;
pack3.addAData(8, 160);
msend(pack3); // должен быть "обработан"
msleep(120);
REQUIRE( ui->getValue(8) == 160 );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: [multicast] switching channels", "[unetudp][multicast][chswitch]")
{
InitMulticastTest();
UniSetUDP::UDPMessage pack;
pack.addAData(8, 70);
msend(pack);
msleep(120);
REQUIRE( ui->getValue(8) == 70 );
// и счётчик переключений каналов в нуле
REQUIRE( ui->getValue(node1_channelSwitchCount_as) == 0 );
// К сожалению в текущей реализации тестов
// обмена по второму каналу нет
// поэтому проверить переключение нет возможности
// остаётся только проверить, что мы не "ушли" с первого канала
// т.к. на втором нет связи и мы не должны на него переключаться
msleep(recvTimeout * 2);
REQUIRE( ui->getValue(node1_numchannel_as) == 1 );
// и счётчик переключений каналов остался в нуле
REQUIRE( ui->getValue(node1_channelSwitchCount_as) == 0 );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: mulsicat check undefined value", "[unetudp][multicast][undefvalue]")
{
InitMulticastTest();
UniSetUDP::UDPMessage pack0 = mreceive();
ui->setValue(2, 110);
REQUIRE( ui->getValue(2) == 110 );
msleep(600);
UniSetUDP::UDPMessage pack = mreceive( pack0.num + 1, 2000, 40 );
REQUIRE( pack.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 110 );
IOController_i::SensorInfo si;
si.id = 2;
si.node = uniset_conf()->getLocalNode();
ui->setUndefinedState(si, true, 6000 /* TestProc */ );
msleep(600);
pack = mreceive(pack.num + 1);
REQUIRE( pack.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 65635 );
ui->setUndefinedState(si, false, 6000 /* TestProc */ );
msleep(600);
pack = mreceive(pack.num + 1);
REQUIRE( pack.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 110 );
}
// -----------------------------------------------------------------------------
#define CATCH_CONFIG_RUNNER
#include <catch.hpp>
#include <string>
#include "Debug.h"
#include "UniSetActivator.h"
#include "PassiveTimer.h"
#include "SharedMemory.h"
#include "Extensions.h"
#include "UNetExchange.h"
// --------------------------------------------------------------------------
using namespace std;
using namespace uniset;
using namespace uniset::extensions;
// --------------------------------------------------------------------------
int main(int argc, const char* argv[] )
{
try
{
Catch::Session session;
if( argc > 1 && ( strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0 ) )
{
cout << "--confile - Использовать указанный конф. файл. По умолчанию configure.xml" << endl;
SharedMemory::help_print(argc, argv);
cout << endl << endl << "--------------- CATCH HELP --------------" << endl;
session.showHelp("test_multicast_with_sm");
return 0;
}
int returnCode = session.applyCommandLine( argc, argv, Catch::Session::OnUnusedOptions::Ignore );
if( returnCode != 0 ) // Indicates a command line error
return returnCode;
auto conf = uniset_init(argc, argv);
bool apart = findArgParam("--apart", argc, argv) != -1;
auto shm = SharedMemory::init_smemory(argc, argv);
if( !shm )
return 1;
auto unet = UNetExchange::init_unetexchange(argc, argv, shm->getId(), (apart ? nullptr : shm ), "unet");
if( !unet )
return 1;
auto act = UniSetActivator::Instance();
act->add(shm);
act->add(unet);
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
act->run(true);
int tout = 6000;
PassiveTimer pt(tout);
while( !pt.checkTime() && !act->exist() )
msleep(100);
if( !act->exist() )
{
cerr << "(tests_multicast_with_sm): SharedMemory not exist! (timeout=" << tout << ")" << endl;
return 1;
}
return session.run();
}
catch( const SystemError& err )
{
cerr << "(tests_multicast_with_sm): " << err << endl;
}
catch( const uniset::Exception& ex )
{
cerr << "(tests_multicast_with_sm): " << ex << endl;
}
catch( const std::exception& e )
{
cerr << "(tests_multicast_with_sm): " << e.what() << endl;
}
catch(...)
{
cerr << "(tests_multicast_with_sm): catch(...)" << endl;
}
return 1;
}
#!/bin/sh
# '--' - нужен для отделения аргументов catch, от наших..
cd ../../../Utilities/Admin/
./uniset2-start.sh -f ./create_links.sh
./uniset2-start.sh -f ./create
./uniset2-start.sh -f ./exist | grep -q UNISET_PLC/Controllers || exit 1
cd -
./uniset2-start.sh -f ./tests-multicast-with-sm $* -- --unet-transport multicast --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy evloop
#--unet-log-add-levels any
......@@ -33,19 +33,40 @@
<ObjectsMap idfromfile="1">
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="badip" unet_multicast_ip="127.0.0.1">
<item id="3000" ip="127.0.0.1" name="localhost" textname="Локальный узел" unet_ignore="0" unet_port="3000" unet_multicast_ip2="127.0.1.1" unet_multicast_port2="2999">
<multicast>
<receive>
<group addr="239.255.1.1" addr2="239.255.2.1"/>
</receive>
<send>
<group addr="239.255.1.1" addr2="239.255.2.1"/>
</send>
</multicast>
<item id="3000" ip="127.0.0.1" name="localhost" textname="Локальный узел" unet_ignore="0" unet_port="3000" unet_multicast_ip2="127.0.1.1" unet_multicast_port2="2999" unet_multicast_receive_from_all_nodes="1">
<multicast>
<receive>
<group addr="238.255.1.2" addr2="239.255.1.2"/>
<group addr="238.255.1.3" addr2="239.255.1.3"/>
</receive>
<send>
<group addr="238.255.1.1" addr2="239.255.1.1"/>
</send>
</multicast>
</item>
<item id="3001" ip="127.0.0.1" name="localhost1" textname="Локальный узел" unet_ignore="1" unet_port="3001"/>
<item id="3002" ip="192.168.56.10" name="Node1" textname="Node1" unet_ignore="0" unet_respond_id="Node1_Not_Respond_S" unet_respond_invert="1" unet_channelswitchcount_id="Node1_ChannelSwitchCount_AS"/>
<item id="3003" ip="192.168.56.11" name="Node2" textname="Node2" unet_ignore="0" unet_respond_id="Node2_Respond_S" unet_lostpackets_id="Node2_LostPackets_AS" unet_numchannel_id="Node2_NumChannel_AS"/>
<item id="3002" ip="192.168.56.10" name="Node1" textname="Node1" unet_ignore="0" unet_respond_id="Node1_Not_Respond_S" unet_respond_invert="1" unet_channelswitchcount_id="Node1_ChannelSwitchCount_AS" unet_multicast_receive_from_all_nodes="1">
<multicast>
<receive>
<group addr="238.255.1.1" addr2="239.255.1.1"/>
<group addr="238.255.1.3" addr2="239.255.1.3"/>
</receive>
<send>
<group addr="238.255.1.2" addr2="239.255.1.2"/>
</send>
</multicast>
</item>
<item id="3003" ip="192.168.56.11" name="Node2" textname="Node2" unet_ignore="0" unet_respond_id="Node2_Respond_S" unet_lostpackets_id="Node2_LostPackets_AS" unet_numchannel_id="Node2_NumChannel_AS" unet_multicast_receive_from_all_nodes="1">
<multicast>
<receive>
<group addr="238.255.1.1" addr2="239.255.1.1"/>
<group addr="238.255.1.2" addr2="239.255.1.2"/>
</receive>
<send>
<group addr="238.255.1.3" addr2="239.255.1.3"/>
</send>
</multicast>
</item>
</nodes>
<!-- ************************ Датчики ********************** -->
<sensors name="Sensors">
......
......@@ -6,6 +6,12 @@ AT_SETUP([UNetUDP tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP
AT_SETUP([UNetUDP multicast tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_multicast_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP
# AT_SETUP([UNetUDP tests (separately)])
# AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_apart.sh],[0],[ignore],[ignore])
# AT_CLEANUP
#include <cstdlib>
#include <errno.h>
#include <getopt.h>
#include <cstring>
#include <iostream>
#include <chrono>
#include <thread>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "UDPPacket.h"
#include "PassiveTimer.h"
#include "UDPCore.h"
#include "MulticastTransport.h"
// --------------------------------------------------------------------------
static struct option longopts[] =
{
{ "help", no_argument, 0, 'h' },
{ "send", required_argument, 0, 's' },
{ "receive", required_argument, 0, 'r' },
{ "proc-id", required_argument, 0, 'p' },
{ "node-id", required_argument, 0, 'n' },
{ "send-pause", required_argument, 0, 'x' },
{ "timeout", required_argument, 0, 't' },
{ "data-count", required_argument, 0, 'c' },
{ "show-data", no_argument, 0, 'd' },
{ "check-lost", no_argument, 0, 'l' },
{ "verbode", required_argument, 0, 'v' },
{ "num-cycles", required_argument, 0, 'z' },
{ "prof", required_argument, 0, 'y' },
{ "a-data", required_argument, 0, 'a' },
{ "d-data", required_argument, 0, 'i' },
{ "group", required_argument, 0, 'g' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
using namespace std;
using namespace uniset;
using namespace uniset::UniSetUDP;
using namespace std::chrono;
// --------------------------------------------------------------------------
enum Command
{
cmdNOP,
cmdSend,
cmdReceive
};
// --------------------------------------------------------------------------
static bool split_addr( const string& addr, string& host, int& port )
{
string::size_type pos = addr.rfind(':');
if( pos != string::npos )
{
host = addr.substr(0, pos);
string s_port(addr.substr(pos + 1, addr.size() - 1));
port = atoi(s_port.c_str());
return true;
}
return false;
}
// --------------------------------------------------------------------------
int main(int argc, char* argv[])
{
int optindex = 0;
int opt = 0;
Command cmd = cmdNOP;
int verb = 0;
std::string addr = "";
int port = 0;
int usecpause = 2000000;
timeout_t tout = UniSetTimer::WaitUpTime;
int procID = 1;
int nodeID = 1;
size_t count = 50;
bool lost = false;
bool show = false;
size_t ncycles = 0;
unsigned int nprof = 0;
std::string d_data = "";
std::string a_data = "";
std::vector<Poco::Net::IPAddress> groups;
while(1)
{
opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:g:", longopts, &optindex);
if( opt == -1 )
break;
switch (opt)
{
case 'h':
cout << "-h|--help - this message" << endl;
cout << "[-s|--send] host:port - Send message." << endl;
cout << "[-c|--data-count] num - Send num count of value. Default: 50." << endl;
cout << "[-r|--receive] host:port - Receive message." << endl;
cout << "[-g|--group] ip - Multicast group address (can be specified many times)" << endl;
cout << "[-p|--proc-id] id - Set packet header. From 'procID'. Default: 1" << endl;
cout << "[-n|--node-id] id - Set packet header. From 'nodeID'. Default: 1" << endl;
cout << "[-t|--timeout] msec - timeout for receive. Default: 0 msec (waitup)." << endl;
cout << "[-x|--send-pause] msec - pause for send packets. Default: 200 msec." << endl;
cout << "[-l|--check-lost] - Check the lost packets." << endl;
cout << "[-v|--verbose] - verbose mode." << endl;
cout << "[-d|--show-data] - show receive data." << endl;
cout << "[-z|--num-cycles] num - Number of cycles of exchange. Default: -1 - infinitely." << endl;
cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl;
cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl;
cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl;
cout << endl;
return 0;
case 'r':
cmd = cmdReceive;
addr = string(optarg);
break;
case 's':
addr = string(optarg);
cmd = cmdSend;
break;
case 'a':
a_data = string(optarg);
break;
case 'i':
d_data = string(optarg);
break;
case 't':
tout = atoi(optarg);
break;
case 'x':
usecpause = atoi(optarg) * 1000;
break;
case 'y':
nprof = atoi(optarg);
break;
case 'c':
count = atoi(optarg);
break;
case 'p':
procID = atoi(optarg);
break;
case 'n':
nodeID = atoi(optarg);
break;
case 'g':
groups.emplace_back(Poco::Net::IPAddress(optarg));
break;
case 'd':
show = true;
break;
case 'l':
lost = true;
break;
case 'v':
verb = 1;
break;
case 'z':
ncycles = atoi(optarg);
break;
case '?':
default:
cerr << "? argumnet" << endl;
return 0;
}
}
if( cmd == cmdNOP )
{
cerr << "No command... Use -h for help" << endl;
return -1;
}
if( groups.empty() )
{
cerr << "Multicast group address must be define. Use -g or -h for help" << endl;
return -1;
}
try
{
string s_host;
if( !split_addr(addr, s_host, port) )
{
cerr << "(main): Unknown 'host:port' for '" << addr << "'" << endl;
return 1;
}
if( verb )
{
cout << " host=" << s_host
<< " port=" << port
<< " timeout=";
if( tout == UniSetTimer::WaitUpTime )
cout << "Waitup";
else
cout << tout;
cout << " msecpause=" << usecpause / 1000
<< endl;
cout << " Groups: " << endl;
for( const auto& g : groups )
cout << " " << g << endl;
}
switch( cmd )
{
case cmdReceive:
{
MulticastReceiveTransport udp(s_host, port, groups);
udp.createConnection(true, 500, true);
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
unsigned long prev_num = 1;
int nc = 1;
if( ncycles > 0 )
nc = ncycles;
auto t_start = high_resolution_clock::now();
unsigned int npack = 0;
while( nc )
{
try
{
if( nprof > 0 && npack >= nprof )
{
auto t_end = high_resolution_clock::now();
float sec = duration_cast<duration<float>>(t_end - t_start).count();
cout << "Receive " << setw(5) << npack << " packets for " << setw(8) << sec << " sec "
<< " [ 1 packet per " << setw(10) << ( sec / (float)npack ) << " sec ]" << endl;
t_start = t_end;
npack = 0;
}
if( !udp.isReadyForReceive(tout) )
{
cout << "(recv): Timeout.." << endl;
continue;
}
size_t ret = udp.receive(&(buf.data), sizeof(buf.data) );
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, buf);
if( sz == 0 )
{
if( pack.magic != UniSetUDP::UNETUDP_MAGICNUM )
cerr << "(recv): BAD PROTOCOL VERSION! [ need version '" << UniSetUDP::UNETUDP_MAGICNUM << "']" << endl;
else
cerr << "(recv): FAILED header ret=" << ret
<< " sizeof=" << sz << endl;
continue;
}
if( lost )
{
if( prev_num != (pack.num - 1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.num
<< " prev=" << prev_num << endl;
prev_num = pack.num;
}
npack++;
if( verb )
cout << "receive OK: "
<< " bytes: " << ret << endl;
if( show )
cout << "receive data: " << pack << endl;
}
catch( Poco::Net::NetException& e )
{
cerr << "(recv): " << e.displayText() << " (" << addr << ")" << endl;
}
catch( ... )
{
cerr << "(recv): catch ..." << endl;
}
if( ncycles > 0 )
{
nc--;
if( nc <= 0 )
break;
}
}
}
break;
case cmdSend:
{
auto udp = std::make_shared<MulticastSendTransport>(s_host, port, groups);
UniSetUDP::UDPMessage mypack;
mypack.nodeID = nodeID;
mypack.procID = procID;
if( !a_data.empty() )
{
auto vlist = uniset::getSInfoList(a_data, nullptr);
for( const auto& v : vlist )
{
UDPAData d(v.si.id, v.val);
mypack.addAData(d);
}
}
else
{
for( size_t i = 0; i < count; i++ )
{
UDPAData d(i, i);
mypack.addAData(d);
}
}
if( !d_data.empty() )
{
auto vlist = uniset::getSInfoList(d_data, nullptr);
for( const auto& v : vlist )
mypack.addDData(v.si.id, v.val);
}
else
{
for( size_t i = 0; i < count; i++ )
mypack.addDData(i, i);
}
udp->createConnection(true, 500);
size_t packetnum = 0;
UniSetUDP::UDPPacket s_buf;
size_t nc = 1;
if( ncycles > 0 )
nc = ncycles;
while( nc )
{
mypack.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
if( packetnum == 0 )
packetnum = 1;
try
{
if( udp->isReadyForSend(tout) )
{
mypack.transport_msg(s_buf);
if( verb )
cout << "(send): to addr=" << addr << " d_count=" << mypack.dcount
<< " a_count=" << mypack.acount << " bytes=" << s_buf.len << endl;
size_t ret = udp->send((char*)&s_buf.data, s_buf.len);
if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
}
}
catch( Poco::Net::NetException& e )
{
cerr << "(send): " << e.message() << " (" << addr << ")" << endl;
}
catch( ... )
{
cerr << "(send): catch ..." << endl;
}
if( ncycles > 0 )
{
nc--;
if( nc <= 0 )
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
}
}
break;
default:
cerr << endl << "Unknown command: '" << cmd << "'. Use -h for help" << endl;
return -1;
break;
}
}
catch( const std::exception& e )
{
cerr << "(main): " << e.what() << endl;
}
catch( ... )
{
cerr << "(main): catch ..." << endl;
return 1;
}
return 0;
}
// --------------------------------------------------------------------------
......@@ -270,12 +270,17 @@
./extensions/tests/test_ui.cc
./extensions/tests/test_vtypes.cc
./extensions/UNetUDP/Makefile.am
./extensions/UNetUDP/MulticastTransport.cc
./extensions/UNetUDP/MulticastTransport.h
./extensions/UNetUDP/tests/a.cc
./extensions/UNetUDP/tests/Makefile.am
./extensions/UNetUDP/tests/tests_individual_process.cc
./extensions/UNetUDP/tests/tests_multicast_with_sm.cc
./extensions/UNetUDP/tests/tests_with_sm.cc
./extensions/UNetUDP/tests/test_unetmulticast.cc
./extensions/UNetUDP/tests/test_unetudp1.cc
./extensions/UNetUDP/tests/test_unetudp.cc
./extensions/UNetUDP/tests/test_unetudp_multicast.cc
./extensions/UNetUDP/tests/u.cc
./extensions/UNetUDP/tests/urecv_perf_test.cc
./extensions/UNetUDP/UDPPacket.cc
......@@ -286,6 +291,7 @@
./extensions/UNetUDP/UNetExchange.cc
./extensions/UNetUDP/UNetExchange.h
./extensions/UNetUDP/UNetLogSugar.h
./extensions/UNetUDP/unet-multicast-tester.cc
./extensions/UNetUDP/UNetReceiver.cc
./extensions/UNetUDP/UNetReceiver.h
./extensions/UNetUDP/UNetSender.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment