Commit 33151652 authored by Pavel Vainerman's avatar Pavel Vainerman

UDP: "причесал код" UDPReceiver-а...

parent d431ed77
......@@ -69,9 +69,11 @@ activated(false)
thr = new ThreadCreator<UDPReceiver>(this, &UDPReceiver::poll);
recvTimeout = conf->getArgPInt("--udp-recv-timeout",it.getProp("recvTimeout"), 5000);
polltime = conf->getArgPInt("--udp-polltime",it.getProp("polltime"), 100);
ptUpdate.setTiming(100);
polltime = conf->getArgPInt("--udp-polltime",it.getProp("polltime"), 10);
updatetime = conf->getArgPInt("--udp-updatetime",it.getProp("updatetime"), 100);
steptime = conf->getArgPInt("--udp-steptime",it.getProp("steptime"), 100);
minBufSize = conf->getArgPInt("--udp-minbufsize",it.getProp("minBufSize"), 30);
maxProcessingCount = conf->getArgPInt("--udp-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
// -------------------------------
// ********** HEARTBEAT *************
......@@ -144,8 +146,13 @@ void UDPReceiver::waitSMReady()
// -----------------------------------------------------------------------------
void UDPReceiver::timerInfo( TimerMessage *tm )
{
if( tm->id == tmExchange )
if( !activated )
return;
if( tm->id == tmStep )
step();
else if( tm->id == tmUpdate )
update();
}
// -----------------------------------------------------------------------------
void UDPReceiver::step()
......@@ -153,12 +160,6 @@ void UDPReceiver::step()
if( !activated )
return;
// if( ptUpdate.checkTime() )
// {
update_data();
ptUpdate.reset();
// }
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() )
{
try
......@@ -168,29 +169,26 @@ void UDPReceiver::step()
}
catch(Exception& ex)
{
dlog[Debug::CRIT] << myname
<< "(step): (hb) " << ex << std::endl;
dlog[Debug::CRIT] << myname << "(step): (hb) " << ex << std::endl;
}
}
}
// -----------------------------------------------------------------------------
void UDPReceiver::update_data()
void UDPReceiver::update()
{
if( !activated )
return;
UniSetUDP::UDPMessage p;
bool buf_ok = false;
{
uniset_mutex_lock l(packMutex);
if( qpack.size() <= max_buf_size )
if( qpack.size() <= minBufSize )
return;
buf_ok = true;
}
while( buf_ok )
int k = maxProcessingCount;
while( buf_ok && k>0)
{
{
uniset_mutex_lock l(packMutex);
......@@ -200,18 +198,35 @@ void UDPReceiver::update_data()
if( labs(p.msg.header.num - pnum) > 1 )
{
cerr << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num
dlog[Debug::CRIT] << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num
<< " num=" << pnum << endl;
}
pnum = p.msg.header.num;
k--;
{
uniset_mutex_lock l(packMutex);
buf_ok = ( qpack.size() > max_buf_size );
buf_ok = ( qpack.size() > minBufSize );
}
cerr << myname << "(step): recv DATA OK. header: " << p.msg.header << endl;
// cerr << myname << "(step): recv DATA OK. header: " << p.msg.header << endl;
for( int i=0; i<p.msg.header.dcount; i++ )
{
try
{
UniSetUDP::UDPData& d = p.msg.dat[i];
shm->setValue(d.id,d.val);
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
}
}
......@@ -360,7 +375,8 @@ void UDPReceiver::sysCommand( UniSetTypes::SystemMessage *sm )
askSensors(UniversalIO::UIONotify);
}
thr->start();
askTimer(tmExchange,1000);
askTimer(tmUpdate,updatetime);
askTimer(tmStep,steptime);
}
case SystemMessage::FoldUp:
......
......@@ -3,7 +3,6 @@
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <map>
#include <queue>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
......@@ -37,8 +36,8 @@ class UDPReceiver:
void poll();
void recv();
virtual void step();
void update_data();
void step();
void update();
virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg );
......@@ -56,7 +55,8 @@ class UDPReceiver:
enum Timer
{
tmExchange
tmUpdate,
tmStep
};
private:
......@@ -70,7 +70,9 @@ class UDPReceiver:
IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id;
int polltime; /*!< переодичность обновления данных, [мсек] */
int polltime; /*!< пауза меджду приёмами пакетов, [мсек] */
int updatetime; /*!< переодичность обновления данных в SM, [мсек] */
int steptime; /*!< периодичность вызова step, [мсек] */
ost::UDPDuplex* udp;
ost::IPV4Host host;
......@@ -83,15 +85,8 @@ class UDPReceiver:
bool activated;
int activateTimeout;
long pnum;
ThreadCreator<UDPReceiver>* thr;
// typedef std::map<unsigned long,UniSetUDP::UDPMessage> QueuePacket;
// QueuePacket qpack;
UniSetUDP::UDPMessage pack;
UniSetTypes::uniset_mutex packMutex;
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
......@@ -100,10 +95,18 @@ class UDPReceiver:
const UniSetUDP::UDPMessage& rhs) const;
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack;
static const int max_buf_size = 20;
PassiveTimer ptUpdate;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< прсто буфер для получения очерещного сообщения */
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! Минимальный размер очереди.
* Предназначен для создания буфера, чтобы обработка сообщений шла
* в порядке возрастания номеров пакетов. Даже если при приёме последовательность нарушалась
*/
int minBufSize;
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
};
// -----------------------------------------------------------------------------
#endif // UDPReceiver_H_
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment