Commit 146f1fa7 authored by Pavel Vainerman's avatar Pavel Vainerman

UNet2: переделал обработку (теперь не используется minBufSize). и сделал защиту…

UNet2: переделал обработку (теперь не используется minBufSize). и сделал защиту от сбоя (или переполнения) счётчика..
parent 122221b7
......@@ -33,11 +33,12 @@ sender(0)
<< "' filter-value='" << s_fvalue << "'" << endl;
int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000);
int lostTimeout = conf->getArgPInt("--unet-lost-timeout",it.getProp("lostTimeout"), recvTimeout);
int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10);
int sendpause = conf->getArgPInt("--unet-sendpause",it.getProp("sendpause"), 150);
int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000);
int minBufSize = conf->getArgPInt("--unet-minbufsize",it.getProp("minBufSize"), 30);
int maxDiff = conf->getArgPInt("--unet-maxdifferense",it.getProp("maxDifferense"), 1000);
int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
no_sender = conf->getArgInt("--unet-nosender",it.getProp("nosender"));
......@@ -86,9 +87,10 @@ sender(0)
UNetReceiver* r = new UNetReceiver(h,p,shm);
r->setReceiveTimeout(recvTimeout);
r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause);
r->setMinBudSize(minBufSize);
r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
recvlist.push_back(r);
}
......
......@@ -20,12 +20,17 @@ UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port,
shm(smi),
recvpause(10),
updatepause(100),
recvTimeout(5000),
udp(0),
recvTimeout(5000),
lostTimeout(5000),
lostPackets(0),
activated(false),
r_thr(0),
u_thr(0),
minBufSize(30),
pnum(0),
maxDifferens(1000),
waitClean(false),
rnum(0),
maxProcessingCount(100),
icache(200),
cache_init_ok(false)
......@@ -67,25 +72,26 @@ UNetReceiver::~UNetReceiver()
delete udp;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( int msec )
void UNetReceiver::setReceiveTimeout( timeout_t msec )
{
recvTimeout = msec;
ptRecvTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceivePause( int msec )
void UNetReceiver::setLostTimeout( timeout_t msec )
{
recvpause = msec;
lostTimeout = msec;
ptLostTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdatePause( int msec )
void UNetReceiver::setReceivePause( timeout_t msec )
{
updatepause = msec;
recvpause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMinBudSize( int set )
void UNetReceiver::setUpdatePause( timeout_t msec )
{
minBufSize = set;
updatepause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
......@@ -93,6 +99,11 @@ void UNetReceiver::setMaxProcessingCount( int set )
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxDifferens( unsigned long set )
{
maxDifferens = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
{
if( !activated )
......@@ -128,48 +139,41 @@ void UNetReceiver::update()
void UNetReceiver::real_update()
{
UniSetUDP::UDPMessage p;
bool buf_ok = false;
{
uniset_mutex_lock l(packMutex);
if( qpack.size() <= minBufSize )
return;
buf_ok = true;
}
// обрабатываем пока, очередь либо не опустеет
// либо обнаружится "дырка" в последовательности
// но при этом обрабатываем не больше maxProcessingCount
// за один раз..
int k = maxProcessingCount;
while( buf_ok && k>0 )
while( k>0 )
{
{
{ // lock qpack
uniset_mutex_lock l(packMutex);
if( qpack.empty() )
return;
p = qpack.top();
qpack.pop();
}
cerr << "************ update recv.num=" << p.msg.header.num << " num=" << pnum << endl;
if( pnum > 0 && labs(p.msg.header.num - pnum) > 1 )
{
if( !ptLostTimeout.checkTime() )
return;
if( labs(p.msg.header.num - pnum) > 1 )
{
dlog[Debug::CRIT] << "************ FAILED! ORDER PACKETS! recv.num=" << p.msg.header.num
<< " num=" << pnum << endl;
lostPackets++;
}
ptLostTimeout.reset();
// запоминаем новый номер и откидываем пакет
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.msg.header.num;
k--;
continue;
}
} // unlock qpack
pnum = p.msg.header.num;
k--;
{
uniset_mutex_lock l(packMutex);
buf_ok = ( qpack.size() > minBufSize );
}
initCache(p, !cache_init_ok);
for( int i=0; i<p.msg.header.dcount; i++ )
for( size_t i=0; i<p.msg.header.dcount; i++ )
{
try
{
......@@ -245,25 +249,46 @@ bool UNetReceiver::recv()
ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
cerr << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
dlog[Debug::CRIT] << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
return false;
}
ssize_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
if( ret < sz )
{
cerr << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
<< " packnum=" << pack.msg.header.num << endl;
return false;
}
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header << endl;
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
<< " header: " << pack.msg.header << endl;
{
uniset_mutex_lock l(packMutex);
qpack.push(pack);
}
if( rnum>0 && labs(pack.msg.header.num - rnum) > maxDifferens )
waitClean = true;
{ // lock qpack
uniset_mutex_lock l(packMutex,500);
if( !waitClean )
{
qpack.push(pack);
return true;
}
if( !qpack.empty() )
qtmp.push(pack);
else
{
// очередь освободилась..
// то копируем в неё всё что набралось...
while( !qtmp.empty() )
{
qpack.push(qtmp.top());
qtmp.pop();
}
waitClean = false;
}
} // unlock qpack
return true;
}
......@@ -286,7 +311,7 @@ void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
cache_init_ok = true;
icache.resize(pack.msg.header.dcount);
for( int i=0; i<icache.size(); i++ )
for( size_t i=0; i<icache.size(); i++ )
{
ItemInfo& d(icache[i]);
if( d.id != pack.msg.dat[i].id )
......
......@@ -18,10 +18,10 @@
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше). И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей.
* Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной.
* Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки.
*
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
* Всё это реализовано в функции UNetReceiver::real_update()
*
* КЭШ
* ===
......@@ -32,7 +32,16 @@
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*/
*
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
* =========================================================================
* Для защиты от сбоя счётика сделана следующая логика:
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
*/
// -----------------------------------------------------------------------------
class UNetReceiver
{
......@@ -46,12 +55,14 @@ class UNetReceiver
void update();
inline bool isRecvOK(){ return ptRecvTimeout.checkTime(); }
inline unsigned long getLostPacketsNum(){ return lostPackets; }
void setReceiveTimeout( int msec );
void setReceivePause( int msec );
void setUpdatePause( int msec );
void setReceiveTimeout( timeout_t msec );
void setReceivePause( timeout_t msec );
void setUpdatePause( timeout_t msec );
void setLostTimeout( timeout_t msec );
void setMaxDifferens( unsigned long set );
void setMinBudSize( int set );
void setMaxProcessingCount( int set );
inline ost::IPV4Address getAddress(){ return addr; }
......@@ -80,7 +91,10 @@ class UNetReceiver
UniSetTypes::uniset_mutex pollMutex;
PassiveTimer ptRecvTimeout;
int recvTimeout;
timeout_t recvTimeout;
timeout_t lostTimeout;
PassiveTimer ptLostTimeout;
unsigned long lostPackets; /*!< счётчик потерянных пакетов */
bool activated;
......@@ -96,15 +110,18 @@ class UNetReceiver
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< прсто буфер для получения очерещного сообщения */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очереlного сообщения */
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! Минимальный размер очереди.
* Предназначен для создания буфера, чтобы обработка сообщений шла
* в порядке возрастания номеров пакетов. Даже если при приёме последовательность нарушалась
/*! максимальная разница межд номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
*/
int minBufSize;
unsigned long maxDifferens;
PacketQueue qtmp; /*!< очередь на время обработки(очистки) основной очереди */
bool waitClean; /*!< флаг означающий, что ждём очистики очереди до конца */
unsigned long rnum; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
......
......@@ -60,6 +60,11 @@ s_thr(0)
}
else
ic->addReadItem( sigc::mem_fun(this,&UNetSender::readItem) );
// выставляем поля, которые не меняются
mypack.msg.header.nodeID = conf->getLocalNode();
mypack.msg.header.procID = shm->ID();
}
// -----------------------------------------------------------------------------
UNetSender::~UNetSender()
......@@ -128,13 +133,7 @@ void UNetSender::send()
// -----------------------------------------------------------------------------
void UNetSender::real_send()
{
// cout << myname << ": send..." << endl;
UniSetUDP::UDPHeader h;
h.nodeID = conf->getLocalNode();
h.procID = shm->ID();
h.dcount = mypack.msg.header.dcount;
h.num = ptPack.getCurrent();
mypack.msg.header = h;
mypack.msg.header.num = packetnum++;
// cout << "************* send header: " << mypack.msg.header << endl;
int sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
......
......@@ -14,6 +14,11 @@
#include "UDPPacket.h"
#include "UDPNReceiver.h"
// -----------------------------------------------------------------------------
/*
* Для защиты от потери пакета при переполнении "номера пакета".
* UNetReceiver при обнаружении "разрыва" в последовательнности, просто игнорирует пакет, обновляет счётчик
* и начинает обработку пока буфер опять не заполнится..
*/
class UNetSender
{
public:
......@@ -76,7 +81,7 @@ class UNetSender
typedef std::vector<UItem> DMap;
DMap dlist;
int maxItem;
PassiveTimer ptPack;
unsigned long packetnum;
ThreadCreator<UNetSender>* s_thr; // send thread
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment