Commit 85d1b0f9 authored by Pavel Vainerman's avatar Pavel Vainerman

Rename UNet2 --> UNetUDP (production of new release 1.4)

parent 5d030cda
......@@ -2,8 +2,8 @@
%define oname uniset
Name: libuniset
Version: 1.3
Release: alt19
Version: 1.4
Release: alt1
Summary: UniSet - library for building distributed industrial control systems
License: GPL
Group: Development/C++
......@@ -183,7 +183,7 @@ rm -f %buildroot%_libdir/*.la
%_libdir/libUniSetRT*.so.*
%_libdir/libUniSetShared*.so.*
%_libdir/libUniSetNetwork*.so.*
%_libdir/libUniSetUNet2*.so.*
%_libdir/libUniSetUNetUDP*.so.*
#%_libdir/libUniSetSMDBServer*.so.*
%files extensions-devel
......@@ -195,7 +195,7 @@ rm -f %buildroot%_libdir/*.la
%_libdir/libUniSetRT*.so
%_libdir/libUniSetShared*.so
%_libdir/libUniSetNetwork.so
%_libdir/libUniSetUNet2.so
%_libdir/libUniSetUNetUDP.so
#%_libdir/libUniSetSMDBServer.so
%_pkgconfigdir/*Extensions.pc
%_pkgconfigdir/libUniSetIO*.pc
......@@ -213,6 +213,10 @@ rm -f %buildroot%_libdir/*.la
%changelog
* Thu May 31 2012 Pavel Vainerman <pv@altlinux.ru> 1.4-alt1
- rename unet2 -->unetudp
- release version 1.4
* Thu May 31 2012 Pavel Vainerman <pv@altlinux.ru> 1.3-alt19
- DBServer: set log info level - LEVEL9
- minor fixies for linker errors (new gcc)
......
......@@ -3,7 +3,7 @@
# See doc: http://www.gnu.org/software/hello/manual/autoconf/Generic-Programs.html
# AC_PREREQ(2.59)
AC_INIT([uniset], [1.3.0], pv@etersoft.ru)
AC_INIT([uniset], [1.4.0], pv@etersoft.ru)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME,AC_PACKAGE_VERSION)
# AC_CONFIG_MACRO_DIR([m4])
......@@ -30,7 +30,7 @@ AC_ENABLE_SHARED(yes)
AC_ENABLE_STATIC(no)
AM_PROG_LIBTOOL
LIBVER=1:3:0
LIBVER=1:4:0
AC_SUBST(LIBVER)
# Checks for libraries.
......@@ -189,8 +189,8 @@ AC_CONFIG_FILES([Makefile
extensions/SMViewer/Makefile
extensions/UniNetwork/Makefile
extensions/UniNetwork/libUniSetNetwork.pc
extensions/UNet2/Makefile
extensions/UNet2/libUniSetUNet2.pc
extensions/UNetUDP/Makefile
extensions/UNetUDP/libUniSetUNetUDP.pc
extensions/SharedMemory/Makefile
extensions/SharedMemory/libUniSetSharedMemory.pc
extensions/SharedMemoryPlus/Makefile
......
bin_PROGRAMS = @PACKAGE@-unetexchange @PACKAGE@-unet2-tester
# не забывайте править версию в pc-файле
UNET2_VER=@LIBVER@
lib_LTLIBRARIES = libUniSetUNet2.la
libUniSetUNet2_la_LDFLAGS = -version-info $(UNET2_VER)
libUniSetUNet2_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
libUniSetUNet2_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libUniSetUNet2_la_SOURCES = UDPPacket.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
@PACKAGE@_unetexchange_SOURCES = unetexchange.cc
@PACKAGE@_unetexchange_LDADD = libUniSetUNet2.la $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_unetexchange_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@_unet2_tester_SOURCES = UDPPacket.cc unet2-tester.cc
@PACKAGE@_unet2_tester_LDADD = $(COMCPP_LIBS)
@PACKAGE@_unet2_tester_CXXFLAGS = $(COMCPP_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libUniSetUNet2.pc
all-local:
ln -sf ../UNet2/$(devel_include_HEADERS) ../include
#include "UDPPacket.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetUDP;
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader& p )
{
return os << "nodeID=" << p.nodeID
<< " procID=" << p.procID
<< " dcount=" << p.dcount
<< " acount=" << p.acount
<< " pnum=" << p.num;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader* p )
{
return os << (*p);
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPAData& p )
{
return os << "id=" << p.id << " val=" << p.val;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p )
{
os << (UDPHeader*)(&p) << endl;
os << "DIGITAL:" << endl;
for( size_t i=0; i<p.dcount; i++ )
os << "[" << i << "]={" << p.dID(i) << "," << p.dValue(i) << "}" << endl;
os << "ANALOG:" << endl;
for( size_t i=0; i<p.acount; i++ )
os << "[" << i << "]={" << p.a_dat[i].id << "," << p.a_dat[i].val << "}" << endl;
return os;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage()
{
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat )
{
if( acount >= MaxACount )
return MaxACount;
a_dat[acount] = dat;
acount++;
return acount-1;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( long id, long val)
{
UDPAData d(id,val);
return addAData(d);
}
// -----------------------------------------------------------------------------
bool UDPMessage::setAData( size_t index, long val )
{
if( index < MaxACount )
{
a_dat[index].val = val;
return true;
}
return false;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val )
{
if( dcount >= MaxDCount )
return MaxDCount;
// сохраняем ID
d_id[dcount] = id;
bool res = setDData( dcount, val );
if( res )
{
dcount++;
return dcount-1;
}
return MaxDCount;
}
// -----------------------------------------------------------------------------
bool UDPMessage::setDData( size_t index, bool val )
{
if( index >= MaxDCount )
return false;
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
// выставляем бит
unsigned char d = d_dat[nbyte];
if( val )
d |= (1<<nbit);
else
d &= ~(1<<nbit);
d_dat[nbyte] = d;
return true;
}
// -----------------------------------------------------------------------------
long UDPMessage::dID( size_t index )
{
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
return d_id[index];
}
// -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index )
{
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
return ( d_dat[nbyte] & (1<<nbit) );
}
// -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p )
{
memset(&p,0,sizeof(UDPPacket));
size_t i = 0;
memcpy(&(p.data[i]),this,sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = acount*sizeof(UDPAData);
memcpy(&(p.data[i]),a_dat,sz);
i += sz;
// копируем булевые индексы
sz = dcount*sizeof(long);
memcpy(&(p.data[i]),d_id,sz);
i += sz;
// копируем булевые данные
size_t nbyte = dcount / sizeof(unsigned char);
size_t nbit = dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
memcpy(&(p.data[i]),d_dat,sz);
i += sz;
p.len = i;
return i;
}
// -----------------------------------------------------------------------------
UDPMessage::UDPMessage( UDPPacket& p )
{
getMessage(*this,p);
}
// -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p )
{
memset(&m,0,sizeof(m));
size_t i = 0;
memcpy(&m,&(p.data[i]),sizeof(UDPHeader));
i += sizeof(UDPHeader);
// копируем аналоговые данные
size_t sz = m.acount*sizeof(UDPAData);
if( sz > sizeof(m.a_dat) )
sz = sizeof(m.a_dat);
memcpy(m.a_dat,&(p.data[i]),sz);
i += sz;
// копируем булевые индексы
sz = m.dcount*sizeof(long);
if( sz > sizeof(m.d_id) )
sz = sizeof(m.d_id);
memcpy(m.d_id,&(p.data[i]),sz);
i += sz;
// копируем булевые данные
size_t nbyte = m.dcount / sizeof(unsigned char);
size_t nbit = m.dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
if( sz > sizeof(m.d_dat) )
sz = sizeof(m.d_dat);
memcpy(m.d_dat,&(p.data[i]),sz);
return i+sz;
}
// -----------------------------------------------------------------------------
#ifndef UDPPacket_H_
#define UDPPacket_H_
// -----------------------------------------------------------------------------
#include <list>
#include <limits>
#include <ostream>
#include "UniSetTypes.h"
// -----------------------------------------------------------------------------
namespace UniSetUDP
{
/*! Для оптимизации размера передаваемх данных, но с учётом того, что ID могут идти не подряд.
Сделан следующие формат:
Для аналоговых величин передаётся массив пар "id-value".
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
*/
struct UDPHeader
{
UDPHeader():num(0),nodeID(0),procID(0),dcount(0),acount(0){}
unsigned long num;
long nodeID;
long procID;
size_t dcount; /*!< количество булевых величин */
size_t acount; /*!< количество аналоговых величин */
friend std::ostream& operator<<( std::ostream& os, UDPHeader& p );
friend std::ostream& operator<<( std::ostream& os, UDPHeader* p );
}__attribute__((packed));
static unsigned long MaxPacketNum = std::numeric_limits<unsigned long>::max();
struct UDPAData
{
UDPAData():id(UniSetTypes::DefaultObjectId),val(0){}
UDPAData(long id, long val):id(id),val(val){}
long id;
long val;
friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed));
static const size_t MaxACount = 200;
static const size_t MaxDCount = 400;
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
struct UDPPacket
{
UDPPacket():len(0){}
int len;
unsigned char data[ sizeof(UDPHeader) + MaxDCount*sizeof(long) + MaxDDataCount + MaxACount*sizeof(UDPAData) ];
}__attribute__((packed));
static const int MaxDataLen = sizeof(UDPPacket);
struct UDPMessage:
public UDPHeader
{
UDPMessage();
UDPMessage( UDPPacket& p );
size_t transport_msg( UDPPacket& p );
static size_t getMessage( UDPMessage& m, UDPPacket& p );
size_t addDData( long id, bool val );
bool setDData( size_t index, bool val );
long dID( size_t index );
bool dValue( size_t index );
size_t addAData( const UDPAData& dat );
size_t addAData( long id, long val );
bool setAData( size_t index, long val );
inline bool isFull(){ return ((dcount<MaxDCount) && (acount<MaxACount)); }
inline int dsize(){ return dcount; }
inline int asize(){ return acount; }
// inline int byte_size(){ return (dcount*sizeof(long)*UDPDData) + acount*sizeof(UDPAData)); }
// количество байт в пакете с булевыми переменными...
int d_byte(){ return dcount*sizeof(long) + dcount; }
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
unsigned char d_dat[MaxDDataCount]; /*!< битовые значения */
friend std::ostream& operator<<( std::ostream& os, UDPMessage& p );
};
}
// -----------------------------------------------------------------------------
#endif // UDPPacket_H_
// -----------------------------------------------------------------------------
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetExchange.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ):
UniSetObject_LT(objId),
shm(0),
initPause(0),
activated(false),
no_sender(false),
sender(0),
sender2(0)
{
if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" );
cnode = conf->getNode(myname);
if( cnode == NULL )
throw UniSetTypes::SystemError("(UNetExchange): Not found conf-node for " + myname );
shm = new SMInterface(shmId,&ui,objId,ic);
UniXML_iterator it(cnode);
// определяем фильтр
s_field = conf->getArgParam("--unet-filter-field");
s_fvalue = conf->getArgParam("--unet-filter-value");
dlog[Debug::INFO] << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
const string n_field(conf->getArgParam("--unet-nodes-filter-field"));
const string n_fvalue(conf->getArgParam("--unet-nodes-filter-value"));
dlog[Debug::INFO] << myname << "(init): read nodes-filter-field='" << n_field
<< "' nodes-filter-value='" << n_fvalue << "'" << endl;
int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000);
int lostTimeout = conf->getArgPInt("--unet-lost-timeout",it.getProp("lostTimeout"), recvTimeout);
int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10);
int sendpause = conf->getArgPInt("--unet-sendpause",it.getProp("sendpause"), 150);
int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--unet-maxdifferense",it.getProp("maxDifferense"), 1000);
int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
no_sender = conf->getArgInt("--unet-nosender",it.getProp("nosender"));
xmlNode* nodes = conf->getXMLNodesSection();
if( !nodes )
throw UniSetTypes::SystemError("(UNetExchange): Not found <nodes>");
UniXML_iterator n_it(nodes);
string default_ip(n_it.getProp("unet_broadcast_ip"));
string default_ip2(n_it.getProp("unet_broadcast_ip2"));
if( !n_it.goChildren() )
throw UniSetTypes::SystemError("(UNetExchange): Items not found for <nodes>");
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
// проверяем фильтры для подсетей
if( !UniSetTypes::check_filter(n_it,n_field,n_fvalue) )
continue;
// Если указано поле unet_broadcast_ip непосредственно у узла - берём его
// если указано общий broadcast ip для всех узлов - берём его
string h("");
string h2("");
if( !default_ip.empty() )
h = default_ip;
if( !n_it.getProp("unet_broadcast_ip").empty() )
h = n_it.getProp("unet_broadcast_ip");
if( !default_ip2.empty() )
h2 = default_ip2;
if( !n_it.getProp("unet_broadcast_ip2").empty() )
h2 = n_it.getProp("unet_broadcast_ip2");
if( h.empty() )
{
ostringstream err;
err << myname << "(init): Unknown broadcast IP for " << n_it.getProp("name");
dlog[Debug::CRIT] << err.str() << endl;
throw UniSetTypes::SystemError(err.str());
}
if( h2.empty() && dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(init): ip2 not used..." << endl;
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
int p = n_it.getIntProp("id");
if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port");
int p2 = p; // по умолчанию порт на втором канале такой же как на первом
if( !n_it.getProp("unet_port2").empty() )
p2 = n_it.getIntProp("unet_port2");
string n(n_it.getProp("name"));
if( n == conf->getLocalNodeName() )
{
if( no_sender )
{
dlog[Debug::INFO] << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
continue;
}
dlog[Debug::INFO] << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = new UNetSender(h,p,shm,s_field,s_fvalue,ic);
sender->setSendPause(sendpause);
try
{
// создаём "писателя" для второго канала если задан
if( !h2.empty() )
{
dlog[Debug::INFO] << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = new UNetSender(h2,p2,shm,s_field,s_fvalue,ic);
sender2->setSendPause(sendpause);
}
}
catch(...)
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = 0;
dlog[Debug::CRIT] << myname << "(ignore): DON`T CREATE 'UNetSender' for " << h2 << ":" << p2 << endl;
}
continue;
}
dlog[Debug::INFO] << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
if( checkExistUNetHost(h,p) )
{
dlog[Debug::INFO] << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
continue;
}
bool resp_invert = n_it.getIntProp("unet_respond_invert");
string s_resp_id(n_it.getProp("unet_respond1_id"));
UniSetTypes::ObjectId resp_id = UniSetTypes::DefaultObjectId;
if( !s_resp_id.empty() )
{
resp_id = conf->getSensorID(s_resp_id);
if( resp_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown RespondID.. Not found id for '" << s_resp_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_resp2_id(n_it.getProp("unet_respond2_id"));
UniSetTypes::ObjectId resp2_id = UniSetTypes::DefaultObjectId;
if( !s_resp2_id.empty() )
{
resp2_id = conf->getSensorID(s_resp2_id);
if( resp2_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown RespondID(2).. Not found id for '" << s_resp2_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_id(n_it.getProp("unet_lostpackets1_id"));
UniSetTypes::ObjectId lp_id = UniSetTypes::DefaultObjectId;
if( !s_lp_id.empty() )
{
lp_id = conf->getSensorID(s_lp_id);
if( lp_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown LostPacketsID.. Not found id for '" << s_lp_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
UniSetTypes::ObjectId lp2_id = UniSetTypes::DefaultObjectId;
if( !s_lp2_id.empty() )
{
lp2_id = conf->getSensorID(s_lp2_id);
if( lp2_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown LostPacketsID(2).. Not found id for '" << s_lp2_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_comm_id(n_it.getProp("unet_lostpackets_id"));
UniSetTypes::ObjectId lp_comm_id = UniSetTypes::DefaultObjectId;
if( !s_lp_comm_id.empty() )
{
lp_comm_id = conf->getSensorID(s_lp_comm_id);
if( lp_comm_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown LostPacketsID(comm).. Not found id for '" << s_lp_comm_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_resp_comm_id(n_it.getProp("unet_respond_id"));
UniSetTypes::ObjectId resp_comm_id = UniSetTypes::DefaultObjectId;
if( !s_resp_comm_id.empty() )
{
resp_comm_id = conf->getSensorID(s_resp_comm_id);
if( resp_comm_id == UniSetTypes::DefaultObjectId )
{
ostringstream err;
err << myname << ": Unknown RespondID(comm).. Not found id for '" << s_resp_comm_id << "'" << endl;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
dlog[Debug::INFO] << myname << "(init): (node='" << n << "') add receiver "
<< h2 << ":" << p2 << endl;
UNetReceiver* r = new UNetReceiver(h,p,shm);
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
r->setLockUpdate(false);
r->setReceiveTimeout(recvTimeout);
r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause);
r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id,resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
UNetReceiver* r2 = 0;
try
{
if( !h2.empty() ) // создаём читателя впо второму каналу
{
dlog[Debug::INFO] << myname << "(init): (node='" << n << "') add reserv receiver "
<< h2 << ":" << p2 << endl;
r2 = new UNetReceiver(h2,p2,shm);
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setReceiveTimeout(recvTimeout);
r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause);
r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id,resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
}
}
catch(...)
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = 0;
dlog[Debug::CRIT] << myname << "(ignore): DON`T CREATE 'UNetReceiver' for " << h2 << ":" << p2 << endl;
}
ReceiverInfo ri(r,r2);
ri.setRespondID(resp_comm_id,resp_invert);
ri.setLostPacketsID(lp_comm_id);
recvlist.push_back(ri);
}
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--unet-heartbeat-id",it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
if( sidHeartBeat == DefaultObjectId )
{
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
int heartbeatTime = getHeartBeatTime();
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--unet-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
{
test_id = conf->getSensorID("TestMode_S");
if( test_id == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
dlog[Debug::INFO] << myname << "(init): test_id=" << test_id << endl;
activateTimeout = conf->getArgPInt("--activate-timeout", 20000);
}
// -----------------------------------------------------------------------------
UNetExchange::~UNetExchange()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( it->r1 )
delete it->r1;
if( it->r2 )
delete it->r2;
}
delete sender;
delete sender2;
delete shm;
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost( const std::string addr, ost::tpport_t port )
{
ost::IPV4Address a1(addr.c_str());
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( it->r1->getAddress() == a1.getAddress() && it->r1->getPort() == port )
return true;
}
return false;
}
// -----------------------------------------------------------------------------
void UNetExchange::startReceivers()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( it->r1 )
it->r1->start();
if( it->r2 )
it->r2->start();
}
}
// -----------------------------------------------------------------------------
void UNetExchange::waitSMReady()
{
// waiting for SM is ready...
int ready_timeout = conf->getArgInt("--unet-sm-ready-timeout","15000");
if( ready_timeout == 0 )
ready_timeout = 15000;
else if( ready_timeout < 0 )
ready_timeout = UniSetTimer::WaitUpTime;
if( !shm->waitSMready(ready_timeout,50) )
{
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
throw SystemError(err.str());
}
}
// -----------------------------------------------------------------------------
void UNetExchange::timerInfo( TimerMessage *tm )
{
if( !activated )
return;
if( tm->id == tmStep )
step();
}
// -----------------------------------------------------------------------------
void UNetExchange::step()
{
if( !activated )
return;
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() )
{
try
{
shm->localSaveValue(aitHeartBeat,sidHeartBeat,maxHeartBeat,getId());
ptHeartBeat.reset();
}
catch(Exception& ex)
{
dlog[Debug::CRIT] << myname << "(step): (hb) " << ex << std::endl;
}
}
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
it->step(shm, myname);
}
// -----------------------------------------------------------------------------
void UNetExchange::ReceiverInfo::step( SMInterface* shm, const std::string myname )
{
try
{
if( sidRespond != DefaultObjectId )
{
bool resp = ( (r1 && r1->isRecvOK()) || (r2 && r2->isRecvOK()) );
if( respondInvert )
resp = !resp;
shm->localSaveState(ditRespond,sidRespond,resp,shm->ID());
}
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << myname << "(ReceiverInfo::step): (respond): " << ex << std::endl;
}
try
{
if( sidLostPackets != DefaultObjectId )
{
long l = 0;
if( r1 )
l += r1->getLostPacketsNum();
if( r2 )
l += r2->getLostPacketsNum();
shm->localSaveValue(aitLostPackets,sidLostPackets,l,shm->ID());
}
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << myname << "(ReceiverInfo::step): (lostpackets): " << ex << std::endl;
}
}
// -----------------------------------------------------------------------------
void UNetExchange::processingMessage( UniSetTypes::VoidMessage *msg )
{
try
{
switch(msg->type)
{
case UniSetTypes::Message::SysCommand:
{
UniSetTypes::SystemMessage sm( msg );
sysCommand( &sm );
}
break;
case Message::SensorInfo:
{
SensorMessage sm( msg );
sensorInfo(&sm);
}
break;
case Message::Timer:
{
TimerMessage tm(msg);
timerInfo(&tm);
}
break;
default:
break;
}
}
catch( SystemError& ex )
{
dlog[Debug::CRIT] << myname << "(SystemError): " << ex << std::endl;
// throw SystemError(ex);
raise(SIGTERM);
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << myname << "(processingMessage): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(processingMessage): catch ..." << std::endl;
}
}
// -----------------------------------------------------------------------------
void UNetExchange::sysCommand( UniSetTypes::SystemMessage *sm )
{
switch( sm->command )
{
case SystemMessage::StartUp:
{
waitSMReady();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep(initPause);
PassiveTimer ptAct(activateTimeout);
while( !activated && !ptAct.checkTime() )
{
cout << myname << "(sysCommand): wait activate..." << endl;
msleep(300);
if( activated )
break;
}
if( !activated )
dlog[Debug::CRIT] << myname << "(sysCommand): ************* don`t activate?! ************" << endl;
{
UniSetTypes::uniset_mutex_lock l(mutex_start, 10000);
if( shm->isLocalwork() )
askSensors(UniversalIO::UIONotify);
}
askTimer(tmStep,steptime);
startReceivers();
if( sender )
sender->start();
if( sender2 )
sender2->start();
}
break;
case SystemMessage::FoldUp:
case SystemMessage::Finish:
if( shm->isLocalwork() )
askSensors(UniversalIO::UIODontNotify);
break;
case SystemMessage::WatchDog:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт автономная работа, то нужно заказывать датчики
// если запущены в одном процессе с SharedMemory2,
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetExchange)
if( shm->isLocalwork() )
askSensors(UniversalIO::UIONotify);
}
break;
case SystemMessage::LogRotate:
{
// переоткрываем логи
unideb << myname << "(sysCommand): logRotate" << std::endl;
string fname = unideb.getLogFile();
if( !fname.empty() )
{
unideb.logFile(fname);
unideb << myname << "(sysCommand): ***************** UNIDEB LOG ROTATE *****************" << std::endl;
}
dlog << myname << "(sysCommand): logRotate" << std::endl;
fname = dlog.getLogFile();
if( !fname.empty() )
{
dlog.logFile(fname);
dlog << myname << "(sysCommand): ***************** dlog LOG ROTATE *****************" << std::endl;
}
}
break;
default:
break;
}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
{
if( !shm->waitSMworking(test_id,activateTimeout,50) )
{
ostringstream err;
err << myname
<< "(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<< activateTimeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
}
if( sender )
sender->askSensors(cmd);
if( sender2 )
sender2->askSensors(cmd);
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm )
{
if( sender )
sender->updateSensor( sm->id , sm->value );
if( sender2 )
sender2->updateSensor( sm->id , sm->value );
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated = false;
UniSetTypes::uniset_mutex_lock l(mutex_start, 5000);
UniSetObject_LT::activateObject();
initIterators();
activated = true;
}
return true;
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sigterm( int signo )
{
dlog[Debug::INFO] << myname << ": ********* SIGTERM(" << signo <<") ********" << endl;
activated = false;
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
try
{
if( it->r1 )
it->r1->stop();
}
catch(...){}
try
{
if( it->r2 )
it->r2->stop();
}
catch(...){}
}
try
{
if( sender )
sender->stop();
}
catch(...){}
try
{
if( sender2 )
sender2->stop();
}
catch(...){}
UniSetObject_LT::sigterm(signo);
}
// ------------------------------------------------------------------------------------------
void UNetExchange::initIterators()
{
shm->initAIterator(aitHeartBeat);
if( sender )
sender->initIterators();
if( sender2 )
sender2->initIterators();
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); it++ )
it->initIterators(shm);
}
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, const char* argv[] )
{
cout << "--unet-name NameID - Идентификтора процесса." << endl;
cout << "--unet-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--unet-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--unet-recvpause msec - Пауза между приёмами. По умолчанию 10" << endl;
cout << "--unet-sendpause msec - Пауза между посылками. По умолчанию 150" << endl;
cout << "--unet-updatepause msec - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl;
cout << "--unet-steptime msec - Шаг..." << endl;
cout << "--unet-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--unet-maxprocessingcount num - время на ожидание старта SM" << endl;
cout << "--unet-nosender [0,1] - Отключить посылку." << endl;
cout << "--unet-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 15000" << endl;
cout << "--unet-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--unet-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
}
// -----------------------------------------------------------------------------
UNetExchange* UNetExchange::init_unetexchange( int argc, const char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic )
{
string name = conf->getArgParam("--unet-name","UNetExchange1");
if( name.empty() )
{
cerr << "(unetexchange): Не задан name'" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
cerr << "(unetexchange): идентификатор '" << name
<< "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl;
return 0;
}
dlog[Debug::INFO] << "(unetexchange): name = " << name << "(" << ID << ")" << endl;
return new UNetExchange(ID,icID,ic);
}
// -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( UNetReceiver* r, UNetReceiver::Event ev )
{
// пока, что другие события нас не интересуют
if( ev != UNetReceiver::evTimeout )
return;
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( it->r1 == r )
{
// если нет второго канала
// то и переключать некуда
if( !it->r2 )
return;
// пропала связь по первому каналу...
// переключаемся на второй
it->r1->setLockUpdate(true);
it->r2->setLockUpdate(false);
it->r2->resetTimeout();
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(event): " << r->getName()
<< ": timeout for channel1.. select channel2" << endl;
return;
}
if( it->r2 == r )
{
// пропала связь по второму каналу...
// переключаемся на первый
it->r1->setLockUpdate(false);
it->r1->resetTimeout();
it->r2->setLockUpdate(true);
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(event): " << r->getName()
<< ": timeout for channel2.. select channel1" << endl;
return;
}
}
}
// -----------------------------------------------------------------------------
#ifndef UNetExchange_H_
#define UNetExchange_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <queue>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNetReceiver.h"
#include "UNetSender.h"
// -----------------------------------------------------------------------------
/*!
\page pageUNetExchange2 Сетевой обмен на основе UDP (UNet2)
- \ref pgUnet2_Common
- \ref pgUnet2_Conf
- \ref pgUnet2_Reserv
\section pgUnet2_Common Общее описание
Обмен построен на основе протокола UDP.
Основная идея заключается в том, что каждый узел на порту равном своему ID
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
пар "id - value". Другие узлы принимают их. Помимо этого данный процесс запускает
по потоку приёма для каждого другого узла и ловит пакеты от них, сохраняя данные в SM.
\par
При своём старте процесс считывает из секции \<nodes> список узлов с которыми необходимо
вести обмен, а также параметры своего узла. Открывает по потоку приёма на каждый узел и поток
передачи для своих данных. Помимо этого такие же потоки для резервных каналов, если они включены
(см. \ref pgUnet2_Reserv ).
\section pgUnet2_Conf Пример конфигурирования
По умолчанию при считывании используется \b unet_broadcast_ip (указанный в секции \<nodes>)
и \b id узла - в качестве порта.
Но можно переопределять эти параметры, при помощи указания \b unet_port и/или \b unet_broadcast_ip,
для конкретного узла (\<item>).
\code
<nodes port="2809" unet_broadcast_ip="192.168.56.255">
<item ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_port="3000" unet_broadcast_ip="192.168.57.255">
<iocards>
...
</iocards>
</item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes>
\endcode
\section pgUnet2_Reserv Настройка резервного канала связи
В текущей реализации поддерживается возможность обмена по двум подсетям (эзернет-каналам).
Она основана на том, что, для каждого узла помимо основного "читателя",
создаётся дополнительный "читатель"(поток) слушающий другой ip-адрес и порт.
А так же, для локального узла создаётся дополнительный "писатель"(поток),
который посылает данные в (указанную) вторую подсеть. Для того, чтобы задействовать
второй канал, достаточно объявить в настройках переменные
\b unet_broadcast_ip2. А также в случае необходимости для конкретного узла
можно указать \b unet_broadcast_ip2 и \b unet_port2.
Переключение между "каналами" происходит по следующей логике:
При старте включается только первый канал. Второй канал работает в режиме "пассивного" чтения.
Т.е. все пакеты принимаются, но данные в SharedMemory не обновляются.
Если во время работы пропадает связь по первому каналу, идёт переключение на второй канал.
Первый канал переводиться в "пассивный" режим, а второй канал, переводится в "нормальный"(активный)
режим. Далее работа ведётся по второму каналу, независимо от того, что связь на первом
канале может восстановиться. Это сделано для защиты от постоянных перескакиваний
с канала на канал. Работа на втором канале будет вестись, пока не пропадёт связь
на нём. Тогда будет попытка переключиться обратно на первый канал и так "по кругу".
В свою очередь "писатели"(если они не отключены) всегда посылают данные в оба канала.
*/
// -----------------------------------------------------------------------------
class UNetExchange:
public UniSetObject_LT
{
public:
UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
virtual ~UNetExchange();
/*! глобальная функция для инициализации объекта */
static UNetExchange* init_unetexchange( int argc, const char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* argv[] );
bool checkExistUNetHost( const std::string host, ost::tpport_t port );
protected:
xmlNode* cnode;
std::string s_field;
std::string s_fvalue;
SMInterface* shm;
void step();
virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg );
void sensorInfo( UniSetTypes::SensorMessage*sm );
void timerInfo( UniSetTypes::TimerMessage *tm );
void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady();
void receiverEvent( UNetReceiver* r, UNetReceiver::Event ev );
virtual bool activateObject();
// действия при завершении работы
virtual void sigterm( int signo );
void initIterators();
void startReceivers();
enum Timer
{
tmStep
};
private:
UNetExchange();
bool initPause;
UniSetTypes::uniset_mutex mutex_start;
PassiveTimer ptHeartBeat;
UniSetTypes::ObjectId sidHeartBeat;
int maxHeartBeat;
IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id;
int steptime; /*!< периодичность вызова step, [мсек] */
bool activated;
int activateTimeout;
struct ReceiverInfo
{
ReceiverInfo():r1(0),r2(0),
sidRespond(UniSetTypes::DefaultObjectId),
sidLostPackets(UniSetTypes::DefaultObjectId)
{}
ReceiverInfo(UNetReceiver* _r1, UNetReceiver* _r2 ):
r1(_r1),r2(_r2),
sidRespond(UniSetTypes::DefaultObjectId),
respondInvert(false),
sidLostPackets(UniSetTypes::DefaultObjectId)
{}
UNetReceiver* r1; /*!< приём по первому каналу */
UNetReceiver* r2; /*!< приём по второму каналу */
void step( SMInterface* shm, const std::string myname );
inline void setRespondID( UniSetTypes::ObjectId id, bool invert=false )
{
sidRespond = id;
respondInvert = invert;
}
inline void setLostPacketsID( UniSetTypes::ObjectId id ){ sidLostPackets = id; }
inline void initIterators( SMInterface* shm )
{
shm->initAIterator(aitLostPackets);
shm->initDIterator(ditRespond);
}
// Сводная информация по двум каналам
// сумма потерянных пакетов и наличие связи
// хотя бы по одному каналу
// ( реализацию см. ReceiverInfo::step() )
UniSetTypes::ObjectId sidRespond;
IOController::DIOStateList::iterator ditRespond;
bool respondInvert;
UniSetTypes::ObjectId sidLostPackets;
IOController::AIOStateList::iterator aitLostPackets;
};
typedef std::list<ReceiverInfo> ReceiverList;
ReceiverList recvlist;
bool no_sender; /*!< флаг отключения посылки сообщений (создания потока для посылки)*/
UNetSender* sender;
UNetSender* sender2;
};
// -----------------------------------------------------------------------------
#endif // UNetExchange_H_
// -----------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetReceiver.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
/*
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.num == rhs.num )
// return (lhs < rhs);
return lhs.num > rhs.num;
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
shm(smi),
recvpause(10),
updatepause(100),
udp(0),
recvTimeout(5000),
lostTimeout(5000),
lostPackets(0),
sidRespond(UniSetTypes::DefaultObjectId),
respondInvert(false),
sidLostPackets(UniSetTypes::DefaultObjectId),
activated(false),
r_thr(0),
u_thr(0),
pnum(0),
maxDifferens(1000),
waitClean(false),
rnum(0),
maxProcessingCount(100),
lockUpdate(false),
d_icache(UniSetUDP::MaxDCount),
a_icache(UniSetUDP::MaxACount),
d_cache_init_ok(false),
a_cache_init_ok(false)
{
{
ostringstream s;
s << "R(" << setw(15) << s_host << ":" << setw(4) << port << ")";
myname = s.str();
}
ost::Thread::setException(ost::Thread::throwException);
try
{
// ost::IPV4Cidr ci(s_host.c_str());
// addr = ci.getBroadcast();
// cerr << "****************** addr: " << addr << endl;
addr = s_host.c_str();
udp = new ost::UDPDuplex(addr,port);
}
catch( std::exception& e )
{
ostringstream s;
s << myname << ": " << e.what();
dlog[Debug::CRIT] << s.str() << std::endl;
throw SystemError(s.str());
}
catch( ... )
{
ostringstream s;
s << myname << ": catch...";
dlog[Debug::CRIT] << s.str() << std::endl;
throw SystemError(s.str());
}
r_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::receive);
u_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::update);
ptRecvTimeout.setTiming(recvTimeout);
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
{
delete r_thr;
delete u_thr;
delete udp;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec )
{
recvTimeout = msec;
ptRecvTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setLostTimeout( timeout_t msec )
{
lostTimeout = msec;
ptLostTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceivePause( timeout_t msec )
{
recvpause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdatePause( timeout_t msec )
{
updatepause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
{
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxDifferens( unsigned long set )
{
maxDifferens = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setRespondID( UniSetTypes::ObjectId id, bool invert )
{
sidRespond = id;
respondInvert = invert;
shm->initDIterator(ditRespond);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setLostPacketsID( UniSetTypes::ObjectId id )
{
sidLostPackets = id;
shm->initAIterator(aitLostPackets);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setLockUpdate( bool st )
{
uniset_mutex_lock l(lockMutex,200);
lockUpdate = st;
}
// -----------------------------------------------------------------------------
void UNetReceiver::resetTimeout()
{
uniset_mutex_lock l(tmMutex,200);
ptRecvTimeout.reset();
trTimeout.change(false);
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
{
if( !activated )
{
activated = true;
u_thr->start();
r_thr->start();
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::update()
{
cerr << "******************* udpate start" << endl;
while(activated)
{
try
{
real_update();
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
if( sidRespond!=DefaultObjectId )
{
try
{
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSaveState(ditRespond,sidRespond,r,shm->ID());
}
catch(Exception& ex)
{
dlog[Debug::CRIT] << myname << "(step): (respond) " << ex << std::endl;
}
}
if( sidLostPackets!=DefaultObjectId )
{
try
{
shm->localSaveValue(aitLostPackets,sidLostPackets,getLostPacketsNum(),shm->ID());
}
catch(Exception& ex)
{
dlog[Debug::CRIT] << myname << "(step): (lostPackets) " << ex << std::endl;
}
}
msleep(updatepause);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::real_update()
{
UniSetUDP::UDPMessage p;
// обрабатываем, пока очередь либо не опустеет,
// либо обнаружится "дырка" в последовательности,
// но при этом обрабатываем не больше maxProcessingCount
// за один раз..
int k = maxProcessingCount;
while( k>0 )
{
{ // lock qpack
uniset_mutex_lock l(packMutex);
if( qpack.empty() )
return;
p = qpack.top();
unsigned long sub = labs(p.num - pnum);
if( pnum > 0 )
{
// если sub > maxDifferens
// значит это просто "разрыв"
// и нам ждать lostTimeout не надо
// сразу начинаем обрабатывать новые пакеты
// а если > 1 && < maxDifferens
// значит это временная "дырка"
// и надо подождать lostTimeout
// чтобы констатировать потерю пакета..
if( sub > 1 && sub < maxDifferens )
{
if( !ptLostTimeout.checkTime() )
return;
lostPackets++;
}
else if( p.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
* для "оптимизации".. лучше игнорировать
*/
qpack.pop(); // пока выбрали вариант "оптимизации"
continue;
}
}
ptLostTimeout.reset();
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.num;
} // unlock qpack
k--;
// cerr << myname << "(update): " << p.msg.header << endl;
initDCache(p, !d_cache_init_ok);
initACache(p, !a_cache_init_ok);
// Обработка дискретных
size_t nbit = 0;
for( size_t i=0; i<p.dcount; i++, nbit++ )
{
try
{
long id = p.dID(i);
bool val = p.dValue(i);
ItemInfo& ii(d_icache[i]);
if( ii.id != id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << id << endl;
ii.id = id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
// обновление данных в SM (блокировано)
{
uniset_mutex_lock l(lockMutex,100);
if( lockUpdate )
continue;
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,id,val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
// Обработка аналоговых
for( size_t i=0; i<p.acount; i++ )
{
try
{
UniSetUDP::UDPAData& d = p.a_dat[i];
ItemInfo& ii(a_icache[i]);
if( ii.id != d.id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
ii.id = d.id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
// обновление данных в SM (блокировано)
{
uniset_mutex_lock l(lockMutex,100);
if( lockUpdate )
continue;
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::stop()
{
activated = false;
// msleep(10);
// u_thr->stop();
// r_thr->stop();
}
// -----------------------------------------------------------------------------
void UNetReceiver::receive()
{
dlog[Debug::INFO] << myname << ": ******************* receive start" << endl;
ptRecvTimeout.setTiming(recvTimeout);
bool tout = false;
while( activated )
{
try
{
if( recv() )
{
uniset_mutex_lock l(tmMutex,100);
ptRecvTimeout.reset();
}
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::WARN] << myname << "(receive): " << ex << std::endl;
}
catch( std::exception& e )
{
dlog[Debug::WARN] << myname << "(receive): " << e.what()<< std::endl;
}
catch(...)
{
dlog[Debug::WARN] << myname << "(receive): catch ..." << std::endl;
}
// делаем через промежуточную переменную
// чтобы поскорее освободить mutex
{
uniset_mutex_lock l(tmMutex,100);
tout = ptRecvTimeout.checkTime();
}
if( trTimeout.change(tout) )
{
if( tout )
slEvent(this,evTimeout);
else
slEvent(this,evOK);
}
msleep(recvpause);
}
dlog[Debug::INFO] << myname << ": ************* receive FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
bool UNetReceiver::recv()
{
if( !udp->isInputReady(recvTimeout) )
return false;
size_t ret = udp->UDPReceive::receive((char*)(r_buf.data),sizeof(r_buf.data));
size_t sz = UniSetUDP::UDPMessage::getMessage(pack,r_buf);
if( sz == 0 )
{
dlog[Debug::CRIT] << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
return false;
}
if( rnum>0 && labs(pack.num - rnum) > maxDifferens )
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
* А можно породолжать складывать во временную, но тогда есть риск "никогда" не разгрести временную
* очередь, при "частых обрывах". Потому-что update будет на каждом разрыве ждать ещё lostTimeout..
*/
// Пока выбираю.. чистить qtmp. Это будет соотвествовать логике работы с картами у которых ограничен буфер приёма.
// Обычно "кольцевой". Т.е. если не успели обработать и "вынуть" из буфера информацию.. он будет переписан новыми данными
if( waitClean )
{
dlog[Debug::CRIT] << myname << "(receive): reset qtmp.." << endl;
while( !qtmp.empty() )
qtmp.pop();
}
waitClean = true;
}
rnum = pack.num;
#if 0
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
<< " header: " << pack.msg.header
<< " waitClean=" << waitClean
<< endl;
for( size_t i=0; i<pack.msg.header.dcount; i++ )
{
UniSetUDP::UDPData& d = pack.msg.dat[i];
cerr << "****** save id=" << d.id << " val=" << d.val << endl;
}
#endif
{ // lock qpack
uniset_mutex_lock l(packMutex,2000);
if( !waitClean )
{
qpack.push(pack);
return true;
}
if( !qpack.empty() )
{
// cerr << myname << "(receive): copy to qtmp..."
// << " header: " << pack.msg.header
// << endl;
qtmp.push(pack);
}
else
{
// cerr << myname << "(receive): copy from qtmp..." << endl;
// очередь освободилась..
// то копируем в неё всё что набралось...
while( !qtmp.empty() )
{
qpack.push(qtmp.top());
qtmp.pop();
}
// не забываем и текущий поместить в очередь..
qpack.push(pack);
waitClean = false;
}
} // unlock qpack
return true;
}
// -----------------------------------------------------------------------------
void UNetReceiver::initIterators()
{
for( ItemVec::iterator it=d_icache.begin(); it!=d_icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
for( ItemVec::iterator it=a_icache.begin(); it!=a_icache.end(); ++it )
{
shm->initAIterator(it->ait);
shm->initDIterator(it->dit);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.dcount == d_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
d_cache_init_ok = true;
d_icache.resize(pack.dcount);
for( size_t i=0; i<d_icache.size(); i++ )
{
ItemInfo& d(d_icache[i]);
if( d.id != pack.d_id[i] )
{
d.id = pack.d_id[i];
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.acount == a_icache.size() )
return;
dlog[Debug::INFO] << myname << ": init icache.." << endl;
a_cache_init_ok = true;
a_icache.resize(pack.acount);
for( size_t i=0; i<a_icache.size(); i++ )
{
ItemInfo& d(a_icache[i]);
if( d.id != pack.a_dat[i].id )
{
d.id = pack.a_dat[i].id;
d.iotype = conf->getIOType(d.id);
shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl )
{
slEvent = sl;
}
// -----------------------------------------------------------------------------
#ifndef UNetReceiver_H_
#define UNetReceiver_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <queue>
#include <cc++/socket.h>
#include <sigc++/sigc++.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UDPPacket.h"
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
* Всё это реализовано в функции UNetReceiver::real_update()
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядковый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
* =========================================================================
* Для защиты от сбоя счётика сделана следующая логика:
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
* Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
* и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
* затирают старые, если их не успели вынуть и обработать.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
*/
// -----------------------------------------------------------------------------
class UNetReceiver
{
public:
UNetReceiver( const std::string host, const ost::tpport_t port, SMInterface* smi );
~UNetReceiver();
void start();
void stop();
void receive();
void update();
inline std::string getName(){ return myname; }
// блокировать сохранение данный в SM
void setLockUpdate( bool st );
void resetTimeout();
inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); }
inline unsigned long getLostPacketsNum(){ return lostPackets; }
void setReceiveTimeout( timeout_t msec );
void setReceivePause( timeout_t msec );
void setUpdatePause( timeout_t msec );
void setLostTimeout( timeout_t msec );
void setMaxDifferens( unsigned long set );
void setRespondID( UniSetTypes::ObjectId id, bool invert=false );
void setLostPacketsID( UniSetTypes::ObjectId id );
void setMaxProcessingCount( int set );
inline ost::IPV4Address getAddress(){ return addr; }
inline ost::tpport_t getPort(){ return port; }
/*! Коды событий */
enum Event
{
evOK, /*!< связь есть */
evTimeout /*!< потеря связи */
};
typedef sigc::slot<void,UNetReceiver*,Event> EventSlot;
void connectEvent( EventSlot sl );
protected:
SMInterface* shm;
bool recv();
void step();
void real_update();
void initIterators();
private:
UNetReceiver();
int recvpause; /*!< пауза меджду приёмами пакетов, [мсек] */
int updatepause; /*!< переодичность обновления данных в SM, [мсек] */
ost::UDPReceive* udp;
ost::IPV4Address addr;
ost::tpport_t port;
std::string myname;
UniSetTypes::uniset_mutex pollMutex;
PassiveTimer ptRecvTimeout;
timeout_t recvTimeout;
timeout_t lostTimeout;
PassiveTimer ptLostTimeout;
unsigned long lostPackets; /*!< счётчик потерянных пакетов */
UniSetTypes::ObjectId sidRespond;
IOController::DIOStateList::iterator ditRespond;
bool respondInvert;
UniSetTypes::ObjectId sidLostPackets;
IOController::AIOStateList::iterator aitLostPackets;
bool activated;
ThreadCreator<UNetReceiver>* r_thr; // receive thread
ThreadCreator<UNetReceiver>* u_thr; // update thread
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
inline bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{ return lhs.num > rhs.num; }
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */
UniSetUDP::UDPPacket r_buf;
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! максимальная разница межд номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
*/
unsigned long maxDifferens;
PacketQueue qtmp; /*!< очередь на время обработки(очистки) основной очереди */
bool waitClean; /*!< флаг означающий, что ждём очистики очереди до конца */
unsigned long rnum; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
int maxProcessingCount; /*!< максимальное число обрабатываемых за один раз сообщений */
bool lockUpdate; /*!< флаг блокировки сохранения принятых данных в SM */
UniSetTypes::uniset_mutex lockMutex;
EventSlot slEvent;
Trigger trTimeout;
UniSetTypes::uniset_mutex tmMutex;
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
ItemInfo():
id(UniSetTypes::DefaultObjectId),
iotype(UniversalIO::UnknownIOType){}
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec d_icache; /*!< кэш итераторов для булевых */
ItemVec a_icache; /*!< кэш итераторов для аналоговых */
bool d_cache_init_ok;
bool a_cache_init_ok;
void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
};
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
// -----------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetSender.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
UNetSender::UNetSender( const std::string s_host, const ost::tpport_t port, SMInterface* smi,
const std::string s_f, const std::string s_val, SharedMemory* ic ):
s_field(s_f),
s_fvalue(s_val),
shm(smi),
s_host(s_host),
sendpause(150),
activated(false),
dlist(100),
maxItem(0),
packetnum(1),
s_thr(0)
{
{
ostringstream s;
s << "S(" << setw(15) << s_host << ":" << setw(4) << port << ")";
myname = s.str();
}
// определяем фильтр
// s_field = conf->getArgParam("--udp-filter-field");
// s_fvalue = conf->getArgParam("--udp-filter-value");
dlog[Debug::INFO] << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(UNetSender): UDP set to " << s_host << ":" << port << endl;
ost::Thread::setException(ost::Thread::throwException);
try
{
addr = s_host.c_str();
udp = new ost::UDPBroadcast(addr,port);
}
catch( std::exception& e )
{
ostringstream s;
s << myname << ": " << e.what();
dlog[Debug::CRIT] << s.str() << std::endl;
throw SystemError(s.str());
}
catch( ... )
{
ostringstream s;
s << myname << ": catch...";
dlog[Debug::CRIT] << s.str() << std::endl;
throw SystemError(s.str());
}
s_thr = new ThreadCreator<UNetSender>(this, &UNetSender::send);
// -------------------------------
if( shm->isLocalwork() )
{
readConfiguration();
dlist.resize(maxItem);
dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl;
}
else
ic->addReadItem( sigc::mem_fun(this,&UNetSender::readItem) );
// выставляем поля, которые не меняются
mypack.nodeID = conf->getLocalNode();
mypack.procID = shm->ID();
}
// -----------------------------------------------------------------------------
UNetSender::~UNetSender()
{
delete s_thr;
delete udp;
delete shm;
}
// -----------------------------------------------------------------------------
void UNetSender::updateFromSM()
{
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
{
long value = 0;
if( it->iotype == UniversalIO::DigitalInput || it->iotype == UniversalIO::DigitalOutput )
value = shm->localGetState(it->dit,it->id) ? 1 : 0;
else if( it->iotype == UniversalIO::AnalogInput || it->iotype == UniversalIO::AnalogOutput )
value = shm->localGetValue(it->ait,it->id);
else
{
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << it->id << endl;
continue;
}
updateItem(it,value);
}
}
// -----------------------------------------------------------------------------
void UNetSender::updateSensor( UniSetTypes::ObjectId id, long value )
{
// cerr << myname << ": UPDATE SENSOR id=" << id << " value=" << value << endl;
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
{
if( it->id == id )
{
updateItem( it, value );
break;
}
}
}
// -----------------------------------------------------------------------------
void UNetSender::updateItem( DMap::iterator& it, long value )
{
if( it == dlist.end() )
return;
if( it->iotype == UniversalIO::DigitalInput || it->iotype == UniversalIO::DigitalOutput )
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setDData(it->pack_ind,value);
}
else if( it->iotype == UniversalIO::AnalogInput || it->iotype == UniversalIO::AnalogOutput )
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,100);
mypack.setAData(it->pack_ind,value);
}
}
// -----------------------------------------------------------------------------
void UNetSender::send()
{
dlist.resize(maxItem);
dlog[Debug::INFO] << myname << "(send): dlist size = " << dlist.size() << endl;
/*
ost::IPV4Broadcast h = s_host.c_str();
try
{
udp->setPeer(h,port);
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
dlog[Debug::CRIT] << myname << "(poll): " << s.str() << endl;
throw SystemError(s.str());
}
*/
while( activated )
{
try
{
if( !shm->isLocalwork() )
updateFromSM();
real_send();
}
catch( ost::SockException& e )
{
dlog[Debug::WARN] << myname << "(send): " << e.getString() << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::WARN] << myname << "(send): " << ex << std::endl;
}
catch( std::exception& e )
{
dlog[Debug::WARN] << myname << "(send): " << e.what() << std::endl;
}
catch(...)
{
dlog[Debug::WARN] << myname << "(send): catch ..." << std::endl;
}
msleep(sendpause);
}
dlog[Debug::INFO] << "************* execute FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
void UNetSender::real_send()
{
UniSetTypes::uniset_mutex_lock l(pack_mutex,300);
mypack.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
if( !udp->isPending(ost::Socket::pendingOutput) )
return;
mypack.transport_msg(s_msg);
size_t ret = udp->send( (char*)s_msg.data, s_msg.len );
if( ret < s_msg.len )
dlog[Debug::CRIT] << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
}
// -----------------------------------------------------------------------------
void UNetSender::stop()
{
activated = false;
// s_thr->stop();
}
// -----------------------------------------------------------------------------
void UNetSender::start()
{
if( !activated )
{
activated = true;
s_thr->start();
}
}
// -----------------------------------------------------------------------------
void UNetSender::readConfiguration()
{
xmlNode* root = conf->getXMLSensorsSection();
if(!root)
{
ostringstream err;
err << myname << "(readConfiguration): not found <sensors>";
throw SystemError(err.str());
}
UniXML_iterator it(root);
if( !it.goChildren() )
{
std::cerr << myname << "(readConfiguration): empty <sensors>?!!" << endl;
return;
}
for( ;it.getCurrent(); it.goNext() )
{
if( check_filter(it,s_field,s_fvalue) )
initItem(it);
}
}
// ------------------------------------------------------------------------------------------
bool UNetSender::readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec )
{
if( UniSetTypes::check_filter(it,s_field,s_fvalue) )
initItem(it);
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetSender::initItem( UniXML_iterator& it )
{
string sname( it.getProp("name") );
string tid(it.getProp("id"));
ObjectId sid;
if( !tid.empty() )
{
sid = UniSetTypes::uni_atoi(tid);
if( sid <= 0 )
sid = DefaultObjectId;
}
else
sid = conf->getSensorID(sname);
if( sid == DefaultObjectId )
{
if( dlog )
dlog[Debug::CRIT] << myname << "(readItem): ID not found for "
<< sname << endl;
return false;
}
UItem p;
p.iotype = UniSetTypes::getIOType(it.getProp("iotype"));
if( p.iotype == UniversalIO::UnknownIOType )
{
dlog[Debug::CRIT] << myname << "(readItem): Unknown iotype for sid=" << sid << endl;
return false;
}
p.id = sid;
if( p.iotype == UniversalIO::DigitalInput || p.iotype == UniversalIO::DigitalOutput )
{
p.pack_ind = mypack.addDData(sid,0);
if ( p.pack_ind >= UniSetUDP::MaxDCount )
{
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP DIGITAL DATA LIMIT! max="
<< UniSetUDP::MaxDCount << endl;
return false;
}
}
else if( p.iotype == UniversalIO::AnalogInput || p.iotype == UniversalIO::AnalogOutput )
{
p.pack_ind = mypack.addAData(sid,0);
if ( p.pack_ind >= UniSetUDP::MaxACount )
{
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max="
<< UniSetUDP::MaxACount << endl;
return false;
}
}
if( maxItem >= dlist.size() )
dlist.resize(maxItem+10);
dlist[maxItem] = p;
maxItem++;
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(initItem): add " << p << endl;
return true;
}
// ------------------------------------------------------------------------------------------
std::ostream& operator<<( std::ostream& os, UNetSender::UItem& p )
{
return os << " sid=" << p.id;
}
// -----------------------------------------------------------------------------
void UNetSender::initIterators()
{
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); it++ )
{
shm->initDIterator(it->dit);
shm->initAIterator(it->ait);
}
}
// -----------------------------------------------------------------------------
void UNetSender::askSensors( UniversalIO::UIOCommand cmd )
{
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); it++ )
shm->askSensor(it->id,cmd);
}
// -----------------------------------------------------------------------------
#ifndef UNetSender_H_
#define UNetSender_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <vector>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UDPPacket.h"
// -----------------------------------------------------------------------------
/*
*
*/
class UNetSender
{
public:
UNetSender( const std::string host, const ost::tpport_t port, SMInterface* smi,
const std::string s_field="", const std::string s_fvalue="", SharedMemory* ic=0 );
~UNetSender();
struct UItem
{
UItem():
iotype(UniversalIO::UnknownIOType),
id(UniSetTypes::DefaultObjectId),
pack_ind(-1){}
UniversalIO::IOTypes iotype;
UniSetTypes::ObjectId id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
int pack_ind;
friend std::ostream& operator<<( std::ostream& os, UItem& p );
};
typedef std::vector<UItem> DMap;
void start();
void stop();
void send();
void real_send();
/*! (принудительно) обновить все данные (из SM) */
void updateFromSM();
/*! Обновить значение по ID датчика */
void updateSensor( UniSetTypes::ObjectId id, long value );
/*! Обновить значение по итератору */
void updateItem( DMap::iterator& it, long value );
inline void setSendPause( int msec ){ sendpause = msec; }
/*! заказать датчики */
void askSensors( UniversalIO::UIOCommand cmd );
/*! инициализация итераторов */
void initIterators();
protected:
std::string s_field;
std::string s_fvalue;
SMInterface* shm;
bool initItem( UniXML_iterator& it );
bool readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec );
void readConfiguration();
private:
UNetSender();
ost::UDPBroadcast* udp;
ost::IPV4Address addr;
ost::tpport_t port;
std::string s_host;
std::string myname;
int sendpause;
bool activated;
UniSetTypes::uniset_mutex pack_mutex;
UniSetUDP::UDPMessage mypack;
DMap dlist;
int maxItem;
unsigned long packetnum;
UniSetUDP::UDPPacket s_msg;
ThreadCreator<UNetSender>* s_thr; // send thread
};
// -----------------------------------------------------------------------------
#endif // UNetSender_H_
// -----------------------------------------------------------------------------
#!/bin/sh
ln -s -f ../../Utilities/scripts/uniset-start.sh
ln -s -f ../../Utilities/scripts/uniset-stop.sh stop.sh
ln -s -f ../../Utilities/scripts/uniset-functions.sh
ln -s -f ../../conf/test.xml test.xml
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: libUniSetUNet2
Description: Support library for UniSetUNet2
Requires: libUniSetExtensions
Version: @VERSION@
Libs: -L${libdir} -lUniSetUNet2
Cflags: -I${includedir}/uniset
#!/bin/sh
uniset-start.sh -f ./uniset-unetexchange --unet-name UNetExchange \
--confile test.xml --smemory-id SharedMemory \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 \
--dlog-add-levels info,crit,warn
#include <cstdlib>
#include <errno.h>
#include <getopt.h>
#include <cstring>
#include <iostream>
#include <cc++/socket.h>
#include "UDPPacket.h"
// --------------------------------------------------------------------------
static struct option longopts[] = {
{ "help", no_argument, 0, 'h' },
{ "send", required_argument, 0, 's' },
{ "receive", required_argument, 0, 'r' },
{ "proc-id", required_argument, 0, 'p' },
{ "node-id", required_argument, 0, 'n' },
{ "send-pause", required_argument, 0, 'x' },
{ "timeout", required_argument, 0, 't' },
{ "data-count", required_argument, 0, 'c' },
{ "disable-broadcast", no_argument, 0, 'b' },
{ "show-data", no_argument, 0, 'd' },
{ "check-lost", no_argument, 0, 'l' },
{ "verbode", required_argument, 0, 'v' },
{ "num-cycles", required_argument, 0, 'z' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
using namespace std;
using namespace UniSetUDP;
// --------------------------------------------------------------------------
enum Command
{
cmdNOP,
cmdSend,
cmdReceive
};
// --------------------------------------------------------------------------
static bool split_addr( const string addr, string& host, ost::tpport_t& port )
{
string::size_type pos = addr.rfind(':');
if( pos != string::npos )
{
host = addr.substr(0,pos);
string s_port(addr.substr(pos+1,addr.size()-1));
port = UniSetTypes::uni_atoi(s_port.c_str());
return true;
}
return false;
}
// --------------------------------------------------------------------------
int main(int argc, char* argv[])
{
int optindex = 0;
int opt = 0;
Command cmd = cmdNOP;
int verb = 0;
std::string addr = "";
ost::tpport_t port=0;
int usecpause = 2000000;
timeout_t tout = TIMEOUT_INF;
bool broadcast = true;
int procID = 1;
int nodeID = 1;
size_t count = 50;
bool lost = false;
bool show = false;
int ncycles = -1;
while( (opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:",longopts,&optindex)) != -1 )
{
switch (opt)
{
case 'h':
cout << "-h|--help - this message" << endl;
cout << "[-s|--send] host:port - Send message." << endl;
cout << "[-c|--data-count] num - Send num count of value. Default: 50." << endl;
cout << "[-r|--receive] host:port - Receive message." << endl;
cout << "[-p|--proc-id] id - Set packet header. From 'procID'. Default: 1" << endl;
cout << "[-n|--node-id] id - Set packet header. From 'nodeID'. Default: 1" << endl;
cout << "[-t|--timeout] msec - timeout for receive. Default: 0 msec (waitup)." << endl;
cout << "[-x|--send-pause] msec - pause for send packets. Default: 200 msec." << endl;
cout << "[-b|--disable-broadcast] - Disable broadcast mode." << endl;
cout << "[-l|--check-lost] - Check the lost packets." << endl;
cout << "[-v|--verbose] - verbose mode." << endl;
cout << "[-d|--show-data] - show receive data." << endl;
cout << "[-z|--num-cycles] num - Number of cycles of exchange. Default: -1 - infinitely." << endl;
cout << endl;
return 0;
case 'r':
cmd = cmdReceive;
addr = string(optarg);
break;
case 's':
addr = string(optarg);
cmd = cmdSend;
break;
case 't':
tout = UniSetTypes::uni_atoi(optarg);
break;
case 'x':
usecpause = UniSetTypes::uni_atoi(optarg)*1000;
break;
case 'c':
count = UniSetTypes::uni_atoi(optarg);
break;
case 'p':
procID = UniSetTypes::uni_atoi(optarg);
break;
case 'n':
nodeID = UniSetTypes::uni_atoi(optarg);
break;
case 'b':
broadcast = false;
break;
case 'd':
show = true;
break;
case 'l':
lost = true;
break;
case 'v':
verb = 1;
break;
case 'z':
ncycles = UniSetTypes::uni_atoi(optarg);
break;
case '?':
default:
cerr << "? argumnet" << endl;
return 0;
}
}
if( cmd == cmdNOP )
{
cerr << "No command... Use -h for help" << endl;
return -1;
}
if( tout < 0 )
tout = TIMEOUT_INF;
ost::Thread::setException(ost::Thread::throwException);
try
{
string s_host;
if( !split_addr(addr,s_host,port) )
{
cerr << "(main): Unknown 'host:port' for '" << addr << "'" << endl;
return 1;
}
if( verb )
{
cout << " host=" << s_host
<< " port=" << port
<< " timeout=";
if( tout == TIMEOUT_INF )
cout << "Waitup";
else
cout << tout;
cout << " msecpause=" << usecpause/1000
<< endl;
}
ost::IPV4Host host(s_host.c_str());
// udp.UDPTransmit::setBroadcast(broadcast);
switch( cmd )
{
case cmdReceive:
{
ost::UDPDuplex udp(host,port);
// char buf[UniSetUDP::MaxDataLen];
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
unsigned long prev_num=1;
int nc = 1;
if( ncycles > 0 )
nc = ncycles;
while( nc )
{
try
{
if( !udp.isInputReady(tout) )
{
cout << "(recv): Timeout.." << endl;
continue;
}
size_t ret = udp.UDPReceive::receive( &(buf.data), sizeof(buf.data) );
size_t sz = UniSetUDP::UDPMessage::getMessage(pack,buf);
if( sz == 0 )
{
cerr << "(recv): FAILED header ret=" << ret
<< " sizeof=" << sz<< endl;
continue;
}
if( lost )
{
if( prev_num != (pack.num-1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.num
<< " prev=" << prev_num << endl;
prev_num = pack.num;
}
if( verb )
cout << "receive OK: "
<< " bytes: " << ret << endl;
if( show )
cout << "receive data: " << pack << endl;
}
catch( ost::SockException& e )
{
cerr << "(recv): " << e.getString() << " (" << addr << ")" << endl;
}
catch( ... )
{
cerr << "(recv): catch ..." << endl;
}
if( ncycles > 0 )
{
nc--;
if( nc <=0 )
break;
}
}
}
break;
case cmdSend:
{
ost::UDPSocket* udp;
if( !broadcast )
udp = new ost::UDPSocket();
else
udp = new ost::UDPBroadcast(host,port);
UniSetUDP::UDPMessage mypack;
mypack.nodeID = nodeID;
mypack.procID = procID;
for( size_t i=0; i < count; i++ )
{
UDPAData d(i,i);
mypack.addAData(d);
}
for( unsigned int i=0; i < count; i++ )
mypack.addDData(i,i);
udp->setPeer(host,port);
unsigned long packetnum = 0;
UniSetUDP::UDPPacket s_buf;
int nc = 1;
if( ncycles > 0 )
nc = ncycles;
while( nc )
{
mypack.num = packetnum++;
if( packetnum > UniSetUDP::MaxPacketNum )
packetnum = 1;
try
{
if( udp->isPending(ost::Socket::pendingOutput,tout) )
{
mypack.transport_msg(s_buf);
if( verb )
cout << "(send): to addr=" << addr << " d_count=" << mypack.dcount
<< " a_count=" << mypack.acount << " bytes=" << s_buf.len << endl;
size_t ret = udp->send((char*)&s_buf.data, s_buf.len);
if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
}
}
catch( ost::SockException& e )
{
cerr << "(send): " << e.getString() << " (" << addr << ")" << endl;
}
catch( ... )
{
cerr << "(send): catch ..." << endl;
}
if( ncycles > 0 )
{
nc--;
if( nc <=0 )
break;
}
usleep(usecpause);
}
}
break;
default:
cerr << endl << "Unknown command: '" << cmd << "'. Use -h for help" << endl;
return -1;
break;
}
}
catch( std::exception& e )
{
cerr << "(main): " << e.what() << endl;
}
catch( ... )
{
cerr << "(main): catch ..." << endl;
return 1;
}
return 0;
}
// --------------------------------------------------------------------------
#include <sstream>
#include <sys/wait.h>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UNetExchange.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
int main( int argc, const char** argv )
{
try
{
if( argc>1 && (!strcmp(argv[1],"--help") || !strcmp(argv[1],"-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << "--unet-logfile filename - logfilename. Default: udpexchange.log" << endl;
cout << endl;
UNetExchange::help_print(argc,argv);
return 0;
}
uniset_init(argc,argv);
string logfilename(conf->getArgParam("--unet-logfile"));
if( logfilename.empty() )
logfilename = "udpexchange.log";
std::ostringstream logname;
string dir(conf->getLogDir());
logname << dir << logfilename;
unideb.logFile( logname.str() );
UniSetExtensions::dlog.logFile( logname.str() );
conf->initDebug(UniSetExtensions::dlog,"dlog");
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
UNetExchange* unet = UNetExchange::init_unetexchange(argc,argv,shmID);
if( !unet )
{
dlog[Debug::CRIT] << "(unetexchange): init failed.." << endl;
return 1;
}
ObjectsActivator act;
act.addObject(static_cast<class UniSetObject*>(unet));
SystemMessage sm(SystemMessage::StartUp);
act.broadcast( sm.transport_msg() );
unideb(Debug::ANY) << "\n\n\n";
unideb[Debug::ANY] << "(main): -------------- UDPRecevier START -------------------------\n\n";
dlog(Debug::ANY) << "\n\n\n";
dlog[Debug::ANY] << "(main): -------------- UDPReceiver START -------------------------\n\n";
act.run(false);
while( waitpid(-1, 0, 0) > 0 );
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << "(unetexchange): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << "(unetexchange): catch ..." << std::endl;
}
while( waitpid(-1, 0, 0) > 0 );
return 0;
}
......@@ -92,8 +92,8 @@ bool UDPMessage::setDData( size_t index, bool val )
if( index >= MaxDCount )
return false;
size_t nbyte = index / 8*sizeof(unsigned char);
size_t nbit = index % 8*sizeof(unsigned char);
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
// выставляем бит
unsigned char d = d_dat[nbyte];
......@@ -119,8 +119,8 @@ bool UDPMessage::dValue( size_t index )
if( index >= MaxDCount )
return UniSetTypes::DefaultObjectId;
size_t nbyte = index / 8*sizeof(unsigned char);
size_t nbit = index % 8*sizeof(unsigned char);
size_t nbyte = index / sizeof(unsigned char);
size_t nbit = index % sizeof(unsigned char);
return ( d_dat[nbyte] & (1<<nbit) );
}
......@@ -144,8 +144,8 @@ size_t UDPMessage::transport_msg( UDPPacket& p )
i += sz;
// копируем булевые данные
size_t nbyte = dcount / 8*sizeof(unsigned char);
size_t nbit = dcount % 8*sizeof(unsigned char);
size_t nbyte = dcount / sizeof(unsigned char);
size_t nbit = dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
memcpy(&(p.data[i]),d_dat,sz);
i += sz;
......@@ -184,8 +184,8 @@ size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p )
i += sz;
// копируем булевые данные
size_t nbyte = m.dcount / 8*sizeof(unsigned char);
size_t nbit = m.dcount % 8*sizeof(unsigned char);
size_t nbyte = m.dcount / sizeof(unsigned char);
size_t nbit = m.dcount % sizeof(unsigned char);
sz = nbit > 0 ? nbyte + 1 : nbyte;
if( sz > sizeof(m.d_dat) )
......
......@@ -41,11 +41,9 @@ namespace UniSetUDP
friend std::ostream& operator<<( std::ostream& os, UDPAData& p );
}__attribute__((packed));
// Хотелось бы не вылезать за общий размер посылаемых пакетов 8192. (550,900 --> 8133)
static const size_t MaxACount = 550;
static const size_t MaxDCount = 900;
static const size_t MaxDDataCount = 1 + MaxDCount / 8*sizeof(unsigned char);
static const size_t MaxACount = 200;
static const size_t MaxDCount = 400;
static const size_t MaxDDataCount = MaxDCount / sizeof(unsigned char);
struct UDPPacket
{
......
......@@ -7,7 +7,7 @@ using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic, const std::string& prefix ):
UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ):
UniSetObject_LT(objId),
shm(0),
initPause(0),
......@@ -17,7 +17,7 @@ sender(0),
sender2(0)
{
if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --" + prefix +"-unet-name" );
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" );
cnode = conf->getNode(myname);
if( cnode == NULL )
......@@ -28,27 +28,26 @@ sender2(0)
UniXML_iterator it(cnode);
// определяем фильтр
s_field = conf->getArgParam("--" + prefix + "-filter-field");
s_fvalue = conf->getArgParam("--" + prefix + "-filter-value");
s_field = conf->getArgParam("--unet-filter-field");
s_fvalue = conf->getArgParam("--unet-filter-value");
dlog[Debug::INFO] << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
const string n_field(conf->getArgParam("--" + prefix + "-nodes-filter-field"));
const string n_fvalue(conf->getArgParam("--" + prefix + "-nodes-filter-value"));
const string n_field(conf->getArgParam("--unet-nodes-filter-field"));
const string n_fvalue(conf->getArgParam("--unet-nodes-filter-value"));
dlog[Debug::INFO] << myname << "(init): read nodes-filter-field='" << n_field
<< "' nodes-filter-value='" << n_fvalue << "'" << endl;
int recvTimeout = conf->getArgPInt("--" + prefix + "-recv-timeout",it.getProp("recvTimeout"), 5000);
int prepareTime = conf->getArgPInt("--" + prefix + "-preapre-time",it.getProp("prepareTime"), 2000);
int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout",it.getProp("lostTimeout"), recvTimeout);
int recvpause = conf->getArgPInt("--" + prefix + "-recvpause",it.getProp("recvpause"), 10);
int sendpause = conf->getArgPInt("--" + prefix + "-sendpause",it.getProp("sendpause"), 100);
int updatepause = conf->getArgPInt("--" + prefix + "-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--" + prefix + "-steptime",it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense",it.getProp("maxDifferense"), 1000);
int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000);
int lostTimeout = conf->getArgPInt("--unet-lost-timeout",it.getProp("lostTimeout"), recvTimeout);
int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10);
int sendpause = conf->getArgPInt("--unet-sendpause",it.getProp("sendpause"), 150);
int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--unet-maxdifferense",it.getProp("maxDifferense"), 1000);
int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
no_sender = conf->getArgInt("--" + prefix + "-nosender",it.getProp("nosender"));
no_sender = conf->getArgInt("--unet-nosender",it.getProp("nosender"));
xmlNode* nodes = conf->getXMLNodesSection();
if( !nodes )
......@@ -248,7 +247,6 @@ sender2(0)
r->setLockUpdate(false);
r->setReceiveTimeout(recvTimeout);
r->setPrepareTime(prepareTime);
r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause);
......@@ -298,7 +296,7 @@ sender2(0)
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--" + prefix + "-heartbeat-id",it.getProp("heartbeat_id"));
string heart = conf->getArgParam("--unet-heartbeat-id",it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
......@@ -310,13 +308,13 @@ sender2(0)
throw SystemError(err.str());
}
int heartbeatTime = conf->getArgPInt("--" + prefix + "-heartbeat-time",it.getProp("heartbeatTime"),conf->getHeartBeatTime());
int heartbeatTime = getHeartBeatTime();
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--" + prefix + "-heartbeat-max", it.getProp("heartbeat_max"), 10);
maxHeartBeat = conf->getArgPInt("--unet-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
......@@ -333,7 +331,7 @@ sender2(0)
dlog[Debug::INFO] << myname << "(init): test_id=" << test_id << endl;
activateTimeout = conf->getArgPInt("--" + prefix + "-activate-timeout", 20000);
activateTimeout = conf->getArgPInt("--activate-timeout", 20000);
}
// -----------------------------------------------------------------------------
UNetExchange::~UNetExchange()
......@@ -351,7 +349,7 @@ UNetExchange::~UNetExchange()
delete shm;
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost( const std::string& addr, ost::tpport_t port )
bool UNetExchange::checkExistUNetHost( const std::string addr, ost::tpport_t port )
{
ost::IPV4Address a1(addr.c_str());
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
......@@ -424,7 +422,7 @@ void UNetExchange::step()
}
// -----------------------------------------------------------------------------
void UNetExchange::ReceiverInfo::step( SMInterface* shm, const std::string& myname )
void UNetExchange::ReceiverInfo::step( SMInterface* shm, const std::string myname )
{
try
{
......@@ -684,28 +682,25 @@ void UNetExchange::initIterators()
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, const char* argv[] )
{
cout << "Default prefix='unet'" << endl;
cout << "--prefix-name NameID - Идентификтора процесса." << endl;
cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--prefix-prepare-time msec - Время необходимое на подготовку (восстановление связи) при переключении на другой канал" << endl;
cout << "--prefix-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--prefix-recvpause msec - Пауза между приёмами. По умолчанию 10" << endl;
cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl;
cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с recvpause и sendpause). По умолчанию 100" << endl;
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - время на ожидание старта SM" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 15000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--unet-name NameID - Идентификтора процесса." << endl;
cout << "--unet-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--unet-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--unet-recvpause msec - Пауза между приёмами. По умолчанию 10" << endl;
cout << "--unet-sendpause msec - Пауза между посылками. По умолчанию 150" << endl;
cout << "--unet-updatepause msec - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl;
cout << "--unet-steptime msec - Шаг..." << endl;
cout << "--unet-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--unet-maxprocessingcount num - время на ожидание старта SM" << endl;
cout << "--unet-nosender [0,1] - Отключить посылку." << endl;
cout << "--unet-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 15000" << endl;
cout << "--unet-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--unet-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
}
// -----------------------------------------------------------------------------
UNetExchange* UNetExchange::init_unetexchange( int argc, const char* argv[], UniSetTypes::ObjectId icID,
SharedMemory* ic, const std::string& prefix )
UNetExchange* UNetExchange::init_unetexchange( int argc, const char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic )
{
string p("--" + prefix + "-name");
string name = conf->getArgParam(p,"UNetExchange1");
string name = conf->getArgParam("--unet-name","UNetExchange1");
if( name.empty() )
{
cerr << "(unetexchange): Не задан name'" << endl;
......@@ -715,13 +710,14 @@ UNetExchange* UNetExchange::init_unetexchange( int argc, const char* argv[], Uni
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
cerr << "(unetexchange): Not found ObjectID for '" << name
<< " in section '" << conf->getObjectsSection() << "'" << endl;
cerr << "(unetexchange): идентификатор '" << name
<< "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl;
return 0;
}
dlog[Debug::INFO] << "(unetexchange): name = " << name << "(" << ID << ")" << endl;
return new UNetExchange(ID,icID,ic,prefix);
return new UNetExchange(ID,icID,ic);
}
// -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( UNetReceiver* r, UNetReceiver::Event ev )
......@@ -743,26 +739,25 @@ void UNetExchange::receiverEvent( UNetReceiver* r, UNetReceiver::Event ev )
// переключаемся на второй
it->r1->setLockUpdate(true);
it->r2->setLockUpdate(false);
it->r2->resetTimeout();
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(event): " << r->getName()
<< ": timeout for channel1.. select channel 2" << endl;
<< ": timeout for channel1.. select channel2" << endl;
return;
}
if( it->r2 == r )
{
// пропала связь по второму каналу...
// переключаемся на первый
it->r1->setLockUpdate(false);
it->r1->resetTimeout();
it->r2->setLockUpdate(true);
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(event): " << r->getName()
<< ": timeout for channel2.. select channel 1" << endl;
<< ": timeout for channel2.. select channel1" << endl;
return;
}
}
}
// -----------------------------------------------------------------------------
......@@ -15,26 +15,26 @@
#include "UNetSender.h"
// -----------------------------------------------------------------------------
/*!
\page pageUNetExchangeUDP Сетевой обмен на основе UDP (UNetUDP)
\page pageUNetExchange2 Сетевой обмен на основе UDP (UNet2)
- \ref pgUNetUDP_Common
- \ref pgUNetUDP_Conf
- \ref pgUNetUDP_Reserv
- \ref pgUnet2_Common
- \ref pgUnet2_Conf
- \ref pgUnet2_Reserv
\section pgUNetUDP_Common Общее описание
\section pgUnet2_Common Общее описание
Обмен построен на основе протокола UDP.
Основная идея заключается в том, что каждый узел на порту равном своему ID
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
пар "id - value". Другие узлы принимают их. Помимо этого данный процесс запускает
по потоку приёма для каждого другого узла и ловит пакеты от них, сохраняя данные в SM.
\par
При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать",
а также параметры своего узла. Открывает по потоку приёма на каждый узел и поток
При своём старте процесс считывает из секции \<nodes> список узлов с которыми необходимо
вести обмен, а также параметры своего узла. Открывает по потоку приёма на каждый узел и поток
передачи для своих данных. Помимо этого такие же потоки для резервных каналов, если они включены
(см. \ref pgUNetUDP_Reserv ).
(см. \ref pgUnet2_Reserv ).
\section pgUNetUDP_Conf Пример конфигурирования
\section pgUnet2_Conf Пример конфигурирования
По умолчанию при считывании используется \b unet_broadcast_ip (указанный в секции \<nodes>)
и \b id узла - в качестве порта.
Но можно переопределять эти параметры, при помощи указания \b unet_port и/или \b unet_broadcast_ip,
......@@ -51,7 +51,7 @@
</nodes>
\endcode
\section pgUNetUDP_Reserv Настройка резервного канала связи
\section pgUnet2_Reserv Настройка резервного канала связи
В текущей реализации поддерживается возможность обмена по двум подсетям (эзернет-каналам).
Она основана на том, что, для каждого узла помимо основного "читателя",
создаётся дополнительный "читатель"(поток) слушающий другой ip-адрес и порт.
......@@ -64,7 +64,7 @@
Переключение между "каналами" происходит по следующей логике:
При старте включается только первый канал. Второй канал работает в режиме "пассивного" чтения.
Т.е. все пакеты принимаются, но данные в SharedMemory не сохраняются.
Т.е. все пакеты принимаются, но данные в SharedMemory не обновляются.
Если во время работы пропадает связь по первому каналу, идёт переключение на второй канал.
Первый канал переводиться в "пассивный" режим, а второй канал, переводится в "нормальный"(активный)
режим. Далее работа ведётся по второму каналу, независимо от того, что связь на первом
......@@ -79,17 +79,17 @@ class UNetExchange:
public UniSetObject_LT
{
public:
UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0, const std::string& prefix="unet" );
UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
virtual ~UNetExchange();
/*! глобальная функция для инициализации объекта */
static UNetExchange* init_unetexchange( int argc, const char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0, const std::string& prefix="unet" );
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* argv[] );
bool checkExistUNetHost( const std::string& host, ost::tpport_t port );
bool checkExistUNetHost( const std::string host, ost::tpport_t port );
protected:
......@@ -142,7 +142,6 @@ class UNetExchange:
{
ReceiverInfo():r1(0),r2(0),
sidRespond(UniSetTypes::DefaultObjectId),
respondInvert(false),
sidLostPackets(UniSetTypes::DefaultObjectId)
{}
......@@ -156,7 +155,7 @@ class UNetExchange:
UNetReceiver* r1; /*!< приём по первому каналу */
UNetReceiver* r2; /*!< приём по второму каналу */
void step( SMInterface* shm, const std::string& myname );
void step( SMInterface* shm, const std::string myname );
inline void setRespondID( UniSetTypes::ObjectId id, bool invert=false )
{
......
......@@ -19,13 +19,12 @@ bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, SMInterface* smi ):
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
shm(smi),
recvpause(10),
updatepause(100),
udp(0),
recvTimeout(5000),
prepareTime(2000),
lostTimeout(5000),
lostPackets(0),
sidRespond(UniSetTypes::DefaultObjectId),
......@@ -79,7 +78,6 @@ a_cache_init_ok(false)
u_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::update);
ptRecvTimeout.setTiming(recvTimeout);
ptPrepare.setTiming(prepareTime);
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
......@@ -95,12 +93,6 @@ void UNetReceiver::setReceiveTimeout( timeout_t msec )
ptRecvTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setPrepareTime( timeout_t msec )
{
prepareTime = msec;
ptPrepare.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setLostTimeout( timeout_t msec )
{
lostTimeout = msec;
......@@ -144,8 +136,6 @@ void UNetReceiver::setLockUpdate( bool st )
{
uniset_mutex_lock l(lockMutex,200);
lockUpdate = st;
if( !st )
ptPrepare.reset();
}
// -----------------------------------------------------------------------------
void UNetReceiver::resetTimeout()
......@@ -167,7 +157,7 @@ void UNetReceiver::start()
// -----------------------------------------------------------------------------
void UNetReceiver::update()
{
cerr << "******************* update start" << endl;
cerr << "******************* udpate start" << endl;
while(activated)
{
try
......@@ -408,8 +398,7 @@ void UNetReceiver::receive()
tout = ptRecvTimeout.checkTime();
}
// только если "режим подготовки закончился, то можем генерировать "события"
if( ptPrepare.checkTime() && trTimeout.change(tout) )
if( trTimeout.change(tout) )
{
if( tout )
slEvent(this,evTimeout);
......
......@@ -50,7 +50,7 @@
class UNetReceiver
{
public:
UNetReceiver( const std::string& host, const ost::tpport_t port, SMInterface* smi );
UNetReceiver( const std::string host, const ost::tpport_t port, SMInterface* smi );
~UNetReceiver();
void start();
......@@ -73,7 +73,6 @@ class UNetReceiver
void setReceivePause( timeout_t msec );
void setUpdatePause( timeout_t msec );
void setLostTimeout( timeout_t msec );
void setPrepareTime( timeout_t msec );
void setMaxDifferens( unsigned long set );
void setRespondID( UniSetTypes::ObjectId id, bool invert=false );
......@@ -117,9 +116,7 @@ class UNetReceiver
UniSetTypes::uniset_mutex pollMutex;
PassiveTimer ptRecvTimeout;
PassiveTimer ptPrepare;
timeout_t recvTimeout;
timeout_t prepareTime;
timeout_t lostTimeout;
PassiveTimer ptLostTimeout;
unsigned long lostPackets; /*!< счётчик потерянных пакетов */
......
......@@ -106,9 +106,6 @@ void UNetSender::updateFromSM()
// -----------------------------------------------------------------------------
void UNetSender::updateSensor( UniSetTypes::ObjectId id, long value )
{
if( !shm->isLocalwork() )
return;
// cerr << myname << ": UPDATE SENSOR id=" << id << " value=" << value << endl;
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
......@@ -295,8 +292,6 @@ bool UNetSender::initItem( UniXML_iterator& it )
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP DIGITAL DATA LIMIT! max="
<< UniSetUDP::MaxDCount << endl;
raise(SIGTERM);
return false;
}
}
......@@ -308,7 +303,6 @@ bool UNetSender::initItem( UniXML_iterator& it )
dlog[Debug::CRIT] << myname
<< "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max="
<< UniSetUDP::MaxACount << endl;
raise(SIGTERM);
return false;
}
}
......
......@@ -4,4 +4,3 @@ uniset-start.sh -f ./uniset-unetexchange --unet-name UNetExchange \
--confile test.xml --smemory-id SharedMemory \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 \
--dlog-add-levels info,crit,warn
......@@ -33,7 +33,7 @@ enum Command
cmdReceive
};
// --------------------------------------------------------------------------
static bool split_addr( const string& addr, string& host, ost::tpport_t& port )
static bool split_addr( const string addr, string& host, ost::tpport_t& port )
{
string::size_type pos = addr.rfind(':');
if( pos != string::npos )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment