Commit 51ba4faa authored by Pavel Vainerman's avatar Pavel Vainerman

добавил UNetSender и встроил его в UNetExchange

parent 994c91d5
...@@ -82,7 +82,7 @@ ...@@ -82,7 +82,7 @@
textname - текстовое имя датчика textname - текстовое имя датчика
--> -->
<nodes port="2809"> <nodes port="2809">
<item infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1"> <item infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_port="3000" unet_ip="192.168.56.255">
<iocards> <iocards>
<item card="1" name="DI32"/> <item card="1" name="DI32"/>
<item card="2" name="DO32"/> <item card="2" name="DO32"/>
......
...@@ -11,7 +11,7 @@ libUniSetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet.la \ ...@@ -11,7 +11,7 @@ libUniSetUDP_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
$(top_builddir)/extensions/lib/libUniSetExtensions.la \ $(top_builddir)/extensions/lib/libUniSetExtensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS) $(SIGC_LIBS) $(COMCPP_LIBS)
libUniSetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS) libUniSetUDP_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiver.cc UNetReceiver.cc UNetExchange.cc libUniSetUDP_la_SOURCES = UDPPacket.cc UDPExchange.cc UDPNReceiver.cc UDPReceiver.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
#UDPSender.cc #UDPSender.cc
#UDPSender.cc #UDPSender.cc
......
...@@ -10,7 +10,9 @@ UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId s ...@@ -10,7 +10,9 @@ UNetExchange::UNetExchange( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId s
UniSetObject_LT(objId), UniSetObject_LT(objId),
shm(0), shm(0),
initPause(0), initPause(0),
activated(false) activated(false),
no_sender(false),
sender(0)
{ {
if( objId == DefaultObjectId ) if( objId == DefaultObjectId )
throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" ); throw UniSetTypes::SystemError("(UNetExchange): objId=-1?!! Use --unet-name" );
...@@ -32,11 +34,14 @@ activated(false) ...@@ -32,11 +34,14 @@ activated(false)
int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000); int recvTimeout = conf->getArgPInt("--unet-recv-timeout",it.getProp("recvTimeout"), 5000);
int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10); int recvpause = conf->getArgPInt("--unet-recvpause",it.getProp("recvpause"), 10);
int sendpause = conf->getArgPInt("--unet-sendpause",it.getProp("sendpause"), 150);
int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100); int updatepause = conf->getArgPInt("--unet-updatepause",it.getProp("updatepause"), 100);
steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000); steptime = conf->getArgPInt("--unet-steptime",it.getProp("steptime"), 1000);
int minBufSize = conf->getArgPInt("--unet-minbufsize",it.getProp("minBufSize"), 30); int minBufSize = conf->getArgPInt("--unet-minbufsize",it.getProp("minBufSize"), 30);
int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100); int maxProcessingCount = conf->getArgPInt("--unet-maxprocessingcount",it.getProp("maxProcessingCount"), 100);
no_sender = conf->getArgInt("--unet-nosender",it.getProp("nosender"));
xmlNode* nodes = conf->getXMLNodesSection(); xmlNode* nodes = conf->getXMLNodesSection();
if( !nodes ) if( !nodes )
throw UniSetTypes::SystemError("(UNetExchange): Not found <nodes>"); throw UniSetTypes::SystemError("(UNetExchange): Not found <nodes>");
...@@ -47,13 +52,7 @@ activated(false) ...@@ -47,13 +52,7 @@ activated(false)
for( ; n_it.getCurrent(); n_it.goNext() ) for( ; n_it.getCurrent(); n_it.goNext() )
{ {
if( !n_it.getProp("unet_ignore").empty() ) string h(n_it.getProp("ip"));
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
string h = n_it.getProp("ip");
if( !n_it.getProp("unet_ip").empty() ) if( !n_it.getProp("unet_ip").empty() )
h = n_it.getProp("unet_ip"); h = n_it.getProp("unet_ip");
...@@ -61,6 +60,21 @@ activated(false) ...@@ -61,6 +60,21 @@ activated(false)
if( !n_it.getProp("unet_port").empty() ) if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port"); p = n_it.getIntProp("unet_port");
string n(n_it.getProp("name"));
if( n == conf->getLocalNodeName() )
{
dlog[Debug::INFO] << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = new UNetSender(h,p,shm,s_field,s_fvalue,ic);
sender->setSendPause(sendpause);
continue;
}
if( !n_it.getProp("unet_ignore").empty() )
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
dlog[Debug::INFO] << myname << "(init): add UNetReceiver for " << h << ":" << p << endl; dlog[Debug::INFO] << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
if( checkExistUNetHost(h,p) ) if( checkExistUNetHost(h,p) )
...@@ -128,6 +142,7 @@ UNetExchange::~UNetExchange() ...@@ -128,6 +142,7 @@ UNetExchange::~UNetExchange()
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it ) for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
delete (*it); delete (*it);
delete sender;
delete shm; delete shm;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -149,6 +164,12 @@ void UNetExchange::startReceivers() ...@@ -149,6 +164,12 @@ void UNetExchange::startReceivers()
(*it)->start(); (*it)->start();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::initSender( const std::string s_host, const ost::tpport_t port, UniXML_iterator& it )
{
if( no_sender )
return;
}
// -----------------------------------------------------------------------------
void UNetExchange::waitSMReady() void UNetExchange::waitSMReady()
{ {
// waiting for SM is ready... // waiting for SM is ready...
...@@ -272,6 +293,8 @@ void UNetExchange::sysCommand( UniSetTypes::SystemMessage *sm ) ...@@ -272,6 +293,8 @@ void UNetExchange::sysCommand( UniSetTypes::SystemMessage *sm )
} }
askTimer(tmStep,steptime); askTimer(tmStep,steptime);
startReceivers(); startReceivers();
if( sender )
sender->start();
} }
case SystemMessage::FoldUp: case SystemMessage::FoldUp:
...@@ -336,6 +359,8 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd ) ...@@ -336,6 +359,8 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm ) void UNetExchange::sensorInfo( UniSetTypes::SensorMessage* sm )
{ {
if( sender )
sender->update(sm->id,sm->value);
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject() bool UNetExchange::activateObject()
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "SharedMemory.h" #include "SharedMemory.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UNetReceiver.h" #include "UNetReceiver.h"
#include "UNetSender.h"
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetExchange: class UNetExchange:
public UniSetObject_LT public UniSetObject_LT
...@@ -27,7 +28,6 @@ class UNetExchange: ...@@ -27,7 +28,6 @@ class UNetExchange:
/*! глобальная функция для вывода help-а */ /*! глобальная функция для вывода help-а */
static void help_print( int argc, char* argv[] ); static void help_print( int argc, char* argv[] );
bool checkExistUNetHost( const std::string host, ost::tpport_t port ); bool checkExistUNetHost( const std::string host, ost::tpport_t port );
protected: protected:
...@@ -53,6 +53,7 @@ class UNetExchange: ...@@ -53,6 +53,7 @@ class UNetExchange:
void initIterators(); void initIterators();
void startReceivers(); void startReceivers();
void initSender( const std::string host, const ost::tpport_t port, UniXML_iterator& it );
enum Timer enum Timer
{ {
...@@ -77,6 +78,9 @@ class UNetExchange: ...@@ -77,6 +78,9 @@ class UNetExchange:
typedef std::list<UNetReceiver*> ReceiverList; typedef std::list<UNetReceiver*> ReceiverList;
ReceiverList recvlist; ReceiverList recvlist;
bool no_sender; /*!< флаг отключения посылки сообщений */
UNetSender* sender;
}; };
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif // UNetExchange_H_ #endif // UNetExchange_H_
......
...@@ -30,6 +30,12 @@ maxProcessingCount(100), ...@@ -30,6 +30,12 @@ maxProcessingCount(100),
icache(200), icache(200),
cache_init_ok(false) cache_init_ok(false)
{ {
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
myname = s.str();
}
try try
{ {
// ost::IPV4Cidr ci(s_host.c_str()); // ost::IPV4Cidr ci(s_host.c_str());
...@@ -50,11 +56,6 @@ cache_init_ok(false) ...@@ -50,11 +56,6 @@ cache_init_ok(false)
r_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::receive); r_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::receive);
u_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::update); u_thr = new ThreadCreator<UNetReceiver>(this, &UNetReceiver::update);
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
myname = s.str();
}
ptRecvTimeout.setTiming(recvTimeout); ptRecvTimeout.setTiming(recvTimeout);
} }
...@@ -249,8 +250,8 @@ bool UNetReceiver::recv() ...@@ -249,8 +250,8 @@ bool UNetReceiver::recv()
} }
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz // cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
<< " header: " << pack.msg.header << endl; // << " header: " << pack.msg.header << endl;
/* /*
if( labs(pack.msg.header.num - pnum) > 1 ) if( labs(pack.msg.header.num - pnum) > 1 )
{ {
......
...@@ -73,7 +73,7 @@ class UNetReceiver ...@@ -73,7 +73,7 @@ class UNetReceiver
int recvpause; /*!< пауза меджду приёмами пакетов, [мсек] */ int recvpause; /*!< пауза меджду приёмами пакетов, [мсек] */
int updatepause; /*!< переодичность обновления данных в SM, [мсек] */ int updatepause; /*!< переодичность обновления данных в SM, [мсек] */
ost::UDPDuplex* udp; ost::UDPReceive* udp;
ost::IPV4Address addr; ost::IPV4Address addr;
ost::tpport_t port; ost::tpport_t port;
std::string myname; std::string myname;
......
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetSender.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
UNetSender::UNetSender( const std::string s_host, const ost::tpport_t port, SMInterface* smi,
const std::string s_f, const std::string s_val, SharedMemory* ic ):
s_field(s_f),
s_fvalue(s_val),
shm(smi),
s_host(s_host),
sendpause(150),
activated(false),
dlist(100),
maxItem(0),
packetnum(1),
s_thr(0)
{
{
ostringstream s;
s << "(" << s_host << ":" << port << ")";
myname = s.str();
}
// определяем фильтр
// s_field = conf->getArgParam("--udp-filter-field");
// s_fvalue = conf->getArgParam("--udp-filter-value");
dlog[Debug::INFO] << myname << "(init): read fileter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << "(UNetSender): UDP set to " << s_host << ":" << port << endl;
try
{
addr = s_host.c_str();
udp = new ost::UDPBroadcast(addr,port);
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString() << endl;
throw SystemError(s.str());
}
s_thr = new ThreadCreator<UNetSender>(this, &UNetSender::send);
// -------------------------------
if( shm->isLocalwork() )
{
readConfiguration();
dlist.resize(maxItem);
dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl;
}
else
ic->addReadItem( sigc::mem_fun(this,&UNetSender::readItem) );
}
// -----------------------------------------------------------------------------
UNetSender::~UNetSender()
{
delete s_thr;
delete udp;
delete shm;
}
// -----------------------------------------------------------------------------
void UNetSender::update( UniSetTypes::ObjectId id, long value )
{
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); ++it )
{
if( it->si.id == id )
{
uniset_spin_lock lock(it->val_lock);
it->val = value;
}
break;
}
}
// -----------------------------------------------------------------------------
void UNetSender::send()
{
dlist.resize(maxItem);
dlog[Debug::INFO] << myname << "(init): dlist size = " << dlist.size() << endl;
/*
ost::IPV4Broadcast h = s_host.c_str();
try
{
udp->setPeer(h,port);
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
dlog[Debug::CRIT] << myname << "(poll): " << s.str() << endl;
throw SystemError(s.str());
}
*/
while( activated )
{
try
{
real_send();
}
catch( ost::SockException& e )
{
cerr << e.getString() << ": " << e.getSystemErrorString() << endl;
}
catch( UniSetTypes::Exception& ex)
{
cerr << myname << "(send): " << ex << std::endl;
}
catch(...)
{
cerr << myname << "(send): catch ..." << std::endl;
}
msleep(sendpause);
}
cerr << "************* execute FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
void UNetSender::real_send()
{
// cout << myname << ": send..." << endl;
UniSetUDP::UDPHeader h;
h.nodeID = conf->getLocalNode();
h.procID = shm->ID();
h.dcount = mypack.msg.header.dcount;
h.num = packetnum++;
mypack.msg.header = h;
// cout << "************* send header: " << mypack.msg.header << endl;
int sz = mypack.size() * sizeof(UniSetUDP::UDPHeader);
if( udp->isPending(ost::Socket::pendingOutput) )
{
// ssize_t ret = udp->send( (char*)&(mypack.msg),sizeof(mypack.msg));
// if( ret<sizeof(mypack.msg) )
ssize_t ret = udp->send( (char*)&(mypack.msg),sz);
if( ret < sz )
{
// cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sz << endl;
return;
}
// cout << "send OK. byte count=" << ret << endl;
}
}
// -----------------------------------------------------------------------------
void UNetSender::start()
{
if( !activated )
{
activated = true;
s_thr->start();
}
}
// -----------------------------------------------------------------------------
void UNetSender::readConfiguration()
{
xmlNode* root = conf->getXMLSensorsSection();
if(!root)
{
ostringstream err;
err << myname << "(readConfiguration): not found <sensors>";
throw SystemError(err.str());
}
UniXML_iterator it(root);
if( !it.goChildren() )
{
std::cerr << myname << "(readConfiguration): empty <sensors>?!!" << endl;
return;
}
for( ;it.getCurrent(); it.goNext() )
{
if( check_item(it) )
initItem(it);
}
}
// ------------------------------------------------------------------------------------------
bool UNetSender::check_item( UniXML_iterator& it )
{
if( s_field.empty() )
return true;
// просто проверка на не пустой field
if( s_fvalue.empty() && it.getProp(s_field).empty() )
return false;
// просто проверка что field = value
if( !s_fvalue.empty() && it.getProp(s_field)!=s_fvalue )
return false;
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetSender::readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec )
{
if( check_item(it) )
initItem(it);
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetSender::initItem( UniXML_iterator& it )
{
string sname( it.getProp("name") );
string tid = it.getProp("id");
ObjectId sid;
if( !tid.empty() )
{
sid = UniSetTypes::uni_atoi(tid);
if( sid <= 0 )
sid = DefaultObjectId;
}
else
sid = conf->getSensorID(sname);
if( sid == DefaultObjectId )
{
if( dlog )
dlog[Debug::CRIT] << myname << "(readItem): ID not found for "
<< sname << endl;
return false;
}
UItem p;
p.si.id = sid;
p.si.node = conf->getLocalNode();
mypack.addData(sid,0);
p.pack_ind = mypack.size()-1;
if( maxItem >= mypack.size() )
dlist.resize(maxItem+10);
dlist[maxItem] = p;
maxItem++;
if( dlog.debugging(Debug::INFO) )
dlog[Debug::INFO] << myname << "(initItem): add " << p << endl;
return true;
}
// ------------------------------------------------------------------------------------------
void UNetSender::initIterators()
{
DMap::iterator it=dlist.begin();
for( ; it!=dlist.end(); it++ )
{
shm->initDIterator(it->dit);
shm->initAIterator(it->ait);
}
}
// -----------------------------------------------------------------------------
std::ostream& operator<<( std::ostream& os, UNetSender::UItem& p )
{
return os << " sid=" << p.si.id;
}
// -----------------------------------------------------------------------------
#ifndef UNetSender_H_
#define UNetSender_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <vector>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UDPPacket.h"
#include "UDPNReceiver.h"
// -----------------------------------------------------------------------------
class UNetSender
{
public:
UNetSender( const std::string host, const ost::tpport_t port, SMInterface* smi,
const std::string s_field="", const std::string s_fvalue="", SharedMemory* ic=0 );
~UNetSender();
struct UItem
{
UItem():
val(0)
{}
IOController_i::SensorInfo si;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniSetTypes::uniset_spin_mutex val_lock;
int pack_ind;
long val;
friend std::ostream& operator<<( std::ostream& os, UItem& p );
};
void start();
void send();
void real_send();
void update( UniSetTypes::ObjectId id, long value );
inline void setSendPause( int msec ){ sendpause = msec; }
protected:
std::string s_field;
std::string s_fvalue;
SMInterface* shm;
void initIterators();
bool initItem( UniXML_iterator& it );
bool readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec );
void readConfiguration();
bool check_item( UniXML_iterator& it );
private:
UNetSender();
ost::UDPBroadcast* udp;
ost::IPV4Address addr;
ost::tpport_t port;
std::string s_host;
std::string myname;
int sendpause;
bool activated;
UniSetUDP::UDPMessage mypack;
typedef std::vector<UItem> DMap;
DMap dlist;
int maxItem;
long packetnum;
ThreadCreator<UNetSender>* s_thr; // send thread
};
// -----------------------------------------------------------------------------
#endif // UNetSender_H_
// -----------------------------------------------------------------------------
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment