Commit 436bca3e authored by Pavel Vainerman's avatar Pavel Vainerman

[unet-zero-copy]: new parameter "recv-max-at-time"

parent 81226a82
......@@ -165,7 +165,7 @@ MulticastReceiveTransport::~MulticastReceiveTransport()
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::isConnected() const
bool MulticastReceiveTransport::isConnected() const noexcept
return udp != nullptr;
......@@ -175,7 +175,7 @@ std::string MulticastReceiveTransport::ID() const noexcept
return toString();
// -------------------------------------------------------------------------
std::string MulticastReceiveTransport::toString() const
std::string MulticastReceiveTransport::toString() const noexcept
ostringstream s;
s << host << ":" << port;
......@@ -283,9 +283,20 @@ ssize_t MulticastReceiveTransport::receive( void* r_buf, size_t sz )
return udp->receiveBytes(r_buf, sz);
// -------------------------------------------------------------------------
bool MulticastReceiveTransport::isReadyForReceive( timeout_t tout )
bool MulticastReceiveTransport::isReadyForReceive( timeout_t tout ) noexcept
return udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ);
return udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ);
catch(...) {}
return false;
// -------------------------------------------------------------------------
int MulticastReceiveTransport::available()
return udp->available();
// -------------------------------------------------------------------------
std::vector<Poco::Net::IPAddress> MulticastReceiveTransport::getGroups()
......@@ -430,19 +441,19 @@ MulticastSendTransport::~MulticastSendTransport()
// -------------------------------------------------------------------------
std::string MulticastSendTransport::toString() const
std::string MulticastSendTransport::toString() const noexcept
return sockAddr.toString();
// -------------------------------------------------------------------------
bool MulticastSendTransport::isConnected() const
bool MulticastSendTransport::isConnected() const noexcept
return udp != nullptr;
// -------------------------------------------------------------------------
void MulticastSendTransport::setTimeToLive( int _ttl )
ttl = ttl;
ttl = _ttl;
if( udp )
......@@ -493,9 +504,15 @@ int MulticastSendTransport::getSocket() const
return udp->getSocket();
// -------------------------------------------------------------------------
bool MulticastSendTransport::isReadyForSend( timeout_t tout )
bool MulticastSendTransport::isReadyForSend( timeout_t tout ) noexcept
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
catch(...) {}
return false;
// -------------------------------------------------------------------------
ssize_t MulticastSendTransport::send( const void* buf, size_t sz )
......@@ -37,8 +37,8 @@ namespace uniset
MulticastReceiveTransport( const std::string& bind, int port, const std::vector<Poco::Net::IPAddress>& joinGroups, const std::string& iface = "" );
virtual ~MulticastReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool isConnected() const noexcept override;
virtual std::string toString() const noexcept override;
virtual std::string ID() const noexcept override;
virtual bool createConnection(bool throwEx, timeout_t readTimeout, bool noblock) override;
......@@ -47,8 +47,9 @@ namespace uniset
std::vector<Poco::Net::IPAddress> getGroups();
void setLoopBack( bool state );
bool isReadyForReceive( timeout_t tout ) override;
bool isReadyForReceive( timeout_t tout ) noexcept override;
virtual ssize_t receive(void* r_buf, size_t sz) override;
virtual int available() override;
std::string iface() const;
......@@ -69,15 +70,15 @@ namespace uniset
MulticastSendTransport(const std::string& sockHost, int sockPort, const std::string& groupHost, int groupPort, int ttl = 1 );
virtual ~MulticastSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool isConnected() const noexcept override;
virtual std::string toString() const noexcept override;
virtual bool createConnection(bool throwEx, timeout_t sendTimeout) override;
virtual int getSocket() const override;
Poco::Net::SocketAddress getGroupAddress();
// write
virtual bool isReadyForSend(timeout_t tout) override;
virtual bool isReadyForSend(timeout_t tout) noexcept override;
virtual ssize_t send(const void* buf, size_t sz) override;
void setTimeToLive( int ttl );
......@@ -190,7 +190,7 @@ namespace uniset
return false;
// -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val ) noexcept
size_t UDPMessage::addDData( int64_t id, bool val ) noexcept
if( header.dcount >= MaxDCount )
return MaxDCount;
......@@ -63,7 +63,7 @@ UDPReceiveTransport::~UDPReceiveTransport()
// -------------------------------------------------------------------------
bool UDPReceiveTransport::isConnected() const
bool UDPReceiveTransport::isConnected() const noexcept
return udp != nullptr;
......@@ -73,7 +73,7 @@ std::string UDPReceiveTransport::ID() const noexcept
return toString();
// -------------------------------------------------------------------------
std::string UDPReceiveTransport::toString() const
std::string UDPReceiveTransport::toString() const noexcept
ostringstream s;
s << host << ":" << port;
......@@ -95,20 +95,24 @@ bool UDPReceiveTransport::createConnection( bool throwEx, timeout_t readTimeout,
catch( const std::exception& e )
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): " << e.what();
if( throwEx )
ostringstream s;
s << host << ":" << port << "(createConnection): " << e.what();
throw uniset::SystemError(s.str());
catch( ... )
udp = nullptr;
ostringstream s;
s << host << ":" << port << "(createConnection): catch...";
if( throwEx )
ostringstream s;
s << host << ":" << port << "(createConnection): catch...";
throw uniset::SystemError(s.str());
return ( udp != nullptr );
......@@ -124,9 +128,20 @@ ssize_t UDPReceiveTransport::receive( void* r_buf, size_t sz )
return udp->receiveBytes(r_buf, sz);
// -------------------------------------------------------------------------
bool UDPReceiveTransport::isReadyForReceive( timeout_t tout )
bool UDPReceiveTransport::isReadyForReceive( timeout_t tout ) noexcept
return udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ);
return udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ);
catch(...) {}
return false;
// -------------------------------------------------------------------------
int UDPReceiveTransport::available()
return udp->available();
// -------------------------------------------------------------------------
std::unique_ptr<UDPSendTransport> UDPSendTransport::createFromXml( UniXML::iterator it, const std::string& defaultIP, int numChan )
......@@ -167,12 +182,12 @@ UDPSendTransport::~UDPSendTransport()
// -------------------------------------------------------------------------
std::string UDPSendTransport::toString() const
std::string UDPSendTransport::toString() const noexcept
return saddr.toString();
// -------------------------------------------------------------------------
bool UDPSendTransport::isConnected() const
bool UDPSendTransport::isConnected() const noexcept
return udp != nullptr;
......@@ -189,20 +204,24 @@ bool UDPSendTransport::createConnection( bool throwEx, timeout_t sendTimeout )
catch( const std::exception& e )
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
if( throwEx )
ostringstream s;
s << saddr.toString() << "(createConnection): " << e.what();
throw uniset::SystemError(s.str());
catch( ... )
udp = nullptr;
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
if( throwEx )
ostringstream s;
s << saddr.toString() << "(createConnection): catch...";
throw uniset::SystemError(s.str());
return (udp != nullptr);
......@@ -213,9 +232,15 @@ int UDPSendTransport::getSocket() const
return udp->getSocket();
// -------------------------------------------------------------------------
bool UDPSendTransport::isReadyForSend( timeout_t tout )
bool UDPSendTransport::isReadyForSend( timeout_t tout ) noexcept
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
return udp && udp->poll( UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE );
catch(...) {}
return false;
// -------------------------------------------------------------------------
ssize_t UDPSendTransport::send( const void* buf, size_t sz )
......@@ -35,15 +35,16 @@ namespace uniset
UDPReceiveTransport( const std::string& bind, int port );
virtual ~UDPReceiveTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool isConnected() const noexcept override;
virtual std::string toString() const noexcept override;
virtual std::string ID() const noexcept override;
virtual bool createConnection( bool throwEx, timeout_t readTimeout, bool noblock ) override;
virtual void disconnect() override;
virtual int getSocket() const override;
virtual ssize_t receive( void* r_buf, size_t sz ) override;
virtual bool isReadyForReceive(timeout_t tout) override;
virtual bool isReadyForReceive(timeout_t tout) noexcept override;
virtual int available() override;
std::unique_ptr<UDPReceiveU> udp;
......@@ -61,14 +62,14 @@ namespace uniset
UDPSendTransport( const std::string& host, int port );
virtual ~UDPSendTransport();
virtual bool isConnected() const override;
virtual std::string toString() const override;
virtual bool isConnected() const noexcept override;
virtual std::string toString() const noexcept override;
virtual bool createConnection( bool throwEx, timeout_t sendTimeout ) override;
virtual int getSocket() const override;
// write
virtual bool isReadyForSend( timeout_t tout ) override;
virtual bool isReadyForSend( timeout_t tout ) noexcept override;
virtual ssize_t send( const void* buf, size_t sz ) override;
......@@ -79,6 +79,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
int recvBufferSize = conf->getArgPInt("--" + prefix + "-recv-buffer-size", it.getProp("recvBufferSize"), 100);
int recvMaxReceiveCount = conf->getArgPInt("--" + prefix + "-recv-max-at-time", it.getProp("recvMaxAtTime"), 5);
const string unet_transport = conf->getArg2Param("--" + prefix + "-transport", it.getProp("transport"), "broadcast");
no_sender = conf->getArgInt("--" + prefix + "-nosender", it.getProp("nosender"));
......@@ -131,6 +132,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
if( r.r2 )
......@@ -144,6 +146,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
......@@ -606,6 +609,7 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-recv-buffer-size sz - Размер циклического буфера для приёма сообщений. По умолчанию: 100" << endl;
cout << "--prefix-recv-max-at-time num - Максимальное количество сообщений вычитываемых из сети за один раз. По умолчанию: 5" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
......@@ -95,6 +95,9 @@ namespace uniset
Буфер для приёма сообщений можно настроить параметром \b recvBufferSize="1000" в конфигурационной секции
или аргументом командной строки \b --prefix-recv-buffer-size sz
Максимальное число сообщений вычитываемых из ести за один раз настраивается параметром
\b recvMaxAtTime="5" или \b --prefix-recv-max-at-time num
\note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра
--prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes>
......@@ -78,6 +78,12 @@ void UNetReceiver::setBufferSize( size_t sz ) noexcept
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxReceiveAtTime( size_t sz ) noexcept
if( sz > 0 )
maxReceiveCount = sz;
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept
std::lock_guard<std::mutex> l(tmMutex);
......@@ -207,6 +213,13 @@ bool UNetReceiver::createConnection( bool throwEx )
if( throwEx )
throw SystemError(s.str());
catch( ... )
unetcrit << "(createConnection): catch ..." << std::endl;
if( throwEx )
return false;
......@@ -471,12 +484,16 @@ void UNetReceiver::readEvent( ev::io& watcher ) noexcept
if( !activated )
bool ok = false;
if( receive() )
for( size_t i = 0; transport->available() > 0 && i < maxReceiveCount; i++ )
std::lock_guard<std::mutex> l(tmMutex);
if( receive() != retOK )
ok = true;
catch( uniset::Exception& ex)
......@@ -487,6 +504,12 @@ void UNetReceiver::readEvent( ev::io& watcher ) noexcept
unetwarn << myname << "(receive): " << e.what() << std::endl;
if( ok )
std::lock_guard<std::mutex> l(tmMutex);
// -----------------------------------------------------------------------------
void UNetReceiver::checkConnection()
......@@ -597,7 +620,7 @@ void UNetReceiver::stop()
// -----------------------------------------------------------------------------
bool UNetReceiver::receive() noexcept
UNetReceiver::ReceiveRetCode UNetReceiver::receive() noexcept
......@@ -608,13 +631,13 @@ bool UNetReceiver::receive() noexcept
if( ret < 0 )
unetcrit << myname << "(receive): recv err(" << errno << "): " << strerror(errno) << endl;
return false;
return retError;
if( ret == 0 )
unetwarn << myname << "(receive): disconnected?!... recv 0 bytes.." << endl;
return false;
return retNoData;
......@@ -623,7 +646,7 @@ bool UNetReceiver::receive() noexcept
if( !pack->isOk() )
return false;
return retError;
if( size_t(abs(long(pack->header.num - wnum))) > maxDifferens || size_t(abs( long(wnum - rnum) )) >= (cbufSize - 2) )
......@@ -648,7 +671,7 @@ bool UNetReceiver::receive() noexcept
pack->header.num = 0;
return true;
return retOK;
if( pack->header.num != wnum )
......@@ -670,7 +693,7 @@ bool UNetReceiver::receive() noexcept
if( rnum == 0 )
rnum = pack->header.num;
return true;
return retOK;
catch( Poco::Net::NetException& ex )
......@@ -681,7 +704,7 @@ bool UNetReceiver::receive() noexcept
unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
return false;
return retError;
// -----------------------------------------------------------------------------
void UNetReceiver::initIterators() noexcept
......@@ -705,11 +728,12 @@ void UNetReceiver::initIterators() noexcept
// -----------------------------------------------------------------------------
UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) noexcept
auto dit = d_icache_map.find(pack->getDataID());
auto dID = pack->getDataID();
auto dit = d_icache_map.find(dID);
if( dit == d_icache_map.end() )
auto p = d_icache_map.emplace(pack->getDataID(), UNetReceiver::CacheVec());
auto p = d_icache_map.emplace(dID, UNetReceiver::CacheVec());
dit = p.first;
......@@ -718,7 +742,7 @@ UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) n
if( pack->header.dcount == d_info->size() )
return d_info;
unetinfo << myname << ": init dcache[" << pack->header.dcount << "] for " << pack->getDataID() << endl;
unetinfo << myname << ": init dcache[" << pack->header.dcount << "] for " << dID << endl;
......@@ -738,11 +762,12 @@ UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) n
// -----------------------------------------------------------------------------
UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) noexcept
auto ait = a_icache_map.find(pack->getDataID());
auto dID = pack->getDataID();
auto ait = a_icache_map.find(dID);
if( ait == a_icache_map.end() )
auto p = a_icache_map.emplace(pack->getDataID(), UNetReceiver::CacheVec());
auto p = a_icache_map.emplace(dID, UNetReceiver::CacheVec());
ait = p.first;
......@@ -751,7 +776,7 @@ UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) n
if( pack->header.acount == a_info->size() )
return a_info;
unetinfo << myname << ": init acache[" << pack->header.acount << "] for " << pack->getDataID() << endl;
unetinfo << myname << ": init acache[" << pack->header.acount << "] for " << dID << endl;
......@@ -129,6 +129,7 @@ namespace uniset
void setEvrunTimeout(timeout_t msec ) noexcept;
void setInitPause( timeout_t msec ) noexcept;
void setBufferSize( size_t sz ) noexcept;
void setMaxReceiveAtTime( size_t sz ) noexcept;
void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
void setLostPacketsID( uniset::ObjectId id ) noexcept;
......@@ -163,7 +164,14 @@ namespace uniset
const std::shared_ptr<SMInterface> shm;
std::shared_ptr<DebugStream> unetlog;
bool receive() noexcept;
enum ReceiveRetCode
retOK = 0,
retError = 1,
retNoData = 2
ReceiveRetCode receive() noexcept;
void step() noexcept;
void update() noexcept;
void callback( ev::io& watcher, int revents ) noexcept;
......@@ -218,6 +226,7 @@ namespace uniset
timeout_t prepareTime = { 2000 };
timeout_t evrunTimeout = { 15000 };
timeout_t lostTimeout = { 200 };
size_t maxReceiveCount = { 5 }; // количество читаемых за один раз
double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
std::atomic_bool initOK = { false };
......@@ -29,16 +29,17 @@ namespace uniset
virtual ~UNetReceiveTransport() {}
virtual bool isConnected() const = 0;
virtual std::string toString() const = 0;
virtual bool isConnected() const noexcept = 0;
virtual std::string toString() const noexcept = 0;
virtual std::string ID() const noexcept = 0;
virtual bool createConnection( bool throwEx, timeout_t recvTimeout, bool noblock ) = 0;
virtual int getSocket() const = 0;
virtual bool isReadyForReceive(timeout_t tout) = 0;
virtual bool isReadyForReceive(timeout_t tout) noexcept = 0;
virtual ssize_t receive( void* r_buf, size_t sz ) = 0;
virtual void disconnect() = 0;
virtual int available() = 0;
// Интерфейс для посылки данных в сеть
......@@ -480,7 +480,8 @@ TEST_CASE("[UNetUDP]: perf test", "[unetudp][zero][perf]")
pack.header.procID = 100;
pack.header.num = 1;
for( size_t i = 0; i < uniset::UniSetUDP::MaxACount; i++ ) {
for( size_t i = 0; i < uniset::UniSetUDP::MaxACount; i++ )
pack.addAData(i, i);
pack.addDData(i, true);
......@@ -489,7 +490,8 @@ TEST_CASE("[UNetUDP]: perf test", "[unetudp][zero][perf]")
PassiveTimer pt;
for( int i = 0; i < 100000; i++ ) {
for( int i = 0; i < 100000; i++ )
memcpy(&pack2, &pack, sizeof(UniSetUDP::UDPMessage));
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment