Commit cb73b88e authored by Pavel Vainerman's avatar Pavel Vainerman

(unet2): откатил реализацию до первой версии (без "сильной" оптимизации).

parent 04f783e4
......@@ -33,7 +33,7 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p
os << "ANALOG:" << endl;
for( size_t i=0; i<p.acount; i++ )
os << "[" << i << "]={" << p.a_dat[i].id << "," << p.a_dat[i].val << "}" << endl;
os << "[" << i << "]={" << p.msg.a_dat[i].id << "," << p.msg.a_dat[i].val << "}" << endl;
return os;
}
......@@ -44,12 +44,12 @@ UDPMessage::UDPMessage()
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat )
{
if( acount >= MaxACount )
if( msg.header.acount >= MaxACount )
return MaxACount;
a_dat[acount] = dat;
acount++;
return acount-1;
msg.a_dat[msg.header.acount] = dat;
msg.header.acount++;
return msg.header.acount-1;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( long id, long val)
......@@ -62,7 +62,7 @@ bool UDPMessage::setAData( size_t index, long val )
{
if( index < MaxACount )
{
a_dat[index].val = val;
msg.a_dat[index].val = val;
return true;
}
......@@ -71,17 +71,17 @@ bool UDPMessage::setAData( size_t index, long val )
// -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val )
{
if( dcount >= MaxDCount )
if( msg.header.dcount >= MaxDCount )
return MaxDCount;
// сохраняем ID
d_id[dcount] = id;
msg.d_id[msg.header.dcount] = id;
bool res = setDData( dcount, val );
bool res = setDData( msg.header.dcount, val );
if( res )
{
dcount++;
return dcount-1;
msg.header.dcount++;
return msg.header.dcount-1;
}
return MaxDCount;
......@@ -96,13 +96,13 @@ bool UDPMessage::setDData( size_t index, bool val )
size_t nbit = index % sizeof(unsigned char);
// выставляем бит
unsigned char d = d_dat[nbyte];
unsigned char d = msg.d_dat[nbyte];
if( val )
d |= (1<<nbit);
else
d &= ~(1<<nbit);
d_dat[nbyte] = d;
msg.d_dat[nbyte] = d;
return true;
}
// -----------------------------------------------------------------------------
......@@ -111,7 +111,7 @@ long UDPMessage::dID( size_t index )
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
return d_id[index];
return msg.d_id[index];
}
// -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index )
......@@ -122,76 +122,7 @@ bool UDPMessage::dValue( size_t index )
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
return ( d_dat[nbyte] & (1<<nbit) );
return ( msg.d_dat[nbyte] & (1<<nbit) );
}
// -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p )
{
memset(&p,0,sizeof(UDPPacket));
size_t i = 0;
memcpy(&(p.data[i]),this,sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = acount*sizeof(UDPAData);
memcpy(&(p.data[i]),a_dat,sz);
i += sz;
// копируем булевые индексы
sz = dcount*sizeof(long);
memcpy(&(p.data[i]),d_id,sz);
i += sz;
// копируем булевые данные
size_t nbyte = dcount / sizeof(unsigned char);
size_t nbit = dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
memcpy(&(p.data[i]),d_dat,sz);
i += sz;
p.len = i;
return i;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage( UDPPacket& p )
{
getMessage(*this,p);
}
// -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p )
{
memset(&m,0,sizeof(m));
size_t i = 0;
memcpy(&m,&(p.data[i]),sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = m.acount*sizeof(UDPAData);
if( sz > sizeof(m.a_dat) )
sz = sizeof(m.a_dat);
memcpy(m.a_dat,&(p.data[i]),sz);
i += sz;
// копируем булевые индексы
sz = m.dcount*sizeof(long);
if( sz > sizeof(m.d_id) )
sz = sizeof(m.d_id);
memcpy(m.d_id,&(p.data[i]),sz);
i += sz;
// копируем булевые данные
size_t nbyte = m.dcount / sizeof(unsigned char);
size_t nbit = m.dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
if( sz > sizeof(m.d_dat) )
sz = sizeof(m.d_dat);
memcpy(m.d_dat,&(p.data[i]),sz);
return i+sz;
}
// -----------------------------------------------------------------------------
......@@ -41,29 +41,26 @@ namespace UniSetUDP
friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed));
static const size_t MaxACount = 200;
static const size_t MaxDCount = 400;
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
static const size_t MaxACount = 200;
struct UDPPacket
{
UDPPacket():len(0){}
struct DataPacket
{
UDPHeader header;
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
int len;
unsigned char data[ sizeof(UDPHeader) + MaxDCount*sizeof(long) + MaxDDataCount + MaxACount*sizeof(UDPAData) ];
}__attribute__((packed));
}__attribute__((packed));
static const int MaxDataLen = sizeof(UDPPacket);
static const int MaxDataLen = sizeof(DataPacket);
struct UDPMessage:
public UDPHeader
{
UDPMessage();
UDPMessage( UDPPacket& p );
size_t transport_msg( UDPPacket& p );
static size_t getMessage( UDPMessage& m, UDPPacket& p );
size_t addDData( long id, bool val );
bool setDData( size_t index, bool val );
long dID( size_t index );
......@@ -78,12 +75,7 @@ namespace UniSetUDP
inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
// количество байт в пакете с булевыми переменными...
int d_byte(){ return dcount*sizeof(long) + dcount; }
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
DataPacket msg;
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
};
......
......@@ -10,10 +10,10 @@ using namespace UniSetExtensions;
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.num == rhs.num )
// return (lhs < rhs);
// if( lhs.msg.header.num == rhs.msg.header.num )
// return (lhs.msg < rhs.msg);
return lhs.num > rhs.num;
return lhs.msg.header.num > rhs.msg.header.num;
}
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
......@@ -161,7 +161,7 @@ void UNetReceiver::real_update()
return;
p = qpack.top();
unsigned long sub = labs(p.num - pnum);
unsigned long sub = labs(p.msg.header.num - pnum);
if( pnum > 0 )
{
// если sub > maxDifferens
......@@ -179,7 +179,7 @@ void UNetReceiver::real_update()
lostPackets++;
}
else if( p.num == pnum )
else if( p.msg.header.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
......@@ -195,7 +195,7 @@ void UNetReceiver::real_update()
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.num;
pnum = p.msg.header.num;
} // unlock qpack
k--;
......@@ -207,7 +207,7 @@ void UNetReceiver::real_update()
// Обработка дискретных
size_t nbit = 0;
for( size_t i=0; i<p.dcount; i++, nbit++ )
for( size_t i=0; i<p.msg.header.dcount; i++, nbit++ )
{
try
{
......@@ -246,11 +246,11 @@ void UNetReceiver::real_update()
}
// Обрабока аналоговых
for( size_t i=0; i<p.acount; i++ )
for( size_t i=0; i<p.msg.header.acount; i++ )
{
try
{
UniSetUDP::UDPAData& d = p.a_dat[i];
UniSetUDP::UDPAData& d = p.msg.a_dat[i];
ItemInfo& ii(a_icache[i]);
if( ii.id != d.id )
{
......@@ -327,16 +327,24 @@ bool UNetReceiver::recv()
if( !udp->isInputReady(recvTimeout) )
return false;
size_t ret = udp->UDPReceive::receive((char*)(r_buf.data),sizeof(r_buf.data));
size_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
return false;
}
size_t sz = UniSetUDP::UDPMessage::getMessage(pack,r_buf);
if( sz == 0 )
// size_t sz = pack.msg.header.acount * sizeof(UniSetUDP::UDPAData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
<< " packnum=" << pack.msg.header.num << endl;
return false;
}
if( rnum>0 && labs(pack.num - rnum) > maxDifferens )
if( rnum>0 && labs(pack.msg.header.num - rnum) > maxDifferens )
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
......@@ -355,7 +363,7 @@ bool UNetReceiver::recv()
waitClean = true;
}
rnum = pack.num;
rnum = pack.msg.header.num;
#if 0
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
......@@ -420,20 +428,20 @@ void UNetReceiver::initIterators()
// -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.dcount == d_icache.size() )
if( !force && pack.msg.header.dcount == d_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
d_cache_init_ok = true;
d_icache.resize(pack.dcount);
d_icache.resize(pack.msg.header.dcount);
for( size_t i=0; i<d_icache.size(); i++ )
{
ItemInfo& d(d_icache[i]);
if( d.id != pack.d_id[i] )
if( d.id != pack.msg.d_id[i] )
{
d.id = pack.d_id[i];
d.id = pack.msg.d_id[i];
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
......@@ -443,19 +451,19 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
// -----------------------------------------------------------------------------
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.acount == a_icache.size() )
if( !force && pack.msg.header.acount == a_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
a_cache_init_ok = true;
a_icache.resize(pack.acount);
a_icache.resize(pack.msg.header.acount);
for( size_t i=0; i<a_icache.size(); i++ )
{
ItemInfo& d(a_icache[i]);
if( d.id != pack.a_dat[i].id )
if( d.id != pack.msg.a_dat[i].id )
{
d.id = pack.a_dat[i].id;
d.id = pack.msg.a_dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
......
......@@ -114,8 +114,7 @@ class UNetReceiver
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */
UniSetUDP::UDPPacket r_buf;
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очереlного сообщения */
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
......
......@@ -71,8 +71,8 @@ s_thr(0)
// выставляем поля, которые не меняются
mypack.nodeID = conf->getLocalNode();
mypack.procID = shm->ID();
mypack.msg.header.nodeID = conf->getLocalNode();
mypack.msg.header.procID = shm->ID();
}
// -----------------------------------------------------------------------------
UNetSender::~UNetSender()
......@@ -187,19 +187,20 @@ void UNetSender::send()
void UNetSender::real_send()
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,300);
mypack.num = packetnum++;
mypack.msg.header.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
// cout << "************* send header: " << mypack.msg.header << endl;
// size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( !udp->isPending(ost::Socket::pendingOutput) )
return;
mypack.transport_msg(s_msg);
size_t ret = udp->send( (char*)s_msg.data,s_msg.len );
if( ret < s_msg.len )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
size_t ret = udp->send( (char*)&(mypack.msg),sz);
if( ret < sz )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
}
// -----------------------------------------------------------------------------
void UNetSender::stop()
......
......@@ -95,7 +95,6 @@ class UNetSender
DMap dlist;
int maxItem;
unsigned long packetnum;
UniSetUDP::UDPPacket s_msg;
ThreadCreator<UNetSender>* s_thr; // send thread
};
......
......@@ -188,7 +188,6 @@ int main(int argc, char* argv[])
// char buf[UniSetUDP::MaxDataLen];
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
unsigned long prev_num=1;
int nc = 1;
......@@ -205,27 +204,35 @@ int main(int argc, char* argv[])
continue;
}
size_t ret = udp.UDPReceive::receive( &(buf.data), sizeof(buf.data) );
size_t sz = UniSetUDP::UDPMessage::getMessage(pack,buf);
if( sz == 0 )
size_t ret = udp.UDPReceive::receive( &pack, sizeof(pack) );
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
cerr << "(recv): FAILED header ret=" << ret
<< " sizeof=" << sz<< endl;
<< " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
continue;
}
//size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz )
{
cerr << "(recv): FAILED data ret=" << ret
<< " sizeof=" << sz << " packnum=" << pack.msg.header.num << endl;
continue;
}
if( lost )
{
if( prev_num != (pack.num-1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.num
if( prev_num != (pack.msg.header.num-1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.msg.header.num
<< " prev=" << prev_num << endl;
prev_num = pack.num;
prev_num = pack.msg.header.num;
}
// if( verb )
// cout << "receive OK. header: " << pack.msg.header
// << " bytes: " << ret << endl;
if( verb )
cout << "receive OK. header: " << pack.msg.header
<< " bytes: " << ret << endl;
if( show )
cout << "receive data: " << pack << endl;
......@@ -258,10 +265,10 @@ int main(int argc, char* argv[])
udp = new ost::UDPBroadcast(host,port);
UniSetUDP::UDPMessage mypack;
mypack.nodeID = nodeID;
mypack.procID = procID;
mypack.msg.header.nodeID = nodeID;
mypack.msg.header.procID = procID;
for( size_t i=0; i < count; i++ )
for( size_t i=0; i < count && i < UniSetUDP::MaxACount; i++ )
{
UDPAData d(i,i);
mypack.addAData(d);
......@@ -270,18 +277,19 @@ int main(int argc, char* argv[])
for( unsigned int i=0; i < count; i++ )
mypack.addDData(i,i);
//size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
udp->setPeer(host,port);
unsigned long packetnum = 0;
UniSetUDP::UDPPacket s_buf;
int nc = 1;
if( ncycles > 0 )
nc = ncycles;
while( nc )
{
mypack.num = packetnum++;
mypack.msg.header.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
......@@ -289,15 +297,13 @@ int main(int argc, char* argv[])
{
if( udp->isPending(ost::Socket::pendingOutput,tout) )
{
mypack.transport_msg(s_buf);
if( verb )
cout << "(send): to addr=" << addr << " d_count=" << mypack.dcount
<< " a_count=" << mypack.acount << " bytes=" << s_buf.len << endl;
cout << "(send): to addr=" << addr << " count=" << count << " bytes=" << sz << endl;
size_t ret = udp->send((char*)&(mypack.msg), sz);
size_t ret = udp->send((char*)&s_buf.data, s_buf.len);
if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
if( ret < sz )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
}
}
catch( ost::SockException& e )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment