Commit dacbb7bb authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

[unet-multicast]: first prototype

parent b18870f9
......@@ -10,7 +10,7 @@ libUniSet2UNetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS) $(POCO_LIBS)
libUniSet2UNetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(POCO_CFLAGS)
libUniSet2UNetUDP_la_SOURCES = UDPPacket.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
libUniSet2UNetUDP_la_SOURCES = UDPPacket.cc UDPTransport.cc MulticastTransport.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
@PACKAGE@_unetexchange_SOURCES = unetexchange.cc
@PACKAGE@_unetexchange_LDADD = libUniSet2UNetUDP.la $(top_builddir)/lib/libUniSet2.la \
......
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "Exceptions.h"
#include "PassiveTimer.h"
#include "unisetstd.h"
#include "MulticastTransport.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
MulticastReceiveTransport::MulticastReceiveTransport( const std::string& _bind, int _port, const std::vector<Poco::Net::IPAddress> _joinGroups ):
host(_bind),
port(_port),
groups(_joinGroups)
{
}
// -------------------------------------------------------------------------
MulticastReceiveTransport::~MulticastReceiveTransport()
{
if( udp )
{
for (const auto& s : groups)
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
}
}
}
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
std::string MulticastReceiveTransport::ID() const noexcept
{
return toString();
}
// -------------------------------------------------------------------------
std::string MulticastReceiveTransport::toString() const
{
ostringstream s;
s << host << ":" << port;
return s.str();
}
// -------------------------------------------------------------------------
void MulticastReceiveTransport::disconnect()
{
if (udp)
{
for (const auto& s : groups)
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
}
udp = nullptr;
}
}
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::createConnection( bool throwEx, timeout_t readTimeout, bool noblock )
{
try
{
udp = unisetstd::make_unique<MulticastSocketU>(host, port);
udp->setBlocking(!noblock);
for( const auto& s : groups )
udp->joinGroup(s);
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return ( udp != nullptr );
}
// -------------------------------------------------------------------------
int MulticastReceiveTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
ssize_t MulticastReceiveTransport::receive( void* r_buf, size_t sz )
{
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
MulticastSendTransport::MulticastSendTransport( const std::string& _host, int _port, const std::vector<Poco::Net::IPAddress> _sendGroups ):
saddr(_host, _port),
groups(_sendGroups)
{
}
// -------------------------------------------------------------------------
MulticastSendTransport::~MulticastSendTransport()
{
if( udp )
{
for (const auto& s : groups)
{
try
{
udp->leaveGroup(s);
}
catch (...) {}
}
}
}
// -------------------------------------------------------------------------
std::string MulticastSendTransport::toString() const
{
return saddr.toString();
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::createConnection( bool throwEx, timeout_t sendTimeout )
{
try
{
udp = unisetstd::make_unique<MulticastSocketU>();
for( const auto& s : groups )
udp->joinGroup(s);
udp->setSendTimeout( UniSetTimer::millisecToPoco(sendTimeout) );
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return (udp != nullptr);
}
// -------------------------------------------------------------------------
int MulticastSendTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
bool MulticastSendTransport::isReadyForSend( timeout_t tout )
{
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
}
// -------------------------------------------------------------------------
ssize_t MulticastSendTransport::send( void* buf, size_t sz )
{
return udp->sendTo(buf, sz, saddr);
}
// -------------------------------------------------------------------------
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#ifndef MulticastTransport_H_
#define MulticastTransport_H_
// -------------------------------------------------------------------------
#include <string>
#include <memory>
#include <vector>
#include "UNetTransport.h"
#include "UDPCore.h"
// -------------------------------------------------------------------------
namespace uniset
{
class MulticastReceiveTransport :
public UNetReceiveTransport
{
public:
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress> joinGroups );
virtual ~MulticastReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual std::string ID() const noexcept override;
virtual bool createConnection(bool throwEx, timeout_t readTimeout, bool noblock) override;
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive(void* r_buf, size_t sz) override;
protected:
std::unique_ptr <MulticastSocketU> udp;
const std::string host;
const int port;
const std::vector<Poco::Net::IPAddress> groups;
};
class MulticastSendTransport :
public UNetSendTransport
{
public:
MulticastSendTransport(const std::string& host, int port, const std::vector<Poco::Net::IPAddress> sendGroups );
virtual ~MulticastSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool createConnection(bool throwEx, timeout_t sendTimeout) override;
virtual int getSocket() const override;
// write
virtual bool isReadyForSend(timeout_t tout) override;
virtual ssize_t send(void* buf, size_t sz) override;
protected:
std::unique_ptr <MulticastSocketU> udp;
const Poco::Net::SocketAddress saddr;
const std::vector<Poco::Net::IPAddress> groups;
};
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // MulticastTransport_H_
// -------------------------------------------------------------------------
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "Exceptions.h"
#include "PassiveTimer.h"
#include "unisetstd.h"
#include "UDPTransport.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace uniset;
// -------------------------------------------------------------------------
UDPReceiveTransport::UDPReceiveTransport( const std::string& _bind, int _port ):
host(_bind),
port(_port)
{
}
// -------------------------------------------------------------------------
UDPReceiveTransport::~UDPReceiveTransport()
{
}
// -------------------------------------------------------------------------
bool UDPReceiveTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
std::string UDPReceiveTransport::ID() const noexcept
{
return toString();
}
// -------------------------------------------------------------------------
std::string UDPReceiveTransport::toString() const
{
ostringstream s;
s << host << ":" << port;
return s.str();
}
// -------------------------------------------------------------------------
void UDPReceiveTransport::disconnect()
{
udp = nullptr;
}
// -------------------------------------------------------------------------
bool UDPReceiveTransport::createConnection( bool throwEx, timeout_t readTimeout, bool noblock )
{
try
{
udp = unisetstd::make_unique<UDPReceiveU>(host, port);
udp->setBlocking(!noblock);
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return ( udp != nullptr );
}
// -------------------------------------------------------------------------
int UDPReceiveTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
ssize_t UDPReceiveTransport::receive( void* r_buf, size_t sz )
{
return udp->receiveBytes(r_buf, sz);
}
// -------------------------------------------------------------------------
UDPSendTransport::UDPSendTransport( const std::string& _host, int _port ):
saddr(_host, _port)
{
}
// -------------------------------------------------------------------------
UDPSendTransport::~UDPSendTransport()
{
}
// -------------------------------------------------------------------------
std::string UDPSendTransport::toString() const
{
return saddr.toString();
}
// -------------------------------------------------------------------------
bool UDPSendTransport::isConnected() const
{
return udp != nullptr;
}
// -------------------------------------------------------------------------
bool UDPSendTransport::createConnection( bool throwEx, timeout_t sendTimeout )
{
try
{
udp = unisetstd::make_unique<UDPSocketU>();
udp->setBroadcast(true);
udp->setSendTimeout( UniSetTimer::millisecToPoco(sendTimeout) );
// udp->setNoDelay(true);
}
catch( const std::exception& e )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
if( throwEx )
throw uniset::SystemError(s.str());
}
catch( ... )
{
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
if( throwEx )
throw uniset::SystemError(s.str());
}
return (udp != nullptr);
}
// -------------------------------------------------------------------------
int UDPSendTransport::getSocket() const
{
return udp->getSocket();
}
// -------------------------------------------------------------------------
bool UDPSendTransport::isReadyForSend( timeout_t tout )
{
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
}
// -------------------------------------------------------------------------
ssize_t UDPSendTransport::send( void* buf, size_t sz )
{
return udp->sendTo(buf, sz, saddr);
}
// -------------------------------------------------------------------------
/*
* Copyright (c) 2021 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#ifndef UDPTransport_H_
#define UDPTransport_H_
// -------------------------------------------------------------------------
#include <string>
#include <memory>
#include "UNetTransport.h"
#include "UDPCore.h"
// -------------------------------------------------------------------------
namespace uniset
{
class UDPReceiveTransport:
public UNetReceiveTransport
{
public:
UDPReceiveTransport( const std::string& bind, int port );
virtual ~UDPReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual std::string ID() const noexcept override;
virtual bool createConnection( bool throwEx, timeout_t readTimeout, bool noblock ) override;
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive( void* r_buf, size_t sz ) override;
protected:
std::unique_ptr<UDPReceiveU> udp;
const std::string host;
const int port;
};
class UDPSendTransport:
public UNetSendTransport
{
public:
UDPSendTransport( const std::string& host, int port );
virtual ~UDPSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool createConnection( bool throwEx, timeout_t sendTimeout ) override;
virtual int getSocket() const override;
// write
virtual bool isReadyForSend( timeout_t tout ) override;
virtual ssize_t send( void* buf, size_t sz ) override;
protected:
std::unique_ptr<UDPSocketU> udp;
Poco::Net::SocketAddress saddr;
};
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // UDPTransport_H_
// -------------------------------------------------------------------------
......@@ -16,10 +16,12 @@
// -------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include "unisetstd.h"
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetExchange.h"
#include "UNetLogSugar.h"
#include "UDPTransport.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset;
......@@ -166,7 +168,8 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = make_shared<UNetSender>(h, p, shm, false, s_field, s_fvalue, "unet", prefix);
auto s1 = unisetstd::make_unique<uniset::UDPSendTransport>(h, p);
sender = make_shared<UNetSender>(std::move(s1), shm, false, s_field, s_fvalue, "unet", prefix);
sender->setSendPause(sendpause);
sender->setPackSendPause(packsendpause);
sender->setPackSendPauseFactor(packsendpauseFactor);
......@@ -179,7 +182,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
if( !h2.empty() )
{
unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = make_shared<UNetSender>(h2, p2, shm, false, s_field, s_fvalue, prefix);
auto s2 = unisetstd::make_unique<uniset::UDPSendTransport>(h2, p2);
sender2 = make_shared<UNetSender>(std::move(s2), shm, false, s_field, s_fvalue, prefix);
sender2->setSendPause(sendpause);
sender2->setCheckConnectionPause(checkConnectionPause);
loga->add(sender2->getLog());
......@@ -197,8 +202,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
}
unetinfo << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
auto transport = unisetstd::make_unique<uniset::UDPReceiveTransport>(h, p);
if( checkExistUNetHost(h, p) )
if( checkExistTransport(transport->ID()) )
{
unetinfo << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
continue;
......@@ -346,7 +352,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix);
auto r = make_shared<UNetReceiver>(std::move(transport), shm, false, prefix);
loga->add(r->getLog());
......@@ -378,7 +384,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver "
<< h2 << ":" << p2 << endl;
r2 = make_shared<UNetReceiver>(h2, p2, shm, false, prefix);
auto transport2 = unisetstd::make_unique<UDPReceiveTransport>(h2, p2);
r2 = make_shared<UNetReceiver>(std::move(transport2), shm, false, prefix);
loga->add(r2->getLog());
......@@ -473,11 +481,11 @@ UNetExchange::~UNetExchange()
{
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost(const std::string& addr, int port ) noexcept
bool UNetExchange::checkExistTransport( const std::string& transportID ) noexcept
{
for( const auto& it : recvlist )
{
if( it.r1->getAddress() == addr && it.r1->getPort() == port )
if( it.r1->getTransportID() == transportID )
return true;
}
......
......@@ -159,7 +159,7 @@ namespace uniset
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* argv[] ) noexcept;
bool checkExistUNetHost( const std::string& host, int port ) noexcept;
bool checkExistTransport( const std::string& transportID ) noexcept;
inline std::shared_ptr<LogAgregator> getLogAggregator() noexcept
{
......
......@@ -40,15 +40,13 @@ bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver(const std::string& s_host, int _port
, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection
, const std::string& prefix ):
UNetReceiver::UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& _transport, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection
, const std::string& prefix ):
shm(smi),
recvpause(10),
updatepause(100),
port(_port),
saddr(s_host, _port),
transport(std::move(_transport)),
recvTimeout(5000),
prepareTime(2000),
lostTimeout(200), /* 2*updatepause */
......@@ -68,14 +66,14 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
{
{
ostringstream s;
s << "R(" << setw(15) << s_host << ":" << setw(4) << port << ")";
s << "R(" << transport->toString() << ")";
myname = s.str();
}
addr = s_host.c_str();
addr = transport->toString();
ostringstream logname;
logname << prefix << "-R-" << s_host << ":" << setw(4) << port;
logname << prefix << "-R-" << transport->toString();
unetlog = make_shared<DebugStream>();
unetlog->setLogName(logname.str());
......@@ -213,8 +211,10 @@ bool UNetReceiver::createConnection( bool throwEx )
try
{
udp = unisetstd::make_unique<UDPReceiveU>(addr, port);
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
// делаем неблокирующее чтение (нужно для libev)
if( !transport->createConnection(throwEx, recvTimeout, true) )
return false;
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
if( upStrategy == useUpdateEventLoop )
......@@ -228,31 +228,18 @@ bool UNetReceiver::createConnection( bool throwEx )
if( activated )
evprepare(loop.evloop());
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
return true;
}
catch( ... )
catch( const std::exception& ex )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
unetcrit << ex.what() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
throw ex;
}
return ( udp != nullptr );
return false;
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
......@@ -290,7 +277,7 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evUpdate.start( 0, ((float)updatepause / 1000.) );
}
if( !udp )
if( !transport->isConnected() )
{
evCheckConnection.set(eloop);
evCheckConnection.start(0, checkConnectionTime);
......@@ -299,7 +286,7 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
else
{
evReceive.set(eloop);
evReceive.start(udp->getSocket(), ev::READ);
evReceive.start(transport->getSocket(), ev::READ);
evInitPause.start(0);
}
}
......@@ -324,8 +311,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
if( evUpdate.is_active() )
evUpdate.stop();
//udp->disconnect();
udp = nullptr;
transport->disconnect();
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept
......@@ -721,9 +707,8 @@ bool UNetReceiver::receive() noexcept
{
try
{
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data));
ssize_t ret = transport->receive(r_buf.data, sizeof(r_buf.data));
recvCount++;
//ssize_t ret = udp->receiveFrom(r_buf.data, sizeof(r_buf.data),saddr);
if( ret < 0 )
{
......@@ -993,7 +978,7 @@ const std::string UNetReceiver::getShortInfo() const noexcept
ostringstream s;
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort()
s << setw(15) << std::right << transport->toString()
<< "[ " << setw(7) << ( isLockUpdate() ? "PASSIVE" : "ACTIVE" ) << " ]"
<< " recvOK=" << isRecvOK()
<< " receivepack=" << rnum
......
......@@ -31,7 +31,7 @@
#include "SharedMemory.h"
#include "UDPPacket.h"
#include "CommonEventLoop.h"
#include "UDPCore.h"
#include "UNetTransport.h"
// --------------------------------------------------------------------------
namespace uniset
{
......@@ -101,7 +101,7 @@ namespace uniset
public std::enable_shared_from_this<UNetReceiver>
{
public:
UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi
UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& transport, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection = false
, const std::string& prefix = "unet" );
virtual ~UNetReceiver();
......@@ -141,13 +141,9 @@ namespace uniset
void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
inline std::string getAddress() const noexcept
inline std::string getTransportID() const noexcept
{
return addr;
}
inline int getPort() const noexcept
{
return port;
return transport->ID();
}
/*! Коды событий */
......@@ -244,10 +240,8 @@ namespace uniset
timeout_t recvpause = { 10 }; /*!< пауза между приёмами пакетов, [мсек] */
timeout_t updatepause = { 100 }; /*!< периодичность обновления данных в SM, [мсек] */
std::unique_ptr<UDPReceiveU> udp;
std::unique_ptr<UNetReceiveTransport> transport;
std::string addr;
int port = { 0 };
Poco::Net::SocketAddress saddr;
std::string myname;
ev::io evReceive;
ev::periodic evCheckConnection;
......
......@@ -29,18 +29,16 @@ namespace uniset
using namespace std;
using namespace uniset::extensions;
// -----------------------------------------------------------------------------
UNetSender::UNetSender(const std::string& _host, const int _port, const std::shared_ptr<SMInterface>& smi,
bool nocheckConnection, const std::string& s_f, const std::string& s_val,
const std::string& s_prefix,
const std::string& prefix,
size_t maxDCount, size_t maxACount ):
UNetSender::UNetSender( std::unique_ptr<UNetSendTransport>&& _transport, const std::shared_ptr<SMInterface>& smi,
bool nocheckConnection, const std::string& s_f, const std::string& s_val,
const std::string& s_prefix,
const std::string& prefix,
size_t maxDCount, size_t maxACount ):
s_field(s_f),
s_fvalue(s_val),
prop_prefix(s_prefix),
shm(smi),
port(_port),
s_host(_host),
saddr(_host, _port),
transport(std::move(_transport)),
sendpause(150),
packsendpause(5),
packsendpauseFactor(1),
......@@ -54,12 +52,12 @@ namespace uniset
{
ostringstream s;
s << "S(" << setw(15) << s_host << ":" << setw(4) << port << ")";
s << "S(" << setw(15) << transport->toString() << ")";
myname = s.str();
}
ostringstream logname;
logname << prefix << "-S-" << s_host << "-" << port;
logname << prefix << "-S-" << transport->toString();
unetlog = make_shared<DebugStream>();
unetlog->setLogName(logname.str());
......@@ -70,9 +68,7 @@ namespace uniset
unetinfo << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
unetinfo << "(UNetSender): UDP set to " << s_host << ":" << port << endl;
addr = s_host.c_str();
unetinfo << "(UNetSender): UDP set to " << transport->toString() << endl;
ptCheckConnection.setTiming(10000); // default 10 сек
createConnection(nocheckConnection);
......@@ -122,36 +118,17 @@ namespace uniset
try
{
//udp = make_shared<UDPSocketU>(addr, port);
udp = unisetstd::make_unique<UDPSocketU>();
udp->setBroadcast(true);
udp->setSendTimeout( UniSetTimer::millisecToPoco(writeTimeout) );
// udp->setNoDelay(true);
return transport->createConnection(throwEx, writeTimeout);
}
catch( const std::exception& e )
catch( const std::exception& ex )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
unetcrit << ex.what() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
catch( ... )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
throw ex;
}
return (udp != nullptr);
return false;
}
// -----------------------------------------------------------------------------
void UNetSender::updateFromSM()
......@@ -220,7 +197,7 @@ namespace uniset
while( activated )
{
if( !udp )
if( !transport->isConnected() )
{
if( !ptCheckConnection.checkTime() )
{
......@@ -328,12 +305,12 @@ namespace uniset
if( packetnum == 0 )
packetnum = 1;
if( !udp || !udp->poll( UniSetTimer::millisecToPoco(writeTimeout), Poco::Net::Socket::SELECT_WRITE) )
if( !transport->isReadyForSend(writeTimeout) )
return;
mypack.msg.transport_msg(s_msg);
size_t ret = udp->sendTo(&s_msg.data, s_msg.len, saddr);
size_t ret = transport->send(&s_msg.data, s_msg.len);
if( ret < s_msg.len )
unetcrit << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
......@@ -574,7 +551,7 @@ namespace uniset
ostringstream s;
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort()
s << setw(15) << std::right << transport->toString()
<< " lastpacknum=" << packetnum
<< " lastcrc=" << setw(6) << lastcrc
<< " items=" << items.size() << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize()
......
......@@ -28,8 +28,8 @@
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UDPCore.h"
#include "UDPPacket.h"
#include "UNetTransport.h"
// --------------------------------------------------------------------------
namespace uniset
{
......@@ -69,7 +69,7 @@ namespace uniset
class UNetSender
{
public:
UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi
UNetSender( std::unique_ptr<UNetSendTransport>&& transport, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection = false
, const std::string& s_field = ""
, const std::string& s_fvalue = ""
......@@ -162,15 +162,6 @@ namespace uniset
virtual const std::string getShortInfo() const;
inline std::string getAddress() const
{
return addr;
}
inline int getPort() const
{
return port;
}
inline size_t getADataSize() const
{
return maxAData;
......@@ -199,11 +190,7 @@ namespace uniset
private:
UNetSender();
std::unique_ptr<UDPSocketU> udp;
std::string addr;
int port = { 0 };
std::string s_host = { "" };
Poco::Net::SocketAddress saddr;
std::unique_ptr<UNetSendTransport> transport;
std::string myname = { "" };
timeout_t sendpause = { 150 };
......
......@@ -4,6 +4,7 @@
#include "UniSetTypes.h"
#include "UInterface.h"
#include "UDPPacket.h"
#include "UDPCore.h"
// -----------------------------------------------------------------------------
// include-ы искплючительно для того, чтобы их обработал gcov (покрытие кода)
#include "UNetReceiver.h"
......
......@@ -3,6 +3,9 @@
#include <string>
#include <Poco/Net/NetException.h>
#include "Debug.h"
#include "UDPCore.h"
#include "unisetstd.h"
#include "UDPTransport.h"
#include "UNetReceiver.h"
#include "SMInterface.h"
#include "Extensions.h"
......@@ -139,7 +142,8 @@ static void run_test( size_t max, const std::string& host )
// make receivers..
for( size_t i = 0; i < max; i++ )
{
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
auto t = unisetstd::make_unique<uniset::UDPReceiveTransport>(host, begPort + i);
auto r = make_shared<UNetReceiver>(std::move(t), smiInstance());
r->setLockUpdate(true);
vrecv.emplace_back(r);
}
......
......@@ -3,6 +3,7 @@
#define UDPReceiveU_H_
// -------------------------------------------------------------------------
#include <Poco/Net/DatagramSocket.h>
#include <Poco/Net/MulticastSocket.h>
// -------------------------------------------------------------------------
// Классы-обёртки, чтобы достучаться до "сырого сокета" и других функций
// необходимых при использовании с libev..
......@@ -52,6 +53,27 @@ namespace uniset
}
};
// -------------------------------------------------------------------------
class MulticastSocketU:
public Poco::Net::MulticastSocket
{
public:
MulticastSocketU():
Poco::Net::MulticastSocket(Poco::Net::IPAddress::IPv4)
{}
MulticastSocketU( const std::string& bind, int port ):
Poco::Net::MulticastSocket(Poco::Net::SocketAddress(bind, port), true)
{}
virtual ~MulticastSocketU() {}
inline int getSocket() const
{
return Poco::Net::MulticastSocket::sockfd();
}
};
// -------------------------------------------------------------------------
} // end of uniset namespace
// -------------------------------------------------------------------------
#endif // UDPReceiveU_H_
......
......@@ -280,6 +280,8 @@
./extensions/UNetUDP/tests/urecv_perf_test.cc
./extensions/UNetUDP/UDPPacket.cc
./extensions/UNetUDP/UDPPacket.h
./extensions/UNetUDP/UDPTransport.cc
./extensions/UNetUDP/UDPTransport.h
./extensions/UNetUDP/unetexchange.cc
./extensions/UNetUDP/UNetExchange.cc
./extensions/UNetUDP/UNetExchange.h
......@@ -288,6 +290,7 @@
./extensions/UNetUDP/UNetReceiver.h
./extensions/UNetUDP/UNetSender.cc
./extensions/UNetUDP/UNetSender.h
./extensions/UNetUDP/UNetTransport.h
./extensions/UNetUDP/unet-udp-tester.cc
./extensions/UNetUDP/up/UNetReceiver.cc
./extensions/UNetUDP/up/UNetReceiver.h
......@@ -303,6 +306,14 @@
./extensions/UWebSocketGate/UWebSocketGate.cc
./extensions/UWebSocketGate/UWebSocketGate.h
./extensions/UWebSocketGate/UWebSocketGateSugar.h
./extensions/WS/main.cc
./extensions/WS/Makefile.am
./extensions/WS/tests/Makefile.am
./extensions/WS/tests/tests_with_sm.cc
./extensions/WS/tests/test_uwebsocketgate.cc
./extensions/WS/UWebSocketGate.cc
./extensions/WS/UWebSocketGate.h
./extensions/WS/UWebSocketGateSugar.h
./IDL/Makefile.am
./IDL/Processes/Makefile.am
./IDL/UniSetTypes/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment