Commit 838d2b9a authored by Pavel Vainerman's avatar Pavel Vainerman

UDP: сделал сохранение данных в SM (с кэшированием)

parent 07c10cda
......@@ -21,7 +21,9 @@ UniSetObject_LT(objId),
shm(0),
initPause(0),
udp(0),
activated(false)
activated(false),
icache(200),
cache_init_ok(false)
{
if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UDPReceiver): objId=-1?!! Use --udp-name" );
......@@ -188,7 +190,7 @@ void UDPReceiver::update()
}
int k = maxProcessingCount;
while( buf_ok && k>0)
while( buf_ok && k>0 )
{
{
uniset_mutex_lock l(packMutex);
......@@ -210,13 +212,32 @@ void UDPReceiver::update()
buf_ok = ( qpack.size() > minBufSize );
}
// cerr << myname << "(step): recv DATA OK. header: " << p.msg.header << endl;
initCache(p, !cache_init_ok);
for( int i=0; i<p.msg.header.dcount; i++ )
{
try
{
UniSetUDP::UDPData& d = p.msg.dat[i];
shm->setValue(d.id,d.val);
ItemInfo& ii(icache[i]);
if( ii.id != d.id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
ii.id = d.id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,getId());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,getId());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,d.id,d.val,getId());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,getId());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
}
catch( UniSetTypes::Exception& ex)
{
......@@ -470,6 +491,11 @@ void UDPReceiver::sigterm( int signo )
void UDPReceiver::initIterators()
{
shm->initAIterator(aitHeartBeat);
for( ItemVec::iterator it=icache.begin(); it!=icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
}
// -----------------------------------------------------------------------------
void UDPReceiver::help_print( int argc, char* argv[] )
......@@ -510,3 +536,25 @@ UDPReceiver* UDPReceiver::init_udpreceiver( int argc, char* argv[], UniSetTypes:
return new UDPReceiver(ID,icID,ic);
}
// -----------------------------------------------------------------------------
void UDPReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
cache_init_ok = true;
icache.resize(pack.msg.header.dcount);
for( int i=0; i<icache.size(); i++ )
{
ItemInfo& d(icache[i]);
if( d.id != pack.msg.dat[i].id )
{
d.id = pack.msg.dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
}
// -----------------------------------------------------------------------------
......@@ -13,6 +13,27 @@
#include "ThreadCreator.h"
#include "UDPPacket.h"
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приориеттом. В качестве приориета используется номер пакета
* (чем меньше тем старше). И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей.
* Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной.
* Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки.
*
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемы данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядокый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размеров кэша.. кэш обновляется.
*/
// -----------------------------------------------------------------------------
class UDPReceiver:
public UniSetObject_LT
{
......@@ -107,6 +128,20 @@ class UDPReceiver:
int minBufSize;
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec icache; /*!< кэш итераторов */
bool cache_init_ok;
void initCache( UniSetUDP::UDPMessage& pack, bool force=false );
};
// -----------------------------------------------------------------------------
#endif // UDPReceiver_H_
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment