Commit 341a72b4 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNetUDP): первая версия реализации механизма регулирования

"частоты посылки для каждого датчика"
parent 8418d184
...@@ -212,7 +212,7 @@ ...@@ -212,7 +212,7 @@
<item id="60" iotype="DI" name="Input60_S" priority="Medium" textname="Команда 61" iofront="10"/> <item id="60" iotype="DI" name="Input60_S" priority="Medium" textname="Команда 61" iofront="10"/>
<item id="7" iotype="DO" name="DO_C" priority="Medium" textname="Digital output"/> <item id="7" iotype="DO" name="DO_C" priority="Medium" textname="Digital output"/>
<item id="8" iotype="DO" name="DO1_C" priority="Medium" textname="Digital output"/> <item id="8" iotype="DO" name="DO1_C" priority="Medium" textname="Digital output"/>
<item default="1000" id="9" iotype="AO" name="AO_AS" precision="2" priority="Medium" rs="2" rs_channel="1" rs_jack="j1" rs_mbaddr="0x01" rs_mbfunc="0x06" rs_mbreg="0x02" rs_mbtype="rtu188" textname="Analog output"/> <item default="1000" id="9" iotype="AO" name="AO_AS" precision="2" priority="Medium" unet_sendfactor="2" rs="2" rs_channel="1" rs_jack="j1" rs_mbaddr="0x01" rs_mbfunc="0x06" rs_mbreg="0x02" rs_mbtype="rtu188" textname="Analog output"/>
<item default="378" id="10" iotype="AI" mbaddr="0x01" mbfunc="0x04" mbreg="0x01" mbtype="rtu" name="AI_AS" precision="45" priority="Medium" rs="1" textname="Analog input" vtype="F2"> <item default="378" id="10" iotype="AI" mbaddr="0x01" mbfunc="0x04" mbreg="0x01" mbtype="rtu" name="AI_AS" precision="45" priority="Medium" rs="1" textname="Analog input" vtype="F2">
<consumers> <consumers>
<consumer cfilter="test1" name="TestProc" type="objects"/> <consumer cfilter="test1" name="TestProc" type="objects"/>
......
...@@ -187,7 +187,7 @@ bool UDPMessage::setDData( size_t index, bool val ) ...@@ -187,7 +187,7 @@ bool UDPMessage::setDData( size_t index, bool val )
return true; return true;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
long UDPMessage::dID( size_t index ) long UDPMessage::dID( size_t index ) const
{ {
if( index >= MaxDCount ) if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId; return UniSetTypes::DefaultObjectId;
...@@ -195,7 +195,7 @@ long UDPMessage::dID( size_t index ) ...@@ -195,7 +195,7 @@ long UDPMessage::dID( size_t index )
return d_id[index]; return d_id[index];
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index ) bool UDPMessage::dValue( size_t index ) const
{ {
if( index >= MaxDCount ) if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId; return UniSetTypes::DefaultObjectId;
...@@ -235,7 +235,7 @@ size_t UDPMessage::transport_msg( UDPPacket& p ) ...@@ -235,7 +235,7 @@ size_t UDPMessage::transport_msg( UDPPacket& p )
return i; return i;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
long UDPMessage::getDataID() long UDPMessage::getDataID() const
{ {
// в качестве идентификатора берётся ID первого датчика в данных // в качестве идентификатора берётся ID первого датчика в данных
// приоритет имеет аналоговые датчики // приоритет имеет аналоговые датчики
......
...@@ -19,7 +19,7 @@ namespace UniSetUDP ...@@ -19,7 +19,7 @@ namespace UniSetUDP
*/ */
const unsigned int UNETUDP_MAGICNUM = 0xfb07ee55; // идентификатор протокола const unsigned int UNETUDP_MAGICNUM = 0x1337A1D; // идентификатор протокола
struct UDPHeader struct UDPHeader
{ {
...@@ -78,39 +78,40 @@ namespace UniSetUDP ...@@ -78,39 +78,40 @@ namespace UniSetUDP
size_t addDData( long id, bool val ); size_t addDData( long id, bool val );
bool setDData( size_t index, bool val ); bool setDData( size_t index, bool val );
long dID( size_t index ); long dID( size_t index ) const;
bool dValue( size_t index ); bool dValue( size_t index ) const;
// функции addAData возвращают индекс, по которому потом можно напрямую писать при помощи setAData(index) // функции addAData возвращают индекс, по которому потом можно напрямую писать при помощи setAData(index)
size_t addAData( const UDPAData& dat ); size_t addAData( const UDPAData& dat );
size_t addAData( long id, long val ); size_t addAData( long id, long val );
bool setAData( size_t index, long val ); bool setAData( size_t index, long val );
inline bool isAFull() long getDataID( ) const; /*!< получение "уникального" идентификатора данных этого пакета */
inline bool isAFull() const
{ {
return (acount >= MaxACount); return (acount >= MaxACount);
} }
inline bool isDFull() inline bool isDFull() const
{ {
return (dcount >= MaxDCount); return (dcount >= MaxDCount);
} }
inline bool isFull() inline bool isFull() const
{ {
return !((dcount < MaxDCount) && (acount < MaxACount)); return !((dcount < MaxDCount) && (acount < MaxACount));
} }
inline int dsize() inline int dsize() const
{ {
return dcount; return dcount;
} }
inline int asize() inline int asize() const
{ {
return acount; return acount;
} }
unsigned short getDataCRC(); unsigned short getDataCRC();
// количество байт в пакете с булевыми переменными... // количество байт в пакете с булевыми переменными...
int d_byte() int d_byte() const
{ {
return dcount * sizeof(long) + dcount; return dcount * sizeof(long) + dcount;
} }
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
- \ref pgUNetUDP_Common - \ref pgUNetUDP_Common
- \ref pgUNetUDP_Conf - \ref pgUNetUDP_Conf
- \ref pgUNetUDP_Reserv - \ref pgUNetUDP_Reserv
- \ref pgUNetUDP_SendFactor
\section pgUNetUDP_Common Общее описание \section pgUNetUDP_Common Общее описание
Обмен построен на основе протокола UDP. Обмен построен на основе протокола UDP.
...@@ -86,6 +87,15 @@ ...@@ -86,6 +87,15 @@
на нём. Тогда будет попытка переключиться обратно на первый канал и так "по кругу". на нём. Тогда будет попытка переключиться обратно на первый канал и так "по кругу".
В свою очередь "писатели"(если они не отключены) всегда посылают данные в оба канала. В свою очередь "писатели"(если они не отключены) всегда посылают данные в оба канала.
\section pgUNetUDP_SendFactor Регулирование частоты посылки
В текущей реализации поддерживается механизм, позволяющий регулировать частоту посылки данных
для каждого датчика. Суть механизма заключается в том, что для каждого датчика можно задать свойство
- \b prefix_sendfactor="N" Где N>1 - задаёт "делитель" относительно \b sendpause определяющий с какой частотой
информация о данном датчике будет посылаться. Например N=2 - каждый второй цикл, N=3 - каждый третий и т.п.
При загрузке все датчики (относщиеся к данному процессу) разбиваются на группы пакетов согласно своей частоте посылки.
При этом внутри одной группы датчики разбиваются по пакетам согласно заданному максимальному размеру пакета
(см. конструктор класса UNetSender()).
*/ */
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetExchange: class UNetExchange:
......
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <iomanip>
#include "Exceptions.h" #include "Exceptions.h"
#include "Extensions.h" #include "Extensions.h"
#include "UNetReceiver.h" #include "UNetReceiver.h"
...@@ -39,8 +38,6 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, ...@@ -39,8 +38,6 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
rnum(0), rnum(0),
maxProcessingCount(100), maxProcessingCount(100),
lockUpdate(false), lockUpdate(false),
d_icache(UniSetUDP::MaxDCount),
a_icache(UniSetUDP::MaxACount),
d_cache_init_ok(false), d_cache_init_ok(false),
a_cache_init_ok(false) a_cache_init_ok(false)
{ {
...@@ -53,6 +50,9 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, ...@@ -53,6 +50,9 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
unetlog = make_shared<DebugStream>(); unetlog = make_shared<DebugStream>();
unetlog->setLogName(myname); unetlog->setLogName(myname);
auto conf = uniset_conf();
conf->initLogStream(unetlog, myname);
ost::Thread::setException(ost::Thread::throwException); ost::Thread::setException(ost::Thread::throwException);
try try
...@@ -306,17 +306,16 @@ void UNetReceiver::real_update() ...@@ -306,17 +306,16 @@ void UNetReceiver::real_update()
initACache(p, !a_cache_init_ok); initACache(p, !a_cache_init_ok);
// Обработка дискретных // Обработка дискретных
size_t nbit = 0; ItemVecInfo& d_iv = d_icache_map[p.getDataID()];
for( size_t i = 0; i < p.dcount; i++, nbit++ ) for( size_t i = 0; i < p.dcount; i++ )
{ {
try try
{ {
long id = p.dID(i); long id = p.dID(i);
bool val = p.dValue(i); bool val = p.dValue(i);
ItemInfo& ii(d_icache[i]); ItemInfo& ii(d_iv.cache[i]);
if( ii.id != id ) if( ii.id != id )
{ {
...@@ -335,7 +334,7 @@ void UNetReceiver::real_update() ...@@ -335,7 +334,7 @@ void UNetReceiver::real_update()
shm->localSetValue(ii.ioit, id, val, shm->ID()); shm->localSetValue(ii.ioit, id, val, shm->ID());
} }
catch( UniSetTypes::Exception& ex) catch( const UniSetTypes::Exception& ex)
{ {
unetcrit << myname << "(update): " << ex << std::endl; unetcrit << myname << "(update): " << ex << std::endl;
} }
...@@ -346,12 +345,15 @@ void UNetReceiver::real_update() ...@@ -346,12 +345,15 @@ void UNetReceiver::real_update()
} }
// Обработка аналоговых // Обработка аналоговых
ItemVecInfo& a_iv = a_icache_map[p.getDataID()];
for( size_t i = 0; i < p.acount; i++ ) for( size_t i = 0; i < p.acount; i++ )
{ {
try try
{ {
UniSetUDP::UDPAData& d = p.a_dat[i]; UniSetUDP::UDPAData& d = p.a_dat[i];
ItemInfo& ii(a_icache[i]);
ItemInfo& ii(a_iv.cache[i]);
if( ii.id != d.id ) if( ii.id != d.id )
{ {
...@@ -370,7 +372,7 @@ void UNetReceiver::real_update() ...@@ -370,7 +372,7 @@ void UNetReceiver::real_update()
shm->localSetValue(ii.ioit, d.id, d.val, shm->ID()); shm->localSetValue(ii.ioit, d.id, d.val, shm->ID());
} }
catch( UniSetTypes::Exception& ex) catch( const UniSetTypes::Exception& ex)
{ {
unetcrit << myname << "(update): " << ex << std::endl; unetcrit << myname << "(update): " << ex << std::endl;
} }
...@@ -542,28 +544,58 @@ bool UNetReceiver::recv() ...@@ -542,28 +544,58 @@ bool UNetReceiver::recv()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initIterators() void UNetReceiver::initIterators()
{ {
for( auto mit = d_icache_map.begin(); mit != d_icache_map.end(); ++mit )
{
ItemVec& d_icache(mit->second.cache);
for( auto && it : d_icache ) for( auto && it : d_icache )
shm->initIterator(it.ioit); shm->initIterator(it.ioit);
}
for( auto mit = a_icache_map.begin(); mit != a_icache_map.end(); ++mit )
{
ItemVec& a_icache(mit->second.cache);
for( auto && it : a_icache ) for( auto && it : a_icache )
shm->initIterator(it.ioit); shm->initIterator(it.ioit);
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{ {
if( !force && pack.dcount == d_icache.size() ) ItemVecInfo& d_info(d_icache_map[pack.getDataID()]);
if( !force && pack.dcount == d_info.cache.size() )
return; return;
unetinfo << myname << ": init icache.." << endl; if( d_info.cache_init_ok && pack.dcount == d_info.cache.size() )
{
d_cache_init_ok = true; d_cache_init_ok = true;
auto it = d_icache_map.begin();
for( ; it != d_icache_map.end(); ++it )
{
ItemVecInfo& d_info(it->second);
d_cache_init_ok = d_cache_init_ok && d_info.cache_init_ok;
if(d_cache_init_ok == false)
break;
}
d_icache.resize(pack.dcount); return;
}
unetinfo << myname << ": init dcache for " << pack.getDataID() << endl;
d_info.cache_init_ok = true;
d_info.cache.resize(pack.dcount);
size_t sz = d_info.cache.size();
auto conf = uniset_conf(); auto conf = uniset_conf();
for( size_t i = 0; i < d_icache.size(); i++ ) for( size_t i = 0; i < sz; i++ )
{ {
ItemInfo& d(d_icache[i]); ItemInfo& d(d_info.cache[i]);
if( d.id != pack.d_id[i] ) if( d.id != pack.d_id[i] )
{ {
...@@ -576,19 +608,39 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) ...@@ -576,19 +608,39 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force ) void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{ {
if( !force && pack.acount == a_icache.size() ) ItemVecInfo& a_info(a_icache_map[pack.getDataID()]);
if( !force && pack.acount == a_info.cache.size() )
return; return;
unetinfo << myname << ": init icache.." << endl; if( a_info.cache_init_ok && pack.acount == a_info.cache.size() )
{
a_cache_init_ok = true; a_cache_init_ok = true;
auto it = a_icache_map.begin();
a_icache.resize(pack.acount); for( ; it != a_icache_map.end(); ++it )
{
ItemVecInfo& a_info(it->second);
a_cache_init_ok = a_cache_init_ok && a_info.cache_init_ok;
if(a_cache_init_ok == false)
break;
}
return;
}
unetinfo << myname << ": init icache for " << pack.getDataID() << endl;
a_info.cache_init_ok = true;
auto conf = uniset_conf(); auto conf = uniset_conf();
for( size_t i = 0; i < a_icache.size(); i++ ) a_info.cache.resize(pack.acount);
size_t sz = a_info.cache.size();
for( size_t i = 0; i < sz; i++ )
{ {
ItemInfo& d(a_icache[i]); ItemInfo& d(a_info.cache[i]);
if( d.id != pack.a_dat[i].id ) if( d.id != pack.a_dat[i].id )
{ {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <queue> #include <queue>
#include <unordered_map>
#include <cc++/socket.h> #include <cc++/socket.h>
#include <sigc++/sigc++.h> #include <sigc++/sigc++.h>
#include "UniSetObject_LT.h" #include "UniSetObject_LT.h"
...@@ -224,8 +225,18 @@ class UNetReceiver: ...@@ -224,8 +225,18 @@ class UNetReceiver:
}; };
typedef std::vector<ItemInfo> ItemVec; typedef std::vector<ItemInfo> ItemVec;
ItemVec d_icache; /*!< кэш итераторов для булевых */ struct ItemVecInfo
ItemVec a_icache; /*!< кэш итераторов для аналоговых */ {
ItemVecInfo():
cache_init_ok(false)
{
}
bool cache_init_ok;
ItemVec cache;
};
typedef std::unordered_map<long, ItemVecInfo> ItemVecInfoMap;
ItemVecInfoMap d_icache_map; /*!< кэш итераторов для булевых */
ItemVecInfoMap a_icache_map; /*!< кэш итераторов для аналоговых */
bool d_cache_init_ok; bool d_cache_init_ok;
bool a_cache_init_ok; bool a_cache_init_ok;
......
...@@ -10,17 +10,22 @@ using namespace UniSetTypes; ...@@ -10,17 +10,22 @@ using namespace UniSetTypes;
using namespace UniSetExtensions; using namespace UniSetExtensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi,
const std::string& s_f, const std::string& s_val ): const std::string& s_f, const std::string& s_val, const std::string& s_prefix,
size_t maxDCount, size_t maxACount ):
s_field(s_f), s_field(s_f),
s_fvalue(s_val), s_fvalue(s_val),
prefix(s_prefix),
shm(smi), shm(smi),
s_host(s_host), s_host(s_host),
sendpause(150), sendpause(150),
packsendpause(5),
activated(false), activated(false),
dlist(100), dlist(100),
maxItem(0), maxItem(0),
packetnum(1), packetnum(1),
lastcrc(0) lastcrc(0),
maxAData(maxACount),
maxDData(maxDCount)
{ {
{ {
...@@ -32,6 +37,8 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con ...@@ -32,6 +37,8 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con
unetlog = make_shared<DebugStream>(); unetlog = make_shared<DebugStream>();
unetlog->setLogName(myname); unetlog->setLogName(myname);
auto conf = uniset_conf();
conf->initLogStream(unetlog, myname);
// определяем фильтр // определяем фильтр
// s_field = conf->getArgParam("--udp-filter-field"); // s_field = conf->getArgParam("--udp-filter-field");
...@@ -65,6 +72,14 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con ...@@ -65,6 +72,14 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con
s_thr = make_shared< ThreadCreator<UNetSender> >(this, &UNetSender::send); s_thr = make_shared< ThreadCreator<UNetSender> >(this, &UNetSender::send);
mypacks[0].resize(1);
packs_anum[0] = 0;
packs_dnum[0] = 0;
UniSetUDP::UDPMessage& mypack(mypacks[0][0]);
// выставляем поля, которые не меняются
mypack.nodeID = uniset_conf()->getLocalNode();
mypack.procID = shm->ID();
// ------------------------------- // -------------------------------
if( shm->isLocalwork() ) if( shm->isLocalwork() )
{ {
...@@ -86,11 +101,6 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con ...@@ -86,11 +101,6 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con
unetinfo << myname << "(init): dlist size = " << dlist.size() << endl; unetinfo << myname << "(init): dlist size = " << dlist.size() << endl;
} }
} }
// выставляем поля, которые не меняются
mypack.nodeID = uniset_conf()->getLocalNode();
mypack.procID = shm->ID();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::~UNetSender() UNetSender::~UNetSender()
...@@ -134,11 +144,15 @@ void UNetSender::updateItem( DMap::iterator& it, long value ) ...@@ -134,11 +144,15 @@ void UNetSender::updateItem( DMap::iterator& it, long value )
if( it->iotype == UniversalIO::DI || it->iotype == UniversalIO::DO ) if( it->iotype == UniversalIO::DI || it->iotype == UniversalIO::DO )
{ {
UniSetTypes::uniset_rwmutex_wrlock l(pack_mutex); UniSetTypes::uniset_rwmutex_wrlock l(pack_mutex);
auto& pk = mypacks[it->pack_sendfactor];
UniSetUDP::UDPMessage& mypack(pk[it->pack_num]);
mypack.setDData(it->pack_ind, value); mypack.setDData(it->pack_ind, value);
} }
else if( it->iotype == UniversalIO::AI || it->iotype == UniversalIO::AO ) else if( it->iotype == UniversalIO::AI || it->iotype == UniversalIO::AO )
{ {
UniSetTypes::uniset_rwmutex_wrlock l(pack_mutex); UniSetTypes::uniset_rwmutex_wrlock l(pack_mutex);
auto& pk = mypacks[it->pack_sendfactor];
UniSetUDP::UDPMessage& mypack(pk[it->pack_num]);
mypack.setAData(it->pack_ind, value); mypack.setAData(it->pack_ind, value);
} }
} }
...@@ -162,6 +176,9 @@ void UNetSender::send() ...@@ -162,6 +176,9 @@ void UNetSender::send()
throw SystemError(s.str()); throw SystemError(s.str());
} }
*/ */
ncycle = 0;
while( activated ) while( activated )
{ {
try try
...@@ -169,7 +186,22 @@ void UNetSender::send() ...@@ -169,7 +186,22 @@ void UNetSender::send()
if( !shm->isLocalwork() ) if( !shm->isLocalwork() )
updateFromSM(); updateFromSM();
real_send(); for( auto && it : mypacks )
{
if( it.first > 1 && (ncycle % it.first) != 0 )
continue;
auto& pk = it.second;
int size = pk.size();
for(int i = 0; i < size; ++i)
{
real_send(pk[i]);
msleep(packsendpause);
}
}
ncycle++;
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
...@@ -196,7 +228,7 @@ void UNetSender::send() ...@@ -196,7 +228,7 @@ void UNetSender::send()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// #define UNETUDP_DISABLE_OPTIMIZATION_N1 // #define UNETUDP_DISABLE_OPTIMIZATION_N1
void UNetSender::real_send() void UNetSender::real_send(UniSetUDP::UDPMessage& mypack)
{ {
UniSetTypes::uniset_rwmutex_rlock l(pack_mutex); UniSetTypes::uniset_rwmutex_rlock l(pack_mutex);
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1 #ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
...@@ -212,6 +244,7 @@ void UNetSender::real_send() ...@@ -212,6 +244,7 @@ void UNetSender::real_send()
#endif #endif
if( packetnum > UniSetUDP::MaxPacketNum ) if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1; packetnum = 1;
...@@ -299,8 +332,13 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -299,8 +332,13 @@ bool UNetSender::initItem( UniXML::iterator& it )
return false; return false;
} }
int priority = it.getPIntProp(prefix + "_sendfactor", 0);
auto pk = mypacks[priority];
UItem p; UItem p;
p.iotype = UniSetTypes::getIOType(it.getProp("iotype")); p.iotype = UniSetTypes::getIOType(it.getProp("iotype"));
p.pack_sendfactor = priority;
if( p.iotype == UniversalIO::UnknownIOType ) if( p.iotype == UniversalIO::UnknownIOType )
{ {
...@@ -312,7 +350,30 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -312,7 +350,30 @@ bool UNetSender::initItem( UniXML::iterator& it )
if( p.iotype == UniversalIO::DI || p.iotype == UniversalIO::DO ) if( p.iotype == UniversalIO::DI || p.iotype == UniversalIO::DO )
{ {
int dnum = packs_dnum[priority];
if( pk.size() <= dnum )
pk.resize(dnum + 1);
UniSetUDP::UDPMessage& mypack(pk[dnum]);
p.pack_ind = mypack.addDData(sid, 0);
if( p.pack_ind >= maxDData )
{
dnum++;
if( dnum >= pk.size() )
pk.resize(dnum + 1);
UniSetUDP::UDPMessage& mypack( pk[dnum] );
p.pack_ind = mypack.addDData(sid, 0); p.pack_ind = mypack.addDData(sid, 0);
mypack.nodeID = uniset_conf()->getLocalNode();
mypack.procID = shm->ID();
}
p.pack_num = dnum;
packs_anum[priority] = dnum;
if ( p.pack_ind >= UniSetUDP::MaxDCount ) if ( p.pack_ind >= UniSetUDP::MaxDCount )
{ {
...@@ -326,18 +387,44 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -326,18 +387,44 @@ bool UNetSender::initItem( UniXML::iterator& it )
} }
else if( p.iotype == UniversalIO::AI || p.iotype == UniversalIO::AO ) else if( p.iotype == UniversalIO::AI || p.iotype == UniversalIO::AO )
{ {
int anum = packs_anum[priority];
if( pk.size() <= anum )
pk.resize(anum + 1);
UniSetUDP::UDPMessage& mypack(pk[anum]);
p.pack_ind = mypack.addAData(sid, 0);
if( p.pack_ind >= maxAData )
{
anum++;
if( anum >= pk.size() )
pk.resize(anum + 1);
UniSetUDP::UDPMessage& mypack(pk[anum]);
p.pack_ind = mypack.addAData(sid, 0); p.pack_ind = mypack.addAData(sid, 0);
mypack.nodeID = uniset_conf()->getLocalNode();
mypack.procID = shm->ID();
}
p.pack_num = anum;
packs_anum[priority] = anum;
if ( p.pack_ind >= UniSetUDP::MaxACount ) if ( p.pack_ind >= UniSetUDP::MaxACount )
{ {
unetcrit << myname unetcrit << myname
<< "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max=" << "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max="
<< UniSetUDP::MaxACount << endl; << UniSetUDP::MaxACount << endl;
raise(SIGTERM); raise(SIGTERM);
return false; return false;
} }
} }
mypacks[priority] = pk;
if( maxItem >= dlist.size() ) if( maxItem >= dlist.size() )
dlist.resize(maxItem + 10); dlist.resize(maxItem + 10);
...@@ -345,7 +432,6 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -345,7 +432,6 @@ bool UNetSender::initItem( UniXML::iterator& it )
maxItem++; maxItem++;
unetinfo << myname << "(initItem): add " << p << endl; unetinfo << myname << "(initItem): add " << p << endl;
return true; return true;
} }
...@@ -367,6 +453,11 @@ void UNetSender::askSensors( UniversalIO::UIOCommand cmd ) ...@@ -367,6 +453,11 @@ void UNetSender::askSensors( UniversalIO::UIOCommand cmd )
shm->askSensor(it.id, cmd); shm->askSensor(it.id, cmd);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
int UNetSender::getDataPackCount()
{
return mypacks.size();
}
// -----------------------------------------------------------------------------
const std::string UNetSender::getShortInfo() const const std::string UNetSender::getShortInfo() const
{ {
// warning: будет вызываться из другого потока // warning: будет вызываться из другого потока
...@@ -381,4 +472,3 @@ const std::string UNetSender::getShortInfo() const ...@@ -381,4 +472,3 @@ const std::string UNetSender::getShortInfo() const
return std::move(s.str()); return std::move(s.str());
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <ostream> #include <ostream>
#include <string> #include <string>
#include <vector> #include <vector>
#include <unordered_map>
#include <cc++/socket.h> #include <cc++/socket.h>
#include "UniSetObject_LT.h" #include "UniSetObject_LT.h"
#include "Trigger.h" #include "Trigger.h"
...@@ -12,13 +13,18 @@ ...@@ -12,13 +13,18 @@
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UDPPacket.h" #include "UDPPacket.h"
#include "DebugStream.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/* /*
* Распределение датчиков по пакетам * Распределение датчиков по пакетам
* ========================================================================= * =========================================================================
* В функции initItem() идет добавление датчика в пакет и создание нового пакета при переполнении. Причем так как дискретные и аналоговые * Все пересылаемые данные разбиваются в группы по частоте посылки("sendfactor").
* датчики обрабатываются отдельно, то датчики, которые первые переполнятся, те и создадут новый пакет. "Отставшие" же будут использовать уже созданные. * Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
* Внутри каждой группы пакеты набираются по мере "заполнения".
*
* Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
* Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
* то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет.
* "Отставшие" же будут использовать уже созданные.
* В свою очередь в initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует * В свою очередь в initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
* существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование. * существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
* *
...@@ -30,32 +36,41 @@ class UNetSender ...@@ -30,32 +36,41 @@ class UNetSender
{ {
public: public:
UNetSender( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, UNetSender( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi,
const std::string& s_field = "", const std::string& s_fvalue = "" ); const std::string& s_field = "", const std::string& s_fvalue = "", const std::string& prefix = "unet",
size_t maxDCount = UniSetUDP::MaxDCount, size_t maxACount = UniSetUDP::MaxACount );
virtual ~UNetSender(); virtual ~UNetSender();
typedef int sendfactor_t;
struct UItem struct UItem
{ {
UItem(): UItem():
iotype(UniversalIO::UnknownIOType), iotype(UniversalIO::UnknownIOType),
id(UniSetTypes::DefaultObjectId), id(UniSetTypes::DefaultObjectId),
pack_ind(-1) {} pack_num(-1),
pack_ind(-1),
pack_sendfactor(0) {}
UniversalIO::IOType iotype; UniversalIO::IOType iotype;
UniSetTypes::ObjectId id; UniSetTypes::ObjectId id;
IOController::IOStateList::iterator ioit; IOController::IOStateList::iterator ioit;
int pack_num;
int pack_ind; int pack_ind;
sendfactor_t pack_sendfactor = { 0 };
friend std::ostream& operator<<( std::ostream& os, UItem& p ); friend std::ostream& operator<<( std::ostream& os, UItem& p );
}; };
typedef std::vector<UItem> DMap; typedef std::vector<UItem> DMap;
int getDataPackCount();
void start(); void start();
void stop(); void stop();
void send(); void send();
void real_send(); void real_send(UniSetUDP::UDPMessage& mypack);
/*! (принудительно) обновить все данные (из SM) */ /*! (принудительно) обновить все данные (из SM) */
void updateFromSM(); void updateFromSM();
...@@ -70,6 +85,10 @@ class UNetSender ...@@ -70,6 +85,10 @@ class UNetSender
{ {
sendpause = msec; sendpause = msec;
} }
inline void setPackSendPause( int msec )
{
packsendpause = msec;
}
/*! заказать датчики */ /*! заказать датчики */
void askSensors( UniversalIO::UIOCommand cmd ); void askSensors( UniversalIO::UIOCommand cmd );
...@@ -84,7 +103,6 @@ class UNetSender ...@@ -84,7 +103,6 @@ class UNetSender
virtual const std::string getShortInfo() const; virtual const std::string getShortInfo() const;
inline ost::IPV4Address getAddress() const inline ost::IPV4Address getAddress() const
{ {
return addr; return addr;
...@@ -94,10 +112,20 @@ class UNetSender ...@@ -94,10 +112,20 @@ class UNetSender
return port; return port;
} }
inline size_t getADataSize()
{
return maxAData;
}
inline size_t getDDataSize()
{
return maxDData;
}
protected: protected:
std::string s_field; std::string s_field;
std::string s_fvalue; std::string s_fvalue;
std::string prefix;
const std::shared_ptr<SMInterface> shm; const std::shared_ptr<SMInterface> shm;
std::shared_ptr<DebugStream> unetlog; std::shared_ptr<DebugStream> unetlog;
...@@ -117,17 +145,30 @@ class UNetSender ...@@ -117,17 +145,30 @@ class UNetSender
std::string myname; std::string myname;
int sendpause; int sendpause;
int packsendpause;
std::atomic_bool activated; std::atomic_bool activated;
UniSetTypes::uniset_rwmutex pack_mutex; UniSetTypes::uniset_rwmutex pack_mutex;
UniSetUDP::UDPMessage mypack;
// int - sendfactor
typedef std::unordered_map<sendfactor_t, std::vector<UniSetUDP::UDPMessage>> Packs;
Packs mypacks;
std::unordered_map<sendfactor_t, int> packs_anum;
std::unordered_map<sendfactor_t, int> packs_dnum;
DMap dlist; DMap dlist;
int maxItem; int maxItem;
unsigned long packetnum; unsigned long packetnum; /*!< номер очередного посылаемого пакета */
unsigned short lastcrc; unsigned short lastcrc;
UniSetUDP::UDPPacket s_msg; UniSetUDP::UDPPacket s_msg;
size_t maxAData;
size_t maxDData;
std::shared_ptr< ThreadCreator<UNetSender> > s_thr; // send thread std::shared_ptr< ThreadCreator<UNetSender> > s_thr; // send thread
unsigned long ncycle = { 0 }; /*!< номер цикла посылки */
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UNetSender_H_ #endif // UNetSender_H_
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
uniset2-start.sh -f ./uniset2-unetexchange --unet-name UNetExchange \ uniset2-start.sh -f ./uniset2-unetexchange --unet-name UNetExchange \
--confile test.xml --smemory-id SharedMemory \ --confile test.xml --smemory-id SharedMemory \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 \ --unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 --unet-sendpause 1000 \
--dlog-add-levels info,crit,warn --dlog-add-levels info,crit,warn --unet-log-add-levels info,crit,warn
#--unet-nodes-confnode specnet #--unet-nodes-confnode specnet
...@@ -20,6 +20,4 @@ clean-local: ...@@ -20,6 +20,4 @@ clean-local:
rm -rf $(CLEANFILES) rm -rf $(CLEANFILES)
rm -rf $(COVERAGE_REPORT_DIR) rm -rf $(COVERAGE_REPORT_DIR)
include $(top_builddir)/include.mk
endif endif
...@@ -103,7 +103,7 @@ TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]") ...@@ -103,7 +103,7 @@ TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
UniSetUDP::UDPMessage m4; UniSetUDP::UDPMessage m4;
m4.num = 100; m4.num = 100;
// специально сладываем в обратном порядке // специально складываем в обратном порядке
// чтобы проверить "сортировку" // чтобы проверить "сортировку"
q.push(m1); q.push(m1);
q.push(m3); q.push(m3);
......
...@@ -240,8 +240,12 @@ int main(int argc, char* argv[]) ...@@ -240,8 +240,12 @@ int main(int argc, char* argv[])
if( sz == 0 ) if( sz == 0 )
{ {
if( pack.magic != UniSetUDP::UNETUDP_MAGICNUM )
cerr << "(recv): BAD PROTOCOL VERSION! [ need version '" << UniSetUDP::UNETUDP_MAGICNUM << "']" << endl;
else
cerr << "(recv): FAILED header ret=" << ret cerr << "(recv): FAILED header ret=" << ret
<< " sizeof=" << sz << endl; << " sizeof=" << sz << endl;
continue; continue;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment