Commit f34a0dfa authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

[unet-multicast]: fixed sender, supported "iface"

parent 38a859b8
......@@ -214,7 +214,7 @@
priority - приоритет сообщения об изменении данного датчика
textname - текстовое имя датчика
-->
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="192.168.56.255" unet_transport="multicast">
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="192.168.56.255" unet_transport="multicast" unet_multicast_ip="0.0.0.0">
<item id="3000" dbserver="DBServer1" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards>
<item card="1" name="DI32"/>
......
......@@ -50,13 +50,10 @@ xmlNode* MulticastReceiveTransport::getReceiveListNode( UniXML::iterator root )
<receive>
<group addr="224.0.0.1" addr2="224.0.0.1"/>
</receive>
<send>
<group addr="224.0.0.1"/>
</send>
</multicast>
</item>
*/
std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan, const std::string& section )
std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan, const std::string& defIface, const std::string& section )
{
ostringstream fieldIp;
fieldIp << "unet_multicast_ip";
......@@ -115,13 +112,16 @@ std::unique_ptr<MulticastReceiveTransport> MulticastReceiveTransport::createFrom
groups.push_back(a);
}
return unisetstd::make_unique<MulticastReceiveTransport>(h, p, std::move(groups));
return unisetstd::make_unique<MulticastReceiveTransport>(h, p, std::move(groups), defIface);
}
// -------------------------------------------------------------------------
MulticastReceiveTransport::MulticastReceiveTransport( const std::string& _bind, int _port, const std::vector<Poco::Net::IPAddress>& _joinGroups ):
MulticastReceiveTransport::MulticastReceiveTransport( const std::string& _bind, int _port,
const std::vector<Poco::Net::IPAddress>& _joinGroups,
const std::string& _iface ):
host(_bind),
port(_port),
groups(_joinGroups)
groups(_joinGroups),
ifaceaddr(_iface)
{
}
......@@ -180,11 +180,23 @@ bool MulticastReceiveTransport::createConnection( bool throwEx, timeout_t readTi
{
try
{
Poco::Net::NetworkInterface iface;
iface.addAddress(Poco::Net::IPAddress()); // INADDR_ANY
if( !ifaceaddr.empty() )
iface = Poco::Net::NetworkInterface::forAddress(Poco::Net::IPAddress(ifaceaddr));
udp = unisetstd::make_unique<MulticastSocketU>(host, port);
udp->setBlocking(!noblock);
for( const auto& s : groups )
udp->joinGroup(s);
udp->joinGroup(s, iface);
}
catch( const Poco::Net::InterfaceNotFoundException& ex )
{
ostringstream err;
err << "(MulticastReceiveTransport): Not found interface for address " << ifaceaddr;
throw uniset::SystemError(err.str());
}
catch( const std::exception& e )
{
......@@ -292,28 +304,41 @@ std::unique_ptr<MulticastSendTransport> MulticastSendTransport::createFromXml( U
if( numChan > 0 )
fieldAddr << numChan;
std::vector<Poco::Net::IPAddress> groups;
ostringstream fieldGroupPort;
fieldGroupPort << "port";
if( numChan > 0 )
fieldGroupPort << numChan;
string groupAddr;
int groupPort = p;
int gnum = 0;
for( ; it; it++ )
{
Poco::Net::IPAddress a(it.getProp(fieldAddr.str()), Poco::Net::IPAddress::IPv4);
groupAddr = it.getProp(fieldAddr.str());
if( !a.isMulticast() )
{
ostringstream err;
err << "(MulticastSendTransport): " << it.getProp(fieldAddr.str()) << " is not multicast address";
throw SystemError(err.str());
}
if( groupAddr.empty() )
throw SystemError("(MulticastSendTransport): unknown group address for send");
groups.push_back(a);
groupPort = it.getPIntProp(fieldGroupPort.str(), p);
if( groupPort <= 0 )
throw SystemError("(MulticastSendTransport): unknown group port for send");
gnum++;
}
return unisetstd::make_unique<MulticastSendTransport>(h, p, std::move(groups), ttl);
if( gnum > 1)
throw SystemError("(MulticastSendTransport): size list <groups> " + std::to_string(gnum) + " > 1. Currently only ONE multicast group is supported to send");
return unisetstd::make_unique<MulticastSendTransport>(h, p, groupAddr, groupPort, ttl);
}
// -------------------------------------------------------------------------
MulticastSendTransport::MulticastSendTransport( const std::string& _host, int _port, const std::vector<Poco::Net::IPAddress>& _sendGroups, int _ttl ):
saddr(_host, _port),
groups(_sendGroups),
MulticastSendTransport::MulticastSendTransport( const std::string& _host, int _port, const std::string& grHost, int grPort, int _ttl ):
sockAddr(_host, _port),
toAddr(grHost, grPort),
ttl(_ttl)
{
......@@ -323,20 +348,17 @@ MulticastSendTransport::~MulticastSendTransport()
{
if( udp )
{
for (const auto& s : groups)
try
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
udp->close();
}
catch (...) {}
}
}
// -------------------------------------------------------------------------
std::string MulticastSendTransport::toString() const
{
return saddr.toString();
return sockAddr.toString();
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::isConnected() const
......@@ -362,11 +384,7 @@ bool MulticastSendTransport::createConnection( bool throwEx, timeout_t sendTimeo
{
try
{
udp = unisetstd::make_unique<MulticastSocketU>();
for( const auto& s : groups )
udp->joinGroup(s);
udp = unisetstd::make_unique<MulticastSocketU>(sockAddr);
udp->setSendTimeout( UniSetTimer::millisecToPoco(sendTimeout) );
udp->setTimeToLive(ttl);
}
......@@ -374,7 +392,7 @@ bool MulticastSendTransport::createConnection( bool throwEx, timeout_t sendTimeo
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
s << sockAddr.toString() << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
......@@ -383,7 +401,7 @@ bool MulticastSendTransport::createConnection( bool throwEx, timeout_t sendTimeo
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
s << sockAddr.toString() << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
......@@ -404,10 +422,12 @@ bool MulticastSendTransport::isReadyForSend( timeout_t tout )
// -------------------------------------------------------------------------
ssize_t MulticastSendTransport::send( const void* buf, size_t sz )
{
return udp->sendTo(buf, sz, saddr);
return udp->sendTo(buf, sz, toAddr);
}
// -------------------------------------------------------------------------
std::vector<Poco::Net::IPAddress> MulticastSendTransport::getGroups()
Poco::Net::SocketAddress MulticastSendTransport::getGroupAddress()
{
return groups;
return toAddr;
}
// -------------------------------------------------------------------------
......@@ -31,10 +31,10 @@ namespace uniset
{
public:
static std::unique_ptr<MulticastReceiveTransport> createFromXml(UniXML::iterator it, const std::string& defaultIP, int numChan, const std::string& section = "receive");
static std::unique_ptr<MulticastReceiveTransport> createFromXml(UniXML::iterator it, const std::string& defaultIP, int numChan, const std::string& defIface = "", const std::string& section = "receive");
static xmlNode* getReceiveListNode( UniXML::iterator root );
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress>& joinGroups );
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress>& joinGroups, const std::string& iface = "" );
virtual ~MulticastReceiveTransport();
virtual bool isConnected() const override;
......@@ -55,6 +55,7 @@ namespace uniset
const std::string host;
const int port;
const std::vector<Poco::Net::IPAddress> groups;
const std::string ifaceaddr;
};
class MulticastSendTransport:
......@@ -64,7 +65,7 @@ namespace uniset
static std::unique_ptr<MulticastSendTransport> createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan );
MulticastSendTransport(const std::string& host, int port, const std::vector<Poco::Net::IPAddress>& sendGroups, int ttl = 1 );
MulticastSendTransport(const std::string& sockHost, int sockPort, const std::string& groupHost, int groupPort, int ttl = 1 );
virtual ~MulticastSendTransport();
virtual bool isConnected() const override;
......@@ -72,7 +73,7 @@ namespace uniset
virtual bool createConnection(bool throwEx, timeout_t sendTimeout) override;
virtual int getSocket() const override;
std::vector<Poco::Net::IPAddress> getGroups();
Poco::Net::SocketAddress getGroupAddress();
// write
virtual bool isReadyForSend(timeout_t tout) override;
......@@ -83,8 +84,8 @@ namespace uniset
protected:
std::unique_ptr <MulticastSocketU> udp;
const Poco::Net::SocketAddress saddr;
const std::vector<Poco::Net::IPAddress> groups;
const Poco::Net::SocketAddress sockAddr;
const Poco::Net::SocketAddress toAddr;
int ttl; // ttl for packets
};
......
......@@ -1100,6 +1100,8 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
auto conf = uniset_conf();
const string default_ip = n_it.getProp("unet_multicast_ip");
const string default_ip2 = n_it.getProp("unet_multicast_ip2");
const string default_iface1 = n_it.getProp("unet_multicast_iface1");
const string default_iface2 = n_it.getProp("unet_multicast_iface2");
if( !n_it.goChildren() )
throw uniset::SystemError("(UNetExchange): Items not found for <nodes>");
......@@ -1111,7 +1113,7 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
{
if( n_it.getIntProp("unet_ignore") )
{
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
unetinfo << myname << "(init): " << n_it.getProp("name") << " unet_ignore.." << endl;
continue;
}
......@@ -1127,18 +1129,14 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
if( no_sender )
{
unetinfo << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
unetinfo << myname << "(init): " << n_it.getProp("name") << " sender DISABLED." << endl;
break;
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
unetinfo << myname << "(init): " << n_it.getProp("name") << " init sender.." << endl;
auto s1 = MulticastSendTransport::createFromXml(n_it, default_ip, 0);
unetinfo << myname << "(init): send (channel1) to multicast groups: " << endl;
for( const auto& gr : s1->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
unetinfo << myname << "(init): " << n_it.getProp("name") << " send (channel1) to multicast group: " << s1->getGroupAddress().toString() << endl;
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
loga->add(sender->getLog());
......@@ -1152,12 +1150,7 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
auto s2 = MulticastSendTransport::createFromXml(n_it, default_ip2, 2);
if( s2 )
{
unetinfo << myname << "(init): send (channel2) to multicast groups: " << endl;
for( const auto& gr : s2->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
}
unetinfo << myname << "(init): " << n_it.getProp("name") << " send (channel2) to multicast group: " << s2->getGroupAddress().toString() << endl;
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, "unet", prefix);
}
......@@ -1165,7 +1158,7 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
if( sender2 )
loga->add(sender2->getLog());
else
unetwarn << myname << "(ignore): sender for Channel2 disabled " << endl;
unetwarn << myname << "(ignore): " << n_it.getProp("name") << " sender for Channel2 disabled " << endl;
}
catch( std::exception& ex )
{
......@@ -1190,16 +1183,18 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
if( !it.getIntProp("unet_multicast_receive_from_all_nodes") )
{
initMulticastReceiverForNode(n_it, default_ip, default_ip2, "receive", prefix);
initMulticastReceiverForNode(n_it, default_ip, default_ip2, default_iface1, default_iface2, "receive", prefix);
return;
}
unetwarn << myname << "(init): init multicast group from nodes.." << endl;
// init receivers (by nodes)
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
{
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
unetinfo << myname << "(init): " << n_it.getProp("name") << " unet_ignore.." << endl;
continue;
}
......@@ -1212,20 +1207,22 @@ void UNetExchange::initMulticastTransport( UniXML::iterator n_it,
if( n == conf->getLocalNodeName() )
continue;
initMulticastReceiverForNode(n_it, default_ip, default_ip2, "send", prefix);
initMulticastReceiverForNode(n_it, default_ip, default_ip2, default_iface1, default_iface2, "send", prefix);
}
}
// ----------------------------------------------------------------------------
void UNetExchange::initMulticastReceiverForNode(UniXML::iterator n_it,
const std::string& default_ip,
const std::string& default_ip2,
const std::string& default_iface1,
const std::string& default_iface2,
const std::string& section,
const std::string& prefix )
{
auto conf = uniset_conf();
unetinfo << myname << "(init): add UNetReceiver for node " << n_it.getProp("name") << endl;
auto transport1 = MulticastReceiveTransport::createFromXml(n_it, default_ip, 0, section);
unetinfo << myname << "(init): " << n_it.getProp("name") << " init receivers:" << endl;
auto transport1 = MulticastReceiveTransport::createFromXml(n_it, default_ip, 0, default_iface1, section);
if( checkExistTransport(transport1->ID()) )
{
......@@ -1363,11 +1360,11 @@ void UNetExchange::initMulticastReceiverForNode(UniXML::iterator n_it,
}
}
unetinfo << myname << "(init): (node='" << n_it.getProp("name") << "') add channel1 receiver " << transport1->ID() << endl;
unetinfo << myname << "(init): (node='" << n_it.getProp("name") << "') add channel1 receiver " << transport1->ID() << " iface: " << default_iface1 << endl;
unetinfo << myname << "(init): receive (channel1) from multicast groups: " << endl;
for( const auto& gr : transport1->getGroups() )
unetinfo << myname << "(init): " << gr.toString() << endl;
unetinfo << myname << "(init): " << gr.toString() << endl;
auto r1 = make_shared<UNetReceiver>(std::move(transport1), shm, false, prefix);
......@@ -1387,11 +1384,11 @@ void UNetExchange::initMulticastReceiverForNode(UniXML::iterator n_it,
std::unique_ptr<MulticastReceiveTransport> transport2 = nullptr;
if (!n_it.getProp("unet_multicast_ip2").empty() || !default_ip2.empty())
transport2 = MulticastReceiveTransport::createFromXml(n_it, default_ip2, 2, section);
transport2 = MulticastReceiveTransport::createFromXml(n_it, default_ip2, 2, default_iface2, section);
if( transport2 ) // создаём читателя по второму каналу
{
unetinfo << myname << "(init): (node='" << n_it.getProp("name") << "') add channel2 receiver " << transport2->ID() << endl;
unetinfo << myname << "(init): (node='" << n_it.getProp("name") << "') add channel2 receiver " << transport2->ID() << " iface: " << default_iface2 << endl;
unetinfo << myname << "(init): receive(channel2) from multicast groups: " << endl;
for( const auto& gr : transport2->getGroups() )
......
......@@ -114,6 +114,13 @@ namespace uniset
Если свойство \b unet_multicast_receive_from_all_nodes="0" или не указано и создана пустая секция \b \<receive/>,
то узел \b не будет слушать и получать сообщения.
В секции \b \<nodes param1 param2 ...> можно задавать умолчательный адрес \b unet_multicast_default_ip1=".." и \b unet_multicast_default_ip2="..".
Помимо этого можно определить умолчательный интерфейс на котором происходит подключение к группам
\b unet_multicast_default_iface1=".." и \b unet_multicast_default_iface2="..".
\warning В текущей реализации поддерживается только одна <send><group .../></send>!
\code
<nodes port="2809" unet_broadcast_ip="192.168.56.255">
<item ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_multicast_port="3000" unet_multicast_ip="192.168.57.255">
......@@ -248,7 +255,14 @@ namespace uniset
void termReceivers();
void initMulticastTransport( UniXML::iterator nodes, const std::string& n_field, const std::string& n_fvalue, const std::string& prefix );
void initMulticastReceiverForNode( UniXML::iterator n_it, const std::string& default_ip1, const std::string& default_ip2, const std::string& section, const std::string& prefix);
void initMulticastReceiverForNode( UniXML::iterator n_it,
const std::string& default_ip1,
const std::string& default_ip2,
const std::string& default_iface1,
const std::string& default_iface2,
const std::string& section,
const std::string& prefix);
void initUDPTransport(UniXML::iterator nodes, const std::string& n_field, const std::string& n_fvalue, const std::string& prefix);
void initIterators() noexcept;
void startReceivers();
......
......@@ -47,12 +47,13 @@ TEST_CASE("[UNetUDP]: multicast transport", "[unetudp][multicast][transport]")
REQUIRE( it.getName() == "item" );
REQUIRE( it.getProp("name") == "localhost" );
auto t1 = MulticastReceiveTransport::createFromXml(it, "127.0.0.1", 0 );
REQUIRE( t1->toString() == "127.0.0.1:3000" );
auto t1 = MulticastReceiveTransport::createFromXml(it, "0.0.0.0", 0 );
REQUIRE( t1->toString() == "0.0.0.0:3000" );
REQUIRE( t1->createConnection(false, 5000, true) );
auto t2 = MulticastSendTransport::createFromXml(it, "127.0.0.1", 0 );
REQUIRE( t2->toString() == "127.0.0.1:3000" );
REQUIRE( t2->getGroupAddress() == Poco::Net::SocketAddress("224.0.0.1", 3000) );
REQUIRE( t2->createConnection(false, 5000) );
string msg = "hello world";
......
......@@ -47,22 +47,22 @@ static void initHelpers()
if( !udp_r )
{
// udp_r = MulticastReceiveTransport::createFromXml(it, "127.0.0.1", 0, "send");
std::vector<Poco::Net::IPAddress> groups;
groups.emplace_back("238.255.1.1");
udp_r = make_unique<MulticastReceiveTransport>("127.0.0.1", 3000, groups);
REQUIRE( udp_r->toString() == "127.0.0.1:3000" );
groups.emplace_back("224.0.0.1");
udp_r = make_unique<MulticastReceiveTransport>("0.0.0.0", 3000, groups, "127.0.0.1");
REQUIRE( udp_r->toString() == "0.0.0.0:3000" );
REQUIRE( udp_r->createConnection(false, 5000, true) );
// pause for igmp message
msleep(3000);
}
if( !udp_s )
{
// udp_s = MulticastSendTransport::createFromXml(it, "127.0.0.1", 0);
std::vector<Poco::Net::IPAddress> groups;
groups.emplace_back("238.255.1.2");
udp_s = make_unique<MulticastSendTransport>("127.0.0.1", 3002, groups);
udp_s = make_unique<MulticastSendTransport>("127.0.0.1", 3002, "224.0.0.1", 3002);
REQUIRE( udp_s->toString() == "127.0.0.1:3002" );
REQUIRE( udp_s->createConnection(false, 5000) );
// pause for igmp message
msleep(3000);
}
}
// -----------------------------------------------------------------------------
......
......@@ -32,15 +32,15 @@
</settings>
<ObjectsMap idfromfile="1">
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="badip" unet_multicast_ip="127.0.0.1">
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="badip" unet_multicast_ip="0.0.0.0">
<item id="3000" ip="127.0.0.1" name="localhost" textname="Локальный узел" unet_ignore="0" unet_port="3000" unet_multicast_ip2="127.0.1.1" unet_multicast_port2="2999" unet_multicast_receive_from_all_nodes="1">
<multicast>
<receive>
<group addr="238.255.1.2" addr2="239.255.1.2"/>
<group addr="238.255.1.3" addr2="239.255.1.3"/>
<group addr="224.0.0.1" addr2="225.0.0.2"/>
<group addr="224.0.0.3" addr2="225.0.0.3"/>
</receive>
<send>
<group addr="238.255.1.1" addr2="239.255.1.1"/>
<group addr="224.0.0.1" addr2="225.0.0.1"/>
</send>
</multicast>
</item>
......@@ -48,22 +48,22 @@
<item id="3002" ip="192.168.56.10" name="Node1" textname="Node1" unet_ignore="0" unet_respond_id="Node1_Not_Respond_S" unet_respond_invert="1" unet_channelswitchcount_id="Node1_ChannelSwitchCount_AS" unet_multicast_receive_from_all_nodes="1">
<multicast>
<receive>
<group addr="238.255.1.1" addr2="239.255.1.1"/>
<group addr="238.255.1.3" addr2="239.255.1.3"/>
<group addr="224.0.0.1" addr2="225.0.0.1"/>
<group addr="224.0.0.3" addr2="225.0.0.3"/>
</receive>
<send>
<group addr="238.255.1.2" addr2="239.255.1.2"/>
<group addr="224.0.0.2" addr2="225.0.0.2"/>
</send>
</multicast>
</item>
<item id="3003" ip="192.168.56.11" name="Node2" textname="Node2" unet_ignore="0" unet_respond_id="Node2_Respond_S" unet_lostpackets_id="Node2_LostPackets_AS" unet_numchannel_id="Node2_NumChannel_AS" unet_multicast_receive_from_all_nodes="1">
<multicast>
<receive>
<group addr="238.255.1.1" addr2="239.255.1.1"/>
<group addr="238.255.1.2" addr2="239.255.1.2"/>
<group addr="224.0.0.1" addr2="225.0.0.1"/>
<group addr="224.0.0.2" addr2="225.0.0.2"/>
</receive>
<send>
<group addr="238.255.1.3" addr2="239.255.1.3"/>
<group addr="224.0.0.3" addr2="225.0.0.3"/>
</send>
</multicast>
</item>
......
......@@ -31,6 +31,7 @@ static struct option longopts[] =
{ "d-data", required_argument, 0, 'i' },
{ "group", required_argument, 0, 'g' },
{ "loopback", no_argument, 0, 'b' },
{ "iface", required_argument, 0, 'f' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
......@@ -82,10 +83,11 @@ int main(int argc, char* argv[])
std::string a_data = "";
std::vector<Poco::Net::IPAddress> groups;
bool loopback = false;
std::string iface = "";
while(1)
{
opt = getopt_long(argc, argv, "hbs:c:r:p:n:t:x:blvdz:y:a:i:g:", longopts, &optindex);
opt = getopt_long(argc, argv, "hbs:c:r:p:n:t:x:blvdz:y:a:i:g:f:", longopts, &optindex);
if( opt == -1 )
break;
......@@ -98,6 +100,7 @@ int main(int argc, char* argv[])
cout << "[-c|--data-count] num - Send num count of value. Default: 50." << endl;
cout << "[-r|--receive] host:port - Receive message." << endl;
cout << "[-g|--group] ip - Multicast group address (can be specified many times)" << endl;
cout << "[-f|--iface] ip - Multicast interface" << endl;
cout << "[-b|--loopback] - Enable multicast loopback." << endl;
cout << "[-p|--proc-id] id - Set packet header. From 'procID'. Default: 1" << endl;
cout << "[-n|--node-id] id - Set packet header. From 'nodeID'. Default: 1" << endl;
......@@ -159,10 +162,24 @@ int main(int argc, char* argv[])
nodeID = atoi(optarg);
break;
case 'g':
groups.emplace_back(Poco::Net::IPAddress(optarg));
case 'f':
iface = string(optarg);
break;
case 'g':
{
Poco::Net::IPAddress a(optarg);
if( !a.isMulticast() )
{
cerr << "Group address " << optarg << " is not multicast!" << endl;
return 1;
}
groups.emplace_back(a);
}
break;
case 'd':
show = true;
break;
......@@ -211,8 +228,12 @@ int main(int argc, char* argv[])
if( verb )
{
cout << " host=" << s_host
<< " port=" << port
<< " timeout=";
<< " port=" << port;
if( !iface.empty() )
cout << " iface=" << iface << endl;
cout << " timeout=";
if( tout == UniSetTimer::WaitUpTime )
cout << "Waitup";
......@@ -233,18 +254,13 @@ int main(int argc, char* argv[])
{
case cmdReceive:
{
MulticastReceiveTransport udp(s_host, port, groups);
MulticastReceiveTransport udp(s_host, port, groups, iface);
udp.createConnection(true, 500, true);
if( loopback )
udp.setLoopBack(true);
msleep(5000);
udp.disconnect();
return 0;
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
unsigned long prev_num = 1;
......@@ -332,7 +348,14 @@ int main(int argc, char* argv[])
case cmdSend:
{
auto udp = std::make_shared<MulticastSendTransport>(s_host, port, groups);
if( groups.empty() )
{
cerr << "(send): Unknown multicast group address for send ..." << endl;
return 1;
}
// supporte only first group address
auto udp = std::make_shared<MulticastSendTransport>(s_host, port, groups[0].toString(), port);
UniSetUDP::UDPMessage mypack;
mypack.nodeID = nodeID;
......
// -------------------------------------------------------------------------
#ifndef UDPReceiveU_H_
#define UDPReceiveU_H_
#ifndef UDPCore_H_
#define UDPCore_H_
// -------------------------------------------------------------------------
#include <Poco/Net/DatagramSocket.h>
#include <Poco/Net/MulticastSocket.h>
......@@ -70,6 +70,10 @@ namespace uniset
Poco::Net::MulticastSocket(Poco::Net::SocketAddress(bind, port), true)
{}
MulticastSocketU( const Poco::Net::SocketAddress& addr ):
Poco::Net::MulticastSocket(addr, true)
{}
virtual ~MulticastSocketU() {}
inline int getSocket() const
......@@ -80,5 +84,5 @@ namespace uniset
// -------------------------------------------------------------------------
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // UDPReceiveU_H_
#endif // UDPCore_H_
// -------------------------------------------------------------------------
############################################################################
# This file is part of the UniSet library #
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libTCP.la
libTCP_la_SOURCES = UTCPCore.cc UTCPStream.cc USocket.cc UTCPSocket.cc TCPCheck.cc
......
......@@ -26,94 +26,94 @@ using namespace std;
// -----------------------------------------------------------------------------
namespace uniset
{
// -----------------------------------------------------------------------------
bool TCPCheck::check( const std::string& _iaddr, timeout_t tout ) noexcept
{
auto v = uniset::explode_str(_iaddr, ':');
if( v.size() < 2 )
return false;
return check( v[0], uniset::uni_atoi(v[1]), tout );
}
// -----------------------------------------------------------------------------
bool TCPCheck::check( const std::string& ip, int port, timeout_t tout_msec ) noexcept
{
try
{
std::future<bool> future = std::async(std::launch::async, [ = ]()
{
// Сама проверка...
bool result = false;
try
{
UTCPStream t;
t.create(ip, port, tout_msec);
// если удалось создать соединение, значит OK
result = t.isConnected();
t.disconnect();
}
catch( ... ) {}
return result;
});
std::future_status status;
do
{
status = future.wait_for(std::chrono::milliseconds(tout_msec));
if( status == std::future_status::timeout )
return false;
}
while( status != std::future_status::ready );
return future.get();
}
catch( std::exception& ex )
{
}
return false;
}
// -----------------------------------------------------------------------------
bool TCPCheck::ping( const std::string& ip, timeout_t tout_msec, const std::string& ping_args ) noexcept
{
try
{
std::future<bool> future = std::async(std::launch::async, [ = ]()
{
// Сама проверка...
ostringstream cmd;
cmd << "ping " << ping_args << " " << ip << " 2>/dev/null 1>/dev/null";
int ret = system(cmd.str().c_str());
int res = WEXITSTATUS(ret);
return (res == 0);
});
std::future_status status;
do
{
status = future.wait_for(std::chrono::milliseconds(tout_msec));
if( status == std::future_status::timeout )
return false;
}
while( status != std::future_status::ready );
return future.get();
}
catch( std::exception& ex )
{
}
return false;
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
bool TCPCheck::check( const std::string& _iaddr, timeout_t tout ) noexcept
{
auto v = uniset::explode_str(_iaddr, ':');
if( v.size() < 2 )
return false;
return check( v[0], uniset::uni_atoi(v[1]), tout );
}
// -----------------------------------------------------------------------------
bool TCPCheck::check( const std::string& ip, int port, timeout_t tout_msec ) noexcept
{
try
{
std::future<bool> future = std::async(std::launch::async, [ = ]()
{
// Сама проверка...
bool result = false;
try
{
UTCPStream t;
t.create(ip, port, tout_msec);
// если удалось создать соединение, значит OK
result = t.isConnected();
t.disconnect();
}
catch( ... ) {}
return result;
});
std::future_status status;
do
{
status = future.wait_for(std::chrono::milliseconds(tout_msec));
if( status == std::future_status::timeout )
return false;
}
while( status != std::future_status::ready );
return future.get();
}
catch( std::exception& ex )
{
}
return false;
}
// -----------------------------------------------------------------------------
bool TCPCheck::ping( const std::string& ip, timeout_t tout_msec, const std::string& ping_args ) noexcept
{
try
{
std::future<bool> future = std::async(std::launch::async, [ = ]()
{
// Сама проверка...
ostringstream cmd;
cmd << "ping " << ping_args << " " << ip << " 2>/dev/null 1>/dev/null";
int ret = system(cmd.str().c_str());
int res = WEXITSTATUS(ret);
return (res == 0);
});
std::future_status status;
do
{
status = future.wait_for(std::chrono::milliseconds(tout_msec));
if( status == std::future_status::timeout )
return false;
}
while( status != std::future_status::ready );
return future.get();
}
catch( std::exception& ex )
{
}
return false;
}
// -----------------------------------------------------------------------------
} // end of namespace uniset
......@@ -5,38 +5,38 @@ using namespace std;
// -------------------------------------------------------------------------
namespace uniset
{
// -------------------------------------------------------------------------
USocket::~USocket()
{
try
{
close();
}
catch(...) {}
}
// -------------------------------------------------------------------------
USocket::USocket()
{
init();
}
// -------------------------------------------------------------------------
bool USocket::setKeepAliveParams( timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(getSocket(), timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
int USocket::getSocket()
{
return Socket::sockfd();
}
// -------------------------------------------------------------------------
void USocket::init( bool throwflag )
{
//setError(throwflag);
setKeepAlive(true);
Socket::setLinger(true, 1);
//setLinger(true);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
USocket::~USocket()
{
try
{
close();
}
catch(...) {}
}
// -------------------------------------------------------------------------
USocket::USocket()
{
init();
}
// -------------------------------------------------------------------------
bool USocket::setKeepAliveParams( timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(getSocket(), timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
int USocket::getSocket()
{
return Socket::sockfd();
}
// -------------------------------------------------------------------------
void USocket::init( bool throwflag )
{
//setError(throwflag);
setKeepAlive(true);
Socket::setLinger(true, 1);
//setLinger(true);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
} // end of namespace uniset
......@@ -6,64 +6,64 @@ using namespace std;
// -------------------------------------------------------------------------
namespace uniset
{
// -------------------------------------------------------------------------
bool UTCPCore::setKeepAliveParams( int fd, timeout_t timeout_sec, int keepcnt, int keepintvl ) noexcept
{
int enable = 1;
bool ok = true;
// -------------------------------------------------------------------------
bool UTCPCore::setKeepAliveParams( int fd, timeout_t timeout_sec, int keepcnt, int keepintvl ) noexcept
{
int enable = 1;
bool ok = true;
if( setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&enable, sizeof(enable)) == -1 )
ok = false;
if( setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&enable, sizeof(enable)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPCNT, (void*) &keepcnt, sizeof(keepcnt)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPCNT, (void*) &keepcnt, sizeof(keepcnt)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, (void*) &keepintvl, sizeof (keepintvl)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, (void*) &keepintvl, sizeof (keepintvl)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, (void*) &timeout_sec, sizeof (timeout_sec)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, (void*) &timeout_sec, sizeof (timeout_sec)) == -1 )
ok = false;
return ok;
}
// -------------------------------------------------------------------------
UTCPCore::Buffer::Buffer( const unsigned char* bytes, size_t nbytes )
{
pos = 0;
len = nbytes;
return ok;
}
// -------------------------------------------------------------------------
UTCPCore::Buffer::Buffer( const unsigned char* bytes, size_t nbytes )
{
pos = 0;
len = nbytes;
if( len == 0 ) // ??!!
return;
if( len == 0 ) // ??!!
return;
data = new unsigned char[nbytes];
std::memcpy(data, bytes, nbytes);
}
// -------------------------------------------------------------------------
UTCPCore::Buffer::Buffer( const string& s )
{
pos = 0;
len = s.length();
data = new unsigned char[nbytes];
std::memcpy(data, bytes, nbytes);
}
// -------------------------------------------------------------------------
UTCPCore::Buffer::Buffer( const string& s )
{
pos = 0;
len = s.length();
if( len <= 0 ) // ??!!
return;
if( len <= 0 ) // ??!!
return;
data = new unsigned char[len];
std::memcpy(data, s.data(), len);
}
// -------------------------------------------------------------------------
UTCPCore::Buffer::~Buffer()
{
delete [] data;
}
// -------------------------------------------------------------------------
unsigned char* UTCPCore::Buffer::dpos() const noexcept
{
return data + pos;
}
// -------------------------------------------------------------------------
size_t UTCPCore::Buffer::nbytes() const noexcept
{
return len - pos;
}
// -------------------------------------------------------------------------
data = new unsigned char[len];
std::memcpy(data, s.data(), len);
}
// -------------------------------------------------------------------------
UTCPCore::Buffer::~Buffer()
{
delete [] data;
}
// -------------------------------------------------------------------------
unsigned char* UTCPCore::Buffer::dpos() const noexcept
{
return data + pos;
}
// -------------------------------------------------------------------------
size_t UTCPCore::Buffer::nbytes() const noexcept
{
return len - pos;
}
// -------------------------------------------------------------------------
} // end of namespace uniset
......@@ -12,48 +12,48 @@ using namespace std;
// -------------------------------------------------------------------------
namespace uniset
{
// -------------------------------------------------------------------------
UTCPSocket::~UTCPSocket()
{
try
{
Poco::Net::ServerSocket::close();
}
catch(...) {}
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket()
{
// -------------------------------------------------------------------------
UTCPSocket::~UTCPSocket()
{
try
{
Poco::Net::ServerSocket::close();
}
catch(...) {}
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket()
{
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( int sock ):
Poco::Net::ServerSocket(sock)
{
init();
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( const string& host, int port ):
Poco::Net::ServerSocket(Poco::Net::SocketAddress(host, port), true)
{
init();
}
// -------------------------------------------------------------------------
bool UTCPSocket::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(Poco::Net::ServerSocket::sockfd() , timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
int UTCPSocket::getSocket() const noexcept
{
return Poco::Net::ServerSocket::sockfd();
}
// -------------------------------------------------------------------------
void UTCPSocket::init()
{
Poco::Net::ServerSocket::setKeepAlive(true);
Poco::Net::ServerSocket::setLinger(true, 1);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( int sock ):
Poco::Net::ServerSocket(sock)
{
init();
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( const string& host, int port ):
Poco::Net::ServerSocket(Poco::Net::SocketAddress(host, port), true)
{
init();
}
// -------------------------------------------------------------------------
bool UTCPSocket::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(Poco::Net::ServerSocket::sockfd(), timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
int UTCPSocket::getSocket() const noexcept
{
return Poco::Net::ServerSocket::sockfd();
}
// -------------------------------------------------------------------------
void UTCPSocket::init()
{
Poco::Net::ServerSocket::setKeepAlive(true);
Poco::Net::ServerSocket::setLinger(true, 1);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
} // end of namespace uniset
......@@ -29,115 +29,115 @@ using namespace std;
// -------------------------------------------------------------------------
namespace uniset
{
// -------------------------------------------------------------------------
UTCPStream::~UTCPStream()
{
// -------------------------------------------------------------------------
UTCPStream::~UTCPStream()
{
}
// -------------------------------------------------------------------------
UTCPStream::UTCPStream(const Poco::Net::StreamSocket& so):
Poco::Net::StreamSocket(so)
{
}
// -------------------------------------------------------------------------
UTCPStream::UTCPStream(const Poco::Net::StreamSocket& so):
Poco::Net::StreamSocket(so)
{
}
}
UTCPStream::UTCPStream()
{
}
// -------------------------------------------------------------------------
bool UTCPStream::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(Poco::Net::StreamSocket::sockfd(), timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
bool UTCPStream::isSetLinger() const
{
bool on;
int sec;
Poco::Net::StreamSocket::getLinger(on, sec);
return on;
}
// -------------------------------------------------------------------------
void UTCPStream::forceDisconnect()
{
if( !isConnected() )
return;
UTCPStream::UTCPStream()
{
}
// -------------------------------------------------------------------------
bool UTCPStream::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(Poco::Net::StreamSocket::sockfd(), timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
bool UTCPStream::isSetLinger() const
{
bool on;
int sec;
Poco::Net::StreamSocket::getLinger(on, sec);
return on;
}
// -------------------------------------------------------------------------
void UTCPStream::forceDisconnect()
{
if( !isConnected() )
return;
try
{
setLinger(false, 0);
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
try
{
setLinger(false, 0);
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
try
{
close();
//shutdown();
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
}
// -------------------------------------------------------------------------
void UTCPStream::disconnect()
{
try
{
shutdown();
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
try
{
close();
//shutdown();
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
}
// -------------------------------------------------------------------------
void UTCPStream::disconnect()
{
try
{
shutdown();
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
try
{
close();
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
}
// -------------------------------------------------------------------------
int UTCPStream::getSocket() const
{
return Poco::Net::StreamSocket::sockfd();
}
// -------------------------------------------------------------------------
timeout_t UTCPStream::getTimeout() const
{
auto tm = Poco::Net::StreamSocket::getReceiveTimeout();
return tm.totalMicroseconds();
}
// -------------------------------------------------------------------------
void UTCPStream::create( const std::string& hname, uint16_t port, timeout_t tout_msec )
{
Poco::Net::SocketAddress saddr(hname, port);
connect(saddr, UniSetTimer::millisecToPoco(tout_msec));
setKeepAlive(true);
Poco::Net::StreamSocket::setLinger(true, 1);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
bool UTCPStream::isConnected() const noexcept
{
return ( Poco::Net::StreamSocket::sockfd() != POCO_INVALID_SOCKET );
/*
try
{
// Вариант 1
//return ( Poco::Net::StreamSocket::peerAddress().addr() != 0 );
try
{
close();
}
// catch( Poco::Net::NetException& ex ){}
catch( std::exception& ex ) {}
}
// -------------------------------------------------------------------------
int UTCPStream::getSocket() const
{
return Poco::Net::StreamSocket::sockfd();
}
// -------------------------------------------------------------------------
timeout_t UTCPStream::getTimeout() const
{
auto tm = Poco::Net::StreamSocket::getReceiveTimeout();
return tm.totalMicroseconds();
}
// -------------------------------------------------------------------------
void UTCPStream::create( const std::string& hname, uint16_t port, timeout_t tout_msec )
{
Poco::Net::SocketAddress saddr(hname, port);
connect(saddr, UniSetTimer::millisecToPoco(tout_msec));
setKeepAlive(true);
Poco::Net::StreamSocket::setLinger(true, 1);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
bool UTCPStream::isConnected() const noexcept
{
return ( Poco::Net::StreamSocket::sockfd() != POCO_INVALID_SOCKET );
/*
try
{
// Вариант 1
//return ( Poco::Net::StreamSocket::peerAddress().addr() != 0 );
// Варинт 2
return ( Poco::Net::StreamSocket::peerAddress().port() != 0 );
// Варинт 2
return ( Poco::Net::StreamSocket::peerAddress().port() != 0 );
// Вариант 3
// if( poll({0, 5}, Poco::Net::Socket::SELECT_READ) )
// return (tcp->available() > 0);
}
catch( Poco::Net::NetException& ex )
{
}
// Вариант 3
// if( poll({0, 5}, Poco::Net::Socket::SELECT_READ) )
// return (tcp->available() > 0);
}
catch( Poco::Net::NetException& ex )
{
}
return false;
*/
}
// -------------------------------------------------------------------------
return false;
*/
}
// -------------------------------------------------------------------------
} // end of namespace uniset
......@@ -312,14 +312,6 @@
./extensions/UWebSocketGate/UWebSocketGate.cc
./extensions/UWebSocketGate/UWebSocketGate.h
./extensions/UWebSocketGate/UWebSocketGateSugar.h
./extensions/WS/main.cc
./extensions/WS/Makefile.am
./extensions/WS/tests/Makefile.am
./extensions/WS/tests/tests_with_sm.cc
./extensions/WS/tests/test_uwebsocketgate.cc
./extensions/WS/UWebSocketGate.cc
./extensions/WS/UWebSocketGate.h
./extensions/WS/UWebSocketGateSugar.h
./IDL/Makefile.am
./IDL/Processes/Makefile.am
./IDL/UniSetTypes/Makefile.am
......@@ -435,13 +427,27 @@
./src/Communications/Modbus/ModbusTypes.cc
./src/Communications/TCP/Makefile.am
./src/Communications/TCP/TCPCheck.cc
./src/Communications/TCP/UDPCore.cc
./src/Communications/TCP/USocket.cc
./src/Communications/TCP/UTCPCore.cc
./src/Communications/TCP/UTCPSocket.cc
./src/Communications/TCP/UTCPStream.cc
./src/Core/Configuration.cc
./src/Core/Makefile.am
./src/Core.m/Configuration.cc
./src/Core/MessageType.cc
./src/Core.m/Makefile.am
./src/Core.m/MessageType.cc
./src/Core.m/UA.cc
./src/Core.m/UInterface.cc
./src/Core.m/UniSetActivator.cc
./src/Core.m/UniSetBaseConstantsSK.cc
./src/Core.m/UniSetManager.cc
./src/Core.m/UniSetManager_iSK.cc
./src/Core.m/UniSetObject.cc
./src/Core.m/UniSetObject_iSK.cc
./src/Core.m/UniSetTypes.cc
./src/Core.m/UniSetTypes_iSK.cc
./src/Core/UA.cc
./src/Core/UInterface.cc
./src/Core/UniSetActivator.cc
......@@ -452,6 +458,19 @@
./src/Core/UniSetObject_iSK.cc
./src/Core/UniSetTypes.cc
./src/Core/UniSetTypes_iSK.cc
./src/Core.x/Configuration.cc
./src/Core.x/Makefile.am
./src/Core.x/MessageType.cc
./src/Core.x/UA.cc
./src/Core.x/UInterface.cc
./src/Core.x/UniSetActivator.cc
./src/Core.x/UniSetBaseConstantsSK.cc
./src/Core.x/UniSetManager.cc
./src/Core.x/UniSetManager_iSK.cc
./src/Core.x/UniSetObject.cc
./src/Core.x/UniSetObject_iSK.cc
./src/Core.x/UniSetTypes.cc
./src/Core.x/UniSetTypes_iSK.cc
./src/Log/Debug.cc
./src/Log/DebugExtBuf.h
./src/Log/DebugStream.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment