Commit efca7b8b authored by Pavel Vainerman's avatar Pavel Vainerman

Перенёс UNet2 в каталог UNet2

parent 9f10c57d
...@@ -154,6 +154,7 @@ rm -f %buildroot%_libdir/*.la ...@@ -154,6 +154,7 @@ rm -f %buildroot%_libdir/*.la
%_bindir/%oname-smemory %_bindir/%oname-smemory
%_bindir/%oname-smviewer %_bindir/%oname-smviewer
%_bindir/%oname-network %_bindir/%oname-network
%_bindir/%oname-unet*
#%_bindir/%oname-smdbserver #%_bindir/%oname-smdbserver
%_libdir/*Extensions.so.* %_libdir/*Extensions.so.*
...@@ -163,6 +164,7 @@ rm -f %buildroot%_libdir/*.la ...@@ -163,6 +164,7 @@ rm -f %buildroot%_libdir/*.la
%_libdir/libUniSetRT*.so.* %_libdir/libUniSetRT*.so.*
%_libdir/libUniSetShared*.so.* %_libdir/libUniSetShared*.so.*
%_libdir/libUniSetNetwork*.so.* %_libdir/libUniSetNetwork*.so.*
%_libdir/libUniSetUNet2*.so.*
#%_libdir/libUniSetSMDBServer*.so.* #%_libdir/libUniSetSMDBServer*.so.*
%files extensions-devel %files extensions-devel
...@@ -182,6 +184,7 @@ rm -f %buildroot%_libdir/*.la ...@@ -182,6 +184,7 @@ rm -f %buildroot%_libdir/*.la
%_pkgconfigdir/libUniSetRT*.pc %_pkgconfigdir/libUniSetRT*.pc
%_pkgconfigdir/libUniSetShared*.pc %_pkgconfigdir/libUniSetShared*.pc
%_pkgconfigdir/libUniSetNetwork*.pc %_pkgconfigdir/libUniSetNetwork*.pc
%_pkgconfigdir/libUniSetUNet*.pc
#%_pkgconfigdir/libUniSetSMDBServer.pc #%_pkgconfigdir/libUniSetSMDBServer.pc
#%_pkgconfigdir/libUniSet*.pc #%_pkgconfigdir/libUniSet*.pc
%exclude %_pkgconfigdir/libUniSet.pc %exclude %_pkgconfigdir/libUniSet.pc
......
...@@ -182,6 +182,7 @@ AC_CONFIG_FILES([Makefile ...@@ -182,6 +182,7 @@ AC_CONFIG_FILES([Makefile
extensions/UDPExchange/Makefile extensions/UDPExchange/Makefile
extensions/UDPExchange/libUniSetUDP.pc extensions/UDPExchange/libUniSetUDP.pc
extensions/UNet2/Makefile extensions/UNet2/Makefile
extensions/UNet2/libUniSetUNet2.pc
extensions/ModbusSlave/Makefile extensions/ModbusSlave/Makefile
extensions/ModbusSlave/libUniSetMBSlave.pc extensions/ModbusSlave/libUniSetMBSlave.pc
extensions/MBTCPMaster/Makefile extensions/MBTCPMaster/Makefile
......
bin_PROGRAMS = @PACKAGE@-unet-sender @PACKAGE@-unet-receiver bin_PROGRAMS = @PACKAGE@-unetexchange
#@PACKAGE@-unetexchange
lib_LTLIBRARIES = libUniSetUNet2.la lib_LTLIBRARIES = libUniSetUNet2.la
libUniSetUNet2_la_LIBADD = $(top_builddir)/lib/libUniSet.la \ libUniSetUNet2_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
...@@ -7,39 +6,21 @@ libUniSetUNet2_la_LIBADD = $(top_builddir)/lib/libUniSet.la \ ...@@ -7,39 +6,21 @@ libUniSetUNet2_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \ $(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS) $(SIGC_LIBS) $(COMCPP_LIBS)
libUniSetUNet2_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS) libUniSetUNet2_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libUniSetUNet2_la_SOURCES = UNetPacket.cc UNetSender.cc UNetReceiver.cc libUniSetUNet2_la_SOURCES = UDPPacket.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
#UNetExchange.cc
#UNetSender.cc @PACKAGE@_unetexchange_SOURCES = unetexchange.cc
@PACKAGE@_unetexchange_LDADD = libUniSetUNet2.la $(top_builddir)/lib/libUniSet.la \
#@PACKAGE@_unetexchange_SOURCES = unetexchange.cc
#@PACKAGE@_unetexchange_LDADD = libUniSetUNet.la $(top_builddir)/lib/libUniSet.la \
# $(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
# $(top_builddir)/extensions/lib/libUniSetExtensions.la \
# $(SIGC_LIBS) $(COMCPP_LIBS)
#@PACKAGE@_unetexchange_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@_unet_sender_SOURCES = unetsender.cc
@PACKAGE@_unet_sender_LDADD = libUniSetUNet2.la $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \ $(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \ $(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS) $(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_unet_sender_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS) @PACKAGE@_unetexchange_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@_unet_receiver_SOURCES = unetreceiver.cc
@PACKAGE@_unet_receiver_LDADD = libUniSetUNet2.la $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_unet_receiver_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
# install # install
#devel_include_HEADERS = *.h devel_include_HEADERS = *.h
#devel_includedir = $(pkgincludedir)/extensions devel_includedir = $(pkgincludedir)/extensions
#pkgconfigdir = $(libdir)/pkgconfig pkgconfigdir = $(libdir)/pkgconfig
#pkgconfig_DATA = libUniSetUNet.pc pkgconfig_DATA = libUniSetUNet2.pc
all-local: all-local:
ln -sf ../UNetExchange/$(devel_include_HEADERS) ../include ln -sf ../UDPExchange/$(devel_include_HEADERS) ../include
...@@ -6,80 +6,98 @@ ...@@ -6,80 +6,98 @@
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace UniSetExtensions; using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ): UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ):
UniSetObject_LT(objId), UniSetObject_LT(objId),
shm(0), shm(0),
initPause(0), initPause(0),
UNet(0),
activated(false), activated(false),
dlist(100), no_sender(false),
maxItem(0) sender(0)
{ {
if( objId == DefaultObjectId ) if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --UNet-name" ); throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" );
// xmlNode* cnode = conf->getNode(myname); // xmlNode* cnode = conf->getNode(myname);
cnode = conf->getNode(myname); cnode = conf->getNode(myname);
if( cnode == NULL ) if( cnode == NULL )
throw UniSetTypes::SystemError("(UNetExchange): Not find conf-node for " + myname ); throw UniSetTypes::SystemError("(UNetExchange): Not found conf-node for " + myname );
shm = new SMInterface(shmId,&ui,objId,ic); shm = new SMInterface(shmId,&ui,objId,ic);
UniXML_iterator it(cnode); UniXML_iterator it(cnode);
// определяем фильтр // определяем фильтр
s_field = conf->getArgParam("--UNet-filter-field"); s_field = conf->getArgParam("--unet-filter-field");
s_fvalue = conf->getArgParam("--UNet-filter-value"); s_fvalue = conf->getArgParam("--unet-filter-value");
dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl; << "' filter-value='" << s_fvalue << "'" << endl;
// ---------- init RS ---------- int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000);
// UniXML_iterator it(cnode); int lostTimeout = conf->getArgPInt("--unet-lost-timeout",it.getProp("lostTimeout"), recvTimeout);
s_host = conf->getArgParam("--UNet-host",it.getProp("host")); int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10);
if( s_host.empty() ) int sendpause = conf->getArgPInt("--unet-sendpause",it.getProp("sendpause"), 150);
throw UniSetTypes::SystemError(myname+"(UNetExchange): Unknown host. Use --UNet-host" ); int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--unet-maxdifferense",it.getProp("maxDifferense"), 1000);
int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
host = s_host.c_str(); no_sender = conf->getArgInt("--unet-nosender",it.getProp("nosender"));
buildReceiverList(); xmlNode* nodes = conf->getXMLNodesSection();
if( !nodes )
throw UniSetTypes::SystemError("(UNetExchange): Not found <nodes>");
// port = conf->getArgInt("--UNet-port",it.getProp("port")); UniXML_iterator n_it(nodes);
if( port <= 0 || port == DefaultObjectId ) if( !n_it.goChildren() )
throw UniSetTypes::SystemError(myname+"(UNetExchange): Unknown port address" ); throw UniSetTypes::SystemError("(UNetExchange): Items not found for <nodes>");
if( dlog.debugging(Debug::INFO) ) for( ; n_it.getCurrent(); n_it.goNext() )
dlog[Debug::INFO] << "(UNetExchange): UNet set to " << s_host << ":" << port << endl;
try
{ {
UNet = new ost::UNetBroadcast(host,port); string h(n_it.getProp("ip"));
} if( !n_it.getProp("unet_ip").empty() )
catch( ost::SockException& e ) h = n_it.getProp("unet_ip");
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString() << endl;
throw SystemError(s.str());
}
thr = new ThreadCreator<UNetExchange>(this, &UNetExchange::poll); int p = n_it.getIntProp("id");
if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port");
recvTimeout = conf->getArgPInt("--UNet-recv-timeout",it.getProp("recvTimeout"), 5000); string n(n_it.getProp("name"));
sendTimeout = conf->getArgPInt("--UNet-send-timeout",it.getProp("sendTimeout"), 5000); if( n == conf->getLocalNodeName() )
polltime = conf->getArgPInt("--UNet-polltime",it.getProp("polltime"), 100); {
dlog[Debug::INFO] << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = new UNetSender(h,p,shm,s_field,s_fvalue,ic);
sender->setSendPause(sendpause);
continue;
}
// ------------------------------- if( !n_it.getProp("unet_ignore").empty() )
if( shm->isLocalwork() ) {
{ dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
readConfiguration(); continue;
dlist.resize(maxItem); }
dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl;
} dlog[Debug::INFO] << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
else
ic->addReadItem( sigc::mem_fun(this,&UNetExchange::readItem) );
if( checkExistUNetHost(h,p) )
{
dlog[Debug::INFO] << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
continue;
}
UNetReceiver* r = new UNetReceiver(h,p,shm);
r->setReceiveTimeout(recvTimeout);
r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause);
r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
recvlist.push_back(r);
}
// -------------------------------
// ********** HEARTBEAT ************* // ********** HEARTBEAT *************
string heart = conf->getArgParam("--UNet-heartbeat-id",it.getProp("heartbeat_id")); string heart = conf->getArgParam("--unet-heartbeat-id",it.getProp("heartbeat_id"));
if( !heart.empty() ) if( !heart.empty() )
{ {
sidHeartBeat = conf->getSensorID(heart); sidHeartBeat = conf->getSensorID(heart);
...@@ -97,7 +115,7 @@ maxItem(0) ...@@ -97,7 +115,7 @@ maxItem(0)
else else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime); ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--UNet-heartbeat-max", it.getProp("heartbeat_max"), 10); maxHeartBeat = conf->getArgPInt("--unet-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat; test_id = sidHeartBeat;
} }
else else
...@@ -116,25 +134,48 @@ maxItem(0) ...@@ -116,25 +134,48 @@ maxItem(0)
activateTimeout = conf->getArgPInt("--activate-timeout", 20000); activateTimeout = conf->getArgPInt("--activate-timeout", 20000);
timeout_t msec = conf->getArgPInt("--UNet-timeout",it.getProp("timeout"), 3000); timeout_t msec = conf->getArgPInt("--unet-timeout",it.getProp("timeout"), 3000);
dlog[Debug::INFO] << myname << "(init): UNet-timeout=" << msec << " msec" << endl; dlog[Debug::INFO] << myname << "(init): udp-timeout=" << msec << " msec" << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetExchange::~UNetExchange() UNetExchange::~UNetExchange()
{ {
for( ReceiverList::iterator it=rlist.begin(); it!=rlist.end(); it++ ) for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
delete (*it); delete (*it);
delete UNet; delete sender;
delete shm; delete shm;
delete thr; }
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost( const std::string addr, ost::tpport_t port )
{
ost::IPV4Address a1(addr.c_str());
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if( (*it)->getAddress() == a1.getAddress() && (*it)->getPort() == port )
return true;
}
return false;
}
// -----------------------------------------------------------------------------
void UNetExchange::startReceivers()
{
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
(*it)->start();
}
// -----------------------------------------------------------------------------
void UNetExchange::initSender( const std::string s_host, const ost::tpport_t port, UniXML_iterator& it )
{
if( no_sender )
return;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::waitSMReady() void UNetExchange::waitSMReady()
{ {
// waiting for SM is ready... // waiting for SM is ready...
int ready_timeout = conf->getArgInt("--UNet-sm-ready-timeout","15000"); int ready_timeout = conf->getArgInt("--unet-sm-ready-timeout","15000");
if( ready_timeout == 0 ) if( ready_timeout == 0 )
ready_timeout = 15000; ready_timeout = 15000;
else if( ready_timeout < 0 ) else if( ready_timeout < 0 )
...@@ -149,6 +190,15 @@ void UNetExchange::waitSMReady() ...@@ -149,6 +190,15 @@ void UNetExchange::waitSMReady()
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::timerInfo( TimerMessage *tm )
{
if( !activated )
return;
if( tm->id == tmStep )
step();
}
// -----------------------------------------------------------------------------
void UNetExchange::step() void UNetExchange::step()
{ {
if( !activated ) if( !activated )
...@@ -163,100 +213,12 @@ void UNetExchange::step() ...@@ -163,100 +213,12 @@ void UNetExchange::step()
} }
catch(Exception& ex) catch(Exception& ex)
{ {
dlog[Debug::CRIT] << myname dlog[Debug::CRIT] << myname << "(step): (hb) " << ex << std::endl;
<< "(step): (hb) " << ex << std::endl;
} }
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::poll()
{
dlist.resize(maxItem);
dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl;
for( ReceiverList::iterator it=rlist.begin(); it!=rlist.end(); it++ )
{
(*it)->setReceiveTimeout(recvTimeout);
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(poll): start exchange for " << (*it)->getName() << endl;
(*it)->start();
}
ost::IPV4Broadcast h = s_host.c_str();
try
{
UNet->setPeer(h,port);
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
dlog[Debug::CRIT] << myname << "(poll): " << s.str() << endl;
throw SystemError(s.str());
}
while( activated )
{
try
{
send();
}
catch( ost::SockException& e )
{
cerr << e.getString() << ": " << e.getSystemErrorString() << endl;
}
catch( UniSetTypes::Exception& ex)
{
cerr << myname << "(step): " << ex << std::endl;
}
catch(...)
{
cerr << myname << "(step): catch ..." << std::endl;
}
msleep(polltime);
}
cerr << "************* execute FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
void UNetExchange::send()
{
cout << myname << ": send..." << endl;
/*
UniSetUNet::UNetHeader h;
h.nodeID = conf->getLocalNode();
h.procID = getId();
h.dcount = mypack.size();
if( UNet->isPending(ost::Socket::pendingOutput) )
{
ssize_t ret = UNet->send((char*)(&h),sizeof(h));
if( ret<(ssize_t)sizeof(h) )
{
cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sizeof(h) << endl;
return;
}
*/
#warning use mutex for list!!!
UniSetUNet::UNetMessage::UNetDataList::iterator it = mypack.dlist.begin();
for( ; it!=mypack.dlist.end(); ++it )
{
// while( !UNet->isPending(ost::Socket::pendingOutput) )
// msleep(30);
cout << myname << "(send): " << (*it) << endl;
ssize_t ret = UNet->send((char*)(&(*it)),sizeof(UniSetUNet::UNetData));
if( ret<(ssize_t)sizeof(UniSetUNet::UNetData) )
{
cerr << myname << "(send data): ret=" << ret << " sizeof=" << sizeof(UniSetUNet::UNetData) << endl;
break;
}
}
// }
}
// -----------------------------------------------------------------------------
void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg) void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg)
{ {
try try
...@@ -277,6 +239,13 @@ void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg) ...@@ -277,6 +239,13 @@ void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg)
} }
break; break;
case Message::Timer:
{
TimerMessage tm(msg);
timerInfo(&tm);
}
break;
default: default:
break; break;
} }
...@@ -297,7 +266,7 @@ void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg) ...@@ -297,7 +266,7 @@ void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg)
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::sysCommand(UniSetTypes::SystemMessage *sm) void UNetExchange::sysCommand( UniSetTypes::SystemMessage *sm )
{ {
switch( sm->command ) switch( sm->command )
{ {
...@@ -324,7 +293,10 @@ void UNetExchange::sysCommand(UniSetTypes::SystemMessage *sm) ...@@ -324,7 +293,10 @@ void UNetExchange::sysCommand(UniSetTypes::SystemMessage *sm)
UniSetTypes::uniset_mutex_lock l(mutex_start, 10000); UniSetTypes::uniset_mutex_lock l(mutex_start, 10000);
askSensors(UniversalIO::UIONotify); askSensors(UniversalIO::UIONotify);
} }
thr->start(); askTimer(tmStep,steptime);
startReceivers();
if( sender )
sender->start();
} }
case SystemMessage::FoldUp: case SystemMessage::FoldUp:
...@@ -385,36 +357,12 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd ) ...@@ -385,36 +357,12 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс... kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str()); throw SystemError(err.str());
} }
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
{
try
{
shm->askSensor(it->si.id,cmd);
}
catch( UniSetTypes::Exception& ex )
{
dlog[Debug::WARN] << myname << "(askSensors): " << ex << std::endl;
}
catch(...){}
}
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm ) void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm )
{ {
DMap::iterator it=dlist.begin(); if( sender )
for( ; it!=dlist.end(); ++it ) sender->update(sm->id,sm->value);
{
if( it->si.id == sm->id )
{
uniset_spin_lock lock(it->val_lock);
it->val = sm->value;
if( it->pack_it != mypack.dlist.end() )
it->pack_it->val = sm->value;
}
break;
}
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject() bool UNetExchange::activateObject()
...@@ -437,201 +385,46 @@ void UNetExchange::sigterm( int signo ) ...@@ -437,201 +385,46 @@ void UNetExchange::sigterm( int signo )
{ {
cerr << myname << ": ********* SIGTERM(" << signo <<") ********" << endl; cerr << myname << ": ********* SIGTERM(" << signo <<") ********" << endl;
activated = false; activated = false;
UNet->disconnect();
for( ReceiverList::iterator it=rlist.begin(); it!=rlist.end(); it++ )
(*it)->stop();
UniSetObject_LT::sigterm(signo); UniSetObject_LT::sigterm(signo);
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UNetExchange::readConfiguration()
{
#warning Сделать сортировку по диапазонам адресов!!!
// чтобы запрашивать одним запросом, сразу несколько входов...
// readconf_ok = false;
xmlNode* root = conf->getXMLSensorsSection();
if(!root)
{
ostringstream err;
err << myname << "(readConfiguration): не нашли корневого раздела <sensors>";
throw SystemError(err.str());
}
UniXML_iterator it(root);
if( !it.goChildren() )
{
std::cerr << myname << "(readConfiguration): раздел <sensors> не содержит секций ?!!\n";
return;
}
for( ;it.getCurrent(); it.goNext() )
{
if( check_item(it) )
initItem(it);
}
// readconf_ok = true;
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::check_item( UniXML_iterator& it )
{
if( s_field.empty() )
return true;
// просто проверка на не пустой field
if( s_fvalue.empty() && it.getProp(s_field).empty() )
return false;
// просто проверка что field = value
if( !s_fvalue.empty() && it.getProp(s_field)!=s_fvalue )
return false;
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec )
{
if( check_item(it) )
initItem(it);
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::initItem( UniXML_iterator& it )
{
string sname( it.getProp("name") );
string tid = it.getProp("id");
ObjectId sid;
if( !tid.empty() )
{
sid = UniSetTypes::uni_atoi(tid);
if( sid <= 0 )
sid = DefaultObjectId;
}
else
sid = conf->getSensorID(sname);
if( sid == DefaultObjectId )
{
if( dlog )
dlog[Debug::CRIT] << myname << "(readItem): ID not found for "
<< sname << endl;
return false;
}
UItem p;
p.si.id = sid;
p.si.node = conf->getLocalNode();
mypack.addData(sid,0);
p.pack_it = (mypack.dlist.end()--);
if( maxItem >= dlist.size() )
dlist.resize(maxItem+10);
dlist[maxItem] = p;
maxItem++;
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(initItem): add " << p << endl;
return true;
}
// ------------------------------------------------------------------------------------------
void UNetExchange::initIterators() void UNetExchange::initIterators()
{ {
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); it++ )
{
shm->initDIterator(it->dit);
shm->initAIterator(it->ait);
}
shm->initAIterator(aitHeartBeat); shm->initAIterator(aitHeartBeat);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, char* argv[] ) void UNetExchange::help_print( int argc, char* argv[] )
{ {
cout << "--UNet-polltime msec - Пауза между опросаом карт. По умолчанию 200 мсек." << endl; cout << "--unet-recvpause msec - Пауза между получением пакетов. По умолчанию 10 мсек." << endl;
cout << "--UNet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком." << endl; cout << "--unet-updatepause msec - Пауза между обновлением данных в SM. По умолчанию 100 мсек." << endl;
cout << "--UNet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10." << endl; cout << "--unet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком." << endl;
cout << "--UNet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl; cout << "--unet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10." << endl;
cout << "--UNet-initPause - Задержка перед инициализацией (время на активизация процесса)" << endl; cout << "--unet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl;
cout << "--UNet-notRespondSensor - датчик связи для данного процесса " << endl; cout << "--unet-initPause - Задержка перед инициализацией (время на активизация процесса)" << endl;
cout << "--UNet-sm-ready-timeout - время на ожидание старта SM" << endl; cout << "--unet-notRespondSensor - датчик связи для данного процесса " << endl;
cout << " Настройки протокола UNet: " << endl; cout << "--unet-sm-ready-timeout - время на ожидание старта SM" << endl;
cout << "--UNet-host [ip|hostname] - Адрес сервера" << endl; cout << " Настройки протокола RS: " << endl;
cout << "--UNet-send-timeout - Таймаут на посылку ответа." << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetExchange* UNetExchange::init_UNetexchange( int argc, char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic ) UNetExchange* UNetExchange::init_unetexchange( int argc, char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic )
{ {
string name = conf->getArgParam("--UNet-name","UNetExchange1"); string name = conf->getArgParam("--unet-name","UNetExchange1");
if( name.empty() ) if( name.empty() )
{ {
cerr << "(UNetexchange): Не задан name'" << endl; cerr << "(unetexchange): Не задан name'" << endl;
return 0; return 0;
} }
ObjectId ID = conf->getObjectID(name); ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId ) if( ID == UniSetTypes::DefaultObjectId )
{ {
cerr << "(UNetexchange): идентификатор '" << name cerr << "(unetexchange): идентификатор '" << name
<< "' не найден в конф. файле!" << "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl; << " в секции " << conf->getObjectsSection() << endl;
return 0; return 0;
} }
dlog[Debug::INFO] << "(rsexchange): name = " << name << "(" << ID << ")" << endl; dlog[Debug::INFO] << "(unetexchange): name = " << name << "(" << ID << ")" << endl;
return new UNetExchange(ID,icID,ic); return new UNetExchange(ID,icID,ic);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
std::ostream& operator<<( std::ostream& os, UNetExchange::UItem& p )
{
return os << " sid=" << p.si.id;
}
// -----------------------------------------------------------------------------
void UNetExchange::buildReceiverList()
{
xmlNode* n = conf->getXMLNodesSection();
if( !n )
{
dlog[Debug::WARN] << myname << "(buildReceiverList): <nodes> not found! ignore..." << endl;
return;
}
UniXML_iterator it(n);
if( !it.goChildren() )
{
dlog[Debug::WARN] << myname << "(buildReceiverList): <nodes> is empty?! ignore..." << endl;
return;
}
for( ; it.getCurrent(); it.goNext() )
{
ObjectId n_id = conf->getNodeID( it.getProp("name") );
if( n_id == conf->getLocalNode() )
{
port = it.getIntProp("UNet_port");
if( port<=0 )
port = n_id;
dlog[Debug::INFO] << myname << "(buildReceiverList): init myport port=" << port << endl;
continue;
}
int p = it.getIntProp("UNet_port");
if( p <=0 )
p = n_id;
if( p == DefaultObjectId )
{
dlog[Debug::WARN] << myname << "(buildReceiverList): node=" << it.getProp("name") << " unknown port. ignore..." << endl;
continue;
}
UNetNReceiver* r = new UNetNReceiver(p,host,shm->getSMID(),shm->SM());
rlist.push_back(r);
}
}
// ------------------------------------------------------------------------------------------
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#include <ostream> #include <ostream>
#include <string> #include <string>
#include <vector> #include <queue>
#include <cc++/socket.h> #include <cc++/socket.h>
#include "UniSetObject_LT.h" #include "UniSetObject_LT.h"
#include "Trigger.h" #include "Trigger.h"
...@@ -11,8 +11,8 @@ ...@@ -11,8 +11,8 @@
#include "SMInterface.h" #include "SMInterface.h"
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UNetPacket.h" #include "UNetReceiver.h"
#include "UNetNReceiver.h" #include "UNetSender.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetExchange: class UNetExchange:
public UniSetObject_LT public UniSetObject_LT
...@@ -22,27 +22,13 @@ class UNetExchange: ...@@ -22,27 +22,13 @@ class UNetExchange:
virtual ~UNetExchange(); virtual ~UNetExchange();
/*! глобальная функция для инициализации объекта */ /*! глобальная функция для инициализации объекта */
static UNetExchange* init_UNetexchange( int argc, char* argv[], static UNetExchange* init_unetexchange( int argc, char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 ); UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */ /*! глобальная функция для вывода help-а */
static void help_print( int argc, char* argv[] ); static void help_print( int argc, char* argv[] );
struct UItem bool checkExistUNetHost( const std::string host, ost::tpport_t port );
{
UItem():
val(0)
{}
IOController_i::SensorInfo si;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniSetTypes::uniset_spin_mutex val_lock;
UniSetUNet::UNetMessage::UNetDataList::iterator pack_it;
long val;
friend std::ostream& operator<<( std::ostream& os, UItem& p );
};
protected: protected:
...@@ -51,16 +37,12 @@ class UNetExchange: ...@@ -51,16 +37,12 @@ class UNetExchange:
std::string s_fvalue; std::string s_fvalue;
SMInterface* shm; SMInterface* shm;
void poll();
void recv();
void send();
void step(); void step();
virtual void processingMessage( UniSetTypes::VoidMessage *msg ); virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg ); void sysCommand( UniSetTypes::SystemMessage *msg );
void sensorInfo( UniSetTypes::SensorMessage*sm ); void sensorInfo( UniSetTypes::SensorMessage*sm );
void timerInfo( UniSetTypes::TimerMessage *tm );
void askSensors( UniversalIO::UIOCommand cmd ); void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady(); void waitSMReady();
...@@ -70,12 +52,13 @@ class UNetExchange: ...@@ -70,12 +52,13 @@ class UNetExchange:
virtual void sigterm( int signo ); virtual void sigterm( int signo );
void initIterators(); void initIterators();
bool initItem( UniXML_iterator& it ); void startReceivers();
bool readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec ); void initSender( const std::string host, const ost::tpport_t port, UniXML_iterator& it );
void readConfiguration(); enum Timer
bool check_item( UniXML_iterator& it ); {
void buildReceiverList(); tmStep
};
private: private:
UNetExchange(); UNetExchange();
...@@ -88,31 +71,16 @@ class UNetExchange: ...@@ -88,31 +71,16 @@ class UNetExchange:
IOController::AIOStateList::iterator aitHeartBeat; IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id; UniSetTypes::ObjectId test_id;
int polltime; /*!< переодичность обновления данных, [мсек] */ int steptime; /*!< периодичность вызова step, [мсек] */
ost::UNetBroadcast* UNet;
ost::IPV4Host host;
ost::tpport_t port;
std::string s_host;
UniSetTypes::uniset_mutex pollMutex;
Trigger trTimeout;
int recvTimeout;
int sendTimeout;
bool activated; bool activated;
int activateTimeout; int activateTimeout;
UniSetUNet::UNetMessage mypack;
typedef std::vector<UItem> DMap;
DMap dlist;
int maxItem;
typedef std::list<UNetNReceiver*> ReceiverList; typedef std::list<UNetReceiver*> ReceiverList;
ReceiverList rlist; ReceiverList recvlist;
ThreadCreator<UNetExchange>* thr; bool no_sender; /*!< флаг отключения посылки сообщений */
UNetSender* sender;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UNetExchange_H_ #endif // UNetExchange_H_
......
#include "UNetPacket.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetUNet;
// -----------------------------------------------------------------------------
std::ostream& UniSetUNet::operator<<( std::ostream& os, UniSetUNet::UNetHeader& p )
{
return os << "nodeID=" << p.nodeID
<< " procID=" << p.procID
<< " dcount=" << p.dcount;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUNet::operator<<( std::ostream& os, UniSetUNet::UNetData& p )
{
return os << "id=" << p.id << " val=" << p.val;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUNet::operator<<( std::ostream& os, UniSetUNet::UNetMessage& p )
{
return os;
}
// -----------------------------------------------------------------------------
UNetMessage::UNetMessage()
{
}
// -----------------------------------------------------------------------------
void UNetMessage::addData( const UniSetUNet::UNetData& dat )
{
dlist.push_back(dat);
}
// -----------------------------------------------------------------------------
void UNetMessage::addData( long id, long val)
{
UNetData d(id,val);
addData(d);
}
// -----------------------------------------------------------------------------
// $Id: UNetPacket.h,v 1.1 2009/02/10 20:38:27 vpashka Exp $
// -----------------------------------------------------------------------------
#ifndef UNetPacket_H_
#define UNetPacket_H_
// -----------------------------------------------------------------------------
#include <list>
#include <ostream>
#include "UniSetTypes.h"
// -----------------------------------------------------------------------------
namespace UniSetUNet
{
struct UNetHeader
{
long nodeID;
long procID;
long dcount;
friend std::ostream& operator<<( std::ostream& os, UNetHeader& p );
}__attribute__((packed));
struct UNetData
{
UNetData():id(UniSetTypes::DefaultObjectId),val(0){}
UNetData(long id, long val):id(id),val(val){}
long id;
long val;
friend std::ostream& operator<<( std::ostream& os, UNetData& p );
}__attribute__((packed));
struct UNetMessage:
public UNetHeader
{
UNetMessage();
void addData( const UNetData& dat );
void addData( long id, long val );
inline int size(){ return dlist.size(); }
typedef std::list<UNetData> UNetDataList;
UNetDataList dlist;
friend std::ostream& operator<<( std::ostream& os, UNetMessage& p );
};
}
// -----------------------------------------------------------------------------
#endif // UNetPacket_H_
// -----------------------------------------------------------------------------
...@@ -7,184 +7,242 @@ using namespace std; ...@@ -7,184 +7,242 @@ using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace UniSetExtensions; using namespace UniSetExtensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::UNetReceiver( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ): bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
UniSetObject_LT(objId), const UniSetUDP::UDPMessage& rhs) const
shm(0),
initPause(0),
udp(0),
activated(false)
{ {
if( objId == DefaultObjectId ) // if( lhs.msg.header.num == rhs.msg.header.num )
throw UniSetTypes::SystemError("(UNetReceiver): objId=-1?!! Use --udp-name" ); // return (lhs.msg < rhs.msg);
// xmlNode* cnode = conf->getNode(myname);
cnode = conf->getNode(myname);
if( cnode == NULL )
throw UniSetTypes::SystemError("(UNetReceiver): Not find conf-node for " + myname );
shm = new SMInterface(shmId,&ui,objId,ic);
UniXML_iterator it(cnode);
// определяем фильтр
s_field = conf->getArgParam("--udp-filter-field");
s_fvalue = conf->getArgParam("--udp-filter-value");
dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
string s_host = conf->getArgParam("--udp-host",it.getProp("host"));
if( s_host.empty() )
throw UniSetTypes::SystemError(myname+"(UNetReceiver): Unknown host. Use --udp-host" );
port = conf->getArgInt("--udp-port",it.getProp("port")); return lhs.msg.header.num > rhs.msg.header.num;
if( port <= 0 ) }
throw UniSetTypes::SystemError(myname+"(UNetReceiver): Unknown port address. Use --udp-port" ); // ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port, SMInterface* smi ):
shm(smi),
recvpause(10),
updatepause(100),
udp(0),
recvTimeout(5000),
lostTimeout(5000),
lostPackets(0),
activated(false),
r_thr(0),
u_thr(0),
pnum(0),
maxDifferens(1000),
waitClean(false),
rnum(0),
maxProcessingCount(100),
icache(200),
cache_init_ok(false)
{
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
myname = s.str();
}
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(UNetReceiver): UNet set to " << s_host << ":" << port << endl;
host = s_host.c_str();
try try
{ {
udp = new ost::UNetDuplex(host,port); // ost::IPV4Cidr ci(s_host.c_str());
// addr = ci.getBroadcast();
// cerr << "****************** addr: " << addr << endl;
addr = s_host.c_str();
udp = new ost::UDPDuplex(addr,port);
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
ostringstream s; ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString() << endl; s << e.getString() << ": " << e.getSystemErrorString();
throw SystemError(s.str()); dlog[Debug::CRIT] << myname << "(init): " << s.str() << std::endl;
}
thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::poll);
recvTimeout = conf->getArgPInt("--udp-recv-timeout",it.getProp("recvTimeout"), 5000);
polltime = conf->getArgPInt("--udp-polltime",it.getProp("polltime"), 100);
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--udp-heartbeat-id",it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
if( sidHeartBeat == DefaultObjectId )
{
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
int heartbeatTime = getHeartBeatTime(); throw SystemError(s.str());
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--udp-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
{
test_id = conf->getSensorID("TestMode_S");
if( test_id == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
} }
dlog[Debug::INFO] << myname << "(init): test_id=" << test_id << endl; r_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::receive);
u_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::update);
activateTimeout = conf->getArgPInt("--activate-timeout", 20000);
timeout_t msec = conf->getArgPInt("--udp-timeout",it.getProp("timeout"), 3000);
dlog[Debug::INFO] << myname << "(init): udp-timeout=" << msec << " msec" << endl; ptRecvTimeout.setTiming(recvTimeout);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver() UNetReceiver::~UNetReceiver()
{ {
delete r_thr;
delete u_thr;
delete udp; delete udp;
delete shm;
delete thr;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::waitSMReady() void UNetReceiver::setReceiveTimeout( timeout_t msec )
{ {
// waiting for SM is ready... recvTimeout = msec;
int ready_timeout = conf->getArgInt("--udp-sm-ready-timeout","15000"); ptRecvTimeout.setTiming(msec);
if( ready_timeout == 0 )
ready_timeout = 15000;
else if( ready_timeout < 0 )
ready_timeout = UniSetTimer::WaitUpTime;
if( !shm->waitSMready(ready_timeout,50) )
{
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
throw SystemError(err.str());
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/* void UNetReceiver::setLostTimeout( timeout_t msec )
void UNetReceiver::timerInfo( TimerMessage *tm )
{ {
if( tm->id == tmExchange ) lostTimeout = msec;
step(); ptLostTimeout.setTiming(msec);
} }
*/
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::step() void UNetReceiver::setReceivePause( timeout_t msec )
{
recvpause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdatePause( timeout_t msec )
{
updatepause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
{
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxDifferens( unsigned long set )
{
maxDifferens = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
{ {
// {
// uniset_mutex_lock l(pollMutex,2000);
// poll();
// }
if( !activated ) if( !activated )
return; {
activated = true;
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() ) u_thr->start();
r_thr->start();
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::update()
{
cerr << "******************* udpate start" << endl;
while(activated)
{ {
try try
{ {
shm->localSaveValue(aitHeartBeat,sidHeartBeat,maxHeartBeat,getId()); real_update();
ptHeartBeat.reset();
} }
catch(Exception& ex) catch( UniSetTypes::Exception& ex)
{ {
dlog[Debug::CRIT] << myname dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
<< "(step): (hb) " << ex << std::endl;
} }
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
msleep(updatepause);
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::poll() void UNetReceiver::real_update()
{ {
try UniSetUDP::UDPMessage p;
// обрабатываем пока, очередь либо не опустеет
// либо обнаружится "дырка" в последовательности
// но при этом обрабатываем не больше maxProcessingCount
// за один раз..
int k = maxProcessingCount;
while( k>0 )
{ {
// udp->connect(host,port); { // lock qpack
// udp->UNetSocket::setPeer(host,port); uniset_mutex_lock l(packMutex);
} if( qpack.empty() )
catch( UniSetTypes::Exception& ex) return;
{
cerr << myname << "(step): " << ex << std::endl; p = qpack.top();
// reise(SIGTERM); unsigned long sub = labs(p.msg.header.num - pnum);
return; if( pnum > 0 )
{
// если sub > maxDifferens
// значит это просто "разрыв"
// и нам ждать lostTimeout не надо
// сразу начинаем обрабатывать новые пакеты
// а если > 1 && < maxDifferens
// значит это временная "дырка"
// и надо подождать lostTimeout
// чтобы констатировать потерю пакета..
if( sub > 1 && sub < maxDifferens )
{
if( !ptLostTimeout.checkTime() )
return;
lostPackets++;
}
else if( p.msg.header.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
* для "оптимизации".. лучше игнорировать
*/
qpack.pop(); // пока выбрали вариант "оптимизации"
continue;
}
}
ptLostTimeout.reset();
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack.pop();
pnum = p.msg.header.num;
} // unlock qpack
k--;
// cerr << myname << "(update): " << p.msg.header << endl;
initCache(p, !cache_init_ok);
for( size_t i=0; i<p.msg.header.dcount; i++ )
{
try
{
UniSetUDP::UDPData& d = p.msg.dat[i];
ItemInfo& ii(icache[i]);
if( ii.id != d.id )
{
dlog[Debug::WARN] << myname << "(update): reinit cache for sid=" << d.id << endl;
ii.id = d.id;
shm->initAIterator(ii.ait);
shm->initDIterator(ii.dit);
}
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(update): Unknown iotype for sid=" << d.id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(update): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(update): catch ..." << std::endl;
}
}
} }
}
// -----------------------------------------------------------------------------
void UNetReceiver::receive()
{
cerr << "******************* receive start" << endl;
ptRecvTimeout.setTiming(recvTimeout);
while( activated ) while( activated )
{ {
try try
{ {
recv(); if( recv() )
// send(); ptRecvTimeout.reset();
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
...@@ -192,270 +250,129 @@ void UNetReceiver::poll() ...@@ -192,270 +250,129 @@ void UNetReceiver::poll()
} }
catch( UniSetTypes::Exception& ex) catch( UniSetTypes::Exception& ex)
{ {
cerr << myname << "(step): " << ex << std::endl; cerr << myname << "(poll): " << ex << std::endl;
} }
catch(...) catch(...)
{ {
cerr << myname << "(step): catch ..." << std::endl; cerr << myname << "(poll): catch ..." << std::endl;
} }
msleep(polltime); msleep(recvpause);
} }
cerr << "************* execute FINISH **********" << endl; cerr << "************* execute FINISH **********" << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::recv() bool UNetReceiver::recv()
{
cout << myname << ": recv....(timeout=" << recvTimeout << ")" << endl;
UniSetUNet::UNetHeader h;
// receive
if( udp->isInputReady(recvTimeout) )
{
ssize_t ret = udp->UNetReceive::receive(&h,sizeof(h));
if( ret<(ssize_t)sizeof(h) )
{
cerr << myname << "(receive): ret=" << ret << " sizeof=" << sizeof(h) << endl;
return;
}
cout << myname << "(receive): header: " << h << endl;
if( h.dcount <=0 )
{
cout << " data=0" << endl;
return;
}
UniSetUNet::UNetData d;
// ignore echo...
#if 0
if( h.nodeID == conf->getLocalNode() && h.procID == getId() )
{
for( int i=0; i<h.dcount;i++ )
{
ssize_t ret = udp->UNetReceive::receive(&d,sizeof(d));
if( ret < (ssize_t)sizeof(d) )
return;
}
return;
}
#endif
for( int i=0; i<h.dcount;i++ )
{
ssize_t ret = udp->UNetReceive::receive(&d,sizeof(d));
if( ret<(ssize_t)sizeof(d) )
{
cerr << myname << "(receive data " << i << "): ret=" << ret << " sizeof=" << sizeof(d) << endl;
break;
}
cout << myname << "(receive data " << i << "): " << d << endl;
}
}
// else
// {
// cout << "no InputReady.." << endl;
// }
}
// -----------------------------------------------------------------------------
void UNetReceiver::processingMessage(UniSetTypes::VoidMessage *msg)
{ {
try if( !udp->isInputReady(recvTimeout) )
{ return false;
switch(msg->type)
{
case UniSetTypes::Message::SysCommand:
{
UniSetTypes::SystemMessage sm( msg );
sysCommand( &sm );
}
break;
case Message::SensorInfo: ssize_t ret = udp->UDPReceive::receive(&(pack.msg),sizeof(pack.msg));
{ if( ret < sizeof(UniSetUDP::UDPHeader) )
SensorMessage sm( msg );
sensorInfo(&sm);
}
break;
default:
break;
}
}
catch( SystemError& ex )
{ {
dlog[Debug::CRIT] << myname << "(SystemError): " << ex << std::endl; dlog[Debug::CRIT] << myname << "(receive): FAILED header ret=" << ret << " sizeof=" << sizeof(UniSetUDP::UDPHeader) << endl;
// throw SystemError(ex); return false;
raise(SIGTERM);
} }
catch( Exception& ex )
{ ssize_t sz = pack.msg.header.dcount * sizeof(UniSetUDP::UDPData) + sizeof(UniSetUDP::UDPHeader);
dlog[Debug::CRIT] << myname << "(processingMessage): " << ex << std::endl; if( ret < sz )
}
catch(...)
{ {
dlog[Debug::CRIT] << myname << "(processingMessage): catch ...\n"; dlog[Debug::CRIT] << myname << "(receive): FAILED data ret=" << ret << " sizeof=" << sz
<< " packnum=" << pack.msg.header.num << endl;
return false;
} }
}
// -----------------------------------------------------------------------------
void UNetReceiver::sysCommand(UniSetTypes::SystemMessage *sm) if( rnum>0 && labs(pack.msg.header.num - rnum) > maxDifferens )
{
switch( sm->command )
{ {
case SystemMessage::StartUp: /* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
* А можно породолжать складывать во временную, но тогда есть риск "никогда" не разгрести временную
* очередь, при "частых обрывах". Потому-что update будет на каждом разрыве ждать ещё lostTimeout..
*/
// Пока выбираю.. чистить qtmp. Это будет соотвествовать логике работы с картами у которых ограничен буфер приёма.
// Обычно "кольцевой". Т.е. если не успели обработать и "вынуть" из буфера информацию.. он будет переписан новыми данными
if( waitClean )
{ {
waitSMReady(); dlog[Debug::CRIT] << myname << "(receive): reset qtmp.." << endl;
while( !qtmp.empty() )
// подождать пока пройдёт инициализация датчиков qtmp.pop();
// см. activateObject()
msleep(initPause);
PassiveTimer ptAct(activateTimeout);
while( !activated && !ptAct.checkTime() )
{
cout << myname << "(sysCommand): wait activate..." << endl;
msleep(300);
if( activated )
break;
}
if( !activated )
dlog[Debug::CRIT] << myname << "(sysCommand): ************* don`t activate?! ************" << endl;
{
UniSetTypes::uniset_mutex_lock l(mutex_start, 10000);
askSensors(UniversalIO::UIONotify);
}
thr->start();
} }
case SystemMessage::FoldUp:
case SystemMessage::Finish:
askSensors(UniversalIO::UIODontNotify);
break;
case SystemMessage::WatchDog: waitClean = true;
{ }
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetReceiver запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetReceiver)
if( shm->isLocalwork() )
break;
askSensors(UniversalIO::UIONotify); rnum = pack.msg.header.num;
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header
// << " waitClean=" << waitClean
// << endl;
{ // lock qpack
uniset_mutex_lock l(packMutex,500);
if( !waitClean )
{
qpack.push(pack);
return true;
} }
break;
case SystemMessage::LogRotate: if( !qpack.empty() )
{
// cerr << myname << "(receive): copy to qtmp..."
// << " header: " << pack.msg.header
// << endl;
qtmp.push(pack);
}
else
{ {
// переоткрываем логи // cerr << myname << "(receive): copy from qtmp..." << endl;
unideb << myname << "(sysCommand): logRotate" << std::endl; // очередь освободилась..
string fname = unideb.getLogFile(); // то копируем в неё всё что набралось...
if( !fname.empty() ) while( !qtmp.empty() )
{ {
unideb.logFile(fname); qpack.push(qtmp.top());
unideb << myname << "(sysCommand): ***************** UNIDEB LOG ROTATE *****************" << std::endl; qtmp.pop();
} }
dlog << myname << "(sysCommand): logRotate" << std::endl; // не забываем и текущий поместить в очередь..
fname = dlog.getLogFile(); qpack.push(pack);
if( !fname.empty() ) waitClean = false;
{
dlog.logFile(fname);
dlog << myname << "(sysCommand): ***************** dlog LOG ROTATE *****************" << std::endl;
}
} }
break; } // unlock qpack
default:
break;
}
}
// ------------------------------------------------------------------------------------------
void UNetReceiver::askSensors( UniversalIO::UIOCommand cmd )
{
if( !shm->waitSMworking(test_id,activateTimeout,50) )
{
ostringstream err;
err << myname
<< "(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<< activateTimeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
}
}
// ------------------------------------------------------------------------------------------
void UNetReceiver::sensorInfo( UniSetTypes::SensorMessage* sm )
{
}
// ------------------------------------------------------------------------------------------
bool UNetReceiver::activateObject()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated = false;
UniSetTypes::uniset_mutex_lock l(mutex_start, 5000);
UniSetObject_LT::activateObject();
initIterators();
activated = true;
}
return true; return true;
} }
// ------------------------------------------------------------------------------------------
void UNetReceiver::sigterm( int signo )
{
cerr << myname << ": ********* SIGTERM(" << signo <<") ********" << endl;
activated = false;
udp->disconnect();
UniSetObject_LT::sigterm(signo);
}
// ------------------------------------------------------------------------------------------
void UNetReceiver::initIterators()
{
shm->initAIterator(aitHeartBeat);
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::help_print( int argc, char* argv[] ) void UNetReceiver::initIterators()
{ {
cout << "--udp-polltime msec - Пауза между опросаом карт. По умолчанию 200 мсек." << endl; for( ItemVec::iterator it=icache.begin(); it!=icache.end(); ++it )
cout << "--udp-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком." << endl; {
cout << "--udp-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10." << endl; shm->initAIterator(it->ait);
cout << "--udp-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl; shm->initDIterator(it->dit);
cout << "--udp-initPause - Задержка перед инициализацией (время на активизация процесса)" << endl; }
cout << "--udp-notRespondSensor - датчик связи для данного процесса " << endl;
cout << "--udp-sm-ready-timeout - время на ожидание старта SM" << endl;
cout << " Настройки протокола RS: " << endl;
cout << "--udp-dev devname - файл устройства" << endl;
cout << "--udp-speed - Скорость обмена (9600,19920,38400,57600,115200)." << endl;
cout << "--udp-my-addr - адрес текущего узла" << endl;
cout << "--udp-recv-timeout - Таймаут на ожидание ответа." << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver* UNetReceiver::init_udpreceiver( int argc, char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic ) void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
{ {
string name = conf->getArgParam("--udp-name","UNetReceiver1"); if( !force && pack.msg.header.dcount == icache.size() )
if( name.empty() ) return;
{
cerr << "(udpexchange): Не задан name'" << endl; dlog[Debug::INFO] << myname << ": init icache.." << endl;
return 0; cache_init_ok = true;
}
icache.resize(pack.msg.header.dcount);
ObjectId ID = conf->getObjectID(name); for( size_t i=0; i<icache.size(); i++ )
if( ID == UniSetTypes::DefaultObjectId ) {
{ ItemInfo& d(icache[i]);
cerr << "(udpexchange): идентификатор '" << name if( d.id != pack.msg.dat[i].id )
<< "' не найден в конф. файле!" {
<< " в секции " << conf->getObjectsSection() << endl; d.id = pack.msg.dat[i].id;
return 0; d.iotype = conf->getIOType(d.id);
} shm->initAIterator(d.ait);
shm->initDIterator(d.dit);
dlog[Debug::INFO] << "(rsexchange): name = " << name << "(" << ID << ")" << endl; }
return new UNetReceiver(ID,icID,ic); }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#include <ostream> #include <ostream>
#include <string> #include <string>
#include <vector> #include <queue>
#include <cc++/socket.h> #include <cc++/socket.h>
#include "UniSetObject_LT.h" #include "UniSetObject_LT.h"
#include "Trigger.h" #include "Trigger.h"
...@@ -11,72 +11,136 @@ ...@@ -11,72 +11,136 @@
#include "SMInterface.h" #include "SMInterface.h"
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UNetPacket.h" #include "UDPPacket.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetReceiver: /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
public UniSetObject_LT * ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
* Всё это реализовано в функции UNetReceiver::real_update()
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядокый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
* =========================================================================
* Для защиты от сбоя счётика сделана следующая логика:
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
* Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
* и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
* затирают старые, если их не успели вынуть и обработать.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
*/
// -----------------------------------------------------------------------------
class UNetReceiver
{ {
public: public:
UNetReceiver( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0 ); UNetReceiver( const std::string host, const ost::tpport_t port, SMInterface* smi );
virtual ~UNetReceiver(); ~UNetReceiver();
/*! глобальная функция для инициализации объекта */
static UNetReceiver* init_udpreceiver( int argc, char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */ void start();
static void help_print( int argc, char* argv[] );
protected: void receive();
void update();
xmlNode* cnode; inline bool isRecvOK(){ return ptRecvTimeout.checkTime(); }
std::string s_field; inline unsigned long getLostPacketsNum(){ return lostPackets; }
std::string s_fvalue;
SMInterface* shm; void setReceiveTimeout( timeout_t msec );
void setReceivePause( timeout_t msec );
void setUpdatePause( timeout_t msec );
void setLostTimeout( timeout_t msec );
void setMaxDifferens( unsigned long set );
void poll(); void setMaxProcessingCount( int set );
void recv();
void step();
virtual void processingMessage( UniSetTypes::VoidMessage *msg ); inline ost::IPV4Address getAddress(){ return addr; }
void sysCommand( UniSetTypes::SystemMessage *msg ); inline ost::tpport_t getPort(){ return port; }
void sensorInfo( UniSetTypes::SensorMessage*sm );
void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady();
virtual bool activateObject(); protected:
// действия при завершении работы SMInterface* shm;
virtual void sigterm( int signo );
bool recv();
void step();
void real_update();
void initIterators(); void initIterators();
private: private:
UNetReceiver(); UNetReceiver();
bool initPause;
UniSetTypes::uniset_mutex mutex_start;
PassiveTimer ptHeartBeat; int recvpause; /*!< пауза меджду приёмами пакетов, [мсек] */
UniSetTypes::ObjectId sidHeartBeat; int updatepause; /*!< переодичность обновления данных в SM, [мсек] */
int maxHeartBeat;
IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id;
int polltime; /*!< переодичность обновления данных, [мсек] */ ost::UDPReceive* udp;
ost::IPV4Address addr;
ost::UNetDuplex* udp;
ost::IPV4Host host;
ost::tpport_t port; ost::tpport_t port;
std::string myname;
UniSetTypes::uniset_mutex pollMutex; UniSetTypes::uniset_mutex pollMutex;
Trigger trTimeout; PassiveTimer ptRecvTimeout;
int recvTimeout; timeout_t recvTimeout;
timeout_t lostTimeout;
PassiveTimer ptLostTimeout;
unsigned long lostPackets; /*!< счётчик потерянных пакетов */
bool activated; bool activated;
int activateTimeout;
ThreadCreator<UNetReceiver>* thr; ThreadCreator<UNetReceiver>* r_thr; // receive thread
ThreadCreator<UNetReceiver>* u_thr; // update thread
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const;
};
typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очереlного сообщения */
UniSetTypes::uniset_mutex packMutex; /*!< mutex для работы с очередью */
unsigned long pnum; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! максимальная разница межд номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
*/
unsigned long maxDifferens;
PacketQueue qtmp; /*!< очередь на время обработки(очистки) основной очереди */
bool waitClean; /*!< флаг означающий, что ждём очистики очереди до конца */
unsigned long rnum; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
int maxProcessingCount; /*! максимальное число обрабатываемых за один раз сообщений */
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec icache; /*!< кэш итераторов */
bool cache_init_ok;
void initCache( UniSetUDP::UDPMessage& pack, bool force=false );
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UNetReceiver_H_ #endif // UNetReceiver_H_
......
...@@ -7,71 +7,49 @@ using namespace std; ...@@ -7,71 +7,49 @@ using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace UniSetExtensions; using namespace UniSetExtensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::UNetSender( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic ): UNetSender::UNetSender( const std::string s_host, const ost::tpport_t port, SMInterface* smi,
UniSetObject_LT(objId), const std::string s_f, const std::string s_val, SharedMemory* ic ):
shm(0), s_field(s_f),
initPause(0), s_fvalue(s_val),
tcp(0), shm(smi),
s_host(s_host),
sendpause(150),
activated(false), activated(false),
dlist(100), dlist(100),
maxItem(0) maxItem(0),
packetnum(1),
s_thr(0)
{ {
if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetSender): objId=-1?!! Use --unet-name" );
// xmlNode* cnode = conf->getNode(myname); {
cnode = conf->getNode(myname); ostringstream s;
if( cnode == NULL ) s << "(" << s_host << ":" << port << ")";
throw UniSetTypes::SystemError("(UNetSender): Not find conf-node for " + myname ); myname = s.str();
}
shm = new SMInterface(shmId,&ui,objId,ic);
UniXML_iterator it(cnode);
// определяем фильтр // определяем фильтр
s_field = conf->getArgParam("--unet-filter-field"); // s_field = conf->getArgParam("--udp-filter-field");
s_fvalue = conf->getArgParam("--unet-filter-value"); // s_fvalue = conf->getArgParam("--udp-filter-value");
dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl; << "' filter-value='" << s_fvalue << "'" << endl;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
string s_host = conf->getArgParam("--unet-host",it.getProp("host"));
if( s_host.empty() )
throw UniSetTypes::SystemError(myname+"(UNetSender): Unknown host. Use --unet-host" );
port = conf->getArgInt("--unet-port",it.getProp("port"));
if( port <= 0 )
throw UniSetTypes::SystemError(myname+"(UNetSender): Unknown port address. Use --unet-port" );
bool no_broadcast = conf->getArgInt("--unet-nobroadcast",it.getProp("no_broadcast"));
host = s_host.c_str();
if( dlog.debugging(Debug::INFO) ) if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(UNetSender): UNet set to " << s_host << ":" << port dlog[Debug::INFO] << "(UNetSender): UDP set to " << s_host << ":" << port << endl;
<< " broadcast=" << broadcast
<< endl;
try try
{ {
if( no_broadcast ) addr = s_host.c_str();
tcp = new ost::TCPSocket(); udp = new ost::UDPBroadcast(addr,port);
else
tcp = new ost::TCPBroadcast(host,port);
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
ostringstream s; ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString(); s << e.getString() << ": " << e.getSystemErrorString() << endl;
dlog[Debug::CRIT] << myname << "(init): " << s.str() << endl;
throw SystemError(s.str()); throw SystemError(s.str());
} }
thr = new ThreadCreator<UNetSender>(this, &UNetSender::poll); s_thr = new ThreadCreator<UNetSender>(this, &UNetSender::send);
sendTimeout = conf->getArgPInt("--unet-send-timeout",it.getProp("sendTimeout"), 5000);
sendtime = conf->getArgPInt("--unet-sendtime",it.getProp("sendtime"), 100);
// ------------------------------- // -------------------------------
if( shm->isLocalwork() ) if( shm->isLocalwork() )
...@@ -83,107 +61,42 @@ maxItem(0) ...@@ -83,107 +61,42 @@ maxItem(0)
else else
ic->addReadItem( sigc::mem_fun(this,&UNetSender::readItem) ); ic->addReadItem( sigc::mem_fun(this,&UNetSender::readItem) );
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--unet-heartbeat-id",it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
if( sidHeartBeat == DefaultObjectId )
{
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
int heartbeatTime = getHeartBeatTime();
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--unet-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
{
test_id = conf->getSensorID("TestMode_S");
if( test_id == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
dlog[Debug::CRIT] << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
dlog[Debug::INFO] << myname << "(init): test_id=" << test_id << endl;
activateTimeout = conf->getArgPInt("--activate-timeout", 20000);
timeout_t msec = conf->getArgPInt("--unet-timeout",it.getProp("timeout"), 3000); // выставляем поля, которые не меняются
mypack.msg.header.nodeID = conf->getLocalNode();
dlog[Debug::INFO] << myname << "(init): udp-timeout=" << msec << " msec" << endl; mypack.msg.header.procID = shm->ID();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::~UNetSender() UNetSender::~UNetSender()
{ {
delete tcp; delete s_thr;
delete udp;
delete shm; delete shm;
delete thr;
}
// -----------------------------------------------------------------------------
void UNetSender::waitSMReady()
{
// waiting for SM is ready...
int ready_timeout = conf->getArgInt("--unet-sm-ready-timeout","15000");
if( ready_timeout == 0 )
ready_timeout = 15000;
else if( ready_timeout < 0 )
ready_timeout = UniSetTimer::WaitUpTime;
if( !shm->waitSMready(ready_timeout,50) )
{
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
throw SystemError(err.str());
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::step() void UNetSender::update( UniSetTypes::ObjectId id, long value )
{ {
if( !activated ) DMap::iterator it=dlist.begin();
return; for( ; it!=dlist.end(); ++it )
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() )
{ {
try if( it->si.id == id )
{ {
shm->localSaveValue(aitHeartBeat,sidHeartBeat,maxHeartBeat,getId()); uniset_spin_lock lock(it->val_lock);
ptHeartBeat.reset(); it->val = value;
}
catch(Exception& ex)
{
dlog[Debug::CRIT] << myname
<< "(step): (hb) " << ex << std::endl;
} }
break;
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::poll() void UNetSender::send()
{ {
dlist.resize(maxItem); dlist.resize(maxItem);
dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl; dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl;
/*
ost::Thread::setException(ost::Thread::throwException); ost::IPV4Broadcast h = s_host.c_str();
// cerr << "create new tcp..." << endl;
tcp = new ost::TCPStream(iaddr.c_str());
tcp->setTimeout(500);
try try
{ {
tcp->setPeer(host,port); udp->setPeer(h,port);
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
...@@ -192,13 +105,12 @@ void UNetSender::poll() ...@@ -192,13 +105,12 @@ void UNetSender::poll()
dlog[Debug::CRIT] << myname << "(poll): " << s.str() << endl; dlog[Debug::CRIT] << myname << "(poll): " << s.str() << endl;
throw SystemError(s.str()); throw SystemError(s.str());
} }
*/
while( activated ) while( activated )
{ {
try try
{ {
send(); real_send();
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
...@@ -206,251 +118,59 @@ void UNetSender::poll() ...@@ -206,251 +118,59 @@ void UNetSender::poll()
} }
catch( UniSetTypes::Exception& ex) catch( UniSetTypes::Exception& ex)
{ {
cerr << myname << "(step): " << ex << std::endl; cerr << myname << "(send): " << ex << std::endl;
} }
catch(...) catch(...)
{ {
cerr << myname << "(step): catch ..." << std::endl; cerr << myname << "(send): catch ..." << std::endl;
} }
msleep(sendtime); msleep(sendpause);
} }
cerr << "************* execute FINISH **********" << endl; cerr << "************* execute FINISH **********" << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::send() void UNetSender::real_send()
{ {
cout << myname << ": send..." << endl; mypack.msg.header.num = packetnum++;
UniSetUNet::UNetHeader h; if( packetnum > UniSetUDP::MaxPacketNum )
h.nodeID = conf->getLocalNode(); packetnum = 1;
h.procID = getId();
h.dcount = mypack.size();
// receive
ssize_t ret = tcp->send((char*)(&h),sizeof(h));
if( ret<(ssize_t)sizeof(h) )
{
cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sizeof(h) << endl;
return;
}
#warning use mutex for list!!! // cout << "************* send header: " << mypack.msg.header << endl;
UniSetUNet::UNetMessage::UNetDataList::iterator it = mypack.dlist.begin(); int sz = mypack.byte_size() + sizeof(UniSetUDP::UDPHeader);
for( ; it!=mypack.dlist.end(); ++it ) if( !udp->isPending(ost::Socket::pendingOutput) )
{ return;
cout << myname << "(send): " << (*it) << endl;
ssize_t ret = tcp->send((char*)(&(*it)),sizeof(*it));
if( ret<(ssize_t)sizeof(*it) )
{
cerr << myname << "(send data): ret=" << ret << " sizeof=" << sizeof(*it) << endl;
break;
}
}
}
// -----------------------------------------------------------------------------
void UNetSender::processingMessage(UniSetTypes::VoidMessage *msg)
{
try
{
switch(msg->type)
{
case UniSetTypes::Message::SysCommand:
{
UniSetTypes::SystemMessage sm( msg );
sysCommand( &sm );
}
break;
case Message::SensorInfo:
{
SensorMessage sm( msg );
sensorInfo(&sm);
}
break;
default: ssize_t ret = udp->send( (char*)&(mypack.msg),sz);
break; if( ret < sz )
} dlog[Debug::CRIT] << myname << "(send): FAILED ret=" << ret << " < sizeof=" << sz << endl;
}
catch( SystemError& ex )
{
dlog[Debug::CRIT] << myname << "(SystemError): " << ex << std::endl;
// throw SystemError(ex);
raise(SIGTERM);
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << myname << "(processingMessage): " << ex << std::endl;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(processingMessage): catch ...\n";
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::sysCommand(UniSetTypes::SystemMessage *sm) void UNetSender::start()
{ {
switch( sm->command ) if( !activated )
{
case SystemMessage::StartUp:
{
waitSMReady();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep(initPause);
PassiveTimer ptAct(activateTimeout);
while( !activated && !ptAct.checkTime() )
{
cout << myname << "(sysCommand): wait activate..." << endl;
msleep(300);
if( activated )
break;
}
if( !activated )
dlog[Debug::CRIT] << myname << "(sysCommand): ************* don`t activate?! ************" << endl;
{
UniSetTypes::uniset_mutex_lock l(mutex_start, 10000);
askSensors(UniversalIO::UIONotify);
}
thr->start();
}
case SystemMessage::FoldUp:
case SystemMessage::Finish:
askSensors(UniversalIO::UIODontNotify);
break;
case SystemMessage::WatchDog:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetSender запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetSender)
if( shm->isLocalwork() )
break;
askSensors(UniversalIO::UIONotify);
}
break;
case SystemMessage::LogRotate:
{
// переоткрываем логи
unideb << myname << "(sysCommand): logRotate" << std::endl;
string fname = unideb.getLogFile();
if( !fname.empty() )
{
unideb.logFile(fname);
unideb << myname << "(sysCommand): ***************** UNIDEB LOG ROTATE *****************" << std::endl;
}
dlog << myname << "(sysCommand): logRotate" << std::endl;
fname = dlog.getLogFile();
if( !fname.empty() )
{
dlog.logFile(fname);
dlog << myname << "(sysCommand): ***************** dlog LOG ROTATE *****************" << std::endl;
}
}
break;
default:
break;
}
}
// ------------------------------------------------------------------------------------------
void UNetSender::askSensors( UniversalIO::UIOCommand cmd )
{
if( !shm->waitSMworking(test_id,activateTimeout,50) )
{
ostringstream err;
err << myname
<< "(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<< activateTimeout << " мсек";
dlog[Debug::CRIT] << err.str() << endl;
kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
}
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
{
try
{
shm->askSensor(it->si.id,cmd);
}
catch( UniSetTypes::Exception& ex )
{
dlog[Debug::WARN] << myname << "(askSensors): " << ex << std::endl;
}
catch(...){}
}
}
// ------------------------------------------------------------------------------------------
void UNetSender::sensorInfo( UniSetTypes::SensorMessage* sm )
{
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
{
if( it->si.id == sm->id )
{
uniset_spin_lock lock(it->val_lock);
it->val = sm->value;
if( it->pack_it != mypack.dlist.end() )
it->pack_it->val = sm->value;
}
break;
}
}
// ------------------------------------------------------------------------------------------
bool UNetSender::activateObject()
{
// блокирование обработки StarUp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{ {
activated = false;
UniSetTypes::uniset_mutex_lock l(mutex_start, 5000);
UniSetObject_LT::activateObject();
initIterators();
activated = true; activated = true;
s_thr->start();
} }
return true;
} }
// ------------------------------------------------------------------------------------------ // -----------------------------------------------------------------------------
void UNetSender::sigterm( int signo )
{
cerr << myname << ": ********* SIGTERM(" << signo <<") ********" << endl;
activated = false;
tcp->disconnect();
UniSetObject_LT::sigterm(signo);
}
// ------------------------------------------------------------------------------------------
void UNetSender::readConfiguration() void UNetSender::readConfiguration()
{ {
#warning Сделать сортировку по диапазонам адресов!!!
// чтобы запрашивать одним запросом, сразу несколько входов...
// readconf_ok = false;
xmlNode* root = conf->getXMLSensorsSection(); xmlNode* root = conf->getXMLSensorsSection();
if(!root) if(!root)
{ {
ostringstream err; ostringstream err;
err << myname << "(readConfiguration): не нашли корневого раздела <sensors>"; err << myname << "(readConfiguration): not found <sensors>";
throw SystemError(err.str()); throw SystemError(err.str());
} }
UniXML_iterator it(root); UniXML_iterator it(root);
if( !it.goChildren() ) if( !it.goChildren() )
{ {
std::cerr << myname << "(readConfiguration): раздел <sensors> не содержит секций ?!!\n"; std::cerr << myname << "(readConfiguration): empty <sensors>?!!" << endl;
return; return;
} }
...@@ -459,8 +179,6 @@ void UNetSender::readConfiguration() ...@@ -459,8 +179,6 @@ void UNetSender::readConfiguration()
if( check_item(it) ) if( check_item(it) )
initItem(it); initItem(it);
} }
// readconf_ok = true;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool UNetSender::check_item( UniXML_iterator& it ) bool UNetSender::check_item( UniXML_iterator& it )
...@@ -514,9 +232,9 @@ bool UNetSender::initItem( UniXML_iterator& it ) ...@@ -514,9 +232,9 @@ bool UNetSender::initItem( UniXML_iterator& it )
p.si.id = sid; p.si.id = sid;
p.si.node = conf->getLocalNode(); p.si.node = conf->getLocalNode();
mypack.addData(sid,0); mypack.addData(sid,0);
p.pack_it = (mypack.dlist.end()--); p.pack_ind = mypack.size()-1;
if( maxItem >= dlist.size() ) if( maxItem >= mypack.size() )
dlist.resize(maxItem+10); dlist.resize(maxItem+10);
dlist[maxItem] = p; dlist[maxItem] = p;
...@@ -537,44 +255,6 @@ void UNetSender::initIterators() ...@@ -537,44 +255,6 @@ void UNetSender::initIterators()
shm->initDIterator(it->dit); shm->initDIterator(it->dit);
shm->initAIterator(it->ait); shm->initAIterator(it->ait);
} }
shm->initAIterator(aitHeartBeat);
}
// -----------------------------------------------------------------------------
void UNetSender::help_print( int argc, char* argv[] )
{
cout << "--unet-sendtime msec - Пауза между опросами. По умолчанию 200 мсек." << endl;
cout << "--unet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком." << endl;
cout << "--unet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10." << endl;
cout << "--unet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')" << endl;
cout << "--unet-initPause - Задержка перед инициализацией (время на активизация процесса)" << endl;
cout << "--unet-sm-ready-timeout - время на ожидание старта SM" << endl;
cout << " Настройки протокола UNet: " << endl;
cout << "--unet-host [ip|hostname] - Адрес сервера" << endl;
cout << "--unet-port - Порт." << endl;
cout << "--unet-send-timeout - Таймаут на посылку ответа." << endl;
}
// -----------------------------------------------------------------------------
UNetSender* UNetSender::init_udpsender( int argc, char* argv[], UniSetTypes::ObjectId icID, SharedMemory* ic )
{
string name = conf->getArgParam("--unet-name","UNetSender1");
if( name.empty() )
{
cerr << "(UNetSender): Не задан name'" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
cerr << "(UNetSender): идентификатор '" << name
<< "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl;
return 0;
}
dlog[Debug::INFO] << "(rsexchange): name = " << name << "(" << ID << ")" << endl;
return new UNetSender(ID,icID,ic);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
std::ostream& operator<<( std::ostream& os, UNetSender::UItem& p ) std::ostream& operator<<( std::ostream& os, UNetSender::UItem& p )
......
...@@ -11,22 +11,22 @@ ...@@ -11,22 +11,22 @@
#include "SMInterface.h" #include "SMInterface.h"
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UNetPacket.h" #include "UDPPacket.h"
#include "UDPNReceiver.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetSender: /*
public UniSetObject_LT * Для защиты от потери пакета при переполнении "номера пакета".
* UNetReceiver при обнаружении "разрыва" в последовательнности, просто игнорирует пакет, обновляет счётчик
* и начинает обработку пока буфер опять не заполнится..
*/
class UNetSender
{ {
public: public:
UNetSender( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0 ); UNetSender( const std::string host, const ost::tpport_t port, SMInterface* smi,
virtual ~UNetSender(); const std::string s_field="", const std::string s_fvalue="", SharedMemory* ic=0 );
/*! глобальная функция для инициализации объекта */
static UNetSender* init_udpsender( int argc, char* argv[],
UniSetTypes::ObjectId shmID, SharedMemory* ic=0 );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, char* argv[] );
~UNetSender();
struct UItem struct UItem
{ {
UItem(): UItem():
...@@ -37,37 +37,27 @@ class UNetSender: ...@@ -37,37 +37,27 @@ class UNetSender:
IOController::AIOStateList::iterator ait; IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit; IOController::DIOStateList::iterator dit;
UniSetTypes::uniset_spin_mutex val_lock; UniSetTypes::uniset_spin_mutex val_lock;
UniSetUNet::UNetMessage::UNetDataList::iterator pack_it; int pack_ind;
long val; long val;
friend std::ostream& operator<<( std::ostream& os, UItem& p ); friend std::ostream& operator<<( std::ostream& os, UItem& p );
}; };
void start();
void send();
void real_send();
void update( UniSetTypes::ObjectId id, long value );
inline void setSendPause( int msec ){ sendpause = msec; }
protected: protected:
xmlNode* cnode;
std::string s_field; std::string s_field;
std::string s_fvalue; std::string s_fvalue;
SMInterface* shm; SMInterface* shm;
void poll();
void recv();
void send();
void step();
virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg );
void sensorInfo( UniSetTypes::SensorMessage*sm );
void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady();
virtual bool activateObject();
// действия при завершении работы
virtual void sigterm( int signo );
void initIterators(); void initIterators();
bool initItem( UniXML_iterator& it ); bool initItem( UniXML_iterator& it );
bool readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec ); bool readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec );
...@@ -77,36 +67,23 @@ class UNetSender: ...@@ -77,36 +67,23 @@ class UNetSender:
private: private:
UNetSender(); UNetSender();
bool initPause;
UniSetTypes::uniset_mutex mutex_start;
PassiveTimer ptHeartBeat;
UniSetTypes::ObjectId sidHeartBeat;
int maxHeartBeat;
IOController::AIOStateList::iterator aitHeartBeat;
UniSetTypes::ObjectId test_id;
int sendtime; /*!< переодичность посылки данных, [мсек] */
ost::TCPStream* tcp; ost::UDPBroadcast* udp;
// ost::TCPSocket* tcp; ost::IPV4Address addr;
ost::IPV4Host host;
ost::tpport_t port; ost::tpport_t port;
std::string s_host;
UniSetTypes::uniset_mutex sendMutex; std::string myname;
Trigger trTimeout; int sendpause;
int sendTimeout;
bool activated; bool activated;
int activateTimeout;
UniSetUNet::UNetMessage mypack; UniSetUDP::UDPMessage mypack;
typedef std::vector<UItem> DMap; typedef std::vector<UItem> DMap;
DMap dlist; DMap dlist;
int maxItem; int maxItem;
unsigned long packetnum;
ThreadCreator<UNetSender>* thr; ThreadCreator<UNetSender>* s_thr; // send thread
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UNetSender_H_ #endif // UNetSender_H_
......
#!/bin/sh #!/bin/sh
uniset-start.sh -f ./uniset-udpexchange --udp-name UDPExchange --udp-host 192.168.56.255 \ uniset-start.sh -f ./uniset-unetexchange --unet-name UNetExchange \
--udp-broadcast 1 --udp-polltime 1000 \
--confile test.xml \ --confile test.xml \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 \
--dlog-add-levels info,crit,warn --dlog-add-levels info,crit,warn
# --udp-filter-field udp --udp-filter-value 1 \
#include <sstream> #include <sstream>
#include "ObjectsActivator.h" #include "ObjectsActivator.h"
#include "Extensions.h" #include "Extensions.h"
#include "UDPExchange.h" #include "UNetExchange.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
...@@ -15,16 +15,16 @@ int main( int argc, char** argv ) ...@@ -15,16 +15,16 @@ int main( int argc, char** argv )
{ {
cout << "--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>" << endl; cout << "--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl; cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << "--udp-logfile filename - logfilename. Default: udpexchange.log" << endl; cout << "--unet-logfile filename - logfilename. Default: udpexchange.log" << endl;
cout << endl; cout << endl;
UDPExchange::help_print(argc,argv); UNetExchange::help_print(argc,argv);
return 0; return 0;
} }
string confile=UniSetTypes::getArgParam("--confile",argc,argv,"configure.xml"); string confile=UniSetTypes::getArgParam("--confile",argc,argv,"configure.xml");
conf = new Configuration( argc, argv, confile ); conf = new Configuration( argc, argv, confile );
string logfilename(conf->getArgParam("--udp-logfile")); string logfilename(conf->getArgParam("--unet-logfile"));
if( logfilename.empty() ) if( logfilename.empty() )
logfilename = "udpexchange.log"; logfilename = "udpexchange.log";
...@@ -49,35 +49,39 @@ int main( int argc, char** argv ) ...@@ -49,35 +49,39 @@ int main( int argc, char** argv )
return 1; return 1;
} }
UDPExchange* rs = UDPExchange::init_udpexchange(argc,argv,shmID); UNetExchange* unet = UNetExchange::init_unetexchange(argc,argv,shmID);
if( !rs ) if( !unet )
{ {
dlog[Debug::CRIT] << "(udpexchange): init не прошёл..." << endl; dlog[Debug::CRIT] << "(unetexchange): init failed.." << endl;
return 1; return 1;
} }
ObjectsActivator act; ObjectsActivator act;
act.addObject(static_cast<class UniSetObject*>(rs)); act.addObject(static_cast<class UniSetObject*>(unet));
SystemMessage sm(SystemMessage::StartUp); SystemMessage sm(SystemMessage::StartUp);
act.broadcast( sm.transport_msg() ); act.broadcast( sm.transport_msg() );
unideb(Debug::ANY) << "\n\n\n"; unideb(Debug::ANY) << "\n\n\n";
unideb[Debug::ANY] << "(main): -------------- UDP Exchange START -------------------------\n\n"; unideb[Debug::ANY] << "(main): -------------- UDPRecevier START -------------------------\n\n";
dlog(Debug::ANY) << "\n\n\n"; dlog(Debug::ANY) << "\n\n\n";
dlog[Debug::ANY] << "(main): -------------- UDP Exchange START -------------------------\n\n"; dlog[Debug::ANY] << "(main): -------------- UDPReceiver START -------------------------\n\n";
act.run(false); act.run(false);
// msleep(500);
// rs->execute();
} }
catch( Exception& ex ) catch( Exception& ex )
{ {
dlog[Debug::CRIT] << "(udpexchange): " << ex << std::endl; dlog[Debug::CRIT] << "(unetexchange): " << ex << std::endl;
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
dlog[Debug::CRIT] << s.str() << endl;
} }
catch(...) catch(...)
{ {
dlog[Debug::CRIT] << "(udpexchange): catch ..." << std::endl; dlog[Debug::CRIT] << "(unetexchange): catch ..." << std::endl;
} }
return 0; return 0;
......
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPReceiver.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
int main( int argc, char** argv )
{
try
{
if( argc>1 && (!strcmp(argv[1],"--help") || !strcmp(argv[1],"-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << "--udp-logfile filename - logfilename. Default: udpexchange.log" << endl;
cout << endl;
UDPReceiver::help_print(argc,argv);
return 0;
}
string confile=UniSetTypes::getArgParam("--confile",argc,argv,"configure.xml");
conf = new Configuration( argc, argv, confile );
string logfilename(conf->getArgParam("--udp-logfile"));
if( logfilename.empty() )
logfilename = "udpexchange.log";
conf->initDebug(dlog,"dlog");
std::ostringstream logname;
string dir(conf->getLogDir());
logname << dir << logfilename;
unideb.logFile( logname.str() );
dlog.logFile( logname.str() );
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
UDPReceiver* udp = UDPReceiver::init_udpreceiver(argc,argv,shmID);
if( !udp )
{
dlog[Debug::CRIT] << "(udpreceiver): init не прошёл..." << endl;
return 1;
}
ObjectsActivator act;
act.addObject(static_cast<class UniSetObject*>(udp));
SystemMessage sm(SystemMessage::StartUp);
act.broadcast( sm.transport_msg() );
unideb(Debug::ANY) << "\n\n\n";
unideb[Debug::ANY] << "(main): -------------- UDPRecevier START -------------------------\n\n";
dlog(Debug::ANY) << "\n\n\n";
dlog[Debug::ANY] << "(main): -------------- UDPReceiver START -------------------------\n\n";
act.run(false);
// msleep(500);
// rs->execute();
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << "(udpexchange): " << ex << std::endl;
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
dlog[Debug::CRIT] << s.str() << endl;
}
catch(...)
{
dlog[Debug::CRIT] << "(udpexchange): catch ..." << std::endl;
}
return 0;
}
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPSender.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
int main( int argc, char** argv )
{
try
{
if( argc>1 && (!strcmp(argv[1],"--help") || !strcmp(argv[1],"-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << "--udp-logfile filename - logfilename. Default: udpexchange.log" << endl;
cout << endl;
UDPSender::help_print(argc,argv);
return 0;
}
string confile=UniSetTypes::getArgParam("--confile",argc,argv,"configure.xml");
conf = new Configuration( argc, argv, confile );
string logfilename(conf->getArgParam("--udp-logfile"));
if( logfilename.empty() )
logfilename = "udpexchange.log";
conf->initDebug(dlog,"dlog");
std::ostringstream logname;
string dir(conf->getLogDir());
logname << dir << logfilename;
unideb.logFile( logname.str() );
dlog.logFile( logname.str() );
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
UDPSender* udp = UDPSender::init_udpsender(argc,argv,shmID);
if( !udp )
{
dlog[Debug::CRIT] << "(udpsender): init не прошёл..." << endl;
return 1;
}
ObjectsActivator act;
act.addObject(static_cast<class UniSetObject*>(udp));
SystemMessage sm(SystemMessage::StartUp);
act.broadcast( sm.transport_msg() );
unideb(Debug::ANY) << "\n\n\n";
unideb[Debug::ANY] << "(main): -------------- UDPSender START -------------------------\n\n";
dlog(Debug::ANY) << "\n\n\n";
dlog[Debug::ANY] << "(main): -------------- UDPSender START -------------------------\n\n";
act.run(false);
// msleep(500);
// rs->execute();
}
catch( Exception& ex )
{
dlog[Debug::CRIT] << "(udpsender): " << ex << std::endl;
}
catch( ost::SockException& e )
{
dlog[Debug::CRIT] << "(udpsender): " << e.getSystemErrorString() << endl;
}
catch(...)
{
dlog[Debug::CRIT] << "(udpsender): catch ..." << std::endl;
}
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment