Commit 845f0f2a authored by Pavel Vainerman's avatar Pavel Vainerman

(unet2): вернул оптимизированную реализацию

parent 7d536878
...@@ -28,12 +28,12 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p ...@@ -28,12 +28,12 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p
os << (UDPHeader*)(&p) << endl; os << (UDPHeader*)(&p) << endl;
os << "DIGITAL:" << endl; os << "DIGITAL:" << endl;
for( size_t i=0; i<p.msg.header.dcount; i++ ) for( size_t i=0; i<p.dcount; i++ )
os << "[" << i << "]={" << p.dID(i) << "," << p.dValue(i) << "}" << endl; os << "[" << i << "]={" << p.dID(i) << "," << p.dValue(i) << "}" << endl;
os << "ANALOG:" << endl; os << "ANALOG:" << endl;
for( size_t i=0; i<p.msg.header.acount; i++ ) for( size_t i=0; i<p.acount; i++ )
os << "[" << i << "]={" << p.msg.a_dat[i].id << "," << p.msg.a_dat[i].val << "}" << endl; os << "[" << i << "]={" << p.a_dat[i].id << "," << p.a_dat[i].val << "}" << endl;
return os; return os;
} }
...@@ -44,12 +44,12 @@ UDPMessage::UDPMessage() ...@@ -44,12 +44,12 @@ UDPMessage::UDPMessage()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat ) size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat )
{ {
if( msg.header.acount >= MaxACount ) if( acount >= MaxACount )
return MaxACount; return MaxACount;
msg.a_dat[msg.header.acount] = dat; a_dat[acount] = dat;
msg.header.acount++; acount++;
return msg.header.acount-1; return acount-1;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::addAData( long id, long val) size_t UDPMessage::addAData( long id, long val)
...@@ -62,7 +62,7 @@ bool UDPMessage::setAData( size_t index, long val ) ...@@ -62,7 +62,7 @@ bool UDPMessage::setAData( size_t index, long val )
{ {
if( index < MaxACount ) if( index < MaxACount )
{ {
msg.a_dat[index].val = val; a_dat[index].val = val;
return true; return true;
} }
...@@ -71,17 +71,17 @@ bool UDPMessage::setAData( size_t index, long val ) ...@@ -71,17 +71,17 @@ bool UDPMessage::setAData( size_t index, long val )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val ) size_t UDPMessage::addDData( long id, bool val )
{ {
if( msg.header.dcount >= MaxDCount ) if( dcount >= MaxDCount )
return MaxDCount; return MaxDCount;
// сохраняем ID // сохраняем ID
msg.d_id[msg.header.dcount] = id; d_id[dcount] = id;
bool res = setDData( msg.header.dcount, val ); bool res = setDData( dcount, val );
if( res ) if( res )
{ {
msg.header.dcount++; dcount++;
return msg.header.dcount-1; return dcount-1;
} }
return MaxDCount; return MaxDCount;
...@@ -96,13 +96,13 @@ bool UDPMessage::setDData( size_t index, bool val ) ...@@ -96,13 +96,13 @@ bool UDPMessage::setDData( size_t index, bool val )
size_t nbit = index % sizeof(unsigned char); size_t nbit = index % sizeof(unsigned char);
// выставляем бит // выставляем бит
unsigned char d = msg.d_dat[nbyte]; unsigned char d = d_dat[nbyte];
if( val ) if( val )
d |= (1<<nbit); d |= (1<<nbit);
else else
d &= ~(1<<nbit); d &= ~(1<<nbit);
msg.d_dat[nbyte] = d; d_dat[nbyte] = d;
return true; return true;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -111,7 +111,7 @@ long UDPMessage::dID( size_t index ) ...@@ -111,7 +111,7 @@ long UDPMessage::dID( size_t index )
if( index >= MaxDCount ) if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId; return UniSetTypes::DefaultObjectId;
return msg.d_id[index]; return d_id[index];
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index ) bool UDPMessage::dValue( size_t index )
...@@ -122,7 +122,76 @@ bool UDPMessage::dValue( size_t index ) ...@@ -122,7 +122,76 @@ bool UDPMessage::dValue( size_t index )
size_t nbyte = index / sizeof(unsigned char); size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char); size_t nbit = index % sizeof(unsigned char);
return ( msg.d_dat[nbyte] & (1<<nbit) ); return ( d_dat[nbyte] & (1<<nbit) );
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p )
{
memset(&p,0,sizeof(UDPPacket));
size_t i = 0;
memcpy(&(p.data[i]),this,sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = acount*sizeof(UDPAData);
memcpy(&(p.data[i]),a_dat,sz);
i += sz;
// копируем булевые индексы
sz = dcount*sizeof(long);
memcpy(&(p.data[i]),d_id,sz);
i += sz;
// копируем булевые данные
size_t nbyte = dcount / sizeof(unsigned char);
size_t nbit = dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
memcpy(&(p.data[i]),d_dat,sz);
i += sz;
p.len = i;
return i;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage( UDPPacket& p )
{
getMessage(*this,p);
}
// -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p )
{
memset(&m,0,sizeof(m));
size_t i = 0;
memcpy(&m,&(p.data[i]),sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = m.acount*sizeof(UDPAData);
if( sz > sizeof(m.a_dat) )
sz = sizeof(m.a_dat);
memcpy(m.a_dat,&(p.data[i]),sz);
i += sz;
// копируем булевые индексы
sz = m.dcount*sizeof(long);
if( sz > sizeof(m.d_id) )
sz = sizeof(m.d_id);
memcpy(m.d_id,&(p.data[i]),sz);
i += sz;
// копируем булевые данные
size_t nbyte = m.dcount / sizeof(unsigned char);
size_t nbit = m.dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
if( sz > sizeof(m.d_dat) )
sz = sizeof(m.d_dat);
memcpy(m.d_dat,&(p.data[i]),sz);
return i+sz;
}
// -----------------------------------------------------------------------------
...@@ -41,25 +41,29 @@ namespace UniSetUDP ...@@ -41,25 +41,29 @@ namespace UniSetUDP
friend std::ostream& operator<<( std::ostream& os, UDPAData& p ); friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed)); }__attribute__((packed));
static const size_t MaxACount = 200;
static const size_t MaxDCount = 400; static const size_t MaxDCount = 400;
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char); static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
static const size_t MaxACount = 200;
struct DataPacket struct UDPPacket
{ {
UDPHeader header; UDPPacket():len(0){}
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
int len;
unsigned char data[ sizeof(UDPHeader) + MaxDCount*sizeof(long) + MaxDDataCount + MaxACount*sizeof(UDPAData) ];
}__attribute__((packed)); }__attribute__((packed));
static const int MaxDataLen = sizeof(DataPacket); static const int MaxDataLen = sizeof(UDPPacket);
struct UDPMessage struct UDPMessage:
public UDPHeader
{ {
UDPMessage(); UDPMessage();
UDPMessage( UDPPacket& p );
size_t transport_msg( UDPPacket& p );
static size_t getMessage( UDPMessage& m, UDPPacket& p );
size_t addDData( long id, bool val ); size_t addDData( long id, bool val );
bool setDData( size_t index, bool val ); bool setDData( size_t index, bool val );
long dID( size_t index ); long dID( size_t index );
...@@ -69,12 +73,17 @@ namespace UniSetUDP ...@@ -69,12 +73,17 @@ namespace UniSetUDP
size_t addAData( long id, long val ); size_t addAData( long id, long val );
bool setAData( size_t index, long val ); bool setAData( size_t index, long val );
inline bool isFull(){ return ((msg.header.dcount<MaxDCount) && (msg.header.acount<MaxACount)); } inline bool isFull(){ return ((dcount<MaxDCount) && (acount<MaxACount)); }
inline int dsize(){ return msg.header.dcount; } inline int dsize(){ return dcount; }
inline int asize(){ return msg.header.acount; } inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); } // inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
DataPacket msg; // количество байт в пакете с булевыми переменными...
int d_byte(){ return dcount*sizeof(long) + dcount; }
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p ); friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
}; };
......
...@@ -10,10 +10,10 @@ using namespace UniSetExtensions; ...@@ -10,10 +10,10 @@ using namespace UniSetExtensions;
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs, bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const const UniSetUDP::UDPMessage& rhs) const
{ {
// if( lhs.msg.header.num == rhs.msg.header.num ) // if( lhs.num == rhs.num )
// return (lhs.msg < rhs.msg); // return (lhs < rhs);
return lhs.msg.header.num > rhs.msg.header.num; return lhs.num > rhs.num;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ): UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
...@@ -161,7 +161,7 @@ void UNetReceiver::real_update() ...@@ -161,7 +161,7 @@ void UNetReceiver::real_update()
return; return;
p = qpack.top(); p = qpack.top();
unsigned long sub = labs(p.msg.header.num - pnum); unsigned long sub = labs(p.num - pnum);
if( pnum > 0 ) if( pnum > 0 )
{ {
// если sub > maxDifferens // если sub > maxDifferens
...@@ -179,7 +179,7 @@ void UNetReceiver::real_update() ...@@ -179,7 +179,7 @@ void UNetReceiver::real_update()
lostPackets++; lostPackets++;
} }
else if( p.msg.header.num == pnum ) else if( p.num == pnum )
{ {
/* а что делать если идут повторные пакеты ?! /* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать.. * для надёжности лучше обрабатывать..
...@@ -195,7 +195,7 @@ void UNetReceiver::real_update() ...@@ -195,7 +195,7 @@ void UNetReceiver::real_update()
// удаляем из очереди, только если // удаляем из очереди, только если
// всё в порядке с последовательностью.. // всё в порядке с последовательностью..
qpack.pop(); qpack.pop();
pnum = p.msg.header.num; pnum = p.num;
} // unlock qpack } // unlock qpack
k--; k--;
...@@ -207,7 +207,7 @@ void UNetReceiver::real_update() ...@@ -207,7 +207,7 @@ void UNetReceiver::real_update()
// Обработка дискретных // Обработка дискретных
size_t nbit = 0; size_t nbit = 0;
for( size_t i=0; i<p.msg.header.dcount; i++, nbit++ ) for( size_t i=0; i<p.dcount; i++, nbit++ )
{ {
try try
{ {
...@@ -246,11 +246,11 @@ void UNetReceiver::real_update() ...@@ -246,11 +246,11 @@ void UNetReceiver::real_update()
} }
// Обрабока аналоговых // Обрабока аналоговых
for( size_t i=0; i<p.msg.header.acount; i++ ) for( size_t i=0; i<p.acount; i++ )
{ {
try try
{ {
UniSetUDP::UDPAData& d = p.msg.a_dat[i]; UniSetUDP::UDPAData& d = p.a_dat[i];
ItemInfo& ii(a_icache[i]); ItemInfo& ii(a_icache[i]);
if( ii.id != d.id ) if( ii.id != d.id )
{ {
...@@ -327,23 +327,16 @@ bool UNetReceiver::recv() ...@@ -327,23 +327,16 @@ bool UNetReceiver::recv()
if( !udp->isInputReady(recvTimeout) ) if( !udp->isInputReady(recvTimeout) )
return false; return false;
size_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg)); size_t ret = udp->UDPReceive::receive((char*)(r_buf.data),sizeof(r_buf.data));
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
return false;
}
size_t sz = sizeof(UniSetUDP::DataPacket); size_t sz = UniSetUDP::UDPMessage::getMessage(pack,r_buf);
if( ret < sz ) if( sz == 0 )
{ {
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz dlog[Debug::CRIT] << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
<< " packnum=" << pack.msg.header.num << endl;
return false; return false;
} }
if( rnum>0 && labs(pack.num - rnum) > maxDifferens )
if( rnum>0 && labs(pack.msg.header.num - rnum) > maxDifferens )
{ {
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв" /* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию) * Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
...@@ -362,7 +355,7 @@ bool UNetReceiver::recv() ...@@ -362,7 +355,7 @@ bool UNetReceiver::recv()
waitClean = true; waitClean = true;
} }
rnum = pack.msg.header.num; rnum = pack.num;
#if 0 #if 0
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
...@@ -427,20 +420,20 @@ void UNetReceiver::initIterators() ...@@ -427,20 +420,20 @@ void UNetReceiver::initIterators()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{ {
if( !force && pack.msg.header.dcount == d_icache.size() ) if( !force && pack.dcount == d_icache.size() )
return; return;
dlog[Debug::INFO] << myname << ": init icache.." << endl; dlog[Debug::INFO] << myname << ": init icache.." << endl;
d_cache_init_ok = true; d_cache_init_ok = true;
d_icache.resize(pack.msg.header.dcount); d_icache.resize(pack.dcount);
for( size_t i=0; i<d_icache.size(); i++ ) for( size_t i=0; i<d_icache.size(); i++ )
{ {
ItemInfo& d(d_icache[i]); ItemInfo& d(d_icache[i]);
if( d.id != pack.msg.d_id[i] ) if( d.id != pack.d_id[i] )
{ {
d.id = pack.msg.d_id[i]; d.id = pack.d_id[i];
d.iotype = conf->getIOType(d.id); d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait); shm->initAIterator(d.ait);
shm->initDIterator(d.dit); shm->initDIterator(d.dit);
...@@ -450,19 +443,19 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) ...@@ -450,19 +443,19 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force ) void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{ {
if( !force && pack.msg.header.acount == a_icache.size() ) if( !force && pack.acount == a_icache.size() )
return; return;
dlog[Debug::INFO] << myname << ": init icache.." << endl; dlog[Debug::INFO] << myname << ": init icache.." << endl;
a_cache_init_ok = true; a_cache_init_ok = true;
a_icache.resize(pack.msg.header.acount); a_icache.resize(pack.acount);
for( size_t i=0; i<a_icache.size(); i++ ) for( size_t i=0; i<a_icache.size(); i++ )
{ {
ItemInfo& d(a_icache[i]); ItemInfo& d(a_icache[i]);
if( d.id != pack.msg.a_dat[i].id ) if( d.id != pack.a_dat[i].id )
{ {
d.id = pack.msg.a_dat[i].id; d.id = pack.a_dat[i].id;
d.iotype = conf->getIOType(d.id); d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait); shm->initAIterator(d.ait);
shm->initDIterator(d.dit); shm->initDIterator(d.dit);
......
...@@ -114,7 +114,8 @@ class UNetReceiver ...@@ -114,7 +114,8 @@ class UNetReceiver
}; };
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue; typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */ PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очереlного сообщения */ UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */
UniSetUDP::UDPPacket r_buf;
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */ UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */ unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
......
...@@ -71,8 +71,8 @@ s_thr(0) ...@@ -71,8 +71,8 @@ s_thr(0)
// выставляем поля, которые не меняются // выставляем поля, которые не меняются
mypack.msg.header.nodeID = conf->getLocalNode(); mypack.nodeID = conf->getLocalNode();
mypack.msg.header.procID = shm->ID(); mypack.procID = shm->ID();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::~UNetSender() UNetSender::~UNetSender()
...@@ -187,18 +187,19 @@ void UNetSender::send() ...@@ -187,18 +187,19 @@ void UNetSender::send()
void UNetSender::real_send() void UNetSender::real_send()
{ {
UniSetTypes::uniset_mutex_lock l(pack_mutex,300); UniSetTypes::uniset_mutex_lock l(pack_mutex,300);
mypack.msg.header.num = packetnum++; mypack.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum ) if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1; packetnum = 1;
size_t sz = sizeof(UniSetUDP::DataPacket);
if( !udp->isPending(ost::Socket::pendingOutput) ) if( !udp->isPending(ost::Socket::pendingOutput) )
return; return;
size_t ret = udp->send( (char*)&(mypack.msg),sz);
if( ret < sz ) mypack.transport_msg(s_msg);
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << sz << endl; size_t ret = udp->send( (char*)s_msg.data, s_msg.len );
if( ret < s_msg.len )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::stop() void UNetSender::stop()
......
...@@ -95,6 +95,7 @@ class UNetSender ...@@ -95,6 +95,7 @@ class UNetSender
DMap dlist; DMap dlist;
int maxItem; int maxItem;
unsigned long packetnum; unsigned long packetnum;
UniSetUDP::UDPPacket s_msg;
ThreadCreator<UNetSender>* s_thr; // send thread ThreadCreator<UNetSender>* s_thr; // send thread
}; };
......
...@@ -188,6 +188,7 @@ int main(int argc, char* argv[]) ...@@ -188,6 +188,7 @@ int main(int argc, char* argv[])
// char buf[UniSetUDP::MaxDataLen]; // char buf[UniSetUDP::MaxDataLen];
UniSetUDP::UDPMessage pack; UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
unsigned long prev_num=1; unsigned long prev_num=1;
int nc = 1; int nc = 1;
...@@ -204,38 +205,27 @@ int main(int argc, char* argv[]) ...@@ -204,38 +205,27 @@ int main(int argc, char* argv[])
continue; continue;
} }
size_t ret = udp.UDPReceive::receive( &pack, sizeof(pack) ); size_t ret = udp.UDPReceive::receive( &(buf.data), sizeof(buf.data) );
if( ret < sizeof(UniSetUDP::UDPHeader) ) size_t sz = UniSetUDP::UDPMessage::getMessage(pack,buf);
if( sz == 0 )
{ {
cerr << "(recv): FAILED header ret=" << ret cerr << "(recv): FAILED header ret=" << ret
<< " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl; << " sizeof=" << sz<< endl;
continue;
}
cout << "HEADER: " << pack.msg.header << endl;
//size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
//size_t sz = sizeof(UniSetUDP::UDPMessage);
size_t sz = sizeof(UniSetUDP::DataPacket);
if( ret < sz )
{
cerr << "(recv): FAILED data ret=" << ret
<< " sizeof=" << sz << " packnum=" << pack.msg.header.num << endl;
continue; continue;
} }
if( lost ) if( lost )
{ {
if( prev_num != (pack.msg.header.num-1) ) if( prev_num != (pack.num-1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.msg.header.num cerr << "WARNING! Incorrect sequence of packets! current=" << pack.num
<< " prev=" << prev_num << endl; << " prev=" << prev_num << endl;
prev_num = pack.msg.header.num; prev_num = pack.num;
} }
if( verb ) // if( verb )
cout << "receive OK. header: " << pack.msg.header // cout << "receive OK. header: " << pack.msg.header
<< " bytes: " << ret << endl; // << " bytes: " << ret << endl;
if( show ) if( show )
cout << "receive data: " << pack << endl; cout << "receive data: " << pack << endl;
...@@ -268,10 +258,10 @@ int main(int argc, char* argv[]) ...@@ -268,10 +258,10 @@ int main(int argc, char* argv[])
udp = new ost::UDPBroadcast(host,port); udp = new ost::UDPBroadcast(host,port);
UniSetUDP::UDPMessage mypack; UniSetUDP::UDPMessage mypack;
mypack.msg.header.nodeID = nodeID; mypack.nodeID = nodeID;
mypack.msg.header.procID = procID; mypack.procID = procID;
for( size_t i=0; i < count && i < UniSetUDP::MaxACount; i++ ) for( size_t i=0; i < count; i++ )
{ {
UDPAData d(i,i); UDPAData d(i,i);
mypack.addAData(d); mypack.addAData(d);
...@@ -280,20 +270,18 @@ int main(int argc, char* argv[]) ...@@ -280,20 +270,18 @@ int main(int argc, char* argv[])
for( unsigned int i=0; i < count; i++ ) for( unsigned int i=0; i < count; i++ )
mypack.addDData(i,i); mypack.addDData(i,i);
//size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
//size_t sz = sizeof(UniSetUDP::UDPMessage);
size_t sz = sizeof(UniSetUDP::DataPacket);
udp->setPeer(host,port); udp->setPeer(host,port);
unsigned long packetnum = 0; unsigned long packetnum = 0;
UniSetUDP::UDPPacket s_buf;
int nc = 1; int nc = 1;
if( ncycles > 0 ) if( ncycles > 0 )
nc = ncycles; nc = ncycles;
while( nc ) while( nc )
{ {
mypack.msg.header.num = packetnum++; mypack.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum ) if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1; packetnum = 1;
...@@ -301,13 +289,15 @@ int main(int argc, char* argv[]) ...@@ -301,13 +289,15 @@ int main(int argc, char* argv[])
{ {
if( udp->isPending(ost::Socket::pendingOutput,tout) ) if( udp->isPending(ost::Socket::pendingOutput,tout) )
{ {
if( verb ) mypack.transport_msg(s_buf);
cout << "(send): to addr=" << addr << " count=" << count << " bytes=" << sz << endl;
size_t ret = udp->send((char*)&(mypack.msg), sz); if( verb )
cout << "(send): to addr=" << addr << " d_count=" << mypack.dcount
<< " a_count=" << mypack.acount << " bytes=" << s_buf.len << endl;
if( ret < sz ) size_t ret = udp->send((char*)&s_buf.data, s_buf.len);
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl; if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
} }
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment