Commit 15660982 authored by Pavel Vainerman's avatar Pavel Vainerman

UDP: сделал сохранение данных в SM (с кэшированием)

parent 33151652
...@@ -21,7 +21,9 @@ UniSetObject_LT(objId), ...@@ -21,7 +21,9 @@ UniSetObject_LT(objId),
shm(0), shm(0),
initPause(0), initPause(0),
udp(0), udp(0),
activated(false) activated(false),
icache(200),
cache_init_ok(false)
{ {
if( objId == DefaultObjectId ) if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UDPReceiver): objId=-1?!! Use --udp-name" ); throw UniSetTypes::SystemError("(UDPReceiver): objId=-1?!! Use --udp-name" );
...@@ -188,7 +190,7 @@ void UDPReceiver::update() ...@@ -188,7 +190,7 @@ void UDPReceiver::update()
} }
int k = maxProcessingCount; int k = maxProcessingCount;
while( buf_ok && k>0) while( buf_ok && k>0 )
{ {
{ {
uniset_mutex_lock l(packMutex); uniset_mutex_lock l(packMutex);
...@@ -210,13 +212,32 @@ void UDPReceiver::update() ...@@ -210,13 +212,32 @@ void UDPReceiver::update()
buf_ok = ( qpack.size() > minBufSize ); buf_ok = ( qpack.size() > minBufSize );
} }
// cerr << myname << "(step): recv DATA OK. header: " << p.msg.header << endl; initCache(p, !cache_init_ok);
for( int i=0; i<p.msg.header.dcount; i++ ) for( int i=0; i<p.msg.header.dcount; i++ )
{ {
try try
{ {
UniSetUDP::UDPData& d = p.msg.dat[i]; UniSetUDP::UDPData& d = p.msg.dat[i];
shm->setValue(d.id,d.val); ItemInfo& ii(icache[i]);
if( ii.id != d.id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
ii.id = d.id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,getId());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,getId());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,d.id,d.val,getId());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,getId());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
} }
catch( UniSetTypes::Exception& ex) catch( UniSetTypes::Exception& ex)
{ {
...@@ -470,6 +491,11 @@ void UDPReceiver::sigterm( int signo ) ...@@ -470,6 +491,11 @@ void UDPReceiver::sigterm( int signo )
void UDPReceiver::initIterators() void UDPReceiver::initIterators()
{ {
shm->initAIterator(aitHeartBeat); shm->initAIterator(aitHeartBeat);
for( ItemVec::iterator it=icache.begin(); it!=icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::help_print( int argc, char* argv[] ) void UDPReceiver::help_print( int argc, char* argv[] )
...@@ -510,3 +536,25 @@ UDPReceiver* UDPReceiver::init_udpreceiver( int argc, char* argv[], UniSetTypes: ...@@ -510,3 +536,25 @@ UDPReceiver* UDPReceiver::init_udpreceiver( int argc, char* argv[], UniSetTypes:
return new UDPReceiver(ID,icID,ic); return new UDPReceiver(ID,icID,ic);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
cache_init_ok = true;
icache.resize(pack.msg.header.dcount);
for( int i=0; i<icache.size(); i++ )
{
ItemInfo& d(icache[i]);
if( d.id != pack.msg.dat[i].id )
{
d.id = pack.msg.dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
}
// -----------------------------------------------------------------------------
...@@ -13,6 +13,27 @@ ...@@ -13,6 +13,27 @@
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UDPPacket.h" #include "UDPPacket.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приориеттом. В качестве приориета используется номер пакета
* (чем меньше тем старше). И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей.
* Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной.
* Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки.
*
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемы данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядокый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размеров кэша.. кэш обновляется.
*/
// -----------------------------------------------------------------------------
class UDPReceiver: class UDPReceiver:
public UniSetObject_LT public UniSetObject_LT
{ {
...@@ -107,6 +128,20 @@ class UDPReceiver: ...@@ -107,6 +128,20 @@ class UDPReceiver:
int minBufSize; int minBufSize;
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */ int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec icache; /*!< кэш итераторов */
bool cache_init_ok;
void initCache( UniSetUDP::UDPMessage& pack, bool force=false );
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UDPReceiver_H_ #endif // UDPReceiver_H_
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment