Commit e21122bd authored by Pavel Vainerman's avatar Pavel Vainerman

(Unet2): первая версия оптимизированного протокола

parent 11272d59
......@@ -8,10 +8,11 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader& p )
return os << "nodeID=" << p.nodeID
<< " procID=" << p.procID
<< " dcount=" << p.dcount
<< " acount=" << p.acount
<< " pnum=" << p.num;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPData& p )
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPAData& p )
{
return os << "id=" << p.id << " val=" << p.val;
}
......@@ -21,36 +22,90 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p
return os;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage():
count(0)
UDPMessage::UDPMessage()
{
}
// -----------------------------------------------------------------------------
bool UDPMessage::addData( const UniSetUDP::UDPData& dat )
size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat )
{
if( count >= MaxDataCount )
return false;
if( msg.header.acount >= MaxACount )
return MaxACount;
msg.dat[count] = dat;
count++;
msg.header.dcount = count;
return true;
msg.a_dat[msg.header.acount] = dat;
msg.header.acount++;
return msg.header.acount-1;
}
// -----------------------------------------------------------------------------
bool UDPMessage::addData( long id, long val)
size_t UDPMessage::addAData( long id, long val)
{
UDPData d(id,val);
return addData(d);
UDPAData d(id,val);
return addAData(d);
}
// -----------------------------------------------------------------------------
bool UDPMessage::setData( unsigned int index, long val )
bool UDPMessage::setAData( size_t index, long val )
{
if( index < MaxDataCount )
if( index < MaxACount )
{
msg.dat[index].val = val;
msg.a_dat[index].val = val;
return true;
}
return false;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val )
{
if( msg.header.dcount >= MaxDCount )
return MaxDCount;
// сохраняем ID
msg.d_id[msg.header.dcount] = id;
bool res = setDData( msg.header.dcount, val );
if( res )
{
msg.header.dcount++;
return msg.header.dcount-1;
}
return MaxDCount;
}
// -----------------------------------------------------------------------------
bool UDPMessage::setDData( size_t index, bool val )
{
if( index >= MaxDCount )
return false;
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
// выставляем бит
unsigned char d = msg.d_dat[nbyte];
if( val )
d |= (1<<nbit);
else
d &= ~(1<<nbit);
msg.d_dat[nbyte] = d;
return true;
}
// -----------------------------------------------------------------------------
long UDPMessage::dID( size_t index )
{
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
return msg.d_id[index];
}
// -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index )
{
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
return ( msg.d_dat[nbyte] & (1<<nbit) );
}
// -----------------------------------------------------------------------------
......@@ -8,55 +8,73 @@
// -----------------------------------------------------------------------------
namespace UniSetUDP
{
/*! Для оптимизации размера передаваемх данных, но с учёто того, что ID могут идти не подряд.
Сделан следующие формат.
Для аналоговых величин передаётся массив пар "id-value".
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
*/
struct UDPHeader
{
UDPHeader():num(0),nodeID(0),procID(0),dcount(0){}
UDPHeader():num(0),nodeID(0),procID(0),dcount(0),acount(0){}
unsigned long num;
long nodeID;
long procID;
size_t dcount;
size_t dcount; /*!< количество булевых величин */
size_t acount; /*!< количество аналоговых величин */
friend std::ostream& operator<<( std::ostream& os, UDPHeader& p );
}__attribute__((packed));
static unsigned long MaxPacketNum = std::numeric_limits<unsigned long>::max();
struct UDPData
struct UDPAData
{
UDPData():id(UniSetTypes::DefaultObjectId),val(0){}
UDPData(long id, long val):id(id),val(val){}
UDPAData():id(UniSetTypes::DefaultObjectId),val(0){}
UDPAData(long id, long val):id(id),val(val){}
long id;
long val;
friend std::ostream& operator<<( std::ostream& os, UDPData& p );
friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed));
static const int MaxDataLen = 8192; // ~ 1000 параметров
static const int MaxDataCount = ( MaxDataLen - sizeof(UniSetUDP::UDPHeader) ) / sizeof(UDPData);
static const size_t MaxDCount = 256;
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
static const size_t MaxACount = 100;
struct DataPacket
{
UDPHeader header;
UDPData dat[MaxDataCount];
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
}__attribute__((packed));
static const int MaxDataLen = sizeof(DataPacket);
struct UDPMessage:
public UDPHeader
{
UDPMessage();
bool addData( const UDPData& dat );
bool addData( long id, long val );
bool setData( unsigned int index, long val );
size_t addDData( long id, bool val );
bool setDData( size_t index, bool val );
long dID( size_t index );
bool dValue( size_t index );
size_t addAData( const UDPAData& dat );
size_t addAData( long id, long val );
bool setAData( size_t index, long val );
inline bool isFull(){ return count<MaxDataCount; }
inline int size(){ return count; }
inline int byte_size(){ return count*sizeof(UDPData); }
inline bool isFull(){ return ((dcount<MaxDCount) && (acount<MaxACount)); }
inline int dsize(){ return dcount; }
inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
DataPacket msg;
int count;
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
};
......
......@@ -32,8 +32,10 @@ maxDifferens(1000),
waitClean(false),
rnum(0),
maxProcessingCount(100),
icache(200),
cache_init_ok(false)
d_icache(UniSetUDP::MaxDCount),
a_icache(UniSetUDP::MaxACount),
d_cache_init_ok(false),
a_cache_init_ok(false)
{
{
ostringstream s;
......@@ -200,14 +202,56 @@ void UNetReceiver::real_update()
// cerr << myname << "(update): " << p.msg.header << endl;
initCache(p, !cache_init_ok);
initDCache(p, !d_cache_init_ok);
initACache(p, !a_cache_init_ok);
for( size_t i=0; i<p.msg.header.dcount; i++ )
// Обработка дискретных
size_t nbit = 0;
for( size_t i=0; i<p.msg.header.dcount; i++, nbit++ )
{
try
{
UniSetUDP::UDPData& d = p.msg.dat[i];
ItemInfo& ii(icache[i]);
long id = p.dID(i);
bool val = p.dValue(i);
ItemInfo& ii(d_icache[i]);
if( ii.id != id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << id << endl;
ii.id = id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,id,val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
// Обрабока аналоговых
for( size_t i=0; i<p.msg.header.acount; i++ )
{
try
{
UniSetUDP::UDPAData& d = p.msg.a_dat[i];
ItemInfo& ii(a_icache[i]);
if( ii.id != d.id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
......@@ -290,7 +334,8 @@ bool UNetReceiver::recv()
return false;
}
size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
// size_t sz = pack.msg.header.acount * sizeof(UniSetUDP::UDPAData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
......@@ -369,28 +414,56 @@ bool UNetReceiver::recv()
// -----------------------------------------------------------------------------
void UNetReceiver::initIterators()
{
for( ItemVec::iterator it=icache.begin(); it!=icache.end(); ++it )
for( ItemVec::iterator it=d_icache.begin(); it!=d_icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
for( ItemVec::iterator it=a_icache.begin(); it!=a_icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == d_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
d_cache_init_ok = true;
d_icache.resize(pack.msg.header.dcount);
for( size_t i=0; i<d_icache.size(); i++ )
{
ItemInfo& d(d_icache[i]);
if( d.id != pack.msg.d_id[i] )
{
d.id = pack.msg.d_id[i];
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == icache.size() )
if( !force && pack.msg.header.acount == a_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
cache_init_ok = true;
a_cache_init_ok = true;
icache.resize(pack.msg.header.dcount);
for( size_t i=0; i<icache.size(); i++ )
a_icache.resize(pack.msg.header.acount);
for( size_t i=0; i<a_icache.size(); i++ )
{
ItemInfo& d(icache[i]);
if( d.id != pack.msg.dat[i].id )
ItemInfo& d(a_icache[i]);
if( d.id != pack.msg.a_dat[i].id )
{
d.id = pack.msg.dat[i].id;
d.id = pack.msg.a_dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
......
......@@ -142,10 +142,14 @@ class UNetReceiver
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec icache; /*!< кэш итераторов */
bool cache_init_ok;
void initCache( UniSetUDP::UDPMessage& pack, bool force=false );
ItemVec d_icache; /*!< кэш итераторов для булевых */
ItemVec a_icache; /*!< кэш итераторов для аналоговых */
bool d_cache_init_ok;
bool a_cache_init_ok;
void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
};
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
......
......@@ -122,8 +122,16 @@ void UNetSender::updateItem( DMap::iterator& it, long value )
if( it == dlist.end() )
return;
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setData(it->pack_ind,value);
if( it->iotype == UniversalIO::DigitalInput || it->iotype == UniversalIO::DigitalOutput )
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setDData(it->pack_ind,value);
}
else if( it->iotype == UniversalIO::AnalogInput || it->iotype == UniversalIO::AnalogOutput )
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setAData(it->pack_ind,value);
}
}
// -----------------------------------------------------------------------------
void UNetSender::send()
......@@ -185,7 +193,8 @@ void UNetSender::real_send()
packetnum = 1;
// cout << "************* send header: " << mypack.msg.header << endl;
size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
// size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( !udp->isPending(ost::Socket::pendingOutput) )
return;
......@@ -281,17 +290,39 @@ bool UNetSender::initItem( UniXML_iterator& it )
}
UItem p;
p.id = sid;
mypack.addData(sid,0);
p.pack_ind = mypack.size()-1;
p.iotype = UniSetTypes::getIOType(it.getProp("iotype"));
if( p.iotype == UniversalIO::UnknownIOType )
{
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << sid << endl;
dlog[Debug::CRIT] << myname << "(readItem): Unknown iotype for sid=" << sid << endl;
return false;
}
p.id = sid;
if( p.iotype == UniversalIO::DigitalInput || p.iotype == UniversalIO::DigitalOutput )
{
p.pack_ind = mypack.addDData(sid,0);
if ( p.pack_ind >= UniSetUDP::MaxDCount )
{
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP DIGITAL DATA LIMIT! max="
<< UniSetUDP::MaxDCount << endl;
return false;
}
}
else if( p.iotype == UniversalIO::AnalogInput || p.iotype == UniversalIO::AnalogOutput )
{
p.pack_ind = mypack.addAData(sid,0);
if ( p.pack_ind >= UniSetUDP::MaxACount )
{
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max="
<< UniSetUDP::MaxACount << endl;
return false;
}
}
if( maxItem >= dlist.size() )
dlist.resize(maxItem+10);
......
......@@ -186,7 +186,7 @@ int main(int argc, char* argv[])
continue;
}
size_t ret = udp.UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
size_t ret = udp.UDPReceive::receive( &pack, sizeof(pack) );
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
cerr << "(recv): FAILED header ret=" << ret
......@@ -194,7 +194,8 @@ int main(int argc, char* argv[])
continue;
}
size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
//size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz )
{
cerr << "(recv): FAILED data ret=" << ret
......@@ -239,13 +240,17 @@ int main(int argc, char* argv[])
mypack.msg.header.nodeID = nodeID;
mypack.msg.header.procID = procID;
for( int i=0; i<count; i++ )
for( size_t i=0; i < count && i < UniSetUDP::MaxACount; i++ )
{
UDPData d(i,i);
mypack.addData(d);
UDPAData d(i,i);
mypack.addAData(d);
}
size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
for( int i=0; i<count; i++ )
mypack.addDData(i,i);
//size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
udp->setPeer(host,port);
unsigned long packetnum = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment