Commit 5407c653 authored by Pavel Vainerman's avatar Pavel Vainerman

(unet): receiver refactoring: remove "thread strategy",

used circular buffer instead priority_queue and some other minor optimizations.
parent 7f295f76
......@@ -23,6 +23,7 @@
// myvar = LE_TO_H(myvar)
// -------------------------------------------------------------------------
#if __BYTE_ORDER == __LITTLE_ENDIAN
static bool HostIsBigEndian = false;
#define LE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX
#define LE_TO_H(x) x = le64toh(x)
......@@ -33,6 +34,7 @@
#endif
#if __BYTE_ORDER == __BIG_ENDIAN
static bool HostIsBigEndian = true;
#define BE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x)
......@@ -253,7 +255,7 @@ namespace uniset
// -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p ) const noexcept
{
memset(&p, 0, sizeof(UDPPacket));
p = UDPPacket{}; // reset data
size_t i = 0;
memcpy(&(p.data[i]), this, sizeof(UDPHeader));
......@@ -311,7 +313,8 @@ namespace uniset
// -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p ) noexcept
{
memset(&m, 0, sizeof(m));
// reset data
m = UDPMessage{};
size_t i = 0;
memcpy(&m, &(p.data[i]), sizeof(UDPHeader));
......@@ -320,7 +323,7 @@ namespace uniset
// byte order from packet
u_char be_order = m._be_order;
if( be_order )
if( be_order && !HostIsBigEndian )
{
BE_TO_H(m.magic);
BE_TO_H(m.num);
......@@ -329,7 +332,7 @@ namespace uniset
BE_TO_H(m.dcount);
BE_TO_H(m.acount);
}
else
else if( !be_order && HostIsBigEndian )
{
LE_TO_H(m.magic);
LE_TO_H(m.num);
......@@ -385,6 +388,8 @@ namespace uniset
// CONVERT DATA TO HOST BYTE ORDER
// -------------------------------
if( (be_order && !HostIsBigEndian) || (!be_order && HostIsBigEndian) )
{
for( size_t n = 0; n < m.acount; n++ )
{
if( be_order )
......@@ -410,6 +415,7 @@ namespace uniset
LE_TO_H(m.d_id[n]);
}
}
}
return i + sz;
}
......
......@@ -84,9 +84,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout", it.getProp("lostTimeout"), 2 * updatepause);
steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100);
int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount", it.getProp("maxProcessingCount"), 100);
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
int recvBufferSize = conf->getArgPInt("--" + prefix + "-recv-buffer-size", it.getProp("recvBufferSize"), 3000);
std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop");
......@@ -333,16 +333,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
}
}
UNetReceiver::UpdateStrategy r_upStrategy = UNetReceiver::strToUpdateStrategy( n_it.getProp2("unet_update_strategy", updateStrategy) );
if( r_upStrategy == UNetReceiver::useUpdateUnknown )
{
ostringstream err;
err << myname << ": Unknown update strategy!!! '" << n_it.getProp2("unet_update_strategy", updateStrategy) << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix);
......@@ -361,11 +351,10 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r->setCheckConnectionPause(checkConnectionPause);
r->setInitPause(initpause);
r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r->setUpdateStrategy(r_upStrategy);
r->setBufferSize(recvBufferSize);
shared_ptr<UNetReceiver> r2(nullptr);
......@@ -391,11 +380,10 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r2->setCheckConnectionPause(checkConnectionPause);
r2->setInitPause(initpause);
r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r2->setUpdateStrategy(r_upStrategy);
r2->setBufferSize(recvBufferSize);
}
}
catch(...)
......@@ -547,7 +535,7 @@ void UNetExchange::step() noexcept
}
}
for( auto && it : recvlist )
for( auto&& it : recvlist )
it.step(shm, myname, unetlog);
}
......@@ -755,10 +743,10 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
void UNetExchange::sensorInfo( const uniset::SensorMessage* sm )
{
if( sender )
sender->updateSensor( sm->id , sm->value );
sender->updateSensor( sm->id, sm->value );
if( sender2 )
sender2->updateSensor( sm->id , sm->value );
sender2->updateSensor( sm->id, sm->value );
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
......@@ -839,7 +827,7 @@ void UNetExchange::initIterators() noexcept
if( sender2 )
sender2->initIterators();
for( auto && it : recvlist )
for( auto&& it : recvlist )
it.initIterators(shm);
}
// -----------------------------------------------------------------------------
......@@ -855,13 +843,8 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - Максимальное количество пакетов обрабатываемых за один раз (если их слишком много)" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-update-strategy [thread,evloop] - Стратегия обновления данных в SM. " << endl;
cout << " 'thread' - у каждого UNetReceiver отдельный поток" << endl;
cout << " 'evloop' - используется общий (с приёмом сообщений) event loop" << endl;
cout << " По умолчанию: evloop" << endl;
cout << "--prefix-recv-buffer-size sz - Размер циклического буфера для приёма сообщений. По умолчанию: 3000" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
......@@ -913,7 +896,7 @@ std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const ch
// -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept
{
for( auto && it : recvlist )
for( auto&& it : recvlist )
{
if( it.r1 == r )
{
......
......@@ -58,7 +58,6 @@ namespace uniset
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
"получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM.
При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver).
или каждый на своём потоке. Это определяется параметром \b unet_update_strategy.
\par
При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать",
......@@ -78,15 +77,13 @@ namespace uniset
...
</iocards>
</item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_update_strategy="evloop"/>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes>
\endcode
* \b unet_update_strategy - задаёт стратегию обновления данных в SM.
Поддерживается два варианта:
- 'thread' - отдельный поток обновления
- 'evloop' - использование общего с приёмом event loop (libev)
Буфер для приёма сообщений можно настроить параметром \b recvBufferSize="1000" в конфигурационной секции
или аргументом командной строки \b --prefix-recv-buffer-size sz
\note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра
--prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes>
......
......@@ -15,6 +15,7 @@
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <cmath>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "unisetstd.h"
......@@ -28,19 +29,8 @@ using namespace uniset;
using namespace uniset::extensions;
// -----------------------------------------------------------------------------
CommonEventLoop UNetReceiver::loop;
static UniSetUDP::UDPAData emptyAData;
static UniSetUDP::UDPMessage emptyMessage;
// -----------------------------------------------------------------------------
/*
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.num == rhs.num )
// return (lhs < rhs);
return lhs.num > rhs.num;
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver(const std::string& s_host, int _port
, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection
......@@ -57,11 +47,8 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
respondInvert(false),
sidLostPackets(uniset::DefaultObjectId),
activated(false),
pnum(0),
cbuf(cbufSize),
maxDifferens(20),
waitClean(false),
rnum(0),
maxProcessingCount(100),
lockUpdate(false),
d_cache_init_ok(false),
a_cache_init_ok(false)
......@@ -83,20 +70,27 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
auto conf = uniset_conf();
conf->initLogStream(unetlog, prefix + "-log");
upThread = unisetstd::make_unique< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::updateThread);
if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
ptLostTimeout.setTiming(lostTimeout);
ptRecvTimeout.setTiming(recvTimeout);
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
{
}
// -----------------------------------------------------------------------------
void UNetReceiver::setBufferSize( size_t sz ) noexcept
{
cbufSize = sz;
cbuf.resize(sz);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept
{
std::lock_guard<std::mutex> l(tmMutex);
......@@ -128,15 +122,10 @@ void UNetReceiver::setUpdatePause( timeout_t msec ) noexcept
{
updatepause = msec;
if( upStrategy == useUpdateEventLoop && evUpdate.is_active() )
if( evUpdate.is_active() )
evUpdate.start(0, (float)updatepause / 1000.);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set ) noexcept
{
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxDifferens( unsigned long set ) noexcept
{
maxDifferens = set;
......@@ -211,8 +200,6 @@ bool UNetReceiver::createConnection( bool throwEx )
udp = unisetstd::make_unique<UDPReceiveU>(addr, port);
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
if( upStrategy == useUpdateEventLoop )
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() )
......@@ -264,9 +251,6 @@ void UNetReceiver::start()
std::terminate();
return;
}
if( upStrategy == useUpdateThread && !upThread->isRunning() )
upThread->start();
}
else
forceUpdate();
......@@ -278,12 +262,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evStatistic.start(0, 1.0); // раз в сек
evInitPause.set(eloop);
if( upStrategy == useUpdateEventLoop )
{
evUpdate.set(eloop);
evUpdate.start( 0, ((float)updatepause / 1000.) );
}
if( !udp )
{
......@@ -325,8 +305,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept
{
pack_guard l(packMutex, upStrategy);
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
rnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. update)
}
// -----------------------------------------------------------------------------
......@@ -363,98 +342,69 @@ void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept
tmr.stop();
}
// -----------------------------------------------------------------------------
size_t UNetReceiver::rnext( size_t num )
{
UniSetUDP::UDPMessage* p;
size_t i = num + 1;
while( i < wnum )
{
p = &cbuf[i % cbufSize];
if( p->num > num )
return i;
i++;
}
return wnum;
}
// -----------------------------------------------------------------------------
void UNetReceiver::update() noexcept
{
UniSetUDP::UDPMessage p;
// ещё не было пакетов
if( wnum == 1 && rnum == 0 )
return;
UniSetUDP::UDPMessage* p;
CacheItem* c_it = nullptr;
UniSetUDP::UDPAData* dat = nullptr;
long s_id;
// обрабатываем, пока очередь либо не опустеет,
// либо обнаружится "дырка" в последовательности,
// но при этом обрабатываем не больше maxProcessingCount
// за один раз..
int k = maxProcessingCount;
while( k > 0 )
{
while( rnum < wnum )
{
// lock qpack
pack_guard l(packMutex, upStrategy);
if( qpack.empty() )
return;
p = qpack.top();
size_t sub = labs( (long int)(p.num - pnum) );
p = &cbuf[rnum % cbufSize];
if( pnum > 0 )
// если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
if( p->num != rnum )
{
// если sub > maxDifferens
// значит это просто "разрыв"
// и нам ждать lostTimeout не надо
// сразу начинаем обрабатывать новые пакеты
// а если > 1 && < maxDifferens
// значит это временная "дырка"
// и надо подождать lostTimeout
// чтобы констатировать потерю пакета..
if( sub > 1 && sub < maxDifferens )
{
// если p.num < pnum, то это какой-то "дубль",
// т.к мы все пакеты <= pnum уже "отработали".
// а значит можно не ждать, а откидывать пакет и
// дальше работать..
if( p.num < pnum )
{
qpack.pop();
continue;
}
if( !ptLostTimeout.checkTime() )
return;
unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p.num << " lost " << sub << " packets " << endl;
size_t sub = (p->num - rnum);
unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p->num << " lost " << sub << " packets " << endl;
lostPackets += sub;
}
else if( p.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
* для "оптимизации".. лучше игнорировать
*/
qpack.pop(); // пока выбрали вариант "оптимизации" (выкидываем из очереди и идём дальше)
continue;
}
if( sub >= maxDifferens )
{
// считаем сколько пакетов потеряли.. (pnum=0 - означает что мы только что запустились...)
if( pnum != 0 && p.num > pnum )
{
lostPackets += sub;
unetwarn << myname << "(update): sub=" << sub << " > maxDifferenst(" << maxDifferens << ")! lost " << sub << " packets " << endl;
}
}
// ищем следующий пакет для обработки
rnum = rnext(rnum);
continue;
}
ptLostTimeout.reset();
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.num;
} // unlock qpack
k--;
rnum++;
upCount++;
// Обработка дискретных
auto d_iv = getDCache(p, !d_cache_init_ok);
for( size_t i = 0; i < p.dcount; i++ )
for( size_t i = 0; i < p->dcount; i++ )
{
try
{
s_id = p.dID(i);
s_id = p->dID(i);
c_it = &d_iv.cache[i];
if( c_it->id != s_id )
......@@ -468,25 +418,34 @@ void UNetReceiver::update() noexcept
if( lockUpdate )
continue;
shm->localSetValue(c_it->ioit, s_id, p.dValue(i), shm->ID());
shm->localSetValue(c_it->ioit, s_id, p->dValue(i), shm->ID());
}
catch( const uniset::Exception& ex)
{
unetcrit << myname << "(update): " << ex << std::endl;
unetcrit << myname << "(update): "
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: " << ex
<< std::endl;
}
catch(...)
{
unetcrit << myname << "(update): catch ..." << std::endl;
unetcrit << myname << "(update): "
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: catch..."
<< std::endl;
}
}
// Обработка аналоговых
auto a_iv = getACache(p, !a_cache_init_ok);
for( size_t i = 0; i < p.acount; i++ )
for( size_t i = 0; i < p->acount; i++ )
{
try
{
dat = &p.a_dat[i];
dat = &p->a_dat[i];
c_it = &a_iv.cache[i];
if( c_it->id != dat->id )
......@@ -504,61 +463,21 @@ void UNetReceiver::update() noexcept
}
catch( const uniset::Exception& ex)
{
unetcrit << myname << "(update): " << ex << std::endl;
unetcrit << myname << "(update): "
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: " << ex
<< std::endl;
}
catch(...)
{
unetcrit << myname << "(update): catch ..." << std::endl;
}
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::updateThread() noexcept
{
while( activated )
{
try
{
update();
}
catch( std::exception& ex )
{
unetcrit << myname << "(update_thread): " << ex.what() << endl;
}
// смотрим есть ли связь..
checkConnection();
if( sidRespond != DefaultObjectId )
{
try
{
if( isInitOK() )
{
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (respond) " << ex.what() << std::endl;
}
}
if( sidLostPackets != DefaultObjectId )
{
try
{
shm->localSetValue(itLostPackets, sidLostPackets, getLostPacketsNum(), shm->ID());
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (lostPackets) " << ex.what() << std::endl;
unetcrit << myname << "(update): "
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: catch..."
<< std::endl;
}
}
msleep(updatepause);
}
}
// -----------------------------------------------------------------------------
......@@ -702,7 +621,6 @@ void UNetReceiver::stop()
{
unetinfo << myname << ": stop.." << endl;
activated = false;
upThread->join();
loop.evstop(this);
}
// -----------------------------------------------------------------------------
......@@ -712,7 +630,6 @@ bool UNetReceiver::receive() noexcept
{
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data));
recvCount++;
//ssize_t ret = udp->receiveFrom(r_buf.data, sizeof(r_buf.data),saddr);
if( ret < 0 )
{
......@@ -726,98 +643,76 @@ bool UNetReceiver::receive() noexcept
return false;
}
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf);
// сперва пробуем сохранить пакет в том месте, которе должно быть очередным на запись
pack = &cbuf[wnum % cbufSize];
size_t sz = UniSetUDP::UDPMessage::getMessage(*pack, r_buf);
if( sz == 0 )
{
unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
return false;
}
}
catch( Poco::Net::NetException& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
return false;
}
catch( exception& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
return false;
}
if( pack.magic != UniSetUDP::UNETUDP_MAGICNUM )
{
// пакет не нашей "системы"
if( pack->magic != UniSetUDP::UNETUDP_MAGICNUM )
return false;
}
if( rnum > 0 && labs( (long int)(pack.num - rnum) ) > maxDifferens )
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
* А можно породолжать складывать во временную, но тогда есть риск "никогда" не разгрести временную
* очередь, при "частых обрывах". Потому-что update будет на каждом разрыве ждать ещё lostTimeout..
*/
// Пока выбираю.. чистить qtmp. Это будет соотвествовать логике работы с картами у которых ограничен буфер приёма.
// Обычно "кольцевой". Т.е. если не успели обработать и "вынуть" из буфера информацию.. он будет переписан новыми данными
if( waitClean )
if( abs(long(pack->num - wnum)) > maxDifferens || abs( long(wnum - rnum) ) >= (cbufSize - 2) )
{
unetcrit << myname << "(receive): reset qtmp.." << endl;
unetcrit << myname << "(receive): DISAGREE "
<< " packnum=" << pack->num
<< " wnum=" << wnum
<< " rnum=" << rnum
<< " (maxDiff=" << maxDifferens
<< " indexDiff=" << abs( long(wnum - rnum) )
<< ")"
<< endl;
lostPackets = pack->num > wnum ? (pack->num - wnum - 1) : lostPackets + 1;
// реинициализируем позицию для чтения
rnum = pack->num;
wnum = pack->num + 1;
while( !qtmp.empty() )
qtmp.pop();
// перемещаем пакет в нужное место (если требуется)
if( wnum != pack->num )
{
cbuf[pack->num % cbufSize] = (*pack);
pack->num = 0;
}
waitClean = true;
return true;
}
rnum = pack.num;
if( pack->num != wnum )
{
// перемещаем пакет в правильное место
// в соответствии с его номером
cbuf[pack->num % cbufSize] = (*pack);
#if 0
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
<< " header: " << pack.msg.header
<< " waitClean=" << waitClean
<< endl;
if( pack->num >= wnum )
wnum = pack->num + 1;
for( size_t i = 0; i < pack.msg.header.dcount; i++ )
{
UniSetUDP::UDPData& d = pack.msg.dat[i];
cerr << "****** save id=" << d.id << " val=" << d.val << endl;
// обнуляем номер в том месте где записали, чтобы его не обрабатывал update
pack->num = 0;
}
else if( pack->num >= wnum )
wnum = pack->num + 1;
#endif
// начальная инициализация для чтения
if( rnum == 0 )
rnum = pack->num;
{
// lock qpack
pack_guard l(packMutex, upStrategy);
if( !waitClean )
{
qpack.push(pack);
return true;
}
if( !qpack.empty() )
catch( Poco::Net::NetException& ex )
{
qtmp.push(pack);
unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
}
else
{
// основная очередь освободилась..
// копируем в неё всё что набралось в qtmp...
while( !qtmp.empty() )
catch( exception& ex )
{
qpack.push(qtmp.top());
qtmp.pop();
}
// не забываем и текущий поместить в очередь..
qpack.push(pack);
waitClean = false;
unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
}
} // unlock qpack
return true;
return false;
}
// -----------------------------------------------------------------------------
void UNetReceiver::initIterators() noexcept
......@@ -826,7 +721,7 @@ void UNetReceiver::initIterators() noexcept
{
CacheVec& d_icache(mit->second.cache);
for( auto && it : d_icache )
for( auto&& it : d_icache )
shm->initIterator(it.ioit);
}
......@@ -834,20 +729,20 @@ void UNetReceiver::initIterators() noexcept
{
CacheVec& a_icache(mit->second.cache);
for( auto && it : a_icache )
for( auto&& it : a_icache )
shm->initIterator(it.ioit);
}
}
// -----------------------------------------------------------------------------
UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, bool force ) noexcept
UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack, bool force ) noexcept
{
// если элемента нет, он будет создан
CacheInfo& d_info = d_icache_map[pack.getDataID()];
CacheInfo& d_info = d_icache_map[pack->getDataID()];
if( !force && pack.dcount == d_info.cache.size() )
if( !force && pack->dcount == d_info.cache.size() )
return d_info;
if( d_info.cache_init_ok && pack.dcount == d_info.cache.size() )
if( d_info.cache_init_ok && pack->dcount == d_info.cache.size() )
{
d_cache_init_ok = true;
auto it = d_icache_map.begin();
......@@ -864,10 +759,10 @@ UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, b
return d_info;
}
unetinfo << myname << ": init dcache for " << pack.getDataID() << endl;
unetinfo << myname << ": init dcache for " << pack->getDataID() << endl;
d_info.cache_init_ok = true;
d_info.cache.resize(pack.dcount);
d_info.cache.resize(pack->dcount);
size_t sz = d_info.cache.size();
auto conf = uniset_conf();
......@@ -876,9 +771,9 @@ UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, b
{
CacheItem& d(d_info.cache[i]);
if( d.id != pack.d_id[i] )
if( d.id != pack->d_id[i] )
{
d.id = pack.d_id[i];
d.id = pack->d_id[i];
shm->initIterator(d.ioit);
}
}
......@@ -886,15 +781,15 @@ UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage& pack, b
return d_info;
}
// -----------------------------------------------------------------------------
UNetReceiver::CacheInfo& UNetReceiver::getACache( UniSetUDP::UDPMessage& pack, bool force ) noexcept
UNetReceiver::CacheInfo& UNetReceiver::getACache( UniSetUDP::UDPMessage* pack, bool force ) noexcept
{
// если элемента нет, он будет создан
CacheInfo& a_info = a_icache_map[pack.getDataID()];
CacheInfo& a_info = a_icache_map[pack->getDataID()];
if( !force && pack.acount == a_info.cache.size() )
if( !force && pack->acount == a_info.cache.size() )
return a_info;
if( a_info.cache_init_ok && pack.acount == a_info.cache.size() )
if( a_info.cache_init_ok && pack->acount == a_info.cache.size() )
{
a_cache_init_ok = true;
auto it = a_icache_map.begin();
......@@ -911,11 +806,11 @@ UNetReceiver::CacheInfo& UNetReceiver::getACache( UniSetUDP::UDPMessage& pack, b
return a_info;
}
unetinfo << myname << ": init icache for " << pack.getDataID() << endl;
unetinfo << myname << ": init icache for " << pack->getDataID() << endl;
a_info.cache_init_ok = true;
auto conf = uniset_conf();
a_info.cache.resize(pack.acount);
a_info.cache.resize(pack->acount);
size_t sz = a_info.cache.size();
......@@ -923,9 +818,9 @@ UNetReceiver::CacheInfo& UNetReceiver::getACache( UniSetUDP::UDPMessage& pack, b
{
CacheItem& d(a_info.cache[i]);
if( d.id != pack.a_dat[i].id )
if( d.id != pack->a_dat[i].id )
{
d.id = pack.a_dat[i].id;
d.id = pack->a_dat[i].id;
shm->initIterator(d.ioit);
}
}
......@@ -938,49 +833,6 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept
slEvent = sl;
}
// -----------------------------------------------------------------------------
UNetReceiver::UpdateStrategy UNetReceiver::strToUpdateStrategy( const string& s ) noexcept
{
if( s == "thread" || s == "THREAD" )
return useUpdateThread;
if( s == "evloop" || s == "EVLOOP" )
return useUpdateEventLoop;
return useUpdateUnknown;
}
// -----------------------------------------------------------------------------
string UNetReceiver::to_string( UNetReceiver::UpdateStrategy s ) noexcept
{
if( s == useUpdateThread )
return "thread";
if( s == useUpdateEventLoop )
return "evloop";
return "";
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdateStrategy( UNetReceiver::UpdateStrategy set )
{
if( set == useUpdateEventLoop && upThread->isRunning() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateEventLoop' strategy but updateThread is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
if( set == useUpdateThread && evUpdate.is_active() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateThread' strategy but update event loop is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
upStrategy = set;
}
// -----------------------------------------------------------------------------
const std::string UNetReceiver::getShortInfo() const noexcept
{
// warning: будет вызываться из другого потока
......@@ -993,7 +845,6 @@ const std::string UNetReceiver::getShortInfo() const noexcept
<< " recvOK=" << isRecvOK()
<< " receivepack=" << rnum
<< " lostPackets=" << setw(6) << getLostPacketsNum()
<< " updateStrategy=" << to_string(upStrategy)
<< endl
<< "\t["
<< " recvTimeout=" << setw(6) << recvTimeout
......@@ -1002,26 +853,10 @@ const std::string UNetReceiver::getShortInfo() const noexcept
<< " lostTimeout=" << setw(6) << lostTimeout
<< " updatepause=" << setw(6) << updatepause
<< " maxDifferens=" << setw(6) << maxDifferens
<< " maxProcessingCount=" << setw(6) << maxProcessingCount
<< " waitClean=" << waitClean
<< " ]"
<< endl
<< "\t[ qsize=" << qpack.size() << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]";
<< "\t[ qsize=" << (wnum - rnum - 1) << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]";
return s.str();
}
// -----------------------------------------------------------------------------
UNetReceiver::pack_guard::pack_guard( mutex& _m, UNetReceiver::UpdateStrategy _s ):
m(_m),
s(_s)
{
if( s == useUpdateThread )
m.lock();
}
// -----------------------------------------------------------------------------
UNetReceiver::pack_guard::~pack_guard()
{
if( s == useUpdateThread )
m.unlock();
}
// -----------------------------------------------------------------------------
......@@ -21,6 +21,7 @@
#include <memory>
#include <string>
#include <queue>
#include <deque>
#include <unordered_map>
#include <sigc++/sigc++.h>
#include <ev++.h>
......@@ -38,13 +39,20 @@ namespace uniset
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приоритета используется номер пакета
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
* что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
* Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
* куда поместить пакет в буфере. Есть два индекса
* rnum - (read number) номер последнего обработанного пакета + 1
* wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
* WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
*
* При этом обработка ведётся по порядку (только пакеты идущие подряд)
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
* Всё это реализовано в функции UNetReceiver::real_update()
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
* Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
* Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
* либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
*
* КЭШ
* ===
......@@ -63,19 +71,15 @@ namespace uniset
* В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
* т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
*
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
* Обработка сбоев в номере пакетов
* =========================================================================
* Для защиты от сбоя счётчика сделана следующая логика:
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
* Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
* и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
* затирают старые, если их не успели вынуть и обработать.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
* Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
* то считается, что произошёл сбой или узел который посылал пакеты перезагрузился
* Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
* реинициализация и обработка продолжается с нового номера.
*
* =========================================================================
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.
*
* Создание соединения (открытие сокета)
* ======================================
......@@ -87,13 +91,6 @@ namespace uniset
* Если такая логика не требуется, то можно задать в конструкторе
* последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
*
* Стратегия обновления данных в SM
* ==================================
* При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
* Поддерживается два варианта:
* 'thread' - отдельный поток обновления
* 'evloop' - использование общего с приёмом event loop (libev)
*/
// -----------------------------------------------------------------------------
class UNetReceiver:
......@@ -132,12 +129,11 @@ namespace uniset
void setMaxDifferens( unsigned long set ) noexcept;
void setEvrunTimeout(timeout_t msec ) noexcept;
void setInitPause( timeout_t msec ) noexcept;
void setBufferSize( size_t sz ) noexcept;
void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
void setLostPacketsID( uniset::ObjectId id ) noexcept;
void setMaxProcessingCount( int set ) noexcept;
void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
inline std::string getAddress() const noexcept
......@@ -160,35 +156,6 @@ namespace uniset
void connectEvent( EventSlot sl ) noexcept;
// --------------------------------------------------------------------
/*! Стратегия обработки сообщений */
enum UpdateStrategy
{
useUpdateUnknown,
useUpdateThread, /*!< использовать отдельный поток */
useUpdateEventLoop /*!< использовать event loop (т.е. совместно с receive) */
};
static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
static std::string to_string( UpdateStrategy s) noexcept;
//! функция должна вызываться до первого вызова start()
void setUpdateStrategy( UpdateStrategy set );
// специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
// (т.к. при evloop mutex захватывать не нужно)
class pack_guard
{
public:
pack_guard( std::mutex& m, UpdateStrategy s );
~pack_guard();
protected:
std::mutex& m;
UpdateStrategy s;
};
// --------------------------------------------------------------------
inline std::shared_ptr<DebugStream> getLog()
{
return unetlog;
......@@ -204,7 +171,6 @@ namespace uniset
bool receive() noexcept;
void step() noexcept;
void update() noexcept;
void updateThread() noexcept;
void callback( ev::io& watcher, int revents ) noexcept;
void readEvent( ev::io& watcher ) noexcept;
void updateEvent( ev::periodic& watcher, int revents ) noexcept;
......@@ -221,21 +187,7 @@ namespace uniset
void initIterators() noexcept;
bool createConnection( bool throwEx = false );
void checkConnection();
public:
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
inline bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
return lhs.num > rhs.num;
}
};
typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
size_t rnext( size_t num );
private:
UNetReceiver();
......@@ -253,18 +205,14 @@ namespace uniset
ev::periodic evUpdate;
ev::timer evInitPause;
UpdateStrategy upStrategy = { useUpdateEventLoop };
// счётчики для подсчёта статистики
size_t recvCount = { 0 };
size_t upCount = { 0 };
// текущая статистик
// текущая статистика
size_t statRecvPerSec = { 0 }; /*!< количество принимаемых пакетов в секунду */
size_t statUpPerSec = { 0 }; /*!< количество обработанных пакетов в секунду */
std::unique_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
// делаем loop общим.. одним на всех!
static CommonEventLoop loop;
......@@ -292,23 +240,19 @@ namespace uniset
std::atomic_bool activated = { false };
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */
UniSetUDP::UDPPacket r_buf;
std::mutex packMutex; /*!< mutex для работы с очередью */
size_t pnum = { 0 }; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
size_t cbufSize = { 2000 }; /*!< размер буфера для сообщений по умолчанию */
std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
size_t wnum = { 1 }; /*!< номер следующего ожидаемого пакета */
size_t rnum = { 0 }; /*!< номер последнего обработанного пакета */
UniSetUDP::UDPMessage* pack;
UniSetUDP::UDPPacket r_buf; /*!< просто буфер для получения очередного сообщения */
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
*/
size_t maxDifferens = { 20 };
PacketQueue qtmp; /*!< очередь на время обработки(очистки) основной очереди */
bool waitClean = { false }; /*!< флаг означающий, что ждём очистки очереди до конца */
size_t rnum = { 0 }; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
size_t maxProcessingCount = { 100 }; /*!< максимальное число обрабатываемых за один раз сообщений */
std::atomic_bool lockUpdate = { false }; /*!< флаг блокировки сохранения принятых данных в SM */
EventSlot slEvent;
......@@ -342,8 +286,8 @@ namespace uniset
bool d_cache_init_ok = { false };
bool a_cache_init_ok = { false };
CacheInfo& getDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
CacheInfo& getACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
CacheInfo& getDCache( UniSetUDP::UDPMessage* pack, bool force = false ) noexcept;
CacheInfo& getACache( UniSetUDP::UDPMessage* pack, bool force = false ) noexcept;
};
// --------------------------------------------------------------------------
} // end of namespace uniset
......
......@@ -156,7 +156,7 @@ namespace uniset
// -----------------------------------------------------------------------------
void UNetSender::updateFromSM()
{
for( auto && it : items )
for( auto&& it : items )
{
UItem& i = it.second;
......@@ -243,7 +243,7 @@ namespace uniset
if( !shm->isLocalwork() )
updateFromSM();
for( auto && it : mypacks )
for( auto&& it : mypacks )
{
if( it.first > 1 && (ncycle % it.first) != 0 )
continue;
......@@ -504,6 +504,7 @@ namespace uniset
if( p.pack_ind >= maxAData )
{
anum++;
if( anum >= pk.size() )
pk.resize(anum + 1);
......@@ -551,7 +552,7 @@ namespace uniset
// -----------------------------------------------------------------------------
void UNetSender::initIterators()
{
for( auto && it : items )
for( auto&& it : items )
shm->initIterator(it.second.ioit);
}
// -----------------------------------------------------------------------------
......
if HAVE_TESTS
noinst_PROGRAMS = tests-with-sm urecv-perf-test
#noinst_PROGRAMS = urecv-perf-test
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc
tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
......
......@@ -96,45 +96,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
size_t ret = udp_s->sendTo(&s_buf.data, s_buf.len, s_addr);
REQUIRE( ret == s_buf.len );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
{
UNetReceiver::PacketQueue q;
UniSetUDP::UDPMessage m1;
m1.num = 10;
UniSetUDP::UDPMessage m2;
m2.num = 11;
UniSetUDP::UDPMessage m3;
m3.num = 13;
UniSetUDP::UDPMessage m4;
m4.num = 100;
// специально складываем в обратном порядке
// чтобы проверить "сортировку"
q.push(m1);
q.push(m3);
q.push(m2);
q.push(m4);
UniSetUDP::UDPMessage t = q.top();
REQUIRE( t.num == 10 );
q.pop();
t = q.top();
REQUIRE( t.num == 11 );
q.pop();
t = q.top();
REQUIRE( t.num == 13 );
q.pop();
t = q.top();
REQUIRE( t.num == 100 );
q.pop();
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
{
SECTION("UDPMessage::isFull()")
......
#!/bin/sh
# '--' - нужен для отделения аргументов catch, от наших..
cd ../../../Utilities/Admin/
./uniset2-start.sh -f ./create_links.sh
./uniset2-start.sh -f ./create
./uniset2-start.sh -f ./exist | grep -q UNISET_PLC/Controllers || exit 1
cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy thread
#--unet-log-add-levels any
AT_SETUP([UNetUDP tests (with SM)(thread)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_thread.sh],[0],[ignore],[ignore])
AT_CLEANUP
AT_SETUP([UNetUDP tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP
......
......@@ -42,7 +42,9 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{
try
{
cout << "create sender: " << s_host << ":" << begPort + i << endl;
auto s = make_shared<UDPSocketU>(s_host, begPort + i);
s->setBroadcast(true);
vsend.emplace_back(s);
}
catch( Poco::Net::NetException& e )
......@@ -103,7 +105,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
if( packetnum == 0 )
packetnum = 1;
for( auto && udp : vsend )
for( auto&& udp : vsend )
{
try
{
......@@ -139,6 +141,7 @@ static void run_test( size_t max, const std::string& host )
// make receivers..
for( size_t i = 0; i < max; i++ )
{
cout << "create receiver: " << host << ":" << begPort + i << endl;
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
r->setLockUpdate(true);
vrecv.emplace_back(r);
......@@ -147,7 +150,7 @@ static void run_test( size_t max, const std::string& host )
size_t count = 0;
// Run receivers..
for( auto && r : vrecv )
for( auto&& r : vrecv )
{
if( r )
{
......@@ -156,12 +159,12 @@ static void run_test( size_t max, const std::string& host )
}
}
cerr << "RUn " << count << " receivers..." << endl;
cerr << "RUN " << count << " receivers..." << endl;
// wait..
pause();
for( auto && r : vrecv )
for( auto&& r : vrecv )
{
if(r)
r->stop();
......@@ -177,9 +180,9 @@ int main(int argc, char* argv[] )
auto conf = uniset_init(argc, argv);
if( argc > 1 && !strcmp(argv[1], "s") )
run_senders(10, host);
run_senders(1, host);
else
run_test(10, host);
run_test(1, host);
return 0;
}
......
......@@ -29,6 +29,7 @@ static struct option longopts[] =
{ "prof", required_argument, 0, 'y' },
{ "a-data", required_argument, 0, 'a' },
{ "d-data", required_argument, 0, 'i' },
{ "pack-num", required_argument, 0, 'u' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
......@@ -79,10 +80,11 @@ int main(int argc, char* argv[])
unsigned int nprof = 0;
std::string d_data = "";
std::string a_data = "";
size_t packetnum = 1;
while(1)
{
opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:", longopts, &optindex);
opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:u:", longopts, &optindex);
if( opt == -1 )
break;
......@@ -106,6 +108,7 @@ int main(int argc, char* argv[])
cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl;
cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl;
cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl;
cout << "[-u|--pack-num] num - first packet numbrt (default: 1)" << endl;
cout << endl;
return 0;
......@@ -171,6 +174,10 @@ int main(int argc, char* argv[])
ncycles = atoi(optarg);
break;
case 'u':
packetnum = atoi(optarg);
break;
case '?':
default:
cerr << "? argumnet" << endl;
......@@ -345,7 +352,6 @@ int main(int argc, char* argv[])
Poco::Net::SocketAddress sa(s_host, port);
udp->connect(sa);
size_t packetnum = 0;
UniSetUDP::UDPPacket s_buf;
......@@ -358,7 +364,7 @@ int main(int argc, char* argv[])
{
mypack.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
if( packetnum == 0 )
packetnum = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment