Commit d431ed77 authored by Pavel Vainerman's avatar Pavel Vainerman

UDP: Версия с буфером

parent cf24e03f
...@@ -228,7 +228,7 @@ void UDPExchange::send() ...@@ -228,7 +228,7 @@ void UDPExchange::send()
UniSetUDP::UDPHeader h; UniSetUDP::UDPHeader h;
h.nodeID = conf->getLocalNode(); h.nodeID = conf->getLocalNode();
h.procID = getId(); h.procID = getId();
h.dcount = mypack.size(); h.dcount = mypack.msg.header.dcount;
h.num = packetnum++; h.num = packetnum++;
mypack.msg.header = h; mypack.msg.header = h;
...@@ -239,12 +239,16 @@ void UDPExchange::send() ...@@ -239,12 +239,16 @@ void UDPExchange::send()
memcpy( &(udpbuf[ind]),&(mypack.data),mypack.size()); memcpy( &(udpbuf[ind]),&(mypack.data),mypack.size());
ind += mypack.size(); ind += mypack.size();
*/ */
cout << "************* send header: " << mypack.msg.header << endl;
int sz = mypack.size() * sizeof(UniSetUDP::UDPHeader);
if( udp->isPending(ost::Socket::pendingOutput) ) if( udp->isPending(ost::Socket::pendingOutput) )
{ {
ssize_t ret = udp->send( (char*)&(mypack.msg),sizeof(mypack.msg)); // ssize_t ret = udp->send( (char*)&(mypack.msg),sizeof(mypack.msg));
if( ret<sizeof(mypack.msg) ) // if( ret<sizeof(mypack.msg) )
ssize_t ret = udp->send( (char*)&(mypack.msg),sz);
if( ret < sz )
{ {
cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sizeof(mypack.msg) << endl; cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sz << endl;
return; return;
} }
......
...@@ -31,8 +31,9 @@ bool UDPMessage::addData( const UniSetUDP::UDPData& dat ) ...@@ -31,8 +31,9 @@ bool UDPMessage::addData( const UniSetUDP::UDPData& dat )
if( count >= MaxDataCount ) if( count >= MaxDataCount )
return false; return false;
msg.dat[sizeof(UniSetUDP::UDPHeader)+count] = dat; msg.dat[count] = dat;
count++; count++;
msg.header.dcount = count;
return true; return true;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
......
...@@ -11,10 +11,11 @@ namespace UniSetUDP ...@@ -11,10 +11,11 @@ namespace UniSetUDP
{ {
struct UDPHeader struct UDPHeader
{ {
long num; UDPHeader():num(0),nodeID(0),procID(0),dcount(0){}
unsigned long num;
long nodeID; long nodeID;
long procID; long procID;
long dcount; size_t dcount;
friend std::ostream& operator<<( std::ostream& os, UDPHeader& p ); friend std::ostream& operator<<( std::ostream& os, UDPHeader& p );
}__attribute__((packed)); }__attribute__((packed));
......
...@@ -7,6 +7,15 @@ using namespace std; ...@@ -7,6 +7,15 @@ using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace UniSetExtensions; using namespace UniSetExtensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UDPReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.msg.header.num == rhs.msg.header.num )
// return (lhs.msg < rhs.msg);
return lhs.msg.header.num > rhs.msg.header.num;
}
// ------------------------------------------------------------------------------------------
UDPReceiver::UDPReceiver( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ): UDPReceiver::UDPReceiver( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ):
UniSetObject_LT(objId), UniSetObject_LT(objId),
shm(0), shm(0),
...@@ -62,6 +71,8 @@ activated(false) ...@@ -62,6 +71,8 @@ activated(false)
recvTimeout = conf->getArgPInt("--udp-recv-timeout",it.getProp("recvTimeout"), 5000); recvTimeout = conf->getArgPInt("--udp-recv-timeout",it.getProp("recvTimeout"), 5000);
polltime = conf->getArgPInt("--udp-polltime",it.getProp("polltime"), 100); polltime = conf->getArgPInt("--udp-polltime",it.getProp("polltime"), 100);
ptUpdate.setTiming(100);
// ------------------------------- // -------------------------------
// ********** HEARTBEAT ************* // ********** HEARTBEAT *************
string heart = conf->getArgParam("--udp-heartbeat-id",it.getProp("heartbeat_id")); string heart = conf->getArgParam("--udp-heartbeat-id",it.getProp("heartbeat_id"));
...@@ -131,24 +142,23 @@ void UDPReceiver::waitSMReady() ...@@ -131,24 +142,23 @@ void UDPReceiver::waitSMReady()
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/*
void UDPReceiver::timerInfo( TimerMessage *tm ) void UDPReceiver::timerInfo( TimerMessage *tm )
{ {
if( tm->id == tmExchange ) if( tm->id == tmExchange )
step(); step();
} }
*/
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::step() void UDPReceiver::step()
{ {
// {
// uniset_mutex_lock l(pollMutex,2000);
// poll();
// }
if( !activated ) if( !activated )
return; return;
// if( ptUpdate.checkTime() )
// {
update_data();
ptUpdate.reset();
// }
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() ) if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() )
{ {
try try
...@@ -165,26 +175,56 @@ void UDPReceiver::step() ...@@ -165,26 +175,56 @@ void UDPReceiver::step()
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::poll() void UDPReceiver::update_data()
{ {
try if( !activated )
return;
UniSetUDP::UDPMessage p;
bool buf_ok = false;
{ {
// udp->connect(host,port); uniset_mutex_lock l(packMutex);
// udp->UDPSocket::setPeer(host,port); if( qpack.size() <= max_buf_size )
return;
buf_ok = true;
} }
catch( UniSetTypes::Exception& ex)
while( buf_ok )
{ {
cerr << myname << "(step): " << ex << std::endl; {
// reise(SIGTERM); uniset_mutex_lock l(packMutex);
return; p = qpack.top();
qpack.pop();
}
if( labs(p.msg.header.num - pnum) > 1 )
{
cerr << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num
<< " num=" << pnum << endl;
}
pnum = p.msg.header.num;
{
uniset_mutex_lock l(packMutex);
buf_ok = ( qpack.size() > max_buf_size );
} }
cerr << myname << "(step): recv DATA OK. header: " << p.msg.header << endl;
}
}
// -----------------------------------------------------------------------------
void UDPReceiver::poll()
{
cerr << "******************* pool start" << endl;
while( activated ) while( activated )
{ {
try try
{ {
recv(); recv();
// send();
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
...@@ -192,11 +232,11 @@ void UDPReceiver::poll() ...@@ -192,11 +232,11 @@ void UDPReceiver::poll()
} }
catch( UniSetTypes::Exception& ex) catch( UniSetTypes::Exception& ex)
{ {
cerr << myname << "(step): " << ex << std::endl; cerr << myname << "(poll): " << ex << std::endl;
} }
catch(...) catch(...)
{ {
cerr << myname << "(step): catch ..." << std::endl; cerr << myname << "(poll): catch ..." << std::endl;
} }
msleep(polltime); msleep(polltime);
...@@ -207,75 +247,42 @@ void UDPReceiver::poll() ...@@ -207,75 +247,42 @@ void UDPReceiver::poll()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::recv() void UDPReceiver::recv()
{ {
cout << myname << ": recv....(timeout=" << recvTimeout << ")" << endl;
// UniSetUDP::UDPHeader h;
// receive
UniSetUDP::UDPMessage pack;
if( udp->isInputReady(recvTimeout) ) if( udp->isInputReady(recvTimeout) )
{ {
ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg)); ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
if( ret<(ssize_t)sizeof(pack.msg) ) if( ret < sizeof(UniSetUDP::UDPHeader) )
{ {
cerr << myname << "(receive): FAILED ret=" << ret << " sizeof=" << sizeof(pack.msg) << endl; cerr << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
return; return;
} }
cerr << myname << "(receive): OK. ret=" << ret << " sizeof=" << sizeof(pack.msg) ssize_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
<< " header: " << pack.msg.header << endl; if( ret < sz )
}
/*
cout << myname << ": recv....(timeout=" << recvTimeout << ")" << endl;
UniSetUDP::UDPHeader h;
// receive
if( udp->isInputReady(recvTimeout) )
{ {
ssize_t ret = udp->UDPReceive::receive(&h,sizeof(h)); cerr << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz << endl;
if( ret<(ssize_t)sizeof(h) )
{
cerr << myname << "(receive): ret=" << ret << " sizeof=" << sizeof(h) << endl;
return; return;
} }
cout << myname << "(receive): header: " << h << endl;
if( h.dcount <=0 ) // cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header << endl;
/*
if( labs(pack.msg.header.num - pnum) > 1 )
{ {
cout << " data=0" << endl; cerr << "************ FAILED! ORDER PACKETS! recv.num=" << pack.msg.header.num
return; << " num=" << pnum << endl;
} }
UniSetUDP::UDPData d; pnum = pack.msg.header.num;
// ignore echo...
*/ */
#if 0
if( h.nodeID == conf->getLocalNode() && h.procID == getId() )
{
for( int i=0; i<h.dcount;i++ )
{ {
ssize_t ret = udp->UDPReceive::receive(&d,sizeof(d)); uniset_mutex_lock l(packMutex);
if( ret < (ssize_t)sizeof(d) ) // qpack[pack.msg.header.num] = pack;
return; qpack.push(pack);
}
return;
}
#endif
#if 0
for( int i=0; i<h.dcount;i++ )
{
ssize_t ret = udp->UDPReceive::receive(&d,sizeof(d));
if( ret<(ssize_t)sizeof(d) )
{
cerr << myname << "(receive data " << i << "): ret=" << ret << " sizeof=" << sizeof(d) << endl;
break;
} }
cout << myname << "(receive data " << i << "): " << d << endl; return;
}
} }
// else
// {
// cout << "no InputReady.." << endl;
// }
#endif
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg) void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg)
...@@ -298,6 +305,13 @@ void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg) ...@@ -298,6 +305,13 @@ void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg)
} }
break; break;
case Message::Timer:
{
TimerMessage tm(msg);
timerInfo(&tm);
}
break;
default: default:
break; break;
} }
...@@ -318,7 +332,7 @@ void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg) ...@@ -318,7 +332,7 @@ void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg)
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UDPReceiver::sysCommand(UniSetTypes::SystemMessage *sm) void UDPReceiver::sysCommand( UniSetTypes::SystemMessage *sm )
{ {
switch( sm->command ) switch( sm->command )
{ {
...@@ -346,6 +360,7 @@ void UDPReceiver::sysCommand(UniSetTypes::SystemMessage *sm) ...@@ -346,6 +360,7 @@ void UDPReceiver::sysCommand(UniSetTypes::SystemMessage *sm)
askSensors(UniversalIO::UIONotify); askSensors(UniversalIO::UIONotify);
} }
thr->start(); thr->start();
askTimer(tmExchange,1000);
} }
case SystemMessage::FoldUp: case SystemMessage::FoldUp:
......
...@@ -3,7 +3,8 @@ ...@@ -3,7 +3,8 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#include <ostream> #include <ostream>
#include <string> #include <string>
#include <vector> #include <map>
#include <queue>
#include <cc++/socket.h> #include <cc++/socket.h>
#include "UniSetObject_LT.h" #include "UniSetObject_LT.h"
#include "Trigger.h" #include "Trigger.h"
...@@ -26,7 +27,6 @@ class UDPReceiver: ...@@ -26,7 +27,6 @@ class UDPReceiver:
/*! глобальная функция для вывода help-а */ /*! глобальная функция для вывода help-а */
static void help_print( int argc, char* argv[] ); static void help_print( int argc, char* argv[] );
protected: protected:
xmlNode* cnode; xmlNode* cnode;
...@@ -37,11 +37,13 @@ class UDPReceiver: ...@@ -37,11 +37,13 @@ class UDPReceiver:
void poll(); void poll();
void recv(); void recv();
void step(); virtual void step();
void update_data();
virtual void processingMessage( UniSetTypes::VoidMessage *msg ); virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg ); void sysCommand( UniSetTypes::SystemMessage *msg );
void sensorInfo( UniSetTypes::SensorMessage*sm ); void sensorInfo( UniSetTypes::SensorMessage*sm );
void timerInfo( UniSetTypes::TimerMessage *tm );
void askSensors( UniversalIO::UIOCommand cmd ); void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady(); void waitSMReady();
...@@ -52,6 +54,11 @@ class UDPReceiver: ...@@ -52,6 +54,11 @@ class UDPReceiver:
void initIterators(); void initIterators();
enum Timer
{
tmExchange
};
private: private:
UDPReceiver(); UDPReceiver();
bool initPause; bool initPause;
...@@ -76,7 +83,27 @@ class UDPReceiver: ...@@ -76,7 +83,27 @@ class UDPReceiver:
bool activated; bool activated;
int activateTimeout; int activateTimeout;
long pnum;
ThreadCreator<UDPReceiver>* thr; ThreadCreator<UDPReceiver>* thr;
// typedef std::map<unsigned long,UniSetUDP::UDPMessage> QueuePacket;
// QueuePacket qpack;
UniSetUDP::UDPMessage pack;
UniSetTypes::uniset_mutex packMutex;
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const;
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack;
static const int max_buf_size = 20;
PassiveTimer ptUpdate;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UDPReceiver_H_ #endif // UDPReceiver_H_
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment