Commit 3fe7a409 authored by Pavel Vainerman's avatar Pavel Vainerman

Сделал предварительную реализацию UNetReceiver-а..

parent 15660982
bin_PROGRAMS = @PACKAGE@-udpexchange bin_PROGRAMS = @PACKAGE@-udpexchange
noinst_PROGRAMS = @PACKAGE@-udpreceiver noinst_PROGRAMS = @PACKAGE@-udpreceiver @PACKAGE@-unetexchange
#@PACKAGE@-udpsender #@PACKAGE@-udpsender
UUDP_VER=@LIBVER@ UUDP_VER=@LIBVER@
...@@ -11,7 +11,7 @@ libUniSetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet.la \ ...@@ -11,7 +11,7 @@ libUniSetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \ $(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS) $(SIGC_LIBS) $(COMCPP_LIBS)
libUniSetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS) libUniSetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiver.cc libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiver.cc UNetReceiver.cc UNetExchange.cc
#UDPSender.cc #UDPSender.cc
#UDPSender.cc #UDPSender.cc
...@@ -37,6 +37,13 @@ libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiv ...@@ -37,6 +37,13 @@ libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiv
$(SIGC_LIBS) $(COMCPP_LIBS) $(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_udpreceiver_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS) @PACKAGE@_udpreceiver_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@_unetexchange_SOURCES = unetexchange.cc
@PACKAGE@_unetexchange_LDADD = libUniSetUDP.la $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_unetexchange_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
# install # install
devel_include_HEADERS = *.h devel_include_HEADERS = *.h
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
* =============== * ===============
* Собственно реализация сделана так: * Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приориеттом. В качестве приориета используется номер пакета * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше). И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей. * (чем меньше тем старше). И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей.
* Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной. * Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной.
* Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки. * Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки.
...@@ -27,11 +27,11 @@ ...@@ -27,11 +27,11 @@
* === * ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов. * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность. * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемы данных. В векторе хранятся итераторы (и всё что необходимо). * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядокый номер данных в пакете является индексом в кэше. * Порядокый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем, * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется. * ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размеров кэша.. кэш обновляется. * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*/ */
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UDPReceiver: class UDPReceiver:
...@@ -47,6 +47,7 @@ class UDPReceiver: ...@@ -47,6 +47,7 @@ class UDPReceiver:
/*! глобальная функция для вывода help-а */ /*! глобальная функция для вывода help-а */
static void help_print( int argc, char* argv[] ); static void help_print( int argc, char* argv[] );
protected: protected:
xmlNode* cnode; xmlNode* cnode;
......
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetExchange.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ):
UniSetObject_LT(objId),
shm(0),
initPause(0),
activated(false)
{
if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" );
// xmlNode* cnode = conf->getNode(myname);
cnode = conf->getNode(myname);
if( cnode == NULL )
throw UniSetTypes::SystemError("(UNetExchange): Not found conf-node for " + myname );
shm = new SMInterface(shmId,&ui,objId,ic);
UniXML_iterator it(cnode);
// определяем фильтр
s_field = conf->getArgParam("--unet-filter-field");
s_fvalue = conf->getArgParam("--unet-filter-value");
dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000);
int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10);
int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000);
int minBufSize = conf->getArgPInt("--unet-minbufsize",it.getProp("minBufSize"), 30);
int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
xmlNode* nodes = conf->getXMLNodesSection();
if( !nodes )
throw UniSetTypes::SystemError("(UNetExchange): Not found <nodes>");
UniXML_iterator n_it(nodes);
if( !n_it.goChildren() )
throw UniSetTypes::SystemError("(UNetExchange): Items not found for <nodes>");
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( !n_it.getProp("unet_ignore").empty() )
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
string h = n_it.getProp("ip");
if( !n_it.getProp("unet_ip").empty() )
h = n_it.getProp("unet_ip");
int p = n_it.getIntProp("id");
if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port");
dlog[Debug::INFO] << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
if( checkExistUNetHost(h,p) )
{
dlog[Debug::INFO] << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
continue;
}
UNetReceiver* r = new UNetReceiver(h,p,shm);
r->setReceiveTimeout(recvTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause);
r->setMinBudSize(minBufSize);
r->setMaxProcessingCount(maxProcessingCount);
recvlist.push_back(r);
}
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--unet-heartbeat-id",it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
if( sidHeartBeat == DefaultObjectId )
{
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
int heartbeatTime = getHeartBeatTime();
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--unet-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
{
test_id = conf->getSensorID("TestMode_S");
if( test_id == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
dlog[Debug::INFO] << myname << "(init): test_id=" << test_id << endl;
activateTimeout = conf->getArgPInt("--activate-timeout", 20000);
timeout_t msec = conf->getArgPInt("--unet-timeout",it.getProp("timeout"), 3000);
dlog[Debug::INFO] << myname << "(init): udp-timeout=" << msec << " msec" << endl;
}
// -----------------------------------------------------------------------------
UNetExchange::~UNetExchange()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
delete (*it);
delete shm;
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost( const std::string addr, ost::tpport_t port )
{
ost::IPV4Address a1(addr.c_str());
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( (*it)->getAddress() == a1.getAddress() && (*it)->getPort() == port )
return true;
}
return false;
}
// -----------------------------------------------------------------------------
void UNetExchange::startReceivers()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
(*it)->start();
}
// -----------------------------------------------------------------------------
void UNetExchange::waitSMReady()
{
// waiting for SM is ready...
int ready_timeout = conf->getArgInt("--unet-sm-ready-timeout","15000");
if( ready_timeout == 0 )
ready_timeout = 15000;
else if( ready_timeout < 0 )
ready_timeout = UniSetTimer::WaitUpTime;
if( !shm->waitSMready(ready_timeout,50) )
{
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
throw SystemError(err.str());
}
}
// -----------------------------------------------------------------------------
void UNetExchange::timerInfo( TimerMessage *tm )
{
if( !activated )
return;
if( tm->id == tmStep )
step();
}
// -----------------------------------------------------------------------------
void UNetExchange::step()
{
if( !activated )
return;
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() )
{
try
{
shm->localSaveValue(aitHeartBeat,sidHeartBeat,maxHeartBeat,getId());
ptHeartBeat.reset();
}
catch(Exception& ex)
{
dlog[Debug::CRIT] << myname << "(step): (hb) " << ex << std::endl;
}
}
}
// -----------------------------------------------------------------------------
void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg)
{
try
{
switch(msg->type)
{
case UniSetTypes::Message::SysCommand:
{
UniSetTypes::SystemMessage sm( msg );
sysCommand( &sm );
}
break;
case Message::SensorInfo:
{
SensorMessage sm( msg );
sensorInfo(&sm);
}
break;
case Message::Timer:
{
TimerMessage tm(msg);
timerInfo(&tm);
}
break;
default:
break;
}
}
catch( SystemError& ex )
{
dlog[Debug::CRIT] << myname << "(SystemError): " << ex << std::endl;
// throw SystemError(ex);
raise(SIGTERM);
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << myname << "(processingMessage): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(processingMessage): catch ...\n";
}
}
// -----------------------------------------------------------------------------
void UNetExchange::sysCommand( UniSetTypes::SystemMessage *sm )
{
switch( sm->command )
{
case SystemMessage::StartUp:
{
waitSMReady();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep(initPause);
PassiveTimer ptAct(activateTimeout);
while( !activated && !ptAct.checkTime() )
{
cout << myname << "(sysCommand): wait activate..." << endl;
msleep(300);
if( activated )
break;
}
if( !activated )
dlog[Debug::CRIT] << myname << "(sysCommand): ************* don`t activate?! ************" << endl;
{
UniSetTypes::uniset_mutex_lock l(mutex_start, 10000);
askSensors(UniversalIO::UIONotify);
}
askTimer(tmStep,steptime);
startReceivers();
}
case SystemMessage::FoldUp:
case SystemMessage::Finish:
askSensors(UniversalIO::UIODontNotify);
break;
case SystemMessage::WatchDog:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetExchange запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetExchange)
if( shm->isLocalwork() )
break;
askSensors(UniversalIO::UIONotify);
}
break;
case SystemMessage::LogRotate:
{
// переоткрываем логи
unideb << myname << "(sysCommand): logRotate" << std::endl;
string fname = unideb.getLogFile();
if( !fname.empty() )
{
unideb.logFile(fname);
unideb << myname << "(sysCommand): ***************** UNIDEB LOG ROTATE *****************" << std::endl;
}
dlog << myname << "(sysCommand): logRotate" << std::endl;
fname = dlog.getLogFile();
if( !fname.empty() )
{
dlog.logFile(fname);
dlog << myname << "(sysCommand): ***************** dlog LOG ROTATE *****************" << std::endl;
}
}
break;
default:
break;
}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
{
if( !shm->waitSMworking(test_id,activateTimeout,50) )
{
ostringstream err;
err << myname
<< "(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<< activateTimeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm )
{
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated = false;
UniSetTypes::uniset_mutex_lock l(mutex_start, 5000);
UniSetObject_LT::activateObject();
initIterators();
activated = true;
}
return true;
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sigterm( int signo )
{
cerr << myname << ": ********* SIGTERM(" << signo <<") ********" << endl;
activated = false;
UniSetObject_LT::sigterm(signo);
}
// ------------------------------------------------------------------------------------------
void UNetExchange::initIterators()
{
shm->initAIterator(aitHeartBeat);
}
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, char* argv[] )
{
cout << "--unet-recvpause msec - Пауза между получением пакетов. По умолчанию 10 мсек." << endl;
cout << "--unet-updatepause msec - Пауза между обновлением данных в SM. По умолчанию 100 мсек." << endl;
cout << "--unet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком." << endl;
cout << "--unet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10." << endl;
cout << "--unet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl;
cout << "--unet-initPause - Задержка перед инициализацией (время на активизация процесса)" << endl;
cout << "--unet-notRespondSensor - датчик связи для данного процесса " << endl;
cout << "--unet-sm-ready-timeout - время на ожидание старта SM" << endl;
cout << " Настройки протокола RS: " << endl;
}
// -----------------------------------------------------------------------------
UNetExchange* UNetExchange::init_unetexchange( int argc, char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic )
{
string name = conf->getArgParam("--unet-name","UNetExchange1");
if( name.empty() )
{
cerr << "(unetexchange): Не задан name'" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
cerr << "(unetexchange): идентификатор '" << name
<< "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl;
return 0;
}
dlog[Debug::INFO] << "(unetexchange): name = " << name << "(" << ID << ")" << endl;
return new UNetExchange(ID,icID,ic);
}
// -----------------------------------------------------------------------------
#ifndef UNetExchange_H_
#define UNetExchange_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <queue>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNetReceiver.h"
// -----------------------------------------------------------------------------
class UNetExchange:
public UniSetObject_LT
{
public:
UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
virtual ~UNetExchange();
/*! глобальная функция для инициализации объекта */
static UNetExchange* init_unetexchange( int argc, char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, char* argv[] );
bool checkExistUNetHost( const std::string host, ost::tpport_t port );
protected:
xmlNode* cnode;
std::string s_field;
std::string s_fvalue;
SMInterface* shm;
void step();
virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg );
void sensorInfo( UniSetTypes::SensorMessage*sm );
void timerInfo( UniSetTypes::TimerMessage *tm );
void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady();
virtual bool activateObject();
// действия при завершении работы
virtual void sigterm( int signo );
void initIterators();
void startReceivers();
enum Timer
{
tmStep
};
private:
UNetExchange();
bool initPause;
UniSetTypes::uniset_mutex mutex_start;
PassiveTimer ptHeartBeat;
UniSetTypes::ObjectId sidHeartBeat;
int maxHeartBeat;
IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id;
int steptime; /*!< периодичность вызова step, [мсек] */
bool activated;
int activateTimeout;
typedef std::list<UNetReceiver*> ReceiverList;
ReceiverList recvlist;
};
// -----------------------------------------------------------------------------
#endif // UNetExchange_H_
// -----------------------------------------------------------------------------
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetReceiver.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.msg.header.num == rhs.msg.header.num )
// return (lhs.msg < rhs.msg);
return lhs.msg.header.num > rhs.msg.header.num;
}
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
shm(smi),
recvpause(10),
updatepause(100),
recvTimeout(5000),
udp(0),
activated(false),
r_thr(0),
u_thr(0),
minBufSize(30),
maxProcessingCount(100),
icache(200),
cache_init_ok(false)
{
try
{
// ost::IPV4Cidr ci(s_host.c_str());
// addr = ci.getBroadcast();
// cerr << "****************** addr: " << addr << endl;
addr = s_host.c_str();
udp = new ost::UDPDuplex(addr,port);
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
dlog[Debug::CRIT] << myname << "(init): " << s.str() << std::endl;
throw SystemError(s.str());
}
r_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::receive);
u_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::update);
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
myname = s.str();
}
ptRecvTimeout.setTiming(recvTimeout);
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
{
delete r_thr;
delete u_thr;
delete udp;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( int msec )
{
recvTimeout = msec;
ptRecvTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceivePause( int msec )
{
recvpause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdatePause( int msec )
{
updatepause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMinBudSize( int set )
{
minBufSize = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
{
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
{
if( !activated )
{
activated = true;
u_thr->start();
r_thr->start();
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::update()
{
cerr << "******************* udpate start" << endl;
while(activated)
{
try
{
real_update();
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
msleep(updatepause);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::real_update()
{
UniSetUDP::UDPMessage p;
bool buf_ok = false;
{
uniset_mutex_lock l(packMutex);
if( qpack.size() <= minBufSize )
return;
buf_ok = true;
}
int k = maxProcessingCount;
while( buf_ok && k>0 )
{
{
uniset_mutex_lock l(packMutex);
p = qpack.top();
qpack.pop();
}
if( labs(p.msg.header.num - pnum) > 1 )
{
dlog[Debug::CRIT] << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num
<< " num=" << pnum << endl;
}
pnum = p.msg.header.num;
k--;
{
uniset_mutex_lock l(packMutex);
buf_ok = ( qpack.size() > minBufSize );
}
initCache(p, !cache_init_ok);
for( int i=0; i<p.msg.header.dcount; i++ )
{
try
{
UniSetUDP::UDPData& d = p.msg.dat[i];
ItemInfo& ii(icache[i]);
if( ii.id != d.id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
ii.id = d.id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::receive()
{
cerr << "******************* receive start" << endl;
ptRecvTimeout.setTiming(recvTimeout);
while( activated )
{
try
{
if( recv() )
ptRecvTimeout.reset();
}
catch( ost::SockException& e )
{
cerr << e.getString() << ": " << e.getSystemErrorString() << endl;
}
catch( UniSetTypes::Exception& ex)
{
cerr << myname << "(poll): " << ex << std::endl;
}
catch(...)
{
cerr << myname << "(poll): catch ..." << std::endl;
}
msleep(recvpause);
}
cerr << "************* execute FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
bool UNetReceiver::recv()
{
if( !udp->isInputReady(recvTimeout) )
return false;
ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
cerr << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
return false;
}
ssize_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
if( ret < sz )
{
cerr << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz << endl;
return false;
}
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
<< " header: " << pack.msg.header << endl;
/*
if( labs(pack.msg.header.num - pnum) > 1 )
{
cerr << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num
<< " num=" << pnum << endl;
}
pnum = pack.msg.header.num;
*/
{
uniset_mutex_lock l(packMutex);
qpack.push(pack);
}
return true;
}
// -----------------------------------------------------------------------------
void UNetReceiver::initIterators()
{
for( ItemVec::iterator it=icache.begin(); it!=icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
cache_init_ok = true;
icache.resize(pack.msg.header.dcount);
for( int i=0; i<icache.size(); i++ )
{
ItemInfo& d(icache[i]);
if( d.id != pack.msg.dat[i].id )
{
d.id = pack.msg.dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
}
// -----------------------------------------------------------------------------
#ifndef UNetReceiver_H_
#define UNetReceiver_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <queue>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UDPPacket.h"
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше). И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей.
* Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной.
* Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки.
*
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядокый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*/
// -----------------------------------------------------------------------------
class UNetReceiver
{
public:
UNetReceiver( const std::string host, const ost::tpport_t port, SMInterface* smi );
~UNetReceiver();
void start();
void receive();
void update();
inline bool isRecvOK(){ return ptRecvTimeout.checkTime(); }
void setReceiveTimeout( int msec );
void setReceivePause( int msec );
void setUpdatePause( int msec );
void setMinBudSize( int set );
void setMaxProcessingCount( int set );
inline ost::IPV4Address getAddress(){ return addr; }
inline ost::tpport_t getPort(){ return port; }
protected:
SMInterface* shm;
bool recv();
void step();
void real_update();
void initIterators();
private:
UNetReceiver();
int recvpause; /*!< пауза меджду приёмами пакетов, [мсек] */
int updatepause; /*!< переодичность обновления данных в SM, [мсек] */
ost::UDPDuplex* udp;
ost::IPV4Address addr;
ost::tpport_t port;
std::string myname;
UniSetTypes::uniset_mutex pollMutex;
PassiveTimer ptRecvTimeout;
int recvTimeout;
bool activated;
ThreadCreator<UNetReceiver>* r_thr; // receive thread
ThreadCreator<UNetReceiver>* u_thr; // update thread
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const;
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< прсто буфер для получения очерещного сообщения */
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! Минимальный размер очереди.
* Предназначен для создания буфера, чтобы обработка сообщений шла
* в порядке возрастания номеров пакетов. Даже если при приёме последовательность нарушалась
*/
int minBufSize;
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec icache; /*!< кэш итераторов */
bool cache_init_ok;
void initCache( UniSetUDP::UDPMessage& pack, bool force=false );
};
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
// -----------------------------------------------------------------------------
#!/bin/sh #!/bin/sh
uniset-start.sh -f ./uniset-udpreceiver --udp-name UDPExchange \ uniset-start.sh -f ./uniset-udpexchange --udp-name UDPExchange \
--udp-host 192.168.1.255 --udp-port 3000 \
--confile test.xml \ --confile test.xml \
--udp-filter-field udp --udp-filter-value 1 \ --udp-filter-field udp --udp-filter-value 1 \
--dlog-add-levels info,crit,warn --dlog-add-levels info,crit,warn
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment