Commit fc1f8c5e authored by Pavel Vainerman's avatar Pavel Vainerman

UNet2: сделал вместо номера пакета "время". Постоянно растущие секунды..

parent abf192e6
...@@ -3,9 +3,7 @@ ...@@ -3,9 +3,7 @@
#ifndef UDPPacket_H_ #ifndef UDPPacket_H_
#define UDPPacket_H_ #define UDPPacket_H_
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#include <list> #include <cc++/socket.h>
#include <limits>
#include <ostream>
#include "UniSetTypes.h" #include "UniSetTypes.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
namespace UniSetUDP namespace UniSetUDP
...@@ -13,7 +11,7 @@ namespace UniSetUDP ...@@ -13,7 +11,7 @@ namespace UniSetUDP
struct UDPHeader struct UDPHeader
{ {
UDPHeader():num(0),nodeID(0),procID(0),dcount(0){} UDPHeader():num(0),nodeID(0),procID(0),dcount(0){}
unsigned long num; timeout_t num;
long nodeID; long nodeID;
long procID; long procID;
size_t dcount; size_t dcount;
...@@ -21,7 +19,7 @@ namespace UniSetUDP ...@@ -21,7 +19,7 @@ namespace UniSetUDP
friend std::ostream& operator<<( std::ostream& os, UDPHeader& p ); friend std::ostream& operator<<( std::ostream& os, UDPHeader& p );
}__attribute__((packed)); }__attribute__((packed));
static unsigned long MaxPacketNum = 50; // std::numeric_limits<unsigned long>::max(); // static unsigned long MaxPacketNum = 50; // std::numeric_limits<unsigned long>::max();
struct UDPData struct UDPData
{ {
......
...@@ -300,8 +300,8 @@ void UDPReceiver::recv() ...@@ -300,8 +300,8 @@ void UDPReceiver::recv()
} }
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header << endl; << " header: " << pack.msg.header << endl;
/* /*
if( labs(pack.msg.header.num - pnum) > 1 ) if( labs(pack.msg.header.num - pnum) > 1 )
{ {
......
...@@ -146,10 +146,17 @@ void UNetReceiver::real_update() ...@@ -146,10 +146,17 @@ void UNetReceiver::real_update()
qpack.pop(); qpack.pop();
} }
cerr << "************ update recv.num=" << p.msg.header.num << " num=" << pnum << endl;
if( labs(p.msg.header.num - pnum) > 1 ) if( labs(p.msg.header.num - pnum) > 1 )
{ {
dlog[Debug::CRIT] << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num dlog[Debug::CRIT] << "************ FAILED! ORDER PACKETS! recv.num=" << p.msg.header.num
<< " num=" << pnum << endl; << " num=" << pnum << endl;
// запоминаем новый номер и откидываем пакет
pnum = p.msg.header.num;
k--;
continue;
} }
pnum = p.msg.header.num; pnum = p.msg.header.num;
...@@ -251,8 +258,8 @@ bool UNetReceiver::recv() ...@@ -251,8 +258,8 @@ bool UNetReceiver::recv()
} }
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header << endl; << " header: " << pack.msg.header << endl;
{ {
uniset_mutex_lock l(packMutex); uniset_mutex_lock l(packMutex);
qpack.push(pack); qpack.push(pack);
......
...@@ -17,9 +17,9 @@ sendpause(150), ...@@ -17,9 +17,9 @@ sendpause(150),
activated(false), activated(false),
dlist(100), dlist(100),
maxItem(0), maxItem(0),
packetnum(1),
s_thr(0) s_thr(0)
{ {
ptPack.setTiming(UniSetTimer::WaitUpTime);
{ {
ostringstream s; ostringstream s;
...@@ -133,7 +133,7 @@ void UNetSender::real_send() ...@@ -133,7 +133,7 @@ void UNetSender::real_send()
h.nodeID = conf->getLocalNode(); h.nodeID = conf->getLocalNode();
h.procID = shm->ID(); h.procID = shm->ID();
h.dcount = mypack.msg.header.dcount; h.dcount = mypack.msg.header.dcount;
h.num = packetnum++; h.num = ptPack.getCurrent();
mypack.msg.header = h; mypack.msg.header = h;
// cout << "************* send header: " << mypack.msg.header << endl; // cout << "************* send header: " << mypack.msg.header << endl;
...@@ -143,28 +143,7 @@ void UNetSender::real_send() ...@@ -143,28 +143,7 @@ void UNetSender::real_send()
ssize_t ret = udp->send( (char*)&(mypack.msg),sz); ssize_t ret = udp->send( (char*)&(mypack.msg),sz);
if( ret < sz ) if( ret < sz )
{
dlog[Debug::CRIT] << myname << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl; dlog[Debug::CRIT] << myname << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
return;
}
// если вышли за границы..
// посылаем несколько одинаковых пакетов с новыми номерами..
// т.к. первый будет откинут (см. UNetReceiver::update)
if( packetnum >= UniSetUDP::MaxPacketNum )
{
packetnum = 1;
for( int i=0; i<3; i++ )
{
mypack.msg.header.num = packetnum++;
if( udp->isPending(ost::Socket::pendingOutput) )
{
ssize_t ret = udp->send( (char*)&(mypack.msg),sz);
if( res < sz )
dlog[Debug::CRIT] << myname << "(send): FAILED. ret=" << ret << " < sizeof=" << sz << endl;
}
}
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::start() void UNetSender::start()
......
...@@ -14,14 +14,6 @@ ...@@ -14,14 +14,6 @@
#include "UDPPacket.h" #include "UDPPacket.h"
#include "UDPNReceiver.h" #include "UDPNReceiver.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/*
* Для защиты от потери пакета при переполнении "номера пакета".
* UNetReceiver при обнаружении "разырва" в последовательнности, просто игнорирует пакет, обновляет счётчик
* и начинает обработку уже со следующего.
* Соотвественно здесь, реализован следующий механизм: При переходе номера пакета через maxnum,
* в сеть посылается один и тоже пакет данных с номерами идущими подряд.
* В результате первй будет откинут, как идущий "не подряд", а второй - будет обработан.
*/
class UNetSender class UNetSender
{ {
public: public:
...@@ -84,7 +76,7 @@ class UNetSender ...@@ -84,7 +76,7 @@ class UNetSender
typedef std::vector<UItem> DMap; typedef std::vector<UItem> DMap;
DMap dlist; DMap dlist;
int maxItem; int maxItem;
long packetnum; PassiveTimer ptPack;
ThreadCreator<UNetSender>* s_thr; // send thread ThreadCreator<UNetSender>* s_thr; // send thread
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment