Commit f51b2455 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNet2): перестроил структуру пакета. Убрал лишние поля.

parent f448dc12
......@@ -81,16 +81,16 @@
priority - приоритет сообщения об изменении данного датчика
textname - текстовое имя датчика
-->
<nodes port="2809">
<item infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_port="3000" unet_ip="192.168.56.255">
<nodes port="2809" unet_broadcast_ip="192.168.1.255">
<item infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards>
<item card="1" name="DI32"/>
<item card="2" name="DO32"/>
<item baddr="0x110" card="3" dev="/dev/comedi1" name="UNIO48" subdev1="TBI24_0" subdev2="TBI16_8"/>
</iocards>
</item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_ip="192.168.56.255"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002" unet_ip="192.168.56.255"/>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_ip="192.168.56.255" unet_ignore="1"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002" unet_ip="192.168.56.255" unet_ignore="1"/>
</nodes>
<!-- ************************ Датчики ********************** -->
<sensors name="Sensors">
......
......@@ -12,6 +12,12 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader& p )
<< " pnum=" << p.num;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader* p )
{
return os << (*p);
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPAData& p )
{
return os << "id=" << p.id << " val=" << p.val;
......@@ -19,6 +25,16 @@ std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPAData& p )
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p )
{
os << (UDPHeader*)(&p) << endl;
os << "DIGITAL:" << endl;
for( size_t i=0; i<p.dcount; i++ )
os << "[" << i << "]={" << p.dID(i) << "," << p.dValue(i) << "}" << endl;
os << "ANALOG:" << endl;
for( size_t i=0; i<p.acount; i++ )
os << "[" << i << "]={" << p.a_dat[i].id << "," << p.a_dat[i].val << "}" << endl;
return os;
}
// -----------------------------------------------------------------------------
......@@ -28,12 +44,12 @@ UDPMessage::UDPMessage()
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat )
{
if( msg.header.acount >= MaxACount )
if( acount >= MaxACount )
return MaxACount;
msg.a_dat[msg.header.acount] = dat;
msg.header.acount++;
return msg.header.acount-1;
a_dat[acount] = dat;
acount++;
return acount-1;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( long id, long val)
......@@ -46,7 +62,7 @@ bool UDPMessage::setAData( size_t index, long val )
{
if( index < MaxACount )
{
msg.a_dat[index].val = val;
a_dat[index].val = val;
return true;
}
......@@ -55,17 +71,17 @@ bool UDPMessage::setAData( size_t index, long val )
// -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val )
{
if( msg.header.dcount >= MaxDCount )
if( dcount >= MaxDCount )
return MaxDCount;
// сохраняем ID
msg.d_id[msg.header.dcount] = id;
d_id[dcount] = id;
bool res = setDData( msg.header.dcount, val );
bool res = setDData( dcount, val );
if( res )
{
msg.header.dcount++;
return msg.header.dcount-1;
dcount++;
return dcount-1;
}
return MaxDCount;
......@@ -80,13 +96,13 @@ bool UDPMessage::setDData( size_t index, bool val )
size_t nbit = index % sizeof(unsigned char);
// выставляем бит
unsigned char d = msg.d_dat[nbyte];
unsigned char d = d_dat[nbyte];
if( val )
d |= (1<<nbit);
else
d &= ~(1<<nbit);
msg.d_dat[nbyte] = d;
d_dat[nbyte] = d;
return true;
}
// -----------------------------------------------------------------------------
......@@ -95,7 +111,7 @@ long UDPMessage::dID( size_t index )
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
return msg.d_id[index];
return d_id[index];
}
// -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index )
......@@ -106,7 +122,7 @@ bool UDPMessage::dValue( size_t index )
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
return ( msg.d_dat[nbyte] & (1<<nbit) );
return ( d_dat[nbyte] & (1<<nbit) );
}
// -----------------------------------------------------------------------------
......@@ -25,6 +25,7 @@ namespace UniSetUDP
size_t acount; /*!< количество аналоговых величин */
friend std::ostream& operator<<( std::ostream& os, UDPHeader& p );
friend std::ostream& operator<<( std::ostream& os, UDPHeader* p );
}__attribute__((packed));
static unsigned long MaxPacketNum = std::numeric_limits<unsigned long>::max();
......@@ -44,17 +45,6 @@ namespace UniSetUDP
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
static const size_t MaxACount = 100;
struct DataPacket
{
UDPHeader header;
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
}__attribute__((packed));
static const int MaxDataLen = sizeof(DataPacket);
struct UDPMessage:
public UDPHeader
{
......@@ -74,10 +64,15 @@ namespace UniSetUDP
inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
DataPacket msg;
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
};
}__attribute__((packed));
static const int MaxDataLen = sizeof(UDPMessage);
}
// -----------------------------------------------------------------------------
#endif // UDPPacket_H_
......
......@@ -49,21 +49,34 @@ sender(0)
UniXML_iterator n_it(nodes);
string default_ip(n_it.getProp("unet_ip"));
string default_ip(n_it.getProp("unet_broadcast_ip"));
if( !n_it.goChildren() )
throw UniSetTypes::SystemError("(UNetExchange): Items not found for <nodes>");
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
// Если указано поле unet_ip непосредственно у узла - берём его
// если указано общий broadcast ip для всех узлов - берём его
// Иначе берём из поля "ip"
string h(n_it.getProp("ip"));
string h("");
if( !default_ip.empty() )
h = default_ip;
if( !n_it.getProp("unet_ip").empty() )
h = n_it.getProp("unet_ip");
if( !n_it.getProp("unet_broadcast_ip").empty() )
h = n_it.getProp("unet_broadcast_ip");
if( h.empty() )
{
ostringstream err;
err << myname << "(init): Unkown broadcast IP for " << n_it.getProp("name");
dlog[Debug::CRIT] << err.str() << endl;
throw UniSetTypes::SystemError(err.str());
}
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
......@@ -80,11 +93,6 @@ sender(0)
continue;
}
if( n_it.getIntProp("unet_ignore") )
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
dlog[Debug::INFO] << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
......
......@@ -10,10 +10,10 @@ using namespace UniSetExtensions;
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.msg.header.num == rhs.msg.header.num )
// return (lhs.msg < rhs.msg);
// if( lhs.num == rhs.num )
// return (lhs < rhs);
return lhs.msg.header.num > rhs.msg.header.num;
return lhs.num > rhs.num;
}
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
......@@ -161,7 +161,7 @@ void UNetReceiver::real_update()
return;
p = qpack.top();
unsigned long sub = labs(p.msg.header.num - pnum);
unsigned long sub = labs(p.num - pnum);
if( pnum > 0 )
{
// если sub > maxDifferens
......@@ -179,7 +179,7 @@ void UNetReceiver::real_update()
lostPackets++;
}
else if( p.msg.header.num == pnum )
else if( p.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
......@@ -195,7 +195,7 @@ void UNetReceiver::real_update()
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.msg.header.num;
pnum = p.num;
} // unlock qpack
k--;
......@@ -207,7 +207,7 @@ void UNetReceiver::real_update()
// Обработка дискретных
size_t nbit = 0;
for( size_t i=0; i<p.msg.header.dcount; i++, nbit++ )
for( size_t i=0; i<p.dcount; i++, nbit++ )
{
try
{
......@@ -246,11 +246,11 @@ void UNetReceiver::real_update()
}
// Обрабока аналоговых
for( size_t i=0; i<p.msg.header.acount; i++ )
for( size_t i=0; i<p.acount; i++ )
{
try
{
UniSetUDP::UDPAData& d = p.msg.a_dat[i];
UniSetUDP::UDPAData& d = p.a_dat[i];
ItemInfo& ii(a_icache[i]);
if( ii.id != d.id )
{
......@@ -327,7 +327,7 @@ bool UNetReceiver::recv()
if( !udp->isInputReady(recvTimeout) )
return false;
size_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
size_t ret = udp->UDPReceive::receive(&pack,sizeof(pack));
if( ret < sizeof(UniSetUDP::UDPHeader) )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
......@@ -339,12 +339,12 @@ bool UNetReceiver::recv()
if( ret < sz )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
<< " packnum=" << pack.msg.header.num << endl;
<< " packnum=" << pack.num << endl;
return false;
}
if( rnum>0 && labs(pack.msg.header.num - rnum) > maxDifferens )
if( rnum>0 && labs(pack.num - rnum) > maxDifferens )
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
......@@ -363,7 +363,7 @@ bool UNetReceiver::recv()
waitClean = true;
}
rnum = pack.msg.header.num;
rnum = pack.num;
#if 0
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
......@@ -428,20 +428,20 @@ void UNetReceiver::initIterators()
// -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.dcount == d_icache.size() )
if( !force && pack.dcount == d_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
d_cache_init_ok = true;
d_icache.resize(pack.msg.header.dcount);
d_icache.resize(pack.dcount);
for( size_t i=0; i<d_icache.size(); i++ )
{
ItemInfo& d(d_icache[i]);
if( d.id != pack.msg.d_id[i] )
if( d.id != pack.d_id[i] )
{
d.id = pack.msg.d_id[i];
d.id = pack.d_id[i];
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
......@@ -451,19 +451,19 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
// -----------------------------------------------------------------------------
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.msg.header.acount == a_icache.size() )
if( !force && pack.acount == a_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
a_cache_init_ok = true;
a_icache.resize(pack.msg.header.acount);
a_icache.resize(pack.acount);
for( size_t i=0; i<a_icache.size(); i++ )
{
ItemInfo& d(a_icache[i]);
if( d.id != pack.msg.a_dat[i].id )
if( d.id != pack.a_dat[i].id )
{
d.id = pack.msg.a_dat[i].id;
d.id = pack.a_dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
......
......@@ -71,8 +71,8 @@ s_thr(0)
// выставляем поля, которые не меняются
mypack.msg.header.nodeID = conf->getLocalNode();
mypack.msg.header.procID = shm->ID();
mypack.nodeID = conf->getLocalNode();
mypack.procID = shm->ID();
}
// -----------------------------------------------------------------------------
UNetSender::~UNetSender()
......@@ -187,7 +187,7 @@ void UNetSender::send()
void UNetSender::real_send()
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,300);
mypack.msg.header.num = packetnum++;
mypack.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
......@@ -198,7 +198,7 @@ void UNetSender::real_send()
if( !udp->isPending(ost::Socket::pendingOutput) )
return;
size_t ret = udp->send( (char*)&(mypack.msg),sz);
size_t ret = udp->send( (char*)(&mypack),sz);
if( ret < sz )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
}
......
......@@ -16,6 +16,7 @@ static struct option longopts[] = {
{ "timeout", required_argument, 0, 't' },
{ "data-count", required_argument, 0, 'c' },
{ "disable-broadcast", no_argument, 0, 'b' },
{ "show-data", no_argument, 0, 'd' },
{ "check-lost", no_argument, 0, 'l' },
{ "verbode", required_argument, 0, 'v' },
{ NULL, 0, 0, 0 }
......@@ -58,10 +59,11 @@ int main(int argc, char* argv[])
bool broadcast = true;
int procID = 1;
int nodeID = 1;
int count = 50;
size_t count = 50;
bool lost = false;
bool show = false;
while( (opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blv",longopts,&optindex)) != -1 )
while( (opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvd",longopts,&optindex)) != -1 )
{
switch (opt)
{
......@@ -77,6 +79,7 @@ int main(int argc, char* argv[])
cout << "[-b|--disable-broadcast] - Disable broadcast mode." << endl;
cout << "[-l|--check-lost] - Check the lost packets." << endl;
cout << "[-v|--verbose] - verbose mode." << endl;
cout << "[-d|--show-data] - show receive data." << endl;
cout << endl;
return 0;
......@@ -113,6 +116,10 @@ int main(int argc, char* argv[])
case 'b':
broadcast = false;
break;
case 'd':
show = true;
break;
case 'l':
lost = true;
......@@ -199,22 +206,25 @@ int main(int argc, char* argv[])
if( ret < sz )
{
cerr << "(recv): FAILED data ret=" << ret
<< " sizeof=" << sz << " packnum=" << pack.msg.header.num << endl;
<< " sizeof=" << sz << " packnum=" << pack.num << endl;
continue;
}
if( lost )
{
if( prev_num != (pack.msg.header.num-1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.msg.header.num
if( prev_num != (pack.num-1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.num
<< " prev=" << prev_num << endl;
prev_num = pack.msg.header.num;
prev_num = pack.num;
}
if( verb )
cout << "receive OK. header: " << pack.msg.header
<< " bytes: " << ret << endl;
// if( verb )
// cout << "receive OK. header: " << pack.msg.header
// << " bytes: " << ret << endl;
if( show )
cout << "receive data: " << pack << endl;
}
catch( ost::SockException& e )
{
......@@ -237,16 +247,16 @@ int main(int argc, char* argv[])
udp = new ost::UDPBroadcast(host,port);
UniSetUDP::UDPMessage mypack;
mypack.msg.header.nodeID = nodeID;
mypack.msg.header.procID = procID;
mypack.nodeID = nodeID;
mypack.procID = procID;
for( size_t i=0; i < count && i < UniSetUDP::MaxACount; i++ )
for( size_t i=0; i < count; i++ )
{
UDPAData d(i,i);
mypack.addAData(d);
}
for( int i=0; i<count; i++ )
for( int i=0; i < count; i++ )
mypack.addDData(i,i);
//size_t sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
......@@ -257,7 +267,7 @@ int main(int argc, char* argv[])
while(1)
{
mypack.msg.header.num = packetnum++;
mypack.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
......@@ -268,7 +278,7 @@ int main(int argc, char* argv[])
if( verb )
cout << "(send): to addr=" << addr << " count=" << count << " bytes=" << sz << endl;
size_t ret = udp->send((char*)&(mypack.msg), sz);
size_t ret = udp->send((char*)&mypack, sz);
if( ret < sz )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment