Commit 3216cd88 authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

[unet-multicast]: init from XML

parent dacbb7bb
......@@ -181,7 +181,7 @@
</RTUExchange>
<UDPExchange name="UDPExchange"/>
<UDPExchange2 name="UDPExchange2"/>
<UNetExchange name="UNetExchange"/>
<UNetExchange name="UNetExchange" transport="multicast"/>
<HeartBeatTime msec="5000"/>
<NCReadyTimeout msec="120000"/>
<TestGen input1_s="Input1_S" input2_s="DumpSensor1_S" name="TestGen" output1_c="DO_C" output2_c="DO1_C"/>
......@@ -214,13 +214,21 @@
priority - приоритет сообщения об изменении данного датчика
textname - текстовое имя датчика
-->
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="192.168.56.255">
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="192.168.56.255" unet_transport="multicast">
<item id="3000" dbserver="DBServer1" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards>
<item card="1" name="DI32"/>
<item card="2" name="DO32"/>
<item baddr="0x110" card="3" dev="/dev/null" name="UNIO48" subdev1="TBI24_0" subdev2="TBI16_8"/>
</iocards>
<multicast>
<receive>
<group addr="224.0.0.1"/>
</receive>
<send>
<group addr="224.0.0.1"/>
</send>
</multicast>
</item>
<item id="3001" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode1" textname="Локальный узел" unet_ignore="0" unet_port="2049"/>
<item id="3002" ip="192.168.56.10" name="Node1" textname="Node1" unet_ignore="0" unet_port="3001" unet_respond_id="Input1_S" unet_respond_invert="1"/>
......
......@@ -25,7 +25,81 @@
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
MulticastReceiveTransport::MulticastReceiveTransport( const std::string& _bind, int _port, const std::vector<Poco::Net::IPAddress> _joinGroups ):
/*
* <item id="3000" unet_port="2048" unet_multicast_ip="192.168.0.255" unet_port2="2048" unet_multicast_ip2="192.169.0.255">
<multicast>
<receive>
1<group addr="224.0.0.1" addr2="224.0.0.1"/>
</receive>
<send>
<group addr="224.0.0.1"/>
</send>
</multicast>
</item>
*/
std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
{
ostringstream fieldIp;
fieldIp << "unet_multicast_ip";
if( numChan > 0 )
fieldIp << numChan;
const string h = it.getProp2(fieldIp.str(), defaultIP);
if( h.empty() )
{
ostringstream err;
err << "(MulticastReceiveTransport): Unknown multicast IP for " << it.getProp("name");
throw uniset::SystemError(err.str());
}
ostringstream fieldPort;
fieldPort << "unet_port";
if( numChan > 0 )
fieldPort << numChan;
int p = it.getPIntProp(fieldPort.str(), it.getIntProp("id"));
if( it.find("multicast") )
throw SystemError("(MulticastReceiveTransport): not found <multicast> node");
if( !it.goChildren() )
throw SystemError("(MulticastReceiveTransport): empty <multicast> node");
if( it.find("receive") )
throw SystemError("(MulticastReceiveTransport): not found <receive> node");
if( !it.goChildren() )
throw SystemError("(MulticastReceiveTransport): empty <receive> groups");
std::vector<Poco::Net::IPAddress> groups;
ostringstream fieldAddr;
fieldAddr << "addr";
if( numChan > 0 )
fieldAddr << numChan;
for( ; it; it++ )
{
Poco::Net::IPAddress a(it.getProp(fieldAddr.str()), Poco::Net::IPAddress::IPv4);
if( !a.isMulticast() )
{
ostringstream err;
err << "(MulticastReceiveTransport): " << it.getProp(fieldAddr.str()) << " is not multicast address";
throw SystemError(err.str());
}
groups.push_back(a);
}
return unisetstd::make_unique<MulticastReceiveTransport>(h, p, std::move(groups));
}
// -------------------------------------------------------------------------
MulticastReceiveTransport::MulticastReceiveTransport( const std::string& _bind, int _port, const std::vector<Poco::Net::IPAddress>& _joinGroups ):
host(_bind),
port(_port),
groups(_joinGroups)
......@@ -125,7 +199,82 @@ ssize_t MulticastReceiveTransport::receive( void* r_buf, size_t sz )
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
MulticastSendTransport::MulticastSendTransport( const std::string& _host, int _port, const std::vector<Poco::Net::IPAddress> _sendGroups ):
// -------------------------------------------------------------------------
/*
* <item id="3000" unet_port="2048" unet_multicast_ip="192.168.0.255" unet_port2="2048" unet_multicast_ip2="192.169.0.255">
<multicast>
<receive>
1<group addr="224.0.0.1" addr2="224.0.0.1"/>
</receive>
<send>
<group addr="224.0.0.1"/>
</send>
</multicast>
</item>
*/
std::unique_ptr<MulticastSendTransport> MulticastSendTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
{
ostringstream fieldIp;
fieldIp << "unet_multicast_ip";
if( numChan > 0 )
fieldIp << numChan;
const string h = it.getProp2(fieldIp.str(), defaultIP);
if( h.empty() )
{
ostringstream err;
err << "(MulticastSendTransport): Unknown multicast IP for " << it.getProp("name");
throw uniset::SystemError(err.str());
}
ostringstream fieldPort;
fieldPort << "unet_port";
if( numChan > 0 )
fieldPort << numChan;
int p = it.getPIntProp(fieldPort.str(), it.getIntProp("id"));
if( it.find("multicast") )
throw SystemError("(MulticastSendTransport): not found <multicast> node");
if( !it.goChildren() )
throw SystemError("(MulticastSendTransport): empty <multicast> node");
if( it.find("send") )
throw SystemError("(MulticastSendTransport): not found <send> node");
if( !it.goChildren() )
throw SystemError("(MulticastSendTransport): empty <send> groups");
ostringstream fieldAddr;
fieldAddr << "addr";
if( numChan > 0 )
fieldAddr << numChan;
std::vector<Poco::Net::IPAddress> groups;
for( ; it; it++ )
{
Poco::Net::IPAddress a(it.getProp(fieldAddr.str()), Poco::Net::IPAddress::IPv4);
if( !a.isMulticast() )
{
ostringstream err;
err << "(MulticastSendTransport): " << it.getProp(fieldAddr.str()) << " is not multicast address";
throw SystemError(err.str());
}
groups.push_back(a);
}
return unisetstd::make_unique<MulticastSendTransport>(h, p, std::move(groups));
}
// -------------------------------------------------------------------------
MulticastSendTransport::MulticastSendTransport( const std::string& _host, int _port, const std::vector<Poco::Net::IPAddress>& _sendGroups ):
saddr(_host, _port),
groups(_sendGroups)
{
......
......@@ -22,28 +22,26 @@
#include <vector>
#include "UNetTransport.h"
#include "UDPCore.h"
#include "UniXML.h"
// -------------------------------------------------------------------------
namespace uniset
{
class MulticastReceiveTransport :
class MulticastReceiveTransport:
public UNetReceiveTransport
{
public:
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress> joinGroups );
static std::unique_ptr<MulticastReceiveTransport> createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan );
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress>& joinGroups );
virtual ~MulticastReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual std::string ID() const noexcept override;
virtual bool createConnection(bool throwEx, timeout_t readTimeout, bool noblock) override;
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive(void* r_buf, size_t sz) override;
......@@ -55,21 +53,20 @@ namespace uniset
const std::vector<Poco::Net::IPAddress> groups;
};
class MulticastSendTransport :
class MulticastSendTransport:
public UNetSendTransport
{
public:
MulticastSendTransport(const std::string& host, int port, const std::vector<Poco::Net::IPAddress> sendGroups );
static std::unique_ptr<MulticastSendTransport> createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan );
MulticastSendTransport(const std::string& host, int port, const std::vector<Poco::Net::IPAddress>& sendGroups );
virtual ~MulticastSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool createConnection(bool throwEx, timeout_t sendTimeout) override;
virtual int getSocket() const override;
// write
......
......@@ -25,6 +25,33 @@
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
std::unique_ptr<UDPReceiveTransport> UDPReceiveTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
{
ostringstream fieldIp;
fieldIp << "unet_broadcast_ip";
if( numChan > 0 )
fieldIp << numChan;
const string h = it.getProp2(fieldIp.str(), defaultIP);
if( h.empty() )
{
ostringstream err;
err << "(UDPReceiveTransport): Unknown broadcast IP for " << it.getProp("name");
throw uniset::SystemError(err.str());
}
ostringstream fieldPort;
fieldPort << "unet_port";
if( numChan > 0 )
fieldPort << numChan;
int p = it.getPIntProp(fieldPort.str(), it.getIntProp("id"));
return unisetstd::make_unique<UDPReceiveTransport>(h, p);
}
// -------------------------------------------------------------------------
UDPReceiveTransport::UDPReceiveTransport( const std::string& _bind, int _port ):
host(_bind),
port(_port)
......@@ -97,6 +124,34 @@ ssize_t UDPReceiveTransport::receive( void* r_buf, size_t sz )
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
std::unique_ptr<UDPSendTransport> UDPSendTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
{
ostringstream fieldIp;
fieldIp << "unet_broadcast_ip";
if( numChan > 0 )
fieldIp << numChan;
const string h = it.getProp2(fieldIp.str(), defaultIP);
if( h.empty() )
{
ostringstream err;
err << "(UDPReceiveTransport): Unknown broadcast IP for " << it.getProp("name");
throw uniset::SystemError(err.str());
}
ostringstream fieldPort;
fieldPort << "unet_port";
if( numChan > 0 )
fieldPort << numChan;
int p = it.getPIntProp(fieldPort.str(), it.getIntProp("id"));
return unisetstd::make_unique<UDPSendTransport>(h, p);
}
// -------------------------------------------------------------------------
UDPSendTransport::UDPSendTransport( const std::string& _host, int _port ):
saddr(_host, _port)
{
......
......@@ -21,6 +21,7 @@
#include <memory>
#include "UNetTransport.h"
#include "UDPCore.h"
#include "UniXML.h"
// -------------------------------------------------------------------------
namespace uniset
{
......@@ -29,6 +30,8 @@ namespace uniset
{
public:
static std::unique_ptr<UDPReceiveTransport> createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan );
UDPReceiveTransport( const std::string& bind, int port );
virtual ~UDPReceiveTransport();
......@@ -52,6 +55,8 @@ namespace uniset
{
public:
static std::unique_ptr<UDPSendTransport> createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan );
UDPSendTransport( const std::string& host, int port );
virtual ~UDPSendTransport();
......
......@@ -22,6 +22,7 @@
#include "UNetExchange.h"
#include "UNetLogSugar.h"
#include "UDPTransport.h"
#include "MulticastTransport.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset;
......@@ -114,9 +115,16 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
UniXML::iterator n_it(nodes);
const string unet_transport(n_it.getProp2("unet_transport", "udp"));
string default_ip(n_it.getProp("unet_broadcast_ip"));
string default_ip2(n_it.getProp("unet_broadcast_ip2"));
if( unet_transport == "multicast" )
{
default_ip = n_it.getProp("unet_multicast_ip");
default_ip2 = n_it.getProp("unet_multicast_ip2");
}
if( !n_it.goChildren() )
throw uniset::SystemError("(UNetExchange): Items not found for <nodes>");
......@@ -132,30 +140,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
if( !uniset::check_filter(n_it, n_field, n_fvalue) )
continue;
// Если указано поле unet_broadcast_ip непосредственно у узла - берём его
// если не указано берём общий broadcast_ip
string h = { n_it.getProp2("unet_broadcast_ip", default_ip) };
string h2 = { n_it.getProp2("unet_broadcast_ip2", default_ip2) };
if( h.empty() )
{
ostringstream err;
err << myname << "(init): Unknown broadcast IP for " << n_it.getProp("name");
unetcrit << err.str() << endl;
throw uniset::SystemError(err.str());
}
if( h2.empty() )
unetinfo << myname << "(init): ip2 not used..." << endl;
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
int p = n_it.getPIntProp("unet_port", n_it.getIntProp("id"));
// по умолчанию порт на втором канале такой же как на первом (если не задан отдельно)
int p2 = n_it.getPIntProp("unet_port2", p);
string n(n_it.getProp("name"));
if( n == conf->getLocalNodeName() )
......@@ -168,8 +152,18 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
auto s1 = unisetstd::make_unique<uniset::UDPSendTransport>(h, p);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
if( unet_transport == "multicast" )
{
auto s1 = MulticastSendTransport::createFromXml(n_it, default_ip, 0);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
}
else // default
{
auto s1 = UDPSendTransport::createFromXml(n_it, default_ip, 0);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
}
sender->setSendPause(sendpause);
sender->setPackSendPause(packsendpause);
sender->setPackSendPauseFactor(packsendpauseFactor);
......@@ -178,35 +172,57 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
try
{
sender2 = nullptr;
// создаём "писателя" для второго канала если задан
if( !h2.empty() )
if( unet_transport == "multicast" )
{
unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
if( n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty() )
{
auto s2 = MulticastSendTransport::createFromXml(n_it, default_ip2, 2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
}
else // default
{
if( n_it.getProp("unet_broadcast_ip2").empty() || !default_ip2.empty() )
{
auto s2 = UDPSendTransport::createFromXml(n_it, default_ip2, 2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
}
auto s2 = unisetstd::make_unique<uniset::UDPSendTransport>(h2, p2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, prefix);
if( sender2 )
{
sender2->setSendPause(sendpause);
sender2->setCheckConnectionPause(checkConnectionPause);
loga->add(sender2->getLog());
}
else
unetwarn << myname << "(ignore): sender for Channel2 disabled " << endl;
}
catch(...)
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = 0;
unetcrit << myname << "(ignore): DON`T CREATE 'UNetSender' for " << h2 << ":" << p2 << endl;
sender2 = nullptr;
unetcrit << myname << "IGNORE! reserv channel create error:" << ex.what() << endl;
}
continue;
}
unetinfo << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
auto transport = unisetstd::make_unique<uniset::UDPReceiveTransport>(h, p);
unetinfo << myname << "(init): add UNetReceiver.." << endl;
std::unique_ptr<UNetReceiveTransport> transport1;
if( checkExistTransport(transport->ID()) )
if( unet_transport == "multicast" )
transport1 = MulticastReceiveTransport::createFromXml(n_it, default_ip, 0);
else // default
transport1 = UDPReceiveTransport::createFromXml(n_it, default_ip, 0);
if( checkExistTransport(transport1->ID()) )
{
unetinfo << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
unetinfo << myname << "(init): " << transport1->ID() << " already added! Ignore.." << endl;
continue;
}
......@@ -350,9 +366,8 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
throw SystemError(err.str());
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(std::move(transport), shm, false, prefix);
unetinfo << myname << "(init): (node='" << n << "') add basic receiver " << transport1->ID() << endl;
auto r = make_shared<UNetReceiver>(std::move(transport1), shm, false, prefix);
loga->add(r->getLog());
......@@ -379,13 +394,22 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
try
{
if( !h2.empty() ) // создаём читателя впо второму каналу
{
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver "
<< h2 << ":" << p2 << endl;
std::unique_ptr<UNetReceiveTransport> transport2 = nullptr;
auto transport2 = unisetstd::make_unique<UDPReceiveTransport>(h2, p2);
if( unet_transport == "multicast" )
{
if (!n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty())
transport2 = MulticastReceiveTransport::createFromXml(n_it, default_ip2, 2);
}
else // default
{
if( !n_it.getProp("unet_broadcast_ip2").empty() || !default_ip2.empty() )
transport2 = UDPReceiveTransport::createFromXml(n_it, default_ip2, 2);
}
if( transport2 ) // создаём читателя по второму каналу
{
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver " << transport2->ID() << endl;
r2 = make_shared<UNetReceiver>(std::move(transport2), shm, false, prefix);
loga->add(r2->getLog());
......@@ -409,12 +433,12 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r2->setUpdateStrategy(r_upStrategy);
}
}
catch(...)
catch( std::exception& ex )
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = 0;
unetcrit << myname << "(ignore): DON`T CREATE 'UNetReceiver' for " << h2 << ":" << p2 << endl;
r2 = nullptr;
unetcrit << myname << "(ignore): DON`T CREATE reserve 'UNetReceiver'. error: " << ex.what() << endl;
}
ReceiverInfo ri(r, r2);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment