Commit 5daf1563 authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

(unet): processing optimization (crc cache)

parent b1bf6785
......@@ -26,12 +26,15 @@
static bool HostIsBigEndian = false;
#define LE_TO_H(x) {}
#define LE32_TO_H(x) {}
#define LE16_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX
#define LE_TO_H(x) x = le64toh(x)
#define LE32_TO_H(x) x = le32toh(x)
#define LE16_TO_H(x) x = le16toh(x)
#elif INTPTR_MAX == INT32_MAX
#define LE_TO_H(x) x = le32toh(x)
#define LE32_TO_H(x) x = le32toh(x)
#define LE16_TO_H(x) x = le16toh(x)
#else
#error UNET(LE_TO_H): Unknown byte order or size of pointer
#endif
......@@ -40,12 +43,15 @@ static bool HostIsBigEndian = false;
static bool HostIsBigEndian = true;
#define BE_TO_H(x) {}
#define BE32_TO_H(x) {}
#define BE16_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x)
#define BE32_TO_H(x) x = be32toh(x)
#define BE16_TO_H(x) x = be16toh(x)
#elif INTPTR_MAX == INT32_MAX
#define BE_TO_H(x) x = be32toh(x)
#define BE32_TO_H(x) x = be32toh(x)
#define BE16_TO_H(x) x = be16toh(x)
#else
#error UNET(BE_TO_H): Unknown byte order or size of pointer
#endif
......@@ -139,7 +145,9 @@ namespace uniset
<< " procID=" << p.procID
<< " dcount=" << p.dcount
<< " acount=" << p.acount
<< " pnum=" << p.num;
<< " pnum=" << p.num
<< " dcrc=" << p.dcrc
<< " acrc=" << p.acrc;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader* p )
......@@ -256,9 +264,6 @@ namespace uniset
// -----------------------------------------------------------------------------
long UDPMessage::getDataID() const noexcept
{
// в качестве идентификатора берётся ID первого датчика в данных
// приоритет имеет аналоговые датчики
if( header.acount > 0 )
return a_dat[0].id;
......@@ -287,6 +292,8 @@ namespace uniset
BE_TO_H(header.nodeID);
BE_TO_H(header.dcount);
BE_TO_H(header.acount);
BE16_TO_H(header.dcrc);
BE16_TO_H(header.acrc);
}
else if( !be_order && HostIsBigEndian )
{
......@@ -296,6 +303,8 @@ namespace uniset
LE_TO_H(header.nodeID);
LE_TO_H(header.dcount);
LE_TO_H(header.acount);
LE16_TO_H(header.dcrc);
LE16_TO_H(header.acrc);
}
// set host byte order
......@@ -339,15 +348,25 @@ namespace uniset
}
}
// -----------------------------------------------------------------------------
uint16_t UDPMessage::getDataCRC() const noexcept
void UDPMessage::updatePacketCrc() noexcept
{
uint16_t crc[3];
crc[0] = makeCRC( (unsigned char*)(a_dat), sizeof(a_dat) );
crc[1] = makeCRC( (unsigned char*)(d_id), sizeof(d_id) );
crc[2] = makeCRC( (unsigned char*)(d_dat), sizeof(d_dat) );
header.dcrc = calcDcrc();
header.acrc = calcAcrc();
}
// -----------------------------------------------------------------------------
uint16_t UDPMessage::calcDcrc() const noexcept
{
uint16_t crc[2];
crc[0] = makeCRC( (unsigned char*)(d_id), sizeof(d_id) );
crc[1] = makeCRC( (unsigned char*)(d_dat), sizeof(d_dat) );
return makeCRC( (unsigned char*)(&crc), sizeof(crc) );
}
// -----------------------------------------------------------------------------
uint16_t UDPMessage::calcAcrc() const noexcept
{
return makeCRC( (unsigned char*)(&a_dat), sizeof(a_dat) );
}
// -----------------------------------------------------------------------------
UDPHeader::UDPHeader() noexcept
: magic(UNETUDP_MAGICNUM)
#if __BYTE_ORDER == __LITTLE_ENDIAN
......@@ -362,6 +381,8 @@ namespace uniset
, procID(0)
, dcount(0)
, acount(0)
, dcrc(0)
, acrc(0)
{}
// -----------------------------------------------------------------------------
......
......@@ -50,11 +50,13 @@ namespace uniset
UDPHeader() noexcept;
uint32_t magic;
uint8_t _be_order; // 1 - BE byte order, 0 - LE byte order
size_t num;
size_t num; // порядковый номер сообщения
int64_t nodeID;
int64_t procID;
size_t dcount; /*!< количество булевых величин */
size_t acount; /*!< количество аналоговых величин */
uint16_t dcrc; /*!< crc по дискретным датчикам */
uint16_t acrc; /*!< crc по аналоговым датчикам */
} __attribute__((packed));
std::ostream& operator<<( std::ostream& os, UDPHeader& p );
......@@ -134,7 +136,9 @@ namespace uniset
return header.acount;
}
uint16_t getDataCRC() const noexcept;
uint16_t calcDcrc() const noexcept;
uint16_t calcAcrc() const noexcept;
void updatePacketCrc() noexcept;
UDPHeader header;
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
......
......@@ -79,6 +79,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
int recvBufferSize = conf->getArgPInt("--" + prefix + "-recv-buffer-size", it.getProp("recvBufferSize"), 100);
bool recvIgnoreCrc = conf->getArgPInt("--" + prefix + "-recv-ignore-crc", it.getProp("recvIgnoreCRC"), 0);
int recvMaxReceiveCount = conf->getArgPInt("--" + prefix + "-recv-max-at-time", it.getProp("recvMaxAtTime"), 5);
const string unet_transport = conf->getArg2Param("--" + prefix + "-transport", it.getProp("transport"), "broadcast");
......@@ -133,6 +134,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r.r1->setMaxDifferens(maxDiff);
r.r1->setBufferSize(recvBufferSize);
r.r1->setMaxReceiveAtTime(recvMaxReceiveCount);
r.r1->setIgnoreCRC(recvIgnoreCrc);
}
if( r.r2 )
......@@ -147,6 +149,7 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r.r2->setMaxDifferens(maxDiff);
r.r2->setBufferSize(recvBufferSize);
r.r2->setMaxReceiveAtTime(recvMaxReceiveCount);
r.r2->setIgnoreCRC(recvIgnoreCrc);
}
}
......@@ -610,6 +613,7 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-recv-buffer-size sz - Размер циклического буфера для приёма сообщений. По умолчанию: 100" << endl;
cout << "--prefix-recv-max-at-time num - Максимальное количество сообщений вычитываемых из сети за один раз. По умолчанию: 5" << endl;
cout << "--prefix-recv-ignore-crc [0,1] - Отключить оптимизацию по проверке crc, обновлять данные в SM всегда. По умолчанию: 0" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
......
......@@ -60,6 +60,7 @@ UNetReceiver::UNetReceiver(std::unique_ptr<UNetReceiveTransport>&& _transport
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
evForceUpdate.set<UNetReceiver, &UNetReceiver::onForceUpdate>(this);
ptLostTimeout.setTiming(lostTimeout);
ptRecvTimeout.setTiming(recvTimeout);
......@@ -78,6 +79,11 @@ void UNetReceiver::setBufferSize( size_t sz ) noexcept
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::setIgnoreCRC( bool set ) noexcept
{
ignoreCRC = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxReceiveAtTime( size_t sz ) noexcept
{
if( sz > 0 )
......@@ -195,6 +201,7 @@ bool UNetReceiver::createConnection( bool throwEx )
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evForceUpdate.set<UNetReceiver, &UNetReceiver::onForceUpdate>(this);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
......@@ -250,6 +257,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evInitPause.set(eloop);
evUpdate.set(eloop);
evUpdate.start( 0, ((float)updatepause / 1000.) );
evForceUpdate.set(eloop);
evForceUpdate.start();
if( !transport->isConnected() )
{
......@@ -285,14 +294,16 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
if( evUpdate.is_active() )
evUpdate.stop();
if( evForceUpdate.is_active() )
evForceUpdate.stop();
transport->disconnect();
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept
{
// сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обработать заново последний пакет и обновить данные в SM (см. update)
rnum = wnum - 1;
if( evForceUpdate.is_active() )
evForceUpdate.send();
}
// -----------------------------------------------------------------------------
void UNetReceiver::statisticsEvent(ev::periodic& tm, int revents) noexcept
......@@ -314,6 +325,33 @@ void UNetReceiver::statisticsEvent(ev::periodic& tm, int revents) noexcept
tm.again();
}
// -----------------------------------------------------------------------------
void UNetReceiver::onForceUpdate( ev::async& watcher, int revents ) noexcept
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(onForceUpdate): EVENT ERROR.." << endl;
return;
}
// ещё не было пакетов
if( wnum == 1 && rnum == 0 )
return;
// сбрасываем кэш
for( auto&& c : d_icache_map )
c.second.crc = 0;
for( auto&& c : a_icache_map )
c.second.crc = 0;
// сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обработать заново последний пакет и обновить данные в SM (см. update)
if( rnum > 0 )
rnum--;
update();
}
// -----------------------------------------------------------------------------
void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept
{
if( EV_ERROR & revents )
......@@ -390,76 +428,92 @@ void UNetReceiver::update() noexcept
continue;
// Обработка дискретных
auto d_iv = getDCache(p);
auto dcache = getDCache(p);
for( size_t i = 0; i < p->header.dcount; i++ )
if( p->header.dcrc == 0 || dcache->crc != p->header.dcrc || ignoreCRC )
{
try
dcache->crc = p->header.dcrc;
for( size_t i = 0; i < p->header.dcount; i++ )
{
s_id = p->dID(i);
c_it = &(*d_iv)[i];
try
{
s_id = p->dID(i);
c_it = &(dcache->items[i]);
if (c_it->id != s_id)
{
unetwarn << myname << "(update): reinit dcache for sid=" << s_id << endl;
c_it->id = s_id;
shm->initIterator(c_it->ioit);
}
if( c_it->id != s_id )
shm->localSetValue(c_it->ioit, s_id, p->dValue(i), shm->ID());
}
catch (const uniset::Exception& ex)
{
unetwarn << myname << "(update): reinit dcache for sid=" << s_id << endl;
c_it->id = s_id;
shm->initIterator(c_it->ioit);
// сбрасываем crc, т.к. данные не удалось успешно обновить
dcache->crc = 0;
unetcrit << myname << "(update): D:"
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: " << ex
<< std::endl;
}
catch (...)
{
// сбрасываем crc, т.к. данные не удалось успешно обновить
dcache->crc = 0;
unetcrit << myname << "(update): D:"
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: catch..."
<< std::endl;
}
shm->localSetValue(c_it->ioit, s_id, p->dValue(i), shm->ID());
}
catch( const uniset::Exception& ex)
{
unetcrit << myname << "(update): D:"
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: " << ex
<< std::endl;
}
catch(...)
{
unetcrit << myname << "(update): D:"
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: catch..."
<< std::endl;
}
}
// Обработка аналоговых
auto a_iv = getACache(p);
auto acache = getACache(p);
for( size_t i = 0; i < p->header.acount; i++ )
if( p->header.acrc == 0 || acache->crc != p->header.acrc || ignoreCRC )
{
try
acache->crc = p->header.acrc;
for( size_t i = 0; i < p->header.acount; i++ )
{
dat = &p->a_dat[i];
c_it = &(*a_iv)[i];
try
{
dat = &p->a_dat[i];
c_it = &(acache->items)[i];
if (c_it->id != dat->id)
{
unetwarn << myname << "(update): reinit acache for sid=" << dat->id << endl;
c_it->id = dat->id;
shm->initIterator(c_it->ioit);
}
if( c_it->id != dat->id )
shm->localSetValue(c_it->ioit, dat->id, dat->val, shm->ID());
}
catch (const uniset::Exception& ex)
{
unetwarn << myname << "(update): reinit acache for sid=" << dat->id << endl;
c_it->id = dat->id;
shm->initIterator(c_it->ioit);
acache->crc = 0;
unetcrit << myname << "(update): A:"
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: " << ex
<< std::endl;
}
catch (...)
{
acache->crc = 0;
unetcrit << myname << "(update): A:"
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: catch..."
<< std::endl;
}
shm->localSetValue(c_it->ioit, dat->id, dat->val, shm->ID());
}
catch( const uniset::Exception& ex)
{
unetcrit << myname << "(update): A:"
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: " << ex
<< std::endl;
}
catch(...)
{
unetcrit << myname << "(update): A:"
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: catch..."
<< std::endl;
}
}
}
......@@ -727,44 +781,46 @@ void UNetReceiver::initIterators() noexcept
{
for( auto mit = d_icache_map.begin(); mit != d_icache_map.end(); ++mit )
{
CacheVec& d_icache = mit->second;
CacheInfo& d_icache = mit->second;
for( auto&& it : d_icache )
for( auto&& it : d_icache.items )
shm->initIterator(it.ioit);
}
for( auto mit = a_icache_map.begin(); mit != a_icache_map.end(); ++mit )
{
CacheVec& a_icache = mit->second;
CacheInfo& a_icache = mit->second;
for( auto&& it : a_icache )
for( auto&& it : a_icache.items )
shm->initIterator(it.ioit);
}
}
// -----------------------------------------------------------------------------
UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) noexcept
UNetReceiver::CacheInfo* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) noexcept
{
auto dID = pack->getDataID();
auto dit = d_icache_map.find(dID);
if( dit == d_icache_map.end() )
{
auto p = d_icache_map.emplace(dID, UNetReceiver::CacheVec());
auto p = d_icache_map.emplace(dID, UNetReceiver::CacheInfo());
dit = p.first;
}
CacheVec* d_info = &dit->second;
CacheInfo* d_info = &dit->second;
if( pack->header.dcount == d_info->size() )
return d_info;
if( pack->header.dcount == d_info->items.size() )
return &dit->second;
unetinfo << myname << ": init dcache[" << pack->header.dcount << "] for " << dID << endl;
unetinfo << myname << ": init dcache[" << pack->header.dcount << "] for dataID=" << dID << endl;
d_info->resize(pack->header.dcount);
d_info->items.resize(pack->header.dcount);
d_info->crc = 0;
cacheMissed++;
for( size_t i = 0; i < pack->header.dcount; i++ )
{
CacheItem& d = (*d_info)[i];
CacheItem& d = d_info->items[i];
if( d.id != pack->d_id[i] )
{
......@@ -776,29 +832,31 @@ UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) n
return d_info;
}
// -----------------------------------------------------------------------------
UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) noexcept
UNetReceiver::CacheInfo* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) noexcept
{
auto dID = pack->getDataID();
auto ait = a_icache_map.find(dID);
if( ait == a_icache_map.end() )
{
auto p = a_icache_map.emplace(dID, UNetReceiver::CacheVec());
auto p = a_icache_map.emplace(dID, UNetReceiver::CacheInfo());
ait = p.first;
}
CacheVec* a_info = &ait->second;
CacheInfo* a_info = &ait->second;
if( pack->header.acount == a_info->size() )
if( pack->header.acount == a_info->items.size() )
return a_info;
unetinfo << myname << ": init acache[" << pack->header.acount << "] for " << dID << endl;
unetinfo << myname << ": init acache[" << pack->header.acount << "] for dataID=" << dID << endl;
a_info->resize(pack->header.acount);
a_info->items.resize(pack->header.acount);
a_info->crc = 0;
cacheMissed++;
for( size_t i = 0; i < pack->header.acount; i++ )
{
CacheItem& d = (*a_info)[i];
CacheItem& d = a_info->items[i];
if( d.id != pack->a_dat[i].id )
{
......@@ -827,6 +885,7 @@ const std::string UNetReceiver::getShortInfo() const noexcept
<< " recvOK=" << isRecvOK()
<< " receivepack=" << rnum
<< " lostPackets=" << setw(6) << getLostPacketsNum()
<< " cacheMissed=" << setw(6) << cacheMissed
<< endl
<< "\t["
<< " recvTimeout=" << recvTimeout
......
......@@ -56,29 +56,36 @@ namespace uniset
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Идея проста: сделан вектор размером с количеством принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядковый номер данных в пакете является индексом в кэше.
* Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
* Если количество пришедших данных не совпадают с размером кэша - кэш обновляется.
*
* КЭШ (ДОПОЛНЕНИЕ)
* ===
* Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
* Т.к. в общем случае, данные могут быть разбиты на несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
* map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
* Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
* Кэш в map добавляется тогда, когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим он используется для этого пакета.
* В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
* т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
* т.е. на то что UNetSender не будет с течением времени менять количество отправляемых пакетов
* (работать будет, просто в map останутся лежать записи для неиспользуемых пакетов)
*
* ОПТИМИЗАЦИЯ
* ===
* В кэше так же хранится crc последних принятых данных. Если crc совпадает с тем, что пришло в пакете, то обработки не происходит.
* crc хранится отдельно для дискретных и отдельно для аналоговых датчиков.
* Эту оптимизацию можно отключить параметром --prefix-recv-ignore-crc или recvIgnoreCRC="1" в конф. файле.
*
* Обработка сбоев в номере пакетов
* =========================================================================
* Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
* то считается, что произошёл сбой или узел который посылал пакеты перезагрузился
* то считается, что произошёл сбой или узел который посылал пакеты - перезагрузился
* Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
* реинициализация и обработка продолжается с нового номера.
*
* =========================================================================
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся, пакет не обрабатываем.
*
* Создание соединения (открытие сокета)
* ======================================
......@@ -130,6 +137,7 @@ namespace uniset
void setInitPause( timeout_t msec ) noexcept;
void setBufferSize( size_t sz ) noexcept;
void setMaxReceiveAtTime( size_t sz ) noexcept;
void setIgnoreCRC( bool set ) noexcept;
void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
void setLostPacketsID( uniset::ObjectId id ) noexcept;
......@@ -179,6 +187,7 @@ namespace uniset
void updateEvent( ev::periodic& watcher, int revents ) noexcept;
void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
void onForceUpdate( ev::async& watcher, int revents ) noexcept;
void initEvent( ev::timer& watcher, int revents ) noexcept;
virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
......@@ -205,6 +214,7 @@ namespace uniset
ev::periodic evStatistic;
ev::periodic evUpdate;
ev::timer evInitPause;
ev::async evForceUpdate;
// счётчики для подсчёта статистики
size_t recvCount = { 0 };
......@@ -277,16 +287,25 @@ namespace uniset
CacheItem():
id(uniset::DefaultObjectId) {}
};
typedef std::vector<CacheItem> CacheVec;
struct CacheInfo
{
uint16_t crc;
CacheVec items;
CacheInfo(): crc(0) {}
};
// ключом является UDPMessage::getDataID()
typedef std::unordered_map<long, CacheVec> CacheMap;
typedef std::unordered_map<long, CacheInfo> CacheMap;
CacheMap d_icache_map; /*!< кэш итераторов для булевых */
CacheMap a_icache_map; /*!< кэш итераторов для аналоговых */
size_t cacheMissed; // количество промахов
bool ignoreCRC = { false }; /*!< отключение проверки crc */
CacheVec* getDCache( UniSetUDP::UDPMessage* pack ) noexcept;
CacheVec* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
CacheInfo* getDCache( UniSetUDP::UDPMessage* pack ) noexcept;
CacheInfo* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
};
// --------------------------------------------------------------------------
} // end of namespace uniset
......
......@@ -43,8 +43,7 @@ namespace uniset
packsendpause(5),
packsendpauseFactor(1),
activated(false),
packetnum(1),
lastcrc(0),
packetnum(0),
maxAData(maxACount),
maxDData(maxDCount)
{
......@@ -280,31 +279,21 @@ namespace uniset
unetinfo << "************* execute FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
// #define UNETUDP_DISABLE_OPTIMIZATION_N1
void UNetSender::real_send( PackMessage& mypack ) noexcept
{
try
{
uniset::uniset_rwmutex_rlock l(mypack.mut);
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
mypack.msg.num = packetnum++;
#else
uint16_t crc = mypack.msg.getDataCRC();
if( crc != lastcrc )
{
mypack.msg.header.num = packetnum++;
lastcrc = crc;
}
#endif
packetnum++;
// при переходе через ноль (когда счётчик перевалит через UniSetUDP::MaxPacketNum..
// делаем номер пакета "1"
if( packetnum == 0 )
packetnum = 1;
uniset::uniset_rwmutex_rlock l(mypack.mut);
mypack.msg.header.num = packetnum;
mypack.msg.updatePacketCrc();
if( !transport->isReadyForSend(writeTimeout) )
return;
......@@ -500,7 +489,6 @@ namespace uniset
<< UniSetUDP::MaxACount << endl << flush;
std::terminate();
return false;
}
}
......@@ -512,7 +500,6 @@ namespace uniset
unetcrit << myname
<< "(readItem): Sensor (" << p.id << ")" << sname << " ALREADY ADDED!! ABORT!" << endl;
std::terminate();
return false;
}
items.emplace(p.id, std::move(p));
......@@ -551,7 +538,6 @@ namespace uniset
s << setw(15) << std::right << transport->toString()
<< " lastpacknum=" << packetnum
<< " lastcrc=" << setw(6) << lastcrc
<< " items=" << items.size() << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize()
<< " packsendpause[factor=" << packsendpauseFactor << "]=" << packsendpause
<< " sendpause=" << sendpause
......@@ -568,7 +554,10 @@ namespace uniset
{
//uniset_rwmutex_rlock l(p->mut);
s << " \t\t[" << (n++) << "]=" << sizeof(pack.msg) << " bytes"
<< " ( numA=" << setw(5) << pack.msg.asize() << " numD=" << setw(5) << pack.msg.dsize() << ")"
<< " (dataID=" << setw(5) << pack.msg.getDataID()
<< " numA=" << setw(5) << pack.msg.asize()
<< " numD=" << setw(5) << pack.msg.dsize()
<< ")"
<< endl;
}
}
......
......@@ -48,12 +48,6 @@ namespace uniset
* В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
* существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
*
* ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
* Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
На стороне UNetReceiver пакеты с повторными номерами (т.е. уже обработанные) - откидываются.
*
*
* Создание соединения
* ======================================
* Попытка создать соединение производиться сразу в конструкторе, если это не получается,
......@@ -203,13 +197,13 @@ namespace uniset
typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
// mypacks заполняется в начале и дальше с ним происходит только чтение
// (меняются данные внутри пакетов, но не меняется само количество пакетов)
// поэтому mutex-ом его не защищаем
Packs mypacks;
std::unordered_map<sendfactor_t, size_t> packs_anum;
std::unordered_map<sendfactor_t, size_t> packs_dnum;
UItemMap items;
size_t packetnum = { 1 }; /*!< номер очередного посылаемого пакета */
uint16_t lastcrc = { 0 };
size_t maxAData = { UniSetUDP::MaxACount };
size_t maxDData = { UniSetUDP::MaxDCount };
......
......@@ -56,9 +56,9 @@ void InitTest()
}
}
// -----------------------------------------------------------------------------
// pnum - минималный номер ожидаемого пакета ( 0 - любой пришедщий )
// pnum - минималный номер ожидаемого пакета (0 - последний пришедщий)
// ncycle - сколько пакетов разрешено "пропустить" прежде чем дождёмся нужного. (чтобы не ждать бесконечно)
static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 2000, int ncycle = 20 )
static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 2000, int ncycle = 30 )
{
UniSetUDP::UDPMessage pack;
......@@ -69,16 +69,18 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20
size_t ret = udp_r->receiveBytes(&pack, sizeof(pack) );
if( ret <= 0 || pnum == 0 || ( pnum > 0 && pack.header.num >= pnum ) ) // -V560
if( ret <= 0 )
break;
pack.ntoh();
if( pnum > 0 && pack.header.num >= pnum ) // -V560
break;
REQUIRE( pack.header.magic == UniSetUDP::UNETUDP_MAGICNUM );
ncycle--;
}
// if( pnum > 0 && pack.num < pnum )
// return UniSetUDP::UDPMessage(); // empty message
return pack;
}
// -----------------------------------------------------------------------------
......@@ -89,6 +91,7 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
pack.header.nodeID = s_nodeID;
pack.header.procID = s_procID;
pack.header.num = s_numpack++;
pack.updatePacketCrc();
size_t ret = udp_s->sendTo(&pack, sizeof(pack), s_addr);
REQUIRE( ret == sizeof(pack) );
......@@ -197,9 +200,10 @@ TEST_CASE("[UNetUDP]: check sender", "[unetudp][udp][sender]")
REQUIRE( p.dValue(0) == true );
REQUIRE( p.dValue(1) == false );
// т.к. данные в SM не менялись, то должен придти пакет с тем же номером что и был.
// т.к. данные в SM не менялись, то должен придти пакет с теми же crc что и были
UniSetUDP::UDPMessage p2 = receive();
REQUIRE( p2.header.num == p.header.num );
REQUIRE( p2.header.dcrc == p.header.dcrc );
REQUIRE( p2.header.acrc == p.header.acrc );
}
SECTION("Test: change AI data..")
......@@ -453,7 +457,6 @@ TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][udp][sender]")
msleep(600);
UniSetUDP::UDPMessage pack = receive( pack0.header.num + 1, 2000, 40 );
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
......@@ -464,7 +467,7 @@ TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][udp][sender]")
si.node = uniset_conf()->getLocalNode();
ui->setUndefinedState(si, true, 6000 /* TestProc */ );
msleep(600);
pack = receive(pack.header.num + 1);
pack = receive();
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
......@@ -473,7 +476,7 @@ TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][udp][sender]")
ui->setUndefinedState(si, false, 6000 /* TestProc */ );
msleep(600);
pack = receive(pack.header.num + 1);
pack = receive(pack0.header.num + 1);
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
......
......@@ -100,7 +100,12 @@ static UniSetUDP::UDPMessage mreceive( unsigned int pnum = 0, timeout_t tout = 2
size_t ret = udp_r->receive(&pack, sizeof(pack) );
if( ret == 0 || pnum == 0 || ( pnum > 0 && pack.header.num >= pnum ) ) // -V560
if( ret <= 0 )
break;
pack.ntoh();
if( pnum > 0 && pack.header.num >= pnum ) // -V560
break;
REQUIRE( pack.header.magic == UniSetUDP::UNETUDP_MAGICNUM );
......@@ -120,6 +125,7 @@ void msend( UniSetUDP::UDPMessage& pack, int tout = 2000 )
pack.header.nodeID = s_nodeID;
pack.header.procID = s_procID;
pack.header.num = s_numpack++;
pack.updatePacketCrc();
size_t ret = udp_s->send(&pack, sizeof(pack));
REQUIRE( ret == sizeof(pack) );
......@@ -144,9 +150,10 @@ TEST_CASE("[UNetUDP]: check multicast sender", "[unetudp][multicast][sender]")
REQUIRE( pack.dValue(0) == 1 );
REQUIRE( pack.dValue(1) == 0 );
// т.к. данные в SM не менялись, то должен придти пакет с тем же номером что и был..
// т.к. данные в SM не менялись, то должен придти пакет с теми же crc
UniSetUDP::UDPMessage pack2 = mreceive();
REQUIRE( pack2.header.num == pack.header.num );
REQUIRE( pack2.header.dcrc == pack.header.dcrc );
REQUIRE( pack2.header.acrc == pack.header.acrc );
}
SECTION("Test: change AI data...")
......
......@@ -10,5 +10,5 @@ cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy evloop
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy evloop
#--unet-log-add-levels any
......@@ -70,7 +70,7 @@ int main(int argc, char* argv[])
int verb = 0;
std::string addr = "";
int port = 0;
int usecpause = 2000000;
int msecpause = 200;
timeout_t tout = UniSetTimer::WaitUpTime;
int procID = 1;
int nodeID = 1;
......@@ -143,7 +143,7 @@ int main(int argc, char* argv[])
break;
case 'x':
usecpause = atoi(optarg) * 1000;
msecpause = atoi(optarg);
break;
case 'y':
......@@ -234,8 +234,7 @@ int main(int argc, char* argv[])
else
cout << tout;
cout << " msecpause=" << usecpause / 1000
<< endl;
cout << " msecpause=" << msecpause << endl;
}
......@@ -468,7 +467,7 @@ int main(int argc, char* argv[])
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
std::this_thread::sleep_for(std::chrono::milliseconds(msecpause));
}
}
break;
......
......@@ -67,7 +67,7 @@ int main(int argc, char* argv[])
int verb = 0;
std::string addr = "";
int port = 0;
int usecpause = 200000;
int msecpause = 200;
timeout_t tout = UniSetTimer::WaitUpTime;
bool broadcast = true;
int procID = 1;
......@@ -134,7 +134,7 @@ int main(int argc, char* argv[])
break;
case 'x':
usecpause = atoi(optarg) * 1000;
msecpause = atoi(optarg);
break;
case 'y':
......@@ -211,8 +211,7 @@ int main(int argc, char* argv[])
else
cout << tout;
cout << " msecpause=" << usecpause / 1000
<< endl;
cout << " msecpause=" << msecpause << endl;
}
......@@ -357,6 +356,8 @@ int main(int argc, char* argv[])
mypack.addDData(i, i);
}
// mypack.updatePacketCrc();
Poco::Net::SocketAddress sa(s_host, port);
udp->connect(sa);
......@@ -405,7 +406,7 @@ int main(int argc, char* argv[])
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
std::this_thread::sleep_for(std::chrono::milliseconds(msecpause));
}
}
break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment