Commit a41a257e authored by Pavel Vainerman's avatar Pavel Vainerman

(UNet2): Новая оптимизированная реализация. (передаётся реальное количество байт-данных)

parent 62fa5b7c
......@@ -125,4 +125,64 @@ bool UDPMessage::dValue( size_t index )
return ( d_dat[nbyte] & (1<<nbit) );
}
// -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p )
{
memset(&p,0,sizeof(UDPPacket));
size_t i = 0;
memcpy(&(p.data[i]),this,sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = acount*sizeof(UDPAData);
memcpy(&(p.data[i]),a_dat,sz);
i += sz;
// копируем булеве индексы
sz = dcount*sizeof(long);
memcpy(&(p.data[i]),d_id,sz);
i += sz;
// копируем булевые данные
size_t nbyte = dcount / sizeof(unsigned char);
size_t nbit = dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
memcpy(&(p.data[i]),d_dat,sz);
i += sz;
p.len = i;
return i;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage( UDPPacket& p )
{
getMessage(*this,p);
}
// -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p )
{
memset(&m,0,sizeof(m));
size_t i = 0;
memcpy(&m,&(p.data[i]),sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = m.acount*sizeof(UDPAData);
memcpy(m.a_dat,&(p.data[i]),sz);
i += sz;
// копируем булеве индексы
sz = m.dcount*sizeof(long);
memcpy(m.d_id,&(p.data[i]),sz);
i += sz;
// копируем булевые данные
size_t nbyte = m.dcount / sizeof(unsigned char);
size_t nbit = m.dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
memcpy(m.d_dat,&(p.data[i]),sz);
return i+sz;
}
// -----------------------------------------------------------------------------
......@@ -41,15 +41,29 @@ namespace UniSetUDP
friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed));
static const size_t MaxACount = 100;
static const size_t MaxDCount = 256;
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
static const size_t MaxACount = 100;
struct UDPPacket
{
UDPPacket():len(0){}
int len;
unsigned char data[ sizeof(UDPHeader) + MaxDCount*sizeof(long) + MaxDDataCount + MaxACount*sizeof(UDPAData) ];
}__attribute__((packed));
static const int MaxDataLen = sizeof(UDPPacket);
struct UDPMessage:
public UDPHeader
{
UDPMessage();
UDPMessage( UDPPacket& p );
size_t transport_msg( UDPPacket& p );
static size_t getMessage( UDPMessage& m, UDPPacket& p );
size_t addDData( long id, bool val );
bool setDData( size_t index, bool val );
long dID( size_t index );
......@@ -64,15 +78,15 @@ namespace UniSetUDP
inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
// количество байт в пакете с булевыми переменными...
int d_byte(){ return dcount*sizeof(long) + dcount; }
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
}__attribute__((packed));
static const int MaxDataLen = sizeof(UDPMessage);
};
}
// -----------------------------------------------------------------------------
#endif // UDPPacket_H_
......
......@@ -327,23 +327,15 @@ bool UNetReceiver::recv()
if( !udp->isInputReady(recvTimeout) )
return false;
size_t ret = udp->UDPReceive::receive(&pack,sizeof(pack));
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
return false;
}
size_t ret = udp->UDPReceive::receive((char*)(&r_buf),sizeof(r_buf));
// size_t sz = pack.msg.header.acount * sizeof(UniSetUDP::UDPAData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz )
size_t sz = UniSetUDP::UDPMessage::getMessage(pack,r_buf);
if( sz == 0 )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
<< " packnum=" << pack.num << endl;
dlog[Debug::CRIT] << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
return false;
}
if( rnum>0 && labs(pack.num - rnum) > maxDifferens )
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
......
......@@ -115,6 +115,7 @@ class UNetReceiver
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очереlного сообщения */
UniSetUDP::UDPPacket r_buf;
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
......
......@@ -192,15 +192,14 @@ void UNetSender::real_send()
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
// cout << "************* send header: " << mypack.msg.header << endl;
// size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( !udp->isPending(ost::Socket::pendingOutput) )
return;
size_t ret = udp->send( (char*)(&mypack),sz);
if( ret < sz )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
mypack.transport_msg(s_msg);
size_t ret = udp->send( (char*)s_msg.data,s_msg.len );
if( ret < s_msg.len )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
}
// -----------------------------------------------------------------------------
void UNetSender::stop()
......
......@@ -97,6 +97,7 @@ class UNetSender
DMap dlist;
int maxItem;
unsigned long packetnum;
UniSetUDP::UDPPacket s_msg;
ThreadCreator<UNetSender>* s_thr; // send thread
};
......
......@@ -181,6 +181,7 @@ int main(int argc, char* argv[])
// char buf[UniSetUDP::MaxDataLen];
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
unsigned long prev_num=1;
while(1)
......@@ -193,20 +194,12 @@ int main(int argc, char* argv[])
continue;
}
size_t ret = udp.UDPReceive::receive( &pack, sizeof(pack) );
if( ret < sizeof(UniSetUDP::UDPHeader) )
size_t ret = udp.UDPReceive::receive( &(buf.data), sizeof(buf.data) );
size_t sz = UniSetUDP::UDPMessage::getMessage(pack,buf);
if( sz == 0 )
{
cerr << "(recv): FAILED header ret=" << ret
<< " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
continue;
}
//size_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
if( ret < sz )
{
cerr << "(recv): FAILED data ret=" << ret
<< " sizeof=" << sz << " packnum=" << pack.num << endl;
<< " sizeof=" << sz<< endl;
continue;
}
......@@ -259,12 +252,11 @@ int main(int argc, char* argv[])
for( int i=0; i < count; i++ )
mypack.addDData(i,i);
//size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
size_t sz = sizeof(UniSetUDP::UDPMessage);
udp->setPeer(host,port);
unsigned long packetnum = 0;
UniSetUDP::UDPPacket s_buf;
while(1)
{
mypack.num = packetnum++;
......@@ -275,13 +267,15 @@ int main(int argc, char* argv[])
{
if( udp->isPending(ost::Socket::pendingOutput,tout) )
{
if( verb )
cout << "(send): to addr=" << addr << " count=" << count << " bytes=" << sz << endl;
mypack.transport_msg(s_buf);
size_t ret = udp->send((char*)&mypack, sz);
if( verb )
cout << "(send): to addr=" << addr << " d_count=" << mypack.dcount
<< " a_count=" << mypack.acount << " bytes=" << s_buf.len << endl;
if( ret < sz )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
size_t ret = udp->send((char*)&s_buf.data, s_buf.len);
if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
}
}
catch( ost::SockException& e )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment