Commit e533def1 authored by Pavel Vainerman's avatar Pavel Vainerman

UDP: сделал пакет постоянной длины..

parent eadf8b95
bin_PROGRAMS = @PACKAGE@-udpexchange @PACKAGE@-udpsender @PACKAGE@-udpreceiver
bin_PROGRAMS = @PACKAGE@-udpexchange @PACKAGE@-udpreceiver
#@PACKAGE@-udpsender
lib_LTLIBRARIES = libUniSetUDP.la
libUniSetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
......@@ -6,8 +7,9 @@ libUniSetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
libUniSetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPSender.cc UDPNReceiver.cc UDPReceiver.cc
libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiver.cc
#UDPSender.cc
#UDPSender.cc
@PACKAGE@_udpexchange_SOURCES = udpexchange.cc
......
......@@ -231,35 +231,28 @@ void UDPExchange::send()
h.dcount = mypack.size();
h.num = packetnum++;
mypack.msg.header = h;
/*
int ind = 0;
memcpy(udpbuf,(char*)(&h),sizeof(h));
ind += sizeof(h);
UniSetUDP::UDPMessage::UDPDataList::iterator it = mypack.dlist.begin();
for( ; it!=mypack.dlist.end(); ++it )
{
memcpy( &(udpbuf[ind]),&(*it),sizeof(UniSetUDP::UDPData));
ind +=sizeof(UniSetUDP::UDPData);
if( ind >= MaxDataLen )
{
cerr << myname << "(send data): data len > " << MaxDataLen << "!!!!" << endl;
return;
}
}
memcpy( &(udpbuf[ind]),&(mypack.data),mypack.size());
ind += mypack.size();
*/
if( udp->isPending(ost::Socket::pendingOutput) )
{
ssize_t ret = udp->send(udpbuf,ind);
if( ret<(ssize_t)ind )
ssize_t ret = udp->send( (char*)&(mypack.msg),sizeof(mypack.msg));
if( ret<sizeof(mypack.msg) )
{
cerr << myname << "(send data header): ret=" << ret << " byte count=" << ind << endl;
cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sizeof(mypack.msg) << endl;
return;
}
cout << "send OK. byte count=" << ret << endl;
}
#if 0
/*
if( udp->isPending(ost::Socket::pendingOutput) )
......@@ -443,8 +436,10 @@ void UDPExchange::sensorInfo( UniSetTypes::SensorMessage* sm )
{
uniset_spin_lock lock(it->val_lock);
it->val = sm->value;
if( it->pack_it != mypack.dlist.end() )
it->pack_it->val = sm->value;
/*
if( it->pack_ind != -1 )
mypack.it->pack_it->val = sm->value;
*/
}
break;
}
......@@ -557,9 +552,9 @@ bool UDPExchange::initItem( UniXML_iterator& it )
p.si.id = sid;
p.si.node = conf->getLocalNode();
mypack.addData(sid,0);
p.pack_it = (mypack.dlist.end()--);
p.pack_ind = mypack.size()-1;
if( maxItem >= dlist.size() )
if( maxItem >= mypack.size() )
dlist.resize(maxItem+10);
dlist[maxItem] = p;
......
......@@ -38,7 +38,8 @@ class UDPExchange:
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniSetTypes::uniset_spin_mutex val_lock;
UniSetUDP::UDPMessage::UDPDataList::iterator pack_it;
// UniSetUDP::UDPMessage::UDPDataList::iterator pack_it;
int pack_ind;
long val;
friend std::ostream& operator<<( std::ostream& os, UItem& p );
......@@ -113,9 +114,6 @@ class UDPExchange:
ReceiverList rlist;
ThreadCreator<UDPExchange>* thr;
static const int MaxDataLen = 8192;
char udpbuf[MaxDataLen];
long packetnum;
};
// -----------------------------------------------------------------------------
......
......@@ -86,25 +86,30 @@ void UDPNReceiver::recv()
cout << myname << ": recv....(timeout=" << recvTimeout << ")" << endl;
// UniSetUDP::UDPHeader h;
// receive
UniSetUDP::UDPMessage pack;
if( udp->isInputReady(recvTimeout) )
{
/*
ssize_t ret = udp->UDPReceive::receive(&h,sizeof(h));
if( ret<(ssize_t)sizeof(h) )
ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
if( ret<(ssize_t)sizeof(pack.msg) )
{
cerr << myname << "(receive): ret=" << ret << " sizeof=" << sizeof(h) << endl;
cerr << myname << "(receive): ret=" << ret << " sizeof=" << sizeof(pack.msg) << endl;
return;
}
cerr << myname << "(receive): OK. ret=" << ret << " sizeof=" << sizeof(pack.msg) << endl;
}
/*
cout << myname << "(receive): header: " << h << endl;
if( h.dcount <=0 )
{
cout << " data=0" << endl;
return;
}
*/
UniSetUDP::UDPData d;
// ignore echo...
*/
#if 0
if( h.nodeID == conf->getLocalNode() && h.procID == getId() )
{
......@@ -116,7 +121,8 @@ void UDPNReceiver::recv()
}
return;
}
#endif
#endif
#if 0
//cout << "***** request: " << udp->UDPSocket::getIPV4Peer() << endl;
for( int i=0; i<100;i++ )
{
......@@ -134,5 +140,31 @@ void UDPNReceiver::recv()
// {
// cout << "no InputReady.." << endl;
// }
#endif
}
// -----------------------------------------------------------------------------
UDPNReceiver* UDPNReceiver::init_udpreceiver( int argc, char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic )
{
string name = conf->getArgParam("--udp-name","UDPReceiver1");
if( name.empty() )
{
cerr << "(udpexchange): Не задан name'" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
cerr << "(udpexchange): идентификатор '" << name
<< "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl;
return 0;
}
dlog[Debug::INFO] << "(rsexchange): name = " << name << "(" << ID << ")" << endl;
//return new UDPNReceiver(ID,icID,ic);
//return new UDPNReceiver(ID,icID,ic);
return 0;
}
// -----------------------------------------------------------------------------
......@@ -25,7 +25,10 @@ class UDPNReceiver
inline void stop(){ activate = false; }
inline void setReceiveTimeout( int t ){ recvTimeout = t; }
inline std::string getName(){ return myname; }
/*! глобальная функция для инициализации объекта */
static UDPNReceiver* init_udpreceiver( int argc, char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
protected:
SMInterface* shm;
......
......@@ -20,19 +20,24 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p
return os;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage()
UDPMessage::UDPMessage():
count(0)
{
}
// -----------------------------------------------------------------------------
void UDPMessage::addData( const UniSetUDP::UDPData& dat )
bool UDPMessage::addData( const UniSetUDP::UDPData& dat )
{
dlist.push_back(dat);
if( count >= MaxDataCount )
return false;
msg.dat[sizeof(UniSetUDP::UDPHeader)+count] = dat;
count++;
return true;
}
// -----------------------------------------------------------------------------
void UDPMessage::addData( long id, long val)
bool UDPMessage::addData( long id, long val)
{
UDPData d(id,val);
addData(d);
return addData(d);
}
// -----------------------------------------------------------------------------
......@@ -29,22 +29,35 @@ namespace UniSetUDP
friend std::ostream& operator<<( std::ostream& os, UDPData& p );
}__attribute__((packed));
static const int MaxDataLen = 8192;
static const int MaxDataCount = ( MaxDataLen - sizeof(UniSetUDP::UDPHeader) ) / sizeof(UDPData);
struct DataPacket
{
UDPHeader header;
UDPData dat[MaxDataCount];
}__attribute__((packed));
struct UDPMessage:
public UDPHeader
{
UDPMessage();
void addData( const UDPData& dat );
void addData( long id, long val );
inline int size(){ return dlist.size(); }
bool addData( const UDPData& dat );
bool addData( long id, long val );
inline bool isFull(){ return count<MaxDataCount; }
inline int size(){ return count; }
typedef std::list<UDPData> UDPDataList;
UDPDataList dlist;
DataPacket msg;
int count;
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
};
}
// -----------------------------------------------------------------------------
#endif // UDPPacket_H_
......
......@@ -206,7 +206,23 @@ void UDPReceiver::poll()
}
// -----------------------------------------------------------------------------
void UDPReceiver::recv()
{
{
cout << myname << ": recv....(timeout=" << recvTimeout << ")" << endl;
// UniSetUDP::UDPHeader h;
// receive
UniSetUDP::UDPMessage pack;
if( udp->isInputReady(recvTimeout) )
{
ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
if( ret<(ssize_t)sizeof(pack.msg) )
{
cerr << myname << "(receive): FAILED ret=" << ret << " sizeof=" << sizeof(pack.msg) << endl;
return;
}
cerr << myname << "(receive): OK. ret=" << ret << " sizeof=" << sizeof(pack.msg) << endl;
}
/*
cout << myname << ": recv....(timeout=" << recvTimeout << ")" << endl;
UniSetUDP::UDPHeader h;
// receive
......@@ -228,6 +244,7 @@ void UDPReceiver::recv()
UniSetUDP::UDPData d;
// ignore echo...
*/
#if 0
if( h.nodeID == conf->getLocalNode() && h.procID == getId() )
{
......@@ -239,7 +256,8 @@ void UDPReceiver::recv()
}
return;
}
#endif
#endif
#if 0
for( int i=0; i<h.dcount;i++ )
{
ssize_t ret = udp->UDPReceive::receive(&d,sizeof(d));
......@@ -256,6 +274,7 @@ void UDPReceiver::recv()
// {
// cout << "no InputReady.." << endl;
// }
#endif
}
// -----------------------------------------------------------------------------
void UDPReceiver::processingMessage(UniSetTypes::VoidMessage *msg)
......
......@@ -21,7 +21,7 @@ class UDPReceiver:
virtual ~UDPReceiver();
/*! глобальная функция для инициализации объекта */
static UDPReceiver* init_udpreceiver( int argc, char* argv[],
static UDPReceiver* init_udpreceiver( int argc, char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */
......
#!/bin/sh
uniset-start.sh -f ./uniset-udpexchange --udp-name UDPExchange2 --udp-host localhost --udp-port 2049 \
uniset-start.sh -f ./uniset-udpexchange --udp-name UDPExchange2 --udp-host localhost --udp-port 3001 \
--confile test.xml \
--udp-filter-field udp --udp-filter-value 2 \
--udp-filter-field udp --udp-filter-value 2we \
--udp-ip \
--dlog-add-levels info,crit,warn
#!/bin/sh
uniset-start.sh -f ./uniset-udpreceiver --udp-name UDPExchange \
--udp-host 192.168.56.255 --udp-port 3000 \
--udp-host 192.168.1.255 --udp-port 3000 \
--confile test.xml \
--udp-filter-field udp --udp-filter-value 1 \
--dlog-add-levels info,crit,warn
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment