Commit 97ff1779 authored by Pavel Vainerman's avatar Pavel Vainerman

[unet-multicast]: first prototype

parent 3d303f1e
...@@ -10,7 +10,7 @@ libUniSet2UNetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \ ...@@ -10,7 +10,7 @@ libUniSet2UNetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \ $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS) $(POCO_LIBS) $(SIGC_LIBS) $(POCO_LIBS)
libUniSet2UNetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS) libUniSet2UNetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
libUniSet2UNetUDP_la_SOURCES = UDPPacket.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc libUniSet2UNetUDP_la_SOURCES = UDPPacket.cc UDPTransport.cc MulticastTransport.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
@PACKAGE@_unetexchange_SOURCES = unetexchange.cc @PACKAGE@_unetexchange_SOURCES = unetexchange.cc
@PACKAGE@_unetexchange_LDADD = libUniSet2UNetUDP.la $(top_builddir)/lib/libUniSet2.la \ @PACKAGE@_unetexchange_LDADD = libUniSet2UNetUDP.la $(top_builddir)/lib/libUniSet2.la \
......
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "Exceptions.h"
#include "PassiveTimer.h"
#include "unisetstd.h"
#include "MulticastTransport.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
MulticastReceiveTransport::MulticastReceiveTransport( const std::string& _bind, int _port, const std::vector<Poco::Net::IPAddress> _joinGroups ):
host(_bind),
port(_port),
groups(_joinGroups)
{
}
// -------------------------------------------------------------------------
MulticastReceiveTransport::~MulticastReceiveTransport()
{
if( udp )
{
for (const auto& s : groups)
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
}
}
}
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
std::string MulticastReceiveTransport::ID() const noexcept
{
return toString();
}
// -------------------------------------------------------------------------
std::string MulticastReceiveTransport::toString() const
{
ostringstream s;
s << host << ":" << port;
return s.str();
}
// -------------------------------------------------------------------------
void MulticastReceiveTransport::disconnect()
{
if (udp)
{
for (const auto& s : groups)
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
}
udp = nullptr;
}
}
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::createConnection( bool throwEx, timeout_t readTimeout, bool noblock )
{
try
{
udp = unisetstd::make_unique<MulticastSocketU>(host, port);
udp->setBlocking(!noblock);
for( const auto& s : groups )
udp->joinGroup(s);
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return ( udp != nullptr );
}
// -------------------------------------------------------------------------
int MulticastReceiveTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
ssize_t MulticastReceiveTransport::receive( void* r_buf, size_t sz )
{
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
MulticastSendTransport::MulticastSendTransport( const std::string& _host, int _port, const std::vector<Poco::Net::IPAddress> _sendGroups ):
saddr(_host, _port),
groups(_sendGroups)
{
}
// -------------------------------------------------------------------------
MulticastSendTransport::~MulticastSendTransport()
{
if( udp )
{
for (const auto& s : groups)
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
}
}
}
// -------------------------------------------------------------------------
std::string MulticastSendTransport::toString() const
{
return saddr.toString();
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::createConnection( bool throwEx, timeout_t sendTimeout )
{
try
{
udp = unisetstd::make_unique<MulticastSocketU>();
for( const auto& s : groups )
udp->joinGroup(s);
udp->setSendTimeout( UniSetTimer::millisecToPoco(sendTimeout) );
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return (udp != nullptr);
}
// -------------------------------------------------------------------------
int MulticastSendTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::isReadyForSend( timeout_t tout )
{
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
}
// -------------------------------------------------------------------------
ssize_t MulticastSendTransport::send( void* buf, size_t sz )
{
return udp->sendTo(buf, sz, saddr);
}
// -------------------------------------------------------------------------
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#ifndef MulticastTransport_H_
#define MulticastTransport_H_
// -------------------------------------------------------------------------
#include <string>
#include <memory>
#include <vector>
#include "UNetTransport.h"
#include "UDPCore.h"
// -------------------------------------------------------------------------
namespace uniset
{
class MulticastReceiveTransport :
public UNetReceiveTransport
{
public:
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress> joinGroups );
virtual ~MulticastReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual std::string ID() const noexcept override;
virtual bool createConnection(bool throwEx, timeout_t readTimeout, bool noblock) override;
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive(void* r_buf, size_t sz) override;
protected:
std::unique_ptr <MulticastSocketU> udp;
const std::string host;
const int port;
const std::vector<Poco::Net::IPAddress> groups;
};
class MulticastSendTransport :
public UNetSendTransport
{
public:
MulticastSendTransport(const std::string& host, int port, const std::vector<Poco::Net::IPAddress> sendGroups );
virtual ~MulticastSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool createConnection(bool throwEx, timeout_t sendTimeout) override;
virtual int getSocket() const override;
// write
virtual bool isReadyForSend(timeout_t tout) override;
virtual ssize_t send(void* buf, size_t sz) override;
protected:
std::unique_ptr <MulticastSocketU> udp;
const Poco::Net::SocketAddress saddr;
const std::vector<Poco::Net::IPAddress> groups;
};
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // MulticastTransport_H_
// -------------------------------------------------------------------------
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "Exceptions.h"
#include "PassiveTimer.h"
#include "unisetstd.h"
#include "UDPTransport.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
UDPReceiveTransport::UDPReceiveTransport( const std::string& _bind, int _port ):
host(_bind),
port(_port)
{
}
// -------------------------------------------------------------------------
UDPReceiveTransport::~UDPReceiveTransport()
{
}
// -------------------------------------------------------------------------
bool UDPReceiveTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
std::string UDPReceiveTransport::ID() const noexcept
{
return toString();
}
// -------------------------------------------------------------------------
std::string UDPReceiveTransport::toString() const
{
ostringstream s;
s << host << ":" << port;
return s.str();
}
// -------------------------------------------------------------------------
void UDPReceiveTransport::disconnect()
{
udp = nullptr;
}
// -------------------------------------------------------------------------
bool UDPReceiveTransport::createConnection( bool throwEx, timeout_t readTimeout, bool noblock )
{
try
{
udp = unisetstd::make_unique<UDPReceiveU>(host, port);
udp->setBlocking(!noblock);
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return ( udp != nullptr );
}
// -------------------------------------------------------------------------
int UDPReceiveTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
ssize_t UDPReceiveTransport::receive( void* r_buf, size_t sz )
{
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
UDPSendTransport::UDPSendTransport( const std::string& _host, int _port ):
saddr(_host, _port)
{
}
// -------------------------------------------------------------------------
UDPSendTransport::~UDPSendTransport()
{
}
// -------------------------------------------------------------------------
std::string UDPSendTransport::toString() const
{
return saddr.toString();
}
// -------------------------------------------------------------------------
bool UDPSendTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
bool UDPSendTransport::createConnection( bool throwEx, timeout_t sendTimeout )
{
try
{
udp = unisetstd::make_unique<UDPSocketU>();
udp->setBroadcast(true);
udp->setSendTimeout( UniSetTimer::millisecToPoco(sendTimeout) );
// udp->setNoDelay(true);
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return (udp != nullptr);
}
// -------------------------------------------------------------------------
int UDPSendTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
bool UDPSendTransport::isReadyForSend( timeout_t tout )
{
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
}
// -------------------------------------------------------------------------
ssize_t UDPSendTransport::send( void* buf, size_t sz )
{
return udp->sendTo(buf, sz, saddr);
}
// -------------------------------------------------------------------------
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#ifndef UDPTransport_H_
#define UDPTransport_H_
// -------------------------------------------------------------------------
#include <string>
#include <memory>
#include "UNetTransport.h"
#include "UDPCore.h"
// -------------------------------------------------------------------------
namespace uniset
{
class UDPReceiveTransport:
public UNetReceiveTransport
{
public:
UDPReceiveTransport( const std::string& bind, int port );
virtual ~UDPReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual std::string ID() const noexcept override;
virtual bool createConnection( bool throwEx, timeout_t readTimeout, bool noblock ) override;
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive( void* r_buf, size_t sz ) override;
protected:
std::unique_ptr<UDPReceiveU> udp;
const std::string host;
const int port;
};
class UDPSendTransport:
public UNetSendTransport
{
public:
UDPSendTransport( const std::string& host, int port );
virtual ~UDPSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool createConnection( bool throwEx, timeout_t sendTimeout ) override;
virtual int getSocket() const override;
// write
virtual bool isReadyForSend( timeout_t tout ) override;
virtual ssize_t send( void* buf, size_t sz ) override;
protected:
std::unique_ptr<UDPSocketU> udp;
Poco::Net::SocketAddress saddr;
};
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // UDPTransport_H_
// -------------------------------------------------------------------------
...@@ -16,10 +16,12 @@ ...@@ -16,10 +16,12 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include "unisetstd.h"
#include "Exceptions.h" #include "Exceptions.h"
#include "Extensions.h" #include "Extensions.h"
#include "UNetExchange.h" #include "UNetExchange.h"
#include "UNetLogSugar.h" #include "UNetLogSugar.h"
#include "UDPTransport.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
using namespace std; using namespace std;
using namespace uniset; using namespace uniset;
...@@ -166,7 +168,8 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -166,7 +168,8 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
} }
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl; unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = make_shared<UNetSender>(h, p, shm, false, s_field, s_fvalue, "unet", prefix); auto s1 = unisetstd::make_unique<uniset::UDPSendTransport>(h, p);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
sender->setSendPause(sendpause); sender->setSendPause(sendpause);
sender->setPackSendPause(packsendpause); sender->setPackSendPause(packsendpause);
sender->setPackSendPauseFactor(packsendpauseFactor); sender->setPackSendPauseFactor(packsendpauseFactor);
...@@ -179,7 +182,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -179,7 +182,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
if( !h2.empty() ) if( !h2.empty() )
{ {
unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl; unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = make_shared<UNetSender>(h2, p2, shm, false, s_field, s_fvalue, prefix);
auto s2 = unisetstd::make_unique<uniset::UDPSendTransport>(h2, p2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, prefix);
sender2->setSendPause(sendpause); sender2->setSendPause(sendpause);
sender2->setCheckConnectionPause(checkConnectionPause); sender2->setCheckConnectionPause(checkConnectionPause);
loga->add(sender2->getLog()); loga->add(sender2->getLog());
...@@ -197,8 +202,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -197,8 +202,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
} }
unetinfo << myname << "(init): add UNetReceiver for " << h << ":" << p << endl; unetinfo << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
auto transport = unisetstd::make_unique<uniset::UDPReceiveTransport>(h, p);
if( checkExistUNetHost(h, p) ) if( checkExistTransport(transport->ID()) )
{ {
unetinfo << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl; unetinfo << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
continue; continue;
...@@ -346,7 +352,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -346,7 +352,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
unetinfo << myname << "(init): (node='" << n << "') add basic receiver " unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl; << h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix); auto r = make_shared<UNetReceiver>(std::move(transport), shm, false, prefix);
loga->add(r->getLog()); loga->add(r->getLog());
...@@ -378,7 +384,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -378,7 +384,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver " unetinfo << myname << "(init): (node='" << n << "') add reserv receiver "
<< h2 << ":" << p2 << endl; << h2 << ":" << p2 << endl;
r2 = make_shared<UNetReceiver>(h2, p2, shm, false, prefix); auto transport2 = unisetstd::make_unique<UDPReceiveTransport>(h2, p2);
r2 = make_shared<UNetReceiver>(std::move(transport2), shm, false, prefix);
loga->add(r2->getLog()); loga->add(r2->getLog());
...@@ -473,11 +481,11 @@ UNetExchange::~UNetExchange() ...@@ -473,11 +481,11 @@ UNetExchange::~UNetExchange()
{ {
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost(const std::string& addr, int port ) noexcept bool UNetExchange::checkExistTransport( const std::string& transportID ) noexcept
{ {
for( const auto& it : recvlist ) for( const auto& it : recvlist )
{ {
if( it.r1->getAddress() == addr && it.r1->getPort() == port ) if( it.r1->getTransportID() == transportID )
return true; return true;
} }
......
...@@ -159,7 +159,7 @@ namespace uniset ...@@ -159,7 +159,7 @@ namespace uniset
/*! глобальная функция для вывода help-а */ /*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* argv[] ) noexcept; static void help_print( int argc, const char* argv[] ) noexcept;
bool checkExistUNetHost( const std::string& host, int port ) noexcept; bool checkExistTransport( const std::string& transportID ) noexcept;
inline std::shared_ptr<LogAgregator> getLogAggregator() noexcept inline std::shared_ptr<LogAgregator> getLogAggregator() noexcept
{ {
......
...@@ -40,15 +40,13 @@ bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs, ...@@ -40,15 +40,13 @@ bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
} }
*/ */
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver(const std::string& s_host, int _port UNetReceiver::UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& _transport, const std::shared_ptr<SMInterface>& smi
, const std::shared_ptr<SMInterface>& smi , bool nocheckConnection
, bool nocheckConnection , const std::string& prefix ):
, const std::string& prefix ):
shm(smi), shm(smi),
recvpause(10), recvpause(10),
updatepause(100), updatepause(100),
port(_port), transport(std::move(_transport)),
saddr(s_host, _port),
recvTimeout(5000), recvTimeout(5000),
prepareTime(2000), prepareTime(2000),
lostTimeout(200), /* 2*updatepause */ lostTimeout(200), /* 2*updatepause */
...@@ -68,14 +66,14 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port ...@@ -68,14 +66,14 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
{ {
{ {
ostringstream s; ostringstream s;
s << "R(" << setw(15) << s_host << ":" << setw(4) << port << ")"; s << "R(" << transport->toString() << ")";
myname = s.str(); myname = s.str();
} }
addr = s_host.c_str(); addr = transport->toString();
ostringstream logname; ostringstream logname;
logname << prefix << "-R-" << s_host << ":" << setw(4) << port; logname << prefix << "-R-" << transport->toString();
unetlog = make_shared<DebugStream>(); unetlog = make_shared<DebugStream>();
unetlog->setLogName(logname.str()); unetlog->setLogName(logname.str());
...@@ -213,8 +211,10 @@ bool UNetReceiver::createConnection( bool throwEx ) ...@@ -213,8 +211,10 @@ bool UNetReceiver::createConnection( bool throwEx )
try try
{ {
udp = unisetstd::make_unique<UDPReceiveU>(addr, port); // делаем неблокирующее чтение (нужно для libev)
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev) if( !transport->createConnection(throwEx, recvTimeout, true) )
return false;
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this); evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
if( upStrategy == useUpdateEventLoop ) if( upStrategy == useUpdateEventLoop )
...@@ -228,31 +228,18 @@ bool UNetReceiver::createConnection( bool throwEx ) ...@@ -228,31 +228,18 @@ bool UNetReceiver::createConnection( bool throwEx )
if( activated ) if( activated )
evprepare(loop.evloop()); evprepare(loop.evloop());
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr; return true;
} }
catch( ... ) catch( const std::exception& ex )
{ {
ostringstream s; unetcrit << ex.what() << std::endl;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx ) if( throwEx )
throw SystemError(s.str()); throw ex;
udp = nullptr;
} }
return ( udp != nullptr ); return false;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::start() void UNetReceiver::start()
...@@ -290,7 +277,7 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept ...@@ -290,7 +277,7 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evUpdate.start( 0, ((float)updatepause / 1000.) ); evUpdate.start( 0, ((float)updatepause / 1000.) );
} }
if( !udp ) if( !transport->isConnected() )
{ {
evCheckConnection.set(eloop); evCheckConnection.set(eloop);
evCheckConnection.start(0, checkConnectionTime); evCheckConnection.start(0, checkConnectionTime);
...@@ -299,7 +286,7 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept ...@@ -299,7 +286,7 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
else else
{ {
evReceive.set(eloop); evReceive.set(eloop);
evReceive.start(udp->getSocket(), ev::READ); evReceive.start(transport->getSocket(), ev::READ);
evInitPause.start(0); evInitPause.start(0);
} }
} }
...@@ -324,8 +311,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept ...@@ -324,8 +311,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
if( evUpdate.is_active() ) if( evUpdate.is_active() )
evUpdate.stop(); evUpdate.stop();
//udp->disconnect(); transport->disconnect();
udp = nullptr;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept void UNetReceiver::forceUpdate() noexcept
...@@ -721,9 +707,8 @@ bool UNetReceiver::receive() noexcept ...@@ -721,9 +707,8 @@ bool UNetReceiver::receive() noexcept
{ {
try try
{ {
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data)); ssize_t ret = transport->receive(r_buf.data, sizeof(r_buf.data));
recvCount++; recvCount++;
//ssize_t ret = udp->receiveFrom(r_buf.data, sizeof(r_buf.data),saddr);
if( ret < 0 ) if( ret < 0 )
{ {
...@@ -993,7 +978,7 @@ const std::string UNetReceiver::getShortInfo() const noexcept ...@@ -993,7 +978,7 @@ const std::string UNetReceiver::getShortInfo() const noexcept
ostringstream s; ostringstream s;
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort() s << setw(15) << std::right << transport->toString()
<< "[ " << setw(7) << ( isLockUpdate() ? "PASSIVE" : "ACTIVE" ) << " ]" << "[ " << setw(7) << ( isLockUpdate() ? "PASSIVE" : "ACTIVE" ) << " ]"
<< " recvOK=" << isRecvOK() << " recvOK=" << isRecvOK()
<< " receivepack=" << rnum << " receivepack=" << rnum
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#include "SharedMemory.h" #include "SharedMemory.h"
#include "UDPPacket.h" #include "UDPPacket.h"
#include "CommonEventLoop.h" #include "CommonEventLoop.h"
#include "UDPCore.h" #include "UNetTransport.h"
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
namespace uniset namespace uniset
{ {
...@@ -101,7 +101,7 @@ namespace uniset ...@@ -101,7 +101,7 @@ namespace uniset
public std::enable_shared_from_this<UNetReceiver> public std::enable_shared_from_this<UNetReceiver>
{ {
public: public:
UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& transport, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection = false , bool nocheckConnection = false
, const std::string& prefix = "unet" ); , const std::string& prefix = "unet" );
virtual ~UNetReceiver(); virtual ~UNetReceiver();
...@@ -141,13 +141,9 @@ namespace uniset ...@@ -141,13 +141,9 @@ namespace uniset
void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
inline std::string getAddress() const noexcept inline std::string getTransportID() const noexcept
{ {
return addr; return transport->ID();
}
inline int getPort() const noexcept
{
return port;
} }
/*! Коды событий */ /*! Коды событий */
...@@ -244,10 +240,8 @@ namespace uniset ...@@ -244,10 +240,8 @@ namespace uniset
timeout_t recvpause = { 10 }; /*!< пауза между приёмами пакетов, [мсек] */ timeout_t recvpause = { 10 }; /*!< пауза между приёмами пакетов, [мсек] */
timeout_t updatepause = { 100 }; /*!< периодичность обновления данных в SM, [мсек] */ timeout_t updatepause = { 100 }; /*!< периодичность обновления данных в SM, [мсек] */
std::unique_ptr<UDPReceiveU> udp; std::unique_ptr<UNetReceiveTransport> transport;
std::string addr; std::string addr;
int port = { 0 };
Poco::Net::SocketAddress saddr;
std::string myname; std::string myname;
ev::io evReceive; ev::io evReceive;
ev::periodic evCheckConnection; ev::periodic evCheckConnection;
......
...@@ -29,18 +29,16 @@ namespace uniset ...@@ -29,18 +29,16 @@ namespace uniset
using namespace std; using namespace std;
using namespace uniset::extensions; using namespace uniset::extensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::UNetSender(const std::string& _host, const int _port, const std::shared_ptr<SMInterface>& smi, UNetSender::UNetSender( std::unique_ptr<UNetSendTransport>&& _transport, const std::shared_ptr<SMInterface>& smi,
bool nocheckConnection, const std::string& s_f, const std::string& s_val, bool nocheckConnection, const std::string& s_f, const std::string& s_val,
const std::string& s_prefix, const std::string& s_prefix,
const std::string& prefix, const std::string& prefix,
size_t maxDCount, size_t maxACount ): size_t maxDCount, size_t maxACount ):
s_field(s_f), s_field(s_f),
s_fvalue(s_val), s_fvalue(s_val),
prop_prefix(s_prefix), prop_prefix(s_prefix),
shm(smi), shm(smi),
port(_port), transport(std::move(_transport)),
s_host(_host),
saddr(_host, _port),
sendpause(150), sendpause(150),
packsendpause(5), packsendpause(5),
packsendpauseFactor(1), packsendpauseFactor(1),
...@@ -54,12 +52,12 @@ namespace uniset ...@@ -54,12 +52,12 @@ namespace uniset
{ {
ostringstream s; ostringstream s;
s << "S(" << setw(15) << s_host << ":" << setw(4) << port << ")"; s << "S(" << setw(15) << transport->toString() << ")";
myname = s.str(); myname = s.str();
} }
ostringstream logname; ostringstream logname;
logname << prefix << "-S-" << s_host << "-" << port; logname << prefix << "-S-" << transport->toString();
unetlog = make_shared<DebugStream>(); unetlog = make_shared<DebugStream>();
unetlog->setLogName(logname.str()); unetlog->setLogName(logname.str());
...@@ -70,9 +68,7 @@ namespace uniset ...@@ -70,9 +68,7 @@ namespace uniset
unetinfo << myname << "(init): read filter-field='" << s_field unetinfo << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl; << "' filter-value='" << s_fvalue << "'" << endl;
unetinfo << "(UNetSender): UDP set to " << s_host << ":" << port << endl; unetinfo << "(UNetSender): UDP set to " << transport->toString() << endl;
addr = s_host.c_str();
ptCheckConnection.setTiming(10000); // default 10 сек ptCheckConnection.setTiming(10000); // default 10 сек
createConnection(nocheckConnection); createConnection(nocheckConnection);
...@@ -122,36 +118,17 @@ namespace uniset ...@@ -122,36 +118,17 @@ namespace uniset
try try
{ {
//udp = make_shared<UDPSocketU>(addr, port); return transport->createConnection(throwEx, writeTimeout);
udp = unisetstd::make_unique<UDPSocketU>();
udp->setBroadcast(true);
udp->setSendTimeout( UniSetTimer::millisecToPoco(writeTimeout) );
// udp->setNoDelay(true);
} }
catch( const std::exception& e ) catch( const std::exception& ex )
{ {
ostringstream s; unetcrit << ex.what() << std::endl;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx ) if( throwEx )
throw SystemError(s.str()); throw ex;
udp = nullptr;
}
catch( ... )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
} }
return (udp != nullptr); return false;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::updateFromSM() void UNetSender::updateFromSM()
...@@ -220,7 +197,7 @@ namespace uniset ...@@ -220,7 +197,7 @@ namespace uniset
while( activated ) while( activated )
{ {
if( !udp ) if( !transport->isConnected() )
{ {
if( !ptCheckConnection.checkTime() ) if( !ptCheckConnection.checkTime() )
{ {
...@@ -328,12 +305,12 @@ namespace uniset ...@@ -328,12 +305,12 @@ namespace uniset
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
if( !udp || !udp->poll( UniSetTimer::millisecToPoco(writeTimeout), Poco::Net::Socket::SELECT_WRITE) ) if( !transport->isReadyForSend(writeTimeout) )
return; return;
mypack.msg.transport_msg(s_msg); mypack.msg.transport_msg(s_msg);
size_t ret = udp->sendTo(&s_msg.data, s_msg.len, saddr); size_t ret = transport->send(&s_msg.data, s_msg.len);
if( ret < s_msg.len ) if( ret < s_msg.len )
unetcrit << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl; unetcrit << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
...@@ -574,7 +551,7 @@ namespace uniset ...@@ -574,7 +551,7 @@ namespace uniset
ostringstream s; ostringstream s;
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort() s << setw(15) << std::right << transport->toString()
<< " lastpacknum=" << packetnum << " lastpacknum=" << packetnum
<< " lastcrc=" << setw(6) << lastcrc << " lastcrc=" << setw(6) << lastcrc
<< " items=" << items.size() << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize() << " items=" << items.size() << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize()
......
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
#include "SMInterface.h" #include "SMInterface.h"
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UDPCore.h"
#include "UDPPacket.h" #include "UDPPacket.h"
#include "UNetTransport.h"
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
namespace uniset namespace uniset
{ {
...@@ -69,7 +69,7 @@ namespace uniset ...@@ -69,7 +69,7 @@ namespace uniset
class UNetSender class UNetSender
{ {
public: public:
UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi UNetSender( std::unique_ptr<UNetSendTransport>&& transport, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection = false , bool nocheckConnection = false
, const std::string& s_field = "" , const std::string& s_field = ""
, const std::string& s_fvalue = "" , const std::string& s_fvalue = ""
...@@ -162,15 +162,6 @@ namespace uniset ...@@ -162,15 +162,6 @@ namespace uniset
virtual const std::string getShortInfo() const; virtual const std::string getShortInfo() const;
inline std::string getAddress() const
{
return addr;
}
inline int getPort() const
{
return port;
}
inline size_t getADataSize() const inline size_t getADataSize() const
{ {
return maxAData; return maxAData;
...@@ -199,11 +190,7 @@ namespace uniset ...@@ -199,11 +190,7 @@ namespace uniset
private: private:
UNetSender(); UNetSender();
std::unique_ptr<UDPSocketU> udp; std::unique_ptr<UNetSendTransport> transport;
std::string addr;
int port = { 0 };
std::string s_host = { "" };
Poco::Net::SocketAddress saddr;
std::string myname = { "" }; std::string myname = { "" };
timeout_t sendpause = { 150 }; timeout_t sendpause = { 150 };
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "UniSetTypes.h" #include "UniSetTypes.h"
#include "UInterface.h" #include "UInterface.h"
#include "UDPPacket.h" #include "UDPPacket.h"
#include "UDPCore.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// include-ы искплючительно для того, чтобы их обработал gcov (покрытие кода) // include-ы искплючительно для того, чтобы их обработал gcov (покрытие кода)
#include "UNetReceiver.h" #include "UNetReceiver.h"
......
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
#include <string> #include <string>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include "Debug.h" #include "Debug.h"
#include "UDPCore.h"
#include "unisetstd.h"
#include "UDPTransport.h"
#include "UNetReceiver.h" #include "UNetReceiver.h"
#include "SMInterface.h" #include "SMInterface.h"
#include "Extensions.h" #include "Extensions.h"
...@@ -139,7 +142,8 @@ static void run_test( size_t max, const std::string& host ) ...@@ -139,7 +142,8 @@ static void run_test( size_t max, const std::string& host )
// make receivers.. // make receivers..
for( size_t i = 0; i < max; i++ ) for( size_t i = 0; i < max; i++ )
{ {
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance()); auto t = unisetstd::make_unique<uniset::UDPReceiveTransport>(host, begPort + i);
auto r = make_shared<UNetReceiver>(std::move(t), smiInstance());
r->setLockUpdate(true); r->setLockUpdate(true);
vrecv.emplace_back(r); vrecv.emplace_back(r);
} }
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#define UDPReceiveU_H_ #define UDPReceiveU_H_
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <Poco/Net/DatagramSocket.h> #include <Poco/Net/DatagramSocket.h>
#include <Poco/Net/MulticastSocket.h>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Классы-обёртки, чтобы достучаться до "сырого сокета" и других функций // Классы-обёртки, чтобы достучаться до "сырого сокета" и других функций
// необходимых при использовании с libev.. // необходимых при использовании с libev..
...@@ -52,6 +53,27 @@ namespace uniset ...@@ -52,6 +53,27 @@ namespace uniset
} }
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class MulticastSocketU:
public Poco::Net::MulticastSocket
{
public:
MulticastSocketU():
Poco::Net::MulticastSocket(Poco::Net::IPAddress::IPv4)
{}
MulticastSocketU( const std::string& bind, int port ):
Poco::Net::MulticastSocket(Poco::Net::SocketAddress(bind, port), true)
{}
virtual ~MulticastSocketU() {}
inline int getSocket() const
{
return Poco::Net::MulticastSocket::sockfd();
}
};
// -------------------------------------------------------------------------
} // end of uniset namespace } // end of uniset namespace
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // UDPReceiveU_H_ #endif // UDPReceiveU_H_
......
...@@ -280,6 +280,8 @@ ...@@ -280,6 +280,8 @@
./extensions/UNetUDP/tests/urecv_perf_test.cc ./extensions/UNetUDP/tests/urecv_perf_test.cc
./extensions/UNetUDP/UDPPacket.cc ./extensions/UNetUDP/UDPPacket.cc
./extensions/UNetUDP/UDPPacket.h ./extensions/UNetUDP/UDPPacket.h
./extensions/UNetUDP/UDPTransport.cc
./extensions/UNetUDP/UDPTransport.h
./extensions/UNetUDP/unetexchange.cc ./extensions/UNetUDP/unetexchange.cc
./extensions/UNetUDP/UNetExchange.cc ./extensions/UNetUDP/UNetExchange.cc
./extensions/UNetUDP/UNetExchange.h ./extensions/UNetUDP/UNetExchange.h
...@@ -288,6 +290,7 @@ ...@@ -288,6 +290,7 @@
./extensions/UNetUDP/UNetReceiver.h ./extensions/UNetUDP/UNetReceiver.h
./extensions/UNetUDP/UNetSender.cc ./extensions/UNetUDP/UNetSender.cc
./extensions/UNetUDP/UNetSender.h ./extensions/UNetUDP/UNetSender.h
./extensions/UNetUDP/UNetTransport.h
./extensions/UNetUDP/unet-udp-tester.cc ./extensions/UNetUDP/unet-udp-tester.cc
./extensions/UNetUDP/up/UNetReceiver.cc ./extensions/UNetUDP/up/UNetReceiver.cc
./extensions/UNetUDP/up/UNetReceiver.h ./extensions/UNetUDP/up/UNetReceiver.h
...@@ -303,6 +306,14 @@ ...@@ -303,6 +306,14 @@
./extensions/UWebSocketGate/UWebSocketGate.cc ./extensions/UWebSocketGate/UWebSocketGate.cc
./extensions/UWebSocketGate/UWebSocketGate.h ./extensions/UWebSocketGate/UWebSocketGate.h
./extensions/UWebSocketGate/UWebSocketGateSugar.h ./extensions/UWebSocketGate/UWebSocketGateSugar.h
./extensions/WS/main.cc
./extensions/WS/Makefile.am
./extensions/WS/tests/Makefile.am
./extensions/WS/tests/tests_with_sm.cc
./extensions/WS/tests/test_uwebsocketgate.cc
./extensions/WS/UWebSocketGate.cc
./extensions/WS/UWebSocketGate.h
./extensions/WS/UWebSocketGateSugar.h
./IDL/Makefile.am ./IDL/Makefile.am
./IDL/Processes/Makefile.am ./IDL/Processes/Makefile.am
./IDL/UniSetTypes/Makefile.am ./IDL/UniSetTypes/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment