Commit c25e9694 authored by Pavel Vainerman's avatar Pavel Vainerman

(Unet2): первая версия оптимизированного протокола

parent 313fe2e3
...@@ -8,10 +8,11 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader& p ) ...@@ -8,10 +8,11 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader& p )
return os << "nodeID=" << p.nodeID return os << "nodeID=" << p.nodeID
<< " procID=" << p.procID << " procID=" << p.procID
<< " dcount=" << p.dcount << " dcount=" << p.dcount
<< " acount=" << p.acount
<< " pnum=" << p.num; << " pnum=" << p.num;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPData& p ) std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPAData& p )
{ {
return os << "id=" << p.id << " val=" << p.val; return os << "id=" << p.id << " val=" << p.val;
} }
...@@ -21,36 +22,90 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p ...@@ -21,36 +22,90 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p
return os; return os;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UDPMessage::UDPMessage(): UDPMessage::UDPMessage()
count(0)
{ {
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UDPMessage::addData( const UniSetUDP::UDPData& dat ) size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat )
{ {
if( count >= MaxDataCount ) if( msg.header.acount >= MaxACount )
return false; return MaxACount;
msg.dat[count] = dat; msg.a_dat[msg.header.acount] = dat;
count++; msg.header.acount++;
msg.header.dcount = count; return msg.header.acount-1;
return true;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UDPMessage::addData( long id, long val) size_t UDPMessage::addAData( long id, long val)
{ {
UDPData d(id,val); UDPAData d(id,val);
return addData(d); return addAData(d);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UDPMessage::setData( unsigned int index, long val ) bool UDPMessage::setAData( size_t index, long val )
{ {
if( index < MaxDataCount ) if( index < MaxACount )
{ {
msg.dat[index].val = val; msg.a_dat[index].val = val;
return true; return true;
} }
return false; return false;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val )
{
if( msg.header.dcount >= MaxDCount )
return MaxDCount;
// сохраняем ID
msg.d_id[msg.header.dcount] = id;
bool res = setDData( msg.header.dcount, val );
if( res )
{
msg.header.dcount++;
return msg.header.dcount-1;
}
return MaxDCount;
}
// -----------------------------------------------------------------------------
bool UDPMessage::setDData( size_t index, bool val )
{
if( index >= MaxDCount )
return false;
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
// выставляем бит
unsigned char d = msg.d_dat[nbyte];
if( val )
d |= (1<<nbit);
else
d &= ~(1<<nbit);
msg.d_dat[nbyte] = d;
return true;
}
// -----------------------------------------------------------------------------
long UDPMessage::dID( size_t index )
{
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
return msg.d_id[index];
}
// -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index )
{
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
return ( msg.d_dat[nbyte] & (1<<nbit) );
}
// -----------------------------------------------------------------------------
...@@ -8,55 +8,73 @@ ...@@ -8,55 +8,73 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
namespace UniSetUDP namespace UniSetUDP
{ {
/*! Для оптимизации размера передаваемх данных, но с учёто того, что ID могут идти не подряд.
Сделан следующие формат.
Для аналоговых величин передаётся массив пар "id-value".
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
*/
struct UDPHeader struct UDPHeader
{ {
UDPHeader():num(0),nodeID(0),procID(0),dcount(0){} UDPHeader():num(0),nodeID(0),procID(0),dcount(0),acount(0){}
unsigned long num; unsigned long num;
long nodeID; long nodeID;
long procID; long procID;
size_t dcount; size_t dcount; /*!< количество булевых величин */
size_t acount; /*!< количество аналоговых величин */
friend std::ostream& operator<<( std::ostream& os, UDPHeader& p ); friend std::ostream& operator<<( std::ostream& os, UDPHeader& p );
}__attribute__((packed)); }__attribute__((packed));
static unsigned long MaxPacketNum = std::numeric_limits<unsigned long>::max(); static unsigned long MaxPacketNum = std::numeric_limits<unsigned long>::max();
struct UDPData struct UDPAData
{ {
UDPData():id(UniSetTypes::DefaultObjectId),val(0){} UDPAData():id(UniSetTypes::DefaultObjectId),val(0){}
UDPData(long id, long val):id(id),val(val){} UDPAData(long id, long val):id(id),val(val){}
long id; long id;
long val; long val;
friend std::ostream& operator<<( std::ostream& os, UDPData& p ); friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed)); }__attribute__((packed));
static const int MaxDataLen = 8192; // ~ 1000 параметров static const size_t MaxDCount = 256;
static const int MaxDataCount = ( MaxDataLen - sizeof(UniSetUDP::UDPHeader) ) / sizeof(UDPData); static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
static const size_t MaxACount = 100;
struct DataPacket struct DataPacket
{ {
UDPHeader header; UDPHeader header;
UDPData dat[MaxDataCount]; UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
}__attribute__((packed)); }__attribute__((packed));
static const int MaxDataLen = sizeof(DataPacket);
struct UDPMessage: struct UDPMessage:
public UDPHeader public UDPHeader
{ {
UDPMessage(); UDPMessage();
bool addData( const UDPData& dat ); size_t addDData( long id, bool val );
bool addData( long id, long val ); bool setDData( size_t index, bool val );
bool setData( unsigned int index, long val ); long dID( size_t index );
bool dValue( size_t index );
size_t addAData( const UDPAData& dat );
size_t addAData( long id, long val );
bool setAData( size_t index, long val );
inline bool isFull(){ return count<MaxDataCount; } inline bool isFull(){ return ((dcount<MaxDCount) && (acount<MaxACount)); }
inline int size(){ return count; } inline int dsize(){ return dcount; }
inline int byte_size(){ return count*sizeof(UDPData); } inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
DataPacket msg; DataPacket msg;
int count;
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p ); friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
}; };
......
...@@ -32,8 +32,10 @@ maxDifferens(1000), ...@@ -32,8 +32,10 @@ maxDifferens(1000),
waitClean(false), waitClean(false),
rnum(0), rnum(0),
maxProcessingCount(100), maxProcessingCount(100),
icache(200), d_icache(UniSetUDP::MaxDCount),
cache_init_ok(false) a_icache(UniSetUDP::MaxACount),
d_cache_init_ok(false),
a_cache_init_ok(false)
{ {
{ {
ostringstream s; ostringstream s;
...@@ -200,14 +202,56 @@ void UNetReceiver::real_update() ...@@ -200,14 +202,56 @@ void UNetReceiver::real_update()
// cerr << myname << "(update): " << p.msg.header << endl; // cerr << myname << "(update): " << p.msg.header << endl;
initCache(p, !cache_init_ok); initDCache(p, !d_cache_init_ok);
initACache(p, !a_cache_init_ok);
for( size_t i=0; i<p.msg.header.dcount; i++ ) // Обработка дискретных
size_t nbit = 0;
for( size_t i=0; i<p.msg.header.dcount; i++, nbit++ )
{ {
try try
{ {
UniSetUDP::UDPData& d = p.msg.dat[i];
ItemInfo& ii(icache[i]); long id = p.dID(i);
bool val = p.dValue(i);
ItemInfo& ii(d_icache[i]);
if( ii.id != id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << id << endl;
ii.id = id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,id,val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
// Обрабока аналоговых
for( size_t i=0; i<p.msg.header.acount; i++ )
{
try
{
UniSetUDP::UDPAData& d = p.msg.a_dat[i];
ItemInfo& ii(a_icache[i]);
if( ii.id != d.id ) if( ii.id != d.id )
{ {
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl; dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
...@@ -290,7 +334,8 @@ bool UNetReceiver::recv() ...@@ -290,7 +334,8 @@ bool UNetReceiver::recv()
return false; return false;
} }
size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader); // size_t sz = pack.msg.header.acount * sizeof(UniSetUDP::UDPAData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz ) if( ret < sz )
{ {
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
...@@ -369,28 +414,56 @@ bool UNetReceiver::recv() ...@@ -369,28 +414,56 @@ bool UNetReceiver::recv()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initIterators() void UNetReceiver::initIterators()
{ {
for( ItemVec::iterator it=icache.begin(); it!=icache.end(); ++it ) for( ItemVec::iterator it=d_icache.begin(); it!=d_icache.end(); ++it )
{ {
shm->initAIterator(it->ait); shm->initAIterator(it->ait);
shm->initDIterator(it->dit); shm->initDIterator(it->dit);
} }
for( ItemVec::iterator it=a_icache.begin(); it!=a_icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == d_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
d_cache_init_ok = true;
d_icache.resize(pack.msg.header.dcount);
for( size_t i=0; i<d_icache.size(); i++ )
{
ItemInfo& d(d_icache[i]);
if( d.id != pack.msg.d_id[i] )
{
d.id = pack.msg.d_id[i];
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force ) void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{ {
if( !force && pack.msg.header.dcount == icache.size() ) if( !force && pack.msg.header.acount == a_icache.size() )
return; return;
dlog[Debug::INFO] << myname << ": init icache.." << endl; dlog[Debug::INFO] << myname << ": init icache.." << endl;
cache_init_ok = true; a_cache_init_ok = true;
icache.resize(pack.msg.header.dcount); a_icache.resize(pack.msg.header.acount);
for( size_t i=0; i<icache.size(); i++ ) for( size_t i=0; i<a_icache.size(); i++ )
{ {
ItemInfo& d(icache[i]); ItemInfo& d(a_icache[i]);
if( d.id != pack.msg.dat[i].id ) if( d.id != pack.msg.a_dat[i].id )
{ {
d.id = pack.msg.dat[i].id; d.id = pack.msg.a_dat[i].id;
d.iotype = conf->getIOType(d.id); d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait); shm->initAIterator(d.ait);
shm->initDIterator(d.dit); shm->initDIterator(d.dit);
......
...@@ -142,10 +142,14 @@ class UNetReceiver ...@@ -142,10 +142,14 @@ class UNetReceiver
}; };
typedef std::vector<ItemInfo> ItemVec; typedef std::vector<ItemInfo> ItemVec;
ItemVec icache; /*!< кэш итераторов */ ItemVec d_icache; /*!< кэш итераторов для булевых */
bool cache_init_ok; ItemVec a_icache; /*!< кэш итераторов для аналоговых */
void initCache( UniSetUDP::UDPMessage& pack, bool force=false );
bool d_cache_init_ok;
bool a_cache_init_ok;
void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UNetReceiver_H_ #endif // UNetReceiver_H_
......
...@@ -122,8 +122,16 @@ void UNetSender::updateItem( DMap::iterator& it, long value ) ...@@ -122,8 +122,16 @@ void UNetSender::updateItem( DMap::iterator& it, long value )
if( it == dlist.end() ) if( it == dlist.end() )
return; return;
UniSetTypes::uniset_mutex_lock l(pack_mutex,100); if( it->iotype == UniversalIO::DigitalInput || it->iotype == UniversalIO::DigitalOutput )
mypack.setData(it->pack_ind,value); {
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setDData(it->pack_ind,value);
}
else if( it->iotype == UniversalIO::AnalogInput || it->iotype == UniversalIO::AnalogOutput )
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setAData(it->pack_ind,value);
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::send() void UNetSender::send()
...@@ -185,7 +193,8 @@ void UNetSender::real_send() ...@@ -185,7 +193,8 @@ void UNetSender::real_send()
packetnum = 1; packetnum = 1;
// cout << "************* send header: " << mypack.msg.header << endl; // cout << "************* send header: " << mypack.msg.header << endl;
size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader); // size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( !udp->isPending(ost::Socket::pendingOutput) ) if( !udp->isPending(ost::Socket::pendingOutput) )
return; return;
...@@ -281,17 +290,39 @@ bool UNetSender::initItem( UniXML_iterator& it ) ...@@ -281,17 +290,39 @@ bool UNetSender::initItem( UniXML_iterator& it )
} }
UItem p; UItem p;
p.id = sid;
mypack.addData(sid,0);
p.pack_ind = mypack.size()-1;
p.iotype = UniSetTypes::getIOType(it.getProp("iotype")); p.iotype = UniSetTypes::getIOType(it.getProp("iotype"));
if( p.iotype == UniversalIO::UnknownIOType ) if( p.iotype == UniversalIO::UnknownIOType )
{ {
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << sid << endl; dlog[Debug::CRIT] << myname << "(readItem): Unknown iotype for sid=" << sid << endl;
return false; return false;
} }
p.id = sid;
if( p.iotype == UniversalIO::DigitalInput || p.iotype == UniversalIO::DigitalOutput )
{
p.pack_ind = mypack.addDData(sid,0);
if ( p.pack_ind >= UniSetUDP::MaxDCount )
{
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP DIGITAL DATA LIMIT! max="
<< UniSetUDP::MaxDCount << endl;
return false;
}
}
else if( p.iotype == UniversalIO::AnalogInput || p.iotype == UniversalIO::AnalogOutput )
{
p.pack_ind = mypack.addAData(sid,0);
if ( p.pack_ind >= UniSetUDP::MaxACount )
{
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max="
<< UniSetUDP::MaxACount << endl;
return false;
}
}
if( maxItem >= dlist.size() ) if( maxItem >= dlist.size() )
dlist.resize(maxItem+10); dlist.resize(maxItem+10);
......
...@@ -186,7 +186,7 @@ int main(int argc, char* argv[]) ...@@ -186,7 +186,7 @@ int main(int argc, char* argv[])
continue; continue;
} }
size_t ret = udp.UDPReceive::receive(&(pack.msg),sizeof(pack.msg)); size_t ret = udp.UDPReceive::receive( &pack, sizeof(pack) );
if( ret < sizeof(UniSetUDP::UDPHeader) ) if( ret < sizeof(UniSetUDP::UDPHeader) )
{ {
cerr << "(recv): FAILED header ret=" << ret cerr << "(recv): FAILED header ret=" << ret
...@@ -194,7 +194,8 @@ int main(int argc, char* argv[]) ...@@ -194,7 +194,8 @@ int main(int argc, char* argv[])
continue; continue;
} }
size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader); //size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz ) if( ret < sz )
{ {
cerr << "(recv): FAILED data ret=" << ret cerr << "(recv): FAILED data ret=" << ret
...@@ -239,13 +240,17 @@ int main(int argc, char* argv[]) ...@@ -239,13 +240,17 @@ int main(int argc, char* argv[])
mypack.msg.header.nodeID = nodeID; mypack.msg.header.nodeID = nodeID;
mypack.msg.header.procID = procID; mypack.msg.header.procID = procID;
for( int i=0; i<count; i++ ) for( size_t i=0; i < count && i < UniSetUDP::MaxACount; i++ )
{ {
UDPData d(i,i); UDPAData d(i,i);
mypack.addData(d); mypack.addAData(d);
} }
size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader); for( int i=0; i<count; i++ )
mypack.addDData(i,i);
//size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
udp->setPeer(host,port); udp->setPeer(host,port);
unsigned long packetnum = 0; unsigned long packetnum = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment