Commit 2febeb01 authored by Pavel Vainerman's avatar Pavel Vainerman

(unet): receiver refactoring: remove "thread strategy",

used circular buffer instead priority_queue and some other minor optimizations.
parent a779c2e4
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
// myvar = LE_TO_H(myvar) // myvar = LE_TO_H(myvar)
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#if __BYTE_ORDER == __LITTLE_ENDIAN #if __BYTE_ORDER == __LITTLE_ENDIAN
static bool HostIsBigEndian = false;
#define LE_TO_H(x) {} #define LE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX #elif INTPTR_MAX == INT64_MAX
#define LE_TO_H(x) x = le64toh(x) #define LE_TO_H(x) x = le64toh(x)
...@@ -33,6 +34,7 @@ ...@@ -33,6 +34,7 @@
#endif #endif
#if __BYTE_ORDER == __BIG_ENDIAN #if __BYTE_ORDER == __BIG_ENDIAN
static bool HostIsBigEndian = true;
#define BE_TO_H(x) {} #define BE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX #elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x) #define BE_TO_H(x) x = be64toh(x)
...@@ -253,7 +255,7 @@ namespace uniset ...@@ -253,7 +255,7 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p ) const noexcept size_t UDPMessage::transport_msg( UDPPacket& p ) const noexcept
{ {
memset(&p, 0, sizeof(UDPPacket)); p = UDPPacket{}; // reset data
size_t i = 0; size_t i = 0;
memcpy(&(p.data[i]), this, sizeof(UDPHeader)); memcpy(&(p.data[i]), this, sizeof(UDPHeader));
...@@ -311,7 +313,8 @@ namespace uniset ...@@ -311,7 +313,8 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p ) noexcept size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p ) noexcept
{ {
memset(&m, 0, sizeof(m)); // reset data
m = UDPMessage{};
size_t i = 0; size_t i = 0;
memcpy(&m, &(p.data[i]), sizeof(UDPHeader)); memcpy(&m, &(p.data[i]), sizeof(UDPHeader));
...@@ -320,7 +323,7 @@ namespace uniset ...@@ -320,7 +323,7 @@ namespace uniset
// byte order from packet // byte order from packet
u_char be_order = m._be_order; u_char be_order = m._be_order;
if( be_order ) if( be_order && !HostIsBigEndian )
{ {
BE_TO_H(m.magic); BE_TO_H(m.magic);
BE_TO_H(m.num); BE_TO_H(m.num);
...@@ -329,7 +332,7 @@ namespace uniset ...@@ -329,7 +332,7 @@ namespace uniset
BE_TO_H(m.dcount); BE_TO_H(m.dcount);
BE_TO_H(m.acount); BE_TO_H(m.acount);
} }
else else if( !be_order && HostIsBigEndian )
{ {
LE_TO_H(m.magic); LE_TO_H(m.magic);
LE_TO_H(m.num); LE_TO_H(m.num);
...@@ -385,6 +388,8 @@ namespace uniset ...@@ -385,6 +388,8 @@ namespace uniset
// CONVERT DATA TO HOST BYTE ORDER // CONVERT DATA TO HOST BYTE ORDER
// ------------------------------- // -------------------------------
if( (be_order && !HostIsBigEndian) || (!be_order && HostIsBigEndian) )
{
for( size_t n = 0; n < m.acount; n++ ) for( size_t n = 0; n < m.acount; n++ )
{ {
if( be_order ) if( be_order )
...@@ -410,6 +415,7 @@ namespace uniset ...@@ -410,6 +415,7 @@ namespace uniset
LE_TO_H(m.d_id[n]); LE_TO_H(m.d_id[n]);
} }
} }
}
return i + sz; return i + sz;
} }
......
...@@ -84,9 +84,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -84,9 +84,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout", it.getProp("lostTimeout"), 2 * updatepause); int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout", it.getProp("lostTimeout"), 2 * updatepause);
steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000); steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100); int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100);
int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount", it.getProp("maxProcessingCount"), 100);
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000); int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000); int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
int recvBufferSize = conf->getArgPInt("--" + prefix + "-recv-buffer-size", it.getProp("recvBufferSize"), 3000);
std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop"); std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop");
...@@ -333,16 +333,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -333,16 +333,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
} }
} }
UNetReceiver::UpdateStrategy r_upStrategy = UNetReceiver::strToUpdateStrategy( n_it.getProp2("unet_update_strategy", updateStrategy) );
if( r_upStrategy == UNetReceiver::useUpdateUnknown )
{
ostringstream err;
err << myname << ": Unknown update strategy!!! '" << n_it.getProp2("unet_update_strategy", updateStrategy) << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver " unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl; << h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix); auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix);
...@@ -361,11 +351,10 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -361,11 +351,10 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r->setCheckConnectionPause(checkConnectionPause); r->setCheckConnectionPause(checkConnectionPause);
r->setInitPause(initpause); r->setInitPause(initpause);
r->setMaxDifferens(maxDiff); r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id, resp_invert); r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id); r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) ); r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r->setUpdateStrategy(r_upStrategy); r->setBufferSize(recvBufferSize);
shared_ptr<UNetReceiver> r2(nullptr); shared_ptr<UNetReceiver> r2(nullptr);
...@@ -391,11 +380,10 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -391,11 +380,10 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r2->setCheckConnectionPause(checkConnectionPause); r2->setCheckConnectionPause(checkConnectionPause);
r2->setInitPause(initpause); r2->setInitPause(initpause);
r2->setMaxDifferens(maxDiff); r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id, resp_invert); r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id); r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) ); r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r2->setUpdateStrategy(r_upStrategy); r2->setBufferSize(recvBufferSize);
} }
} }
catch(...) catch(...)
...@@ -547,7 +535,7 @@ void UNetExchange::step() noexcept ...@@ -547,7 +535,7 @@ void UNetExchange::step() noexcept
} }
} }
for( auto && it : recvlist ) for( auto&& it : recvlist )
it.step(shm, myname, unetlog); it.step(shm, myname, unetlog);
} }
...@@ -755,10 +743,10 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd ) ...@@ -755,10 +743,10 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
void UNetExchange::sensorInfo( const uniset::SensorMessage* sm ) void UNetExchange::sensorInfo( const uniset::SensorMessage* sm )
{ {
if( sender ) if( sender )
sender->updateSensor( sm->id , sm->value ); sender->updateSensor( sm->id, sm->value );
if( sender2 ) if( sender2 )
sender2->updateSensor( sm->id , sm->value ); sender2->updateSensor( sm->id, sm->value );
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject() bool UNetExchange::activateObject()
...@@ -839,7 +827,7 @@ void UNetExchange::initIterators() noexcept ...@@ -839,7 +827,7 @@ void UNetExchange::initIterators() noexcept
if( sender2 ) if( sender2 )
sender2->initIterators(); sender2->initIterators();
for( auto && it : recvlist ) for( auto&& it : recvlist )
it.initIterators(shm); it.initIterators(shm);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -855,13 +843,8 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept ...@@ -855,13 +843,8 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl; cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl; cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl; cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - Максимальное количество пакетов обрабатываемых за один раз (если их слишком много)" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl; cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-update-strategy [thread,evloop] - Стратегия обновления данных в SM. " << endl; cout << "--prefix-recv-buffer-size sz - Размер циклического буфера для приёма сообщений. По умолчанию: 3000" << endl;
cout << " 'thread' - у каждого UNetReceiver отдельный поток" << endl;
cout << " 'evloop' - используется общий (с приёмом сообщений) event loop" << endl;
cout << " По умолчанию: evloop" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl; cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl; cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl; cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
...@@ -913,7 +896,7 @@ std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const ch ...@@ -913,7 +896,7 @@ std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const ch
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept
{ {
for( auto && it : recvlist ) for( auto&& it : recvlist )
{ {
if( it.r1 == r ) if( it.r1 == r )
{ {
......
...@@ -58,7 +58,6 @@ namespace uniset ...@@ -58,7 +58,6 @@ namespace uniset
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
"получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM. "получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM.
При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver). При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver).
или каждый на своём потоке. Это определяется параметром \b unet_update_strategy.
\par \par
При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать", При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать",
...@@ -78,15 +77,13 @@ namespace uniset ...@@ -78,15 +77,13 @@ namespace uniset
... ...
</iocards> </iocards>
</item> </item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_update_strategy="evloop"/> <item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/> <item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes> </nodes>
\endcode \endcode
* \b unet_update_strategy - задаёт стратегию обновления данных в SM. Буфер для приёма сообщений можно настроить параметром \b recvBufferSize="1000" в конфигурационной секции
Поддерживается два варианта: или аргументом командной строки \b --prefix-recv-buffer-size sz
- 'thread' - отдельный поток обновления
- 'evloop' - использование общего с приёмом event loop (libev)
\note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра \note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра
--prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes> --prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes>
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <sstream> #include <sstream>
#include <cmath>
#include <iomanip> #include <iomanip>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include "unisetstd.h" #include "unisetstd.h"
...@@ -28,19 +29,8 @@ using namespace uniset; ...@@ -28,19 +29,8 @@ using namespace uniset;
using namespace uniset::extensions; using namespace uniset::extensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
CommonEventLoop UNetReceiver::loop; CommonEventLoop UNetReceiver::loop;
static UniSetUDP::UDPAData emptyAData; static UniSetUDP::UDPMessage emptyMessage;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/*
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.num == rhs.num )
// return (lhs < rhs);
return lhs.num > rhs.num;
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver(const std::string& s_host, int _port UNetReceiver::UNetReceiver(const std::string& s_host, int _port
, const std::shared_ptr<SMInterface>& smi , const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection , bool nocheckConnection
...@@ -57,14 +47,9 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port ...@@ -57,14 +47,9 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
respondInvert(false), respondInvert(false),
sidLostPackets(uniset::DefaultObjectId), sidLostPackets(uniset::DefaultObjectId),
activated(false), activated(false),
pnum(0), cbuf(cbufSize),
maxDifferens(20), maxDifferens(20),
waitClean(false), lockUpdate(false)
rnum(0),
maxProcessingCount(100),
lockUpdate(false),
d_cache_init_ok(false),
a_cache_init_ok(false)
{ {
{ {
ostringstream s; ostringstream s;
...@@ -83,20 +68,30 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port ...@@ -83,20 +68,30 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
auto conf = uniset_conf(); auto conf = uniset_conf();
conf->initLogStream(unetlog, prefix + "-log"); conf->initLogStream(unetlog, prefix + "-log");
upThread = unisetstd::make_unique< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::updateThread);
if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) ) if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this); evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this); evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this); evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this); evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
ptLostTimeout.setTiming(lostTimeout);
ptRecvTimeout.setTiming(recvTimeout);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver() UNetReceiver::~UNetReceiver()
{ {
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setBufferSize( size_t sz ) noexcept
{
if( sz > 0 )
{
cbufSize = sz;
cbuf.resize(sz);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept
{ {
std::lock_guard<std::mutex> l(tmMutex); std::lock_guard<std::mutex> l(tmMutex);
...@@ -128,15 +123,10 @@ void UNetReceiver::setUpdatePause( timeout_t msec ) noexcept ...@@ -128,15 +123,10 @@ void UNetReceiver::setUpdatePause( timeout_t msec ) noexcept
{ {
updatepause = msec; updatepause = msec;
if( upStrategy == useUpdateEventLoop && evUpdate.is_active() ) if( evUpdate.is_active() )
evUpdate.start(0, (float)updatepause / 1000.); evUpdate.start(0, (float)updatepause / 1000.);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set ) noexcept
{
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxDifferens( unsigned long set ) noexcept void UNetReceiver::setMaxDifferens( unsigned long set ) noexcept
{ {
maxDifferens = set; maxDifferens = set;
...@@ -211,8 +201,6 @@ bool UNetReceiver::createConnection( bool throwEx ) ...@@ -211,8 +201,6 @@ bool UNetReceiver::createConnection( bool throwEx )
udp = unisetstd::make_unique<UDPReceiveU>(addr, port); udp = unisetstd::make_unique<UDPReceiveU>(addr, port);
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev) udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this); evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
if( upStrategy == useUpdateEventLoop )
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this); evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() ) if( evCheckConnection.is_active() )
...@@ -264,9 +252,6 @@ void UNetReceiver::start() ...@@ -264,9 +252,6 @@ void UNetReceiver::start()
std::terminate(); std::terminate();
return; return;
} }
if( upStrategy == useUpdateThread && !upThread->isRunning() )
upThread->start();
} }
else else
forceUpdate(); forceUpdate();
...@@ -278,12 +263,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept ...@@ -278,12 +263,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evStatistic.start(0, 1.0); // раз в сек evStatistic.start(0, 1.0); // раз в сек
evInitPause.set(eloop); evInitPause.set(eloop);
if( upStrategy == useUpdateEventLoop )
{
evUpdate.set(eloop); evUpdate.set(eloop);
evUpdate.start( 0, ((float)updatepause / 1000.) ); evUpdate.start( 0, ((float)updatepause / 1000.) );
}
if( !udp ) if( !udp )
{ {
...@@ -325,8 +306,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept ...@@ -325,8 +306,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept void UNetReceiver::forceUpdate() noexcept
{ {
pack_guard l(packMutex, upStrategy); rnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. update) // и тем самым заставляем обновить данные в SM (см. update)
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -363,99 +343,70 @@ void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept ...@@ -363,99 +343,70 @@ void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept
tmr.stop(); tmr.stop();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UNetReceiver::rnext( size_t num )
{
UniSetUDP::UDPMessage* p;
size_t i = num + 1;
while( i < wnum )
{
p = &cbuf[i % cbufSize];
if( p->num > num )
return i;
i++;
}
return wnum;
}
// -----------------------------------------------------------------------------
void UNetReceiver::update() noexcept void UNetReceiver::update() noexcept
{ {
UniSetUDP::UDPMessage p; // ещё не было пакетов
if( wnum == 1 && rnum == 0 )
return;
UniSetUDP::UDPMessage* p;
CacheItem* c_it = nullptr; CacheItem* c_it = nullptr;
UniSetUDP::UDPAData* dat = nullptr; UniSetUDP::UDPAData* dat = nullptr;
long s_id; long s_id;
// обрабатываем, пока очередь либо не опустеет, // обрабатываем, пока очередь либо не опустеет,
// либо обнаружится "дырка" в последовательности, // либо обнаружится "дырка" в последовательности,
// но при этом обрабатываем не больше maxProcessingCount while( rnum < wnum )
// за один раз..
int k = maxProcessingCount;
while( k > 0 )
{
{ {
// lock qpack p = &cbuf[rnum % cbufSize];
pack_guard l(packMutex, upStrategy);
if( qpack.empty() ) // если номер пакета не равен ожидаемому, ждём считая что это "дырка"
return; // т.к. разрывы и другие случаи обрабатываются при приёме пакетов
if( p->num != rnum )
p = qpack.top();
size_t sub = labs( (long int)(p.num - pnum) );
if( pnum > 0 )
{ {
// если sub > maxDifferens
// значит это просто "разрыв"
// и нам ждать lostTimeout не надо
// сразу начинаем обрабатывать новые пакеты
// а если > 1 && < maxDifferens
// значит это временная "дырка"
// и надо подождать lostTimeout
// чтобы констатировать потерю пакета..
if( sub > 1 && sub < maxDifferens )
{
// если p.num < pnum, то это какой-то "дубль",
// т.к мы все пакеты <= pnum уже "отработали".
// а значит можно не ждать, а откидывать пакет и
// дальше работать..
if( p.num < pnum )
{
qpack.pop();
continue;
}
if( !ptLostTimeout.checkTime() ) if( !ptLostTimeout.checkTime() )
return; return;
unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p.num << " lost " << sub << " packets " << endl; size_t sub = (p->num - rnum);
unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p->num << " lost " << sub << " packets " << endl;
lostPackets += sub; lostPackets += sub;
}
else if( p.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
* для "оптимизации".. лучше игнорировать
*/
qpack.pop(); // пока выбрали вариант "оптимизации" (выкидываем из очереди и идём дальше)
continue;
}
if( sub >= maxDifferens ) // ищем следующий пакет для обработки
{ rnum = rnext(rnum);
// считаем сколько пакетов потеряли.. (pnum=0 - означает что мы только что запустились...) continue;
if( pnum != 0 && p.num > pnum )
{
lostPackets += sub;
unetwarn << myname << "(update): sub=" << sub << " > maxDifferenst(" << maxDifferens << ")! lost " << sub << " packets " << endl;
}
}
} }
ptLostTimeout.reset(); ptLostTimeout.reset();
rnum++;
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.num;
} // unlock qpack
k--;
upCount++; upCount++;
// Обработка дискретных // Обработка дискретных
auto d_iv = getDCache(p, !d_cache_init_ok); auto d_iv = getDCache(p);
for( size_t i = 0; i < p.dcount; i++ )
for( size_t i = 0; i < p->dcount; i++ )
{ {
try try
{ {
s_id = p.dID(i); s_id = p->dID(i);
c_it = &d_iv.cache[i]; c_it = &(*d_iv)[i];
if( c_it->id != s_id ) if( c_it->id != s_id )
{ {
...@@ -468,26 +419,35 @@ void UNetReceiver::update() noexcept ...@@ -468,26 +419,35 @@ void UNetReceiver::update() noexcept
if( lockUpdate ) if( lockUpdate )
continue; continue;
shm->localSetValue(c_it->ioit, s_id, p.dValue(i), shm->ID()); shm->localSetValue(c_it->ioit, s_id, p->dValue(i), shm->ID());
} }
catch( const uniset::Exception& ex) catch( const uniset::Exception& ex)
{ {
unetcrit << myname << "(update): " << ex << std::endl; unetcrit << myname << "(update): "
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: " << ex
<< std::endl;
} }
catch(...) catch(...)
{ {
unetcrit << myname << "(update): catch ..." << std::endl; unetcrit << myname << "(update): "
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: catch..."
<< std::endl;
} }
} }
// Обработка аналоговых // Обработка аналоговых
auto a_iv = getACache(p, !a_cache_init_ok); auto a_iv = getACache(p);
for( size_t i = 0; i < p.acount; i++ )
for( size_t i = 0; i < p->acount; i++ )
{ {
try try
{ {
dat = &p.a_dat[i]; dat = &p->a_dat[i];
c_it = &a_iv.cache[i]; c_it = &(*a_iv)[i];
if( c_it->id != dat->id ) if( c_it->id != dat->id )
{ {
...@@ -504,61 +464,21 @@ void UNetReceiver::update() noexcept ...@@ -504,61 +464,21 @@ void UNetReceiver::update() noexcept
} }
catch( const uniset::Exception& ex) catch( const uniset::Exception& ex)
{ {
unetcrit << myname << "(update): " << ex << std::endl; unetcrit << myname << "(update): "
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: " << ex
<< std::endl;
} }
catch(...) catch(...)
{ {
unetcrit << myname << "(update): catch ..." << std::endl; unetcrit << myname << "(update): "
} << " id=" << dat->id
} << " val=" << dat->val
} << " error: catch..."
} << std::endl;
// -----------------------------------------------------------------------------
void UNetReceiver::updateThread() noexcept
{
while( activated )
{
try
{
update();
}
catch( std::exception& ex )
{
unetcrit << myname << "(update_thread): " << ex.what() << endl;
}
// смотрим есть ли связь..
checkConnection();
if( sidRespond != DefaultObjectId )
{
try
{
if( isInitOK() )
{
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (respond) " << ex.what() << std::endl;
}
}
if( sidLostPackets != DefaultObjectId )
{
try
{
shm->localSetValue(itLostPackets, sidLostPackets, getLostPacketsNum(), shm->ID());
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (lostPackets) " << ex.what() << std::endl;
} }
} }
msleep(updatepause);
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -702,7 +622,6 @@ void UNetReceiver::stop() ...@@ -702,7 +622,6 @@ void UNetReceiver::stop()
{ {
unetinfo << myname << ": stop.." << endl; unetinfo << myname << ": stop.." << endl;
activated = false; activated = false;
upThread->join();
loop.evstop(this); loop.evstop(this);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -712,7 +631,6 @@ bool UNetReceiver::receive() noexcept ...@@ -712,7 +631,6 @@ bool UNetReceiver::receive() noexcept
{ {
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data)); ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data));
recvCount++; recvCount++;
//ssize_t ret = udp->receiveFrom(r_buf.data, sizeof(r_buf.data),saddr);
if( ret < 0 ) if( ret < 0 )
{ {
...@@ -726,159 +644,123 @@ bool UNetReceiver::receive() noexcept ...@@ -726,159 +644,123 @@ bool UNetReceiver::receive() noexcept
return false; return false;
} }
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf); // сперва пробуем сохранить пакет в том месте, которе должно быть очередным на запись
pack = &cbuf[wnum % cbufSize];
size_t sz = UniSetUDP::UDPMessage::getMessage(*pack, r_buf);
if( sz == 0 ) if( sz == 0 )
{ {
unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl; unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
return false; return false;
} }
}
catch( Poco::Net::NetException& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
return false;
}
catch( exception& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
return false;
}
if( pack.magic != UniSetUDP::UNETUDP_MAGICNUM ) if( pack->magic != UniSetUDP::UNETUDP_MAGICNUM )
{
// пакет не нашей "системы"
return false; return false;
}
if( rnum > 0 && labs( (long int)(pack.num - rnum) ) > maxDifferens ) if( abs(long(pack->num - wnum)) > maxDifferens || abs( long(wnum - rnum) ) >= (cbufSize - 2) )
{ {
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв" unetcrit << myname << "(receive): DISAGREE "
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию) << " packnum=" << pack->num
* А можно породолжать складывать во временную, но тогда есть риск "никогда" не разгрести временную << " wnum=" << wnum
* очередь, при "частых обрывах". Потому-что update будет на каждом разрыве ждать ещё lostTimeout.. << " rnum=" << rnum
*/ << " (maxDiff=" << maxDifferens
// Пока выбираю.. чистить qtmp. Это будет соотвествовать логике работы с картами у которых ограничен буфер приёма. << " indexDiff=" << abs( long(wnum - rnum) )
// Обычно "кольцевой". Т.е. если не успели обработать и "вынуть" из буфера информацию.. он будет переписан новыми данными << ")"
if( waitClean ) << endl;
{
unetcrit << myname << "(receive): reset qtmp.." << endl;
while( !qtmp.empty() ) lostPackets = pack->num > wnum ? (pack->num - wnum - 1) : lostPackets + 1;
qtmp.pop(); // реинициализируем позицию для чтения
rnum = pack->num;
wnum = pack->num + 1;
// перемещаем пакет в нужное место (если требуется)
if( wnum != pack->num )
{
cbuf[pack->num % cbufSize] = (*pack);
pack->num = 0;
} }
waitClean = true; return true;
} }
rnum = pack.num; if( pack->num != wnum )
{
// перемещаем пакет в правильное место
// в соответствии с его номером
cbuf[pack->num % cbufSize] = (*pack);
#if 0 if( pack->num >= wnum )
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz wnum = pack->num + 1;
<< " header: " << pack.msg.header
<< " waitClean=" << waitClean
<< endl;
for( size_t i = 0; i < pack.msg.header.dcount; i++ ) // обнуляем номер в том месте где записали, чтобы его не обрабатывал update
{ pack->num = 0;
UniSetUDP::UDPData& d = pack.msg.dat[i];
cerr << "****** save id=" << d.id << " val=" << d.val << endl;
} }
else if( pack->num >= wnum )
wnum = pack->num + 1;
#endif // начальная инициализация для чтения
if( rnum == 0 )
{ rnum = pack->num;
// lock qpack
pack_guard l(packMutex, upStrategy);
if( !waitClean )
{
qpack.push(pack);
return true; return true;
} }
catch( Poco::Net::NetException& ex )
if( !qpack.empty() )
{ {
qtmp.push(pack); unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
} }
else catch( exception& ex )
{
// основная очередь освободилась..
// копируем в неё всё что набралось в qtmp...
while( !qtmp.empty() )
{ {
qpack.push(qtmp.top()); unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
qtmp.pop();
}
// не забываем и текущий поместить в очередь..
qpack.push(pack);
waitClean = false;
} }
} // unlock qpack
return true; return false;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initIterators() noexcept void UNetReceiver::initIterators() noexcept
{ {
for( auto mit = d_icache_map.begin(); mit != d_icache_map.end(); ++mit ) for( auto mit = d_icache_map.begin(); mit != d_icache_map.end(); ++mit )
{ {
CacheVec& d_icache(mit->second.cache); CacheVec& d_icache = mit->second;
for( auto && it : d_icache ) for( auto&& it : d_icache )
shm->initIterator(it.ioit); shm->initIterator(it.ioit);
} }
for( auto mit = a_icache_map.begin(); mit != a_icache_map.end(); ++mit ) for( auto mit = a_icache_map.begin(); mit != a_icache_map.end(); ++mit )
{ {
CacheVec& a_icache(mit->second.cache); CacheVec& a_icache = mit->second;
for( auto && it : a_icache ) for( auto&& it : a_icache )
shm->initIterator(it.ioit); shm->initIterator(it.ioit);
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, bool force ) noexcept UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) noexcept
{ {
// если элемента нет, он будет создан auto dit = d_icache_map.find(pack->getDataID());
CacheInfo& d_info = d_icache_map[pack.getDataID()];
if( !force && pack.dcount == d_info.cache.size() )
return d_info;
if( d_info.cache_init_ok && pack.dcount == d_info.cache.size() ) if( dit == d_icache_map.end() )
{ {
d_cache_init_ok = true; auto p = d_icache_map.emplace(pack->getDataID(), UNetReceiver::CacheVec());
auto it = d_icache_map.begin(); dit = p.first;
for( ; it != d_icache_map.end(); ++it )
{
d_info = it->second;
d_cache_init_ok = d_cache_init_ok && d_info.cache_init_ok;
if(d_cache_init_ok == false)
break;
} }
CacheVec* d_info = &dit->second;
if( pack->dcount == d_info->size() )
return d_info; return d_info;
}
unetinfo << myname << ": init dcache for " << pack.getDataID() << endl; unetinfo << myname << ": init dcache for " << pack->getDataID() << endl;
d_info.cache_init_ok = true; d_info->resize(pack->dcount);
d_info.cache.resize(pack.dcount);
size_t sz = d_info.cache.size(); for( size_t i = 0; i < pack->dcount; i++ )
auto conf = uniset_conf();
for( size_t i = 0; i < sz; i++ )
{ {
CacheItem& d(d_info.cache[i]); CacheItem& d = (*d_info)[i];
if( d.id != pack.d_id[i] ) if( d.id != pack->d_id[i] )
{ {
d.id = pack.d_id[i]; d.id = pack->d_id[i];
shm->initIterator(d.ioit); shm->initIterator(d.ioit);
} }
} }
...@@ -886,46 +768,32 @@ UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, b ...@@ -886,46 +768,32 @@ UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, b
return d_info; return d_info;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::CacheInfo& UNetReceiver::getACache( UniSetUDP::UDPMessage& pack, bool force ) noexcept UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) noexcept
{ {
// если элемента нет, он будет создан auto ait = a_icache_map.find(pack->getDataID());
CacheInfo& a_info = a_icache_map[pack.getDataID()];
if( !force && pack.acount == a_info.cache.size() )
return a_info;
if( a_info.cache_init_ok && pack.acount == a_info.cache.size() )
{
a_cache_init_ok = true;
auto it = a_icache_map.begin();
for( ; it != a_icache_map.end(); ++it ) if( ait == a_icache_map.end() )
{ {
a_info = it->second; auto p = a_icache_map.emplace(pack->getDataID(), UNetReceiver::CacheVec());
a_cache_init_ok = a_cache_init_ok && a_info.cache_init_ok; ait = p.first;
if(a_cache_init_ok == false)
break;
} }
return a_info; CacheVec* a_info = &ait->second;
}
unetinfo << myname << ": init icache for " << pack.getDataID() << endl; if( pack->acount == a_info->size() )
a_info.cache_init_ok = true; return a_info;
auto conf = uniset_conf();
a_info.cache.resize(pack.acount); unetinfo << myname << ": init acache for " << pack->getDataID() << endl;
size_t sz = a_info.cache.size(); a_info->resize(pack->acount);
for( size_t i = 0; i < sz; i++ ) for( size_t i = 0; i < pack->acount; i++ )
{ {
CacheItem& d(a_info.cache[i]); CacheItem& d = (*a_info)[i];
if( d.id != pack.a_dat[i].id ) if( d.id != pack->a_dat[i].id )
{ {
d.id = pack.a_dat[i].id; d.id = pack->a_dat[i].id;
shm->initIterator(d.ioit); shm->initIterator(d.ioit);
} }
} }
...@@ -938,49 +806,6 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept ...@@ -938,49 +806,6 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept
slEvent = sl; slEvent = sl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::UpdateStrategy UNetReceiver::strToUpdateStrategy( const string& s ) noexcept
{
if( s == "thread" || s == "THREAD" )
return useUpdateThread;
if( s == "evloop" || s == "EVLOOP" )
return useUpdateEventLoop;
return useUpdateUnknown;
}
// -----------------------------------------------------------------------------
string UNetReceiver::to_string( UNetReceiver::UpdateStrategy s ) noexcept
{
if( s == useUpdateThread )
return "thread";
if( s == useUpdateEventLoop )
return "evloop";
return "";
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdateStrategy( UNetReceiver::UpdateStrategy set )
{
if( set == useUpdateEventLoop && upThread->isRunning() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateEventLoop' strategy but updateThread is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
if( set == useUpdateThread && evUpdate.is_active() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateThread' strategy but update event loop is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
upStrategy = set;
}
// -----------------------------------------------------------------------------
const std::string UNetReceiver::getShortInfo() const noexcept const std::string UNetReceiver::getShortInfo() const noexcept
{ {
// warning: будет вызываться из другого потока // warning: будет вызываться из другого потока
...@@ -993,7 +818,6 @@ const std::string UNetReceiver::getShortInfo() const noexcept ...@@ -993,7 +818,6 @@ const std::string UNetReceiver::getShortInfo() const noexcept
<< " recvOK=" << isRecvOK() << " recvOK=" << isRecvOK()
<< " receivepack=" << rnum << " receivepack=" << rnum
<< " lostPackets=" << setw(6) << getLostPacketsNum() << " lostPackets=" << setw(6) << getLostPacketsNum()
<< " updateStrategy=" << to_string(upStrategy)
<< endl << endl
<< "\t[" << "\t["
<< " recvTimeout=" << setw(6) << recvTimeout << " recvTimeout=" << setw(6) << recvTimeout
...@@ -1002,26 +826,10 @@ const std::string UNetReceiver::getShortInfo() const noexcept ...@@ -1002,26 +826,10 @@ const std::string UNetReceiver::getShortInfo() const noexcept
<< " lostTimeout=" << setw(6) << lostTimeout << " lostTimeout=" << setw(6) << lostTimeout
<< " updatepause=" << setw(6) << updatepause << " updatepause=" << setw(6) << updatepause
<< " maxDifferens=" << setw(6) << maxDifferens << " maxDifferens=" << setw(6) << maxDifferens
<< " maxProcessingCount=" << setw(6) << maxProcessingCount
<< " waitClean=" << waitClean
<< " ]" << " ]"
<< endl << endl
<< "\t[ qsize=" << qpack.size() << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]"; << "\t[ qsize=" << (wnum - rnum - 1) << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]";
return s.str(); return s.str();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::pack_guard::pack_guard( mutex& _m, UNetReceiver::UpdateStrategy _s ):
m(_m),
s(_s)
{
if( s == useUpdateThread )
m.lock();
}
// -----------------------------------------------------------------------------
UNetReceiver::pack_guard::~pack_guard()
{
if( s == useUpdateThread )
m.unlock();
}
// -----------------------------------------------------------------------------
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <ostream> #include <ostream>
#include <memory> #include <memory>
#include <string> #include <string>
#include <queue> #include <vector>
#include <unordered_map> #include <unordered_map>
#include <sigc++/sigc++.h> #include <sigc++/sigc++.h>
#include <ev++.h> #include <ev++.h>
...@@ -38,13 +38,20 @@ namespace uniset ...@@ -38,13 +38,20 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* =============== * ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приоритета используется номер пакета * что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", * Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
* куда поместить пакет в буфере. Есть два индекса
* rnum - (read number) номер последнего обработанного пакета + 1
* wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
* WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
*
* При этом обработка ведётся по порядку (только пакеты идущие подряд)
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
* Всё это реализовано в функции UNetReceiver::real_update() * Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
* Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
* либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
* *
* КЭШ * КЭШ
* === * ===
...@@ -63,19 +70,15 @@ namespace uniset ...@@ -63,19 +70,15 @@ namespace uniset
* В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов, * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
* т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов. * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
* *
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) * Обработка сбоев в номере пакетов
* ========================================================================= * =========================================================================
* Для защиты от сбоя счётчика сделана следующая логика: * Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, * то считается, что произошёл сбой или узел который посылал пакеты перезагрузился
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. * Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, * реинициализация и обработка продолжается с нового номера.
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. *
* Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
* и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
* затирают старые, если их не успели вынуть и обработать.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
* ========================================================================= * =========================================================================
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.. * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.
* *
* Создание соединения (открытие сокета) * Создание соединения (открытие сокета)
* ====================================== * ======================================
...@@ -87,13 +90,6 @@ namespace uniset ...@@ -87,13 +90,6 @@ namespace uniset
* Если такая логика не требуется, то можно задать в конструкторе * Если такая логика не требуется, то можно задать в конструкторе
* последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения. * выкинуто исключение при неудачной попытке создания соединения.
*
* Стратегия обновления данных в SM
* ==================================
* При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
* Поддерживается два варианта:
* 'thread' - отдельный поток обновления
* 'evloop' - использование общего с приёмом event loop (libev)
*/ */
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetReceiver: class UNetReceiver:
...@@ -132,12 +128,11 @@ namespace uniset ...@@ -132,12 +128,11 @@ namespace uniset
void setMaxDifferens( unsigned long set ) noexcept; void setMaxDifferens( unsigned long set ) noexcept;
void setEvrunTimeout(timeout_t msec ) noexcept; void setEvrunTimeout(timeout_t msec ) noexcept;
void setInitPause( timeout_t msec ) noexcept; void setInitPause( timeout_t msec ) noexcept;
void setBufferSize( size_t sz ) noexcept;
void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept; void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
void setLostPacketsID( uniset::ObjectId id ) noexcept; void setLostPacketsID( uniset::ObjectId id ) noexcept;
void setMaxProcessingCount( int set ) noexcept;
void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
inline std::string getAddress() const noexcept inline std::string getAddress() const noexcept
...@@ -160,35 +155,6 @@ namespace uniset ...@@ -160,35 +155,6 @@ namespace uniset
void connectEvent( EventSlot sl ) noexcept; void connectEvent( EventSlot sl ) noexcept;
// -------------------------------------------------------------------- // --------------------------------------------------------------------
/*! Стратегия обработки сообщений */
enum UpdateStrategy
{
useUpdateUnknown,
useUpdateThread, /*!< использовать отдельный поток */
useUpdateEventLoop /*!< использовать event loop (т.е. совместно с receive) */
};
static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
static std::string to_string( UpdateStrategy s) noexcept;
//! функция должна вызываться до первого вызова start()
void setUpdateStrategy( UpdateStrategy set );
// специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
// (т.к. при evloop mutex захватывать не нужно)
class pack_guard
{
public:
pack_guard( std::mutex& m, UpdateStrategy s );
~pack_guard();
protected:
std::mutex& m;
UpdateStrategy s;
};
// --------------------------------------------------------------------
inline std::shared_ptr<DebugStream> getLog() inline std::shared_ptr<DebugStream> getLog()
{ {
return unetlog; return unetlog;
...@@ -204,7 +170,6 @@ namespace uniset ...@@ -204,7 +170,6 @@ namespace uniset
bool receive() noexcept; bool receive() noexcept;
void step() noexcept; void step() noexcept;
void update() noexcept; void update() noexcept;
void updateThread() noexcept;
void callback( ev::io& watcher, int revents ) noexcept; void callback( ev::io& watcher, int revents ) noexcept;
void readEvent( ev::io& watcher ) noexcept; void readEvent( ev::io& watcher ) noexcept;
void updateEvent( ev::periodic& watcher, int revents ) noexcept; void updateEvent( ev::periodic& watcher, int revents ) noexcept;
...@@ -221,21 +186,7 @@ namespace uniset ...@@ -221,21 +186,7 @@ namespace uniset
void initIterators() noexcept; void initIterators() noexcept;
bool createConnection( bool throwEx = false ); bool createConnection( bool throwEx = false );
void checkConnection(); void checkConnection();
size_t rnext( size_t num );
public:
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
inline bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
return lhs.num > rhs.num;
}
};
typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
private: private:
UNetReceiver(); UNetReceiver();
...@@ -253,18 +204,14 @@ namespace uniset ...@@ -253,18 +204,14 @@ namespace uniset
ev::periodic evUpdate; ev::periodic evUpdate;
ev::timer evInitPause; ev::timer evInitPause;
UpdateStrategy upStrategy = { useUpdateEventLoop };
// счётчики для подсчёта статистики // счётчики для подсчёта статистики
size_t recvCount = { 0 }; size_t recvCount = { 0 };
size_t upCount = { 0 }; size_t upCount = { 0 };
// текущая статистик // текущая статистика
size_t statRecvPerSec = { 0 }; /*!< количество принимаемых пакетов в секунду */ size_t statRecvPerSec = { 0 }; /*!< количество принимаемых пакетов в секунду */
size_t statUpPerSec = { 0 }; /*!< количество обработанных пакетов в секунду */ size_t statUpPerSec = { 0 }; /*!< количество обработанных пакетов в секунду */
std::unique_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
// делаем loop общим.. одним на всех! // делаем loop общим.. одним на всех!
static CommonEventLoop loop; static CommonEventLoop loop;
...@@ -292,23 +239,19 @@ namespace uniset ...@@ -292,23 +239,19 @@ namespace uniset
std::atomic_bool activated = { false }; std::atomic_bool activated = { false };
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */ size_t cbufSize = { 2000 }; /*!< размер буфера для сообщений по умолчанию */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */ std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
UniSetUDP::UDPPacket r_buf; size_t wnum = { 1 }; /*!< номер следующего ожидаемого пакета */
std::mutex packMutex; /*!< mutex для работы с очередью */ size_t rnum = { 0 }; /*!< номер последнего обработанного пакета */
size_t pnum = { 0 }; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
UniSetUDP::UDPMessage* pack;
UniSetUDP::UDPPacket r_buf; /*!< просто буфер для получения очередного сообщения */
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов /*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился... * прошёл через максимум или сбился...
*/ */
size_t maxDifferens = { 20 }; size_t maxDifferens = { 20 };
PacketQueue qtmp; /*!< очередь на время обработки(очистки) основной очереди */
bool waitClean = { false }; /*!< флаг означающий, что ждём очистки очереди до конца */
size_t rnum = { 0 }; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
size_t maxProcessingCount = { 100 }; /*!< максимальное число обрабатываемых за один раз сообщений */
std::atomic_bool lockUpdate = { false }; /*!< флаг блокировки сохранения принятых данных в SM */ std::atomic_bool lockUpdate = { false }; /*!< флаг блокировки сохранения принятых данных в SM */
EventSlot slEvent; EventSlot slEvent;
...@@ -325,25 +268,14 @@ namespace uniset ...@@ -325,25 +268,14 @@ namespace uniset
}; };
typedef std::vector<CacheItem> CacheVec; typedef std::vector<CacheItem> CacheVec;
struct CacheInfo
{
CacheInfo():
cache_init_ok(false) {}
bool cache_init_ok = { false };
CacheVec cache;
};
// ключом является UDPMessage::getDataID() // ключом является UDPMessage::getDataID()
typedef std::unordered_map<long, CacheInfo> CacheMap; typedef std::unordered_map<long, CacheVec> CacheMap;
CacheMap d_icache_map; /*!< кэш итераторов для булевых */ CacheMap d_icache_map; /*!< кэш итераторов для булевых */
CacheMap a_icache_map; /*!< кэш итераторов для аналоговых */ CacheMap a_icache_map; /*!< кэш итераторов для аналоговых */
bool d_cache_init_ok = { false }; CacheVec* getDCache( UniSetUDP::UDPMessage* pack ) noexcept;
bool a_cache_init_ok = { false }; CacheVec* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
CacheInfo& getDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
CacheInfo& getACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
} // end of namespace uniset } // end of namespace uniset
......
...@@ -156,7 +156,7 @@ namespace uniset ...@@ -156,7 +156,7 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::updateFromSM() void UNetSender::updateFromSM()
{ {
for( auto && it : items ) for( auto&& it : items )
{ {
UItem& i = it.second; UItem& i = it.second;
...@@ -243,7 +243,7 @@ namespace uniset ...@@ -243,7 +243,7 @@ namespace uniset
if( !shm->isLocalwork() ) if( !shm->isLocalwork() )
updateFromSM(); updateFromSM();
for( auto && it : mypacks ) for( auto&& it : mypacks )
{ {
if( it.first > 1 && (ncycle % it.first) != 0 ) if( it.first > 1 && (ncycle % it.first) != 0 )
continue; continue;
...@@ -504,6 +504,7 @@ namespace uniset ...@@ -504,6 +504,7 @@ namespace uniset
if( p.pack_ind >= maxAData ) if( p.pack_ind >= maxAData )
{ {
anum++; anum++;
if( anum >= pk.size() ) if( anum >= pk.size() )
pk.resize(anum + 1); pk.resize(anum + 1);
...@@ -551,7 +552,7 @@ namespace uniset ...@@ -551,7 +552,7 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::initIterators() void UNetSender::initIterators()
{ {
for( auto && it : items ) for( auto&& it : items )
shm->initIterator(it.second.ioit); shm->initIterator(it.second.ioit);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
......
if HAVE_TESTS if HAVE_TESTS
noinst_PROGRAMS = tests-with-sm urecv-perf-test noinst_PROGRAMS = tests-with-sm urecv-perf-test
#noinst_PROGRAMS = urecv-perf-test
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc
tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \ tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
......
...@@ -96,45 +96,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 ) ...@@ -96,45 +96,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
size_t ret = udp_s->sendTo(&s_buf.data, s_buf.len, s_addr); size_t ret = udp_s->sendTo(&s_buf.data, s_buf.len, s_addr);
REQUIRE( ret == s_buf.len ); REQUIRE( ret == s_buf.len );
} }
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
{
UNetReceiver::PacketQueue q;
UniSetUDP::UDPMessage m1;
m1.num = 10;
UniSetUDP::UDPMessage m2;
m2.num = 11;
UniSetUDP::UDPMessage m3;
m3.num = 13;
UniSetUDP::UDPMessage m4;
m4.num = 100;
// специально складываем в обратном порядке
// чтобы проверить "сортировку"
q.push(m1);
q.push(m3);
q.push(m2);
q.push(m4);
UniSetUDP::UDPMessage t = q.top();
REQUIRE( t.num == 10 );
q.pop();
t = q.top();
REQUIRE( t.num == 11 );
q.pop();
t = q.top();
REQUIRE( t.num == 13 );
q.pop();
t = q.top();
REQUIRE( t.num == 100 );
q.pop();
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]") TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
{ {
SECTION("UDPMessage::isFull()") SECTION("UDPMessage::isFull()")
......
#!/bin/sh
# '--' - нужен для отделения аргументов catch, от наших..
cd ../../../Utilities/Admin/
./uniset2-start.sh -f ./create_links.sh
./uniset2-start.sh -f ./create
./uniset2-start.sh -f ./exist | grep -q UNISET_PLC/Controllers || exit 1
cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy thread
#--unet-log-add-levels any
AT_SETUP([UNetUDP tests (with SM)(thread)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_thread.sh],[0],[ignore],[ignore])
AT_CLEANUP
AT_SETUP([UNetUDP tests (with SM)(evloop)]) AT_SETUP([UNetUDP tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore]) AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP AT_CLEANUP
......
...@@ -42,7 +42,9 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5 ...@@ -42,7 +42,9 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{ {
try try
{ {
cout << "create sender: " << s_host << ":" << begPort + i << endl;
auto s = make_shared<UDPSocketU>(s_host, begPort + i); auto s = make_shared<UDPSocketU>(s_host, begPort + i);
s->setBroadcast(true);
vsend.emplace_back(s); vsend.emplace_back(s);
} }
catch( Poco::Net::NetException& e ) catch( Poco::Net::NetException& e )
...@@ -103,7 +105,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5 ...@@ -103,7 +105,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
for( auto && udp : vsend ) for( auto&& udp : vsend )
{ {
try try
{ {
...@@ -139,6 +141,7 @@ static void run_test( size_t max, const std::string& host ) ...@@ -139,6 +141,7 @@ static void run_test( size_t max, const std::string& host )
// make receivers.. // make receivers..
for( size_t i = 0; i < max; i++ ) for( size_t i = 0; i < max; i++ )
{ {
cout << "create receiver: " << host << ":" << begPort + i << endl;
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance()); auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
r->setLockUpdate(true); r->setLockUpdate(true);
vrecv.emplace_back(r); vrecv.emplace_back(r);
...@@ -147,7 +150,7 @@ static void run_test( size_t max, const std::string& host ) ...@@ -147,7 +150,7 @@ static void run_test( size_t max, const std::string& host )
size_t count = 0; size_t count = 0;
// Run receivers.. // Run receivers..
for( auto && r : vrecv ) for( auto&& r : vrecv )
{ {
if( r ) if( r )
{ {
...@@ -156,12 +159,12 @@ static void run_test( size_t max, const std::string& host ) ...@@ -156,12 +159,12 @@ static void run_test( size_t max, const std::string& host )
} }
} }
cerr << "RUn " << count << " receivers..." << endl; cerr << "RUN " << count << " receivers..." << endl;
// wait.. // wait..
pause(); pause();
for( auto && r : vrecv ) for( auto&& r : vrecv )
{ {
if(r) if(r)
r->stop(); r->stop();
...@@ -177,9 +180,9 @@ int main(int argc, char* argv[] ) ...@@ -177,9 +180,9 @@ int main(int argc, char* argv[] )
auto conf = uniset_init(argc, argv); auto conf = uniset_init(argc, argv);
if( argc > 1 && !strcmp(argv[1], "s") ) if( argc > 1 && !strcmp(argv[1], "s") )
run_senders(10, host); run_senders(1, host);
else else
run_test(10, host); run_test(1, host);
return 0; return 0;
} }
......
...@@ -29,6 +29,7 @@ static struct option longopts[] = ...@@ -29,6 +29,7 @@ static struct option longopts[] =
{ "prof", required_argument, 0, 'y' }, { "prof", required_argument, 0, 'y' },
{ "a-data", required_argument, 0, 'a' }, { "a-data", required_argument, 0, 'a' },
{ "d-data", required_argument, 0, 'i' }, { "d-data", required_argument, 0, 'i' },
{ "pack-num", required_argument, 0, 'u' },
{ NULL, 0, 0, 0 } { NULL, 0, 0, 0 }
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -79,10 +80,11 @@ int main(int argc, char* argv[]) ...@@ -79,10 +80,11 @@ int main(int argc, char* argv[])
unsigned int nprof = 0; unsigned int nprof = 0;
std::string d_data = ""; std::string d_data = "";
std::string a_data = ""; std::string a_data = "";
size_t packetnum = 1;
while(1) while(1)
{ {
opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:", longopts, &optindex); opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:u:", longopts, &optindex);
if( opt == -1 ) if( opt == -1 )
break; break;
...@@ -106,6 +108,7 @@ int main(int argc, char* argv[]) ...@@ -106,6 +108,7 @@ int main(int argc, char* argv[])
cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl; cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl;
cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl; cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl;
cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl; cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl;
cout << "[-u|--pack-num] num - first packet numbrt (default: 1)" << endl;
cout << endl; cout << endl;
return 0; return 0;
...@@ -171,6 +174,10 @@ int main(int argc, char* argv[]) ...@@ -171,6 +174,10 @@ int main(int argc, char* argv[])
ncycles = atoi(optarg); ncycles = atoi(optarg);
break; break;
case 'u':
packetnum = atoi(optarg);
break;
case '?': case '?':
default: default:
cerr << "? argumnet" << endl; cerr << "? argumnet" << endl;
...@@ -345,7 +352,6 @@ int main(int argc, char* argv[]) ...@@ -345,7 +352,6 @@ int main(int argc, char* argv[])
Poco::Net::SocketAddress sa(s_host, port); Poco::Net::SocketAddress sa(s_host, port);
udp->connect(sa); udp->connect(sa);
size_t packetnum = 0;
UniSetUDP::UDPPacket s_buf; UniSetUDP::UDPPacket s_buf;
...@@ -358,7 +364,7 @@ int main(int argc, char* argv[]) ...@@ -358,7 +364,7 @@ int main(int argc, char* argv[])
{ {
mypack.num = packetnum++; mypack.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum) // при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1" // пакет опять должен иметь номер "1"
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment