Commit 2b89a801 authored by Pavel Vainerman's avatar Pavel Vainerman

Merge remote-tracking branch 'uzum/master'

parents 0dc16760 32852a64
......@@ -41,6 +41,7 @@ BuildRequires: librrd-devel
%if_enabled python
BuildRequires: python-devel
BuildRequires(pre): rpm-build-python
Requires: python-module-libxml2
# swig
# add_findprov_lib_path %python_sitelibdir/%oname
......
......@@ -77,6 +77,7 @@ AC_ARG_ENABLE(mysql, AC_HELP_STRING([--disable-mysql], [disable MySQL support]),
if test ${buildmysql} = true; then
AC_MSG_RESULT([enabled])
AC_CHECK_HEADERS([mysql/mysql.h])
AC_CHECK_LIB([mysqlclient],mysql_init,,exit)
else
AC_MSG_RESULT([disabled])
fi
......
......@@ -56,15 +56,15 @@ sender2(0)
UniXML_iterator n_it(nodes);
string default_ip(n_it.getProp("unet_broadcast_ip"));
string default_ip2(n_it.getProp("unet_broadcast_ip2"));
string default_ip(n_it.getProp(prefix+"_broadcast_ip"));
string default_ip2(n_it.getProp(prefix+"_broadcast_ip2"));
if( !n_it.goChildren() )
throw UniSetTypes::SystemError("(UNetExchange): Items not found for <nodes>");
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
if( n_it.getIntProp(prefix+"_ignore") )
{
dlog[Debug::INFO] << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
......@@ -80,13 +80,13 @@ sender2(0)
string h2("");
if( !default_ip.empty() )
h = default_ip;
if( !n_it.getProp("unet_broadcast_ip").empty() )
h = n_it.getProp("unet_broadcast_ip");
if( !n_it.getProp(prefix+"_broadcast_ip").empty() )
h = n_it.getProp(prefix+"_broadcast_ip");
if( !default_ip2.empty() )
h2 = default_ip2;
if( !n_it.getProp("unet_broadcast_ip2").empty() )
h2 = n_it.getProp("unet_broadcast_ip2");
if( !n_it.getProp(prefix+"_broadcast_ip2").empty() )
h2 = n_it.getProp(prefix+"_broadcast_ip2");
if( h.empty() )
{
......@@ -102,12 +102,12 @@ sender2(0)
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
int p = n_it.getIntProp("id");
if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port");
if( !n_it.getProp(prefix+"_port").empty() )
p = n_it.getIntProp(prefix+"_port");
int p2 = p; // по умолчанию порт на втором канале такой же как на первом
if( !n_it.getProp("unet_port2").empty() )
p2 = n_it.getIntProp("unet_port2");
if( !n_it.getProp(prefix+"_port2").empty() )
p2 = n_it.getIntProp(prefix+"_port2");
string n(n_it.getProp("name"));
if( n == conf->getLocalNodeName() )
......@@ -120,7 +120,7 @@ sender2(0)
}
dlog[Debug::INFO] << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = new UNetSender(h,p,shm,s_field,s_fvalue,ic);
sender = create_sender(h,p,shm,s_field,s_fvalue,ic);
sender->setSendPause(sendpause);
try
......@@ -129,7 +129,7 @@ sender2(0)
if( !h2.empty() )
{
dlog[Debug::INFO] << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = new UNetSender(h2,p2,shm,s_field,s_fvalue,ic);
sender2 = create_sender(h2,p2,shm,s_field,s_fvalue,ic);
sender2->setSendPause(sendpause);
}
}
......@@ -152,9 +152,9 @@ sender2(0)
continue;
}
bool resp_invert = n_it.getIntProp("unet_respond_invert");
bool resp_invert = n_it.getIntProp(prefix+"_respond_invert");
string s_resp_id(n_it.getProp("unet_respond1_id"));
string s_resp_id(n_it.getProp(prefix+"_respond1_id"));
UniSetTypes::ObjectId resp_id = UniSetTypes::DefaultObjectId;
if( !s_resp_id.empty() )
{
......@@ -168,7 +168,7 @@ sender2(0)
}
}
string s_resp2_id(n_it.getProp("unet_respond2_id"));
string s_resp2_id(n_it.getProp(prefix+"_respond2_id"));
UniSetTypes::ObjectId resp2_id = UniSetTypes::DefaultObjectId;
if( !s_resp2_id.empty() )
{
......@@ -182,7 +182,7 @@ sender2(0)
}
}
string s_lp_id(n_it.getProp("unet_lostpackets1_id"));
string s_lp_id(n_it.getProp(prefix+"_lostpackets1_id"));
UniSetTypes::ObjectId lp_id = UniSetTypes::DefaultObjectId;
if( !s_lp_id.empty() )
{
......@@ -196,7 +196,7 @@ sender2(0)
}
}
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
string s_lp2_id(n_it.getProp(prefix+"_lostpackets2_id"));
UniSetTypes::ObjectId lp2_id = UniSetTypes::DefaultObjectId;
if( !s_lp2_id.empty() )
{
......@@ -210,7 +210,7 @@ sender2(0)
}
}
string s_lp_comm_id(n_it.getProp("unet_lostpackets_id"));
string s_lp_comm_id(n_it.getProp(prefix+"_lostpackets_id"));
UniSetTypes::ObjectId lp_comm_id = UniSetTypes::DefaultObjectId;
if( !s_lp_comm_id.empty() )
{
......@@ -224,7 +224,7 @@ sender2(0)
}
}
string s_resp_comm_id(n_it.getProp("unet_respond_id"));
string s_resp_comm_id(n_it.getProp(prefix+"_respond_id"));
UniSetTypes::ObjectId resp_comm_id = UniSetTypes::DefaultObjectId;
if( !s_resp_comm_id.empty() )
{
......@@ -241,7 +241,7 @@ sender2(0)
dlog[Debug::INFO] << myname << "(init): (node='" << n << "') add receiver "
<< h2 << ":" << p2 << endl;
UNetReceiver* r = new UNetReceiver(h,p,shm);
UNetReceiver* r = create_receiver(h,p,shm);
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
......@@ -266,7 +266,7 @@ sender2(0)
dlog[Debug::INFO] << myname << "(init): (node='" << n << "') add reserv receiver "
<< h2 << ":" << p2 << endl;
r2 = new UNetReceiver(h2,p2,shm);
r2 = create_receiver(h2,p2,shm);
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
......@@ -351,6 +351,17 @@ UNetExchange::~UNetExchange()
delete shm;
}
// -----------------------------------------------------------------------------
UNetReceiver* UNetExchange::create_receiver( const std::string& h, const ost::tpport_t p, SMInterface* shm )
{
return new UNetReceiver(h,p,shm);
}
// -----------------------------------------------------------------------------
UNetSender* UNetExchange::create_sender( const std::string h, const ost::tpport_t p, SMInterface* shm,
const std::string s_field, const std::string s_fvalue, SharedMemory* ic )
{
return new UNetSender(h,p,shm,s_field,s_fvalue,ic);
}
// -----------------------------------------------------------------------------
bool UNetExchange::checkExistUNetHost( const std::string& addr, ost::tpport_t port )
{
ost::IPV4Address a1(addr.c_str());
......@@ -765,4 +776,24 @@ void UNetExchange::receiverEvent( UNetReceiver* r, UNetReceiver::Event ev )
}
}
// -----------------------------------------------------------------------------
void UNetExchange::ignore_item(UniSetTypes::ObjectId id, bool set)
{
std::list<UNetReceiver*> rList = get_receivers();
std::list<UNetReceiver*>::iterator rIt = rList.begin();
for(; rIt != rList.end(); ++ rIt )
(*rIt)->ignore_item(id, set);
}
// -----------------------------------------------------------------------------
std::list<UNetReceiver*> UNetExchange::get_receivers()
{
std::list<UNetReceiver*> tList;
for( ReceiverList::iterator it=recvlist.begin(); it!=recvlist.end(); ++it )
{
if(it->r1)
tList.push_back(it->r1);
if(it->r2)
tList.push_back(it->r2);
}
return tList;
}
// -----------------------------------------------------------------------------
......@@ -90,8 +90,12 @@ class UNetExchange:
static void help_print( int argc, const char* argv[] );
bool checkExistUNetHost( const std::string& host, ost::tpport_t port );
std::list<UNetReceiver*> get_receivers();
/*! игнорировать запись датчика в SM */
void ignore_item(UniSetTypes::ObjectId id = UniSetTypes::DefaultObjectId, bool set = true);
protected:
UNetExchange();
xmlNode* cnode;
std::string s_field;
......@@ -99,7 +103,10 @@ class UNetExchange:
SMInterface* shm;
void step();
virtual UNetReceiver* create_receiver( const std::string& h, const ost::tpport_t p, SMInterface* shm );
virtual UNetSender* create_sender( const std::string h, const ost::tpport_t p, SMInterface* shm,
const std::string s_field="", const std::string s_fvalue="", SharedMemory* ic=0 );
virtual void processingMessage( UniSetTypes::VoidMessage *msg );
void sysCommand( UniSetTypes::SystemMessage *msg );
void sensorInfo( UniSetTypes::SensorMessage*sm );
......@@ -123,7 +130,6 @@ class UNetExchange:
};
private:
UNetExchange();
bool initPause;
UniSetTypes::uniset_mutex mutex_start;
......
......@@ -297,8 +297,9 @@ void UNetReceiver::real_update()
if( lockUpdate )
continue;
}
if( ii.iotype == UniversalIO::DigitalInput )
if( ii.ignore )
continue;
else if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,id,val,shm->ID());
......@@ -341,7 +342,9 @@ void UNetReceiver::real_update()
continue;
}
if( ii.iotype == UniversalIO::DigitalInput )
if( ii.ignore )
continue;
else if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,shm->ID());
......@@ -363,7 +366,60 @@ void UNetReceiver::real_update()
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::updateDItem( ItemInfo& ii, const long& id, bool val )
{
try
{
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,id,val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,id,val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(updateAItem): Unknown iotype for sid=" << id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(updateAItem): " << ex << std::endl;
throw ex;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(updateAItem): catch ..." << std::endl;
throw;
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::updateAItem( ItemInfo& ii, const UniSetUDP::UDPAData& d )
{
try
{
if( ii.iotype == UniversalIO::DigitalInput )
shm->localSaveState(ii.dit,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogInput )
shm->localSaveValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::AnalogOutput )
shm->localSetValue(ii.ait,d.id,d.val,shm->ID());
else if( ii.iotype == UniversalIO::DigitalOutput )
shm->localSetState(ii.dit,d.id,d.val,shm->ID());
else
dlog[Debug::CRIT] << myname << "(updateAItem): Unknown iotype for sid=" << d.id << endl;
}
catch( UniSetTypes::Exception& ex)
{
dlog[Debug::CRIT] << myname << "(updateAItem): " << ex << std::endl;
throw ex;
}
catch(...)
{
dlog[Debug::CRIT] << myname << "(updateAItem): catch ..." << std::endl;
throw;
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::stop()
{
......@@ -519,6 +575,30 @@ void UNetReceiver::initIterators()
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::ignore_item(UniSetTypes::ObjectId id, bool set)
{
for( ItemVec::iterator it=d_icache.begin(); it!=d_icache.end(); ++it )
{
if( id == UniSetTypes::DefaultObjectId )
it->ignore = set;
else if( id == it->id )
{
it->ignore = set;
return;
}
}
for( ItemVec::iterator it=a_icache.begin(); it!=a_icache.end(); ++it )
{
if( id == UniSetTypes::DefaultObjectId )
it->ignore = set;
else if( id == it->id )
{
it->ignore = set;
return;
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force )
{
if( !force && pack.dcount == d_icache.size() )
......
......@@ -93,19 +93,45 @@ class UNetReceiver
typedef sigc::slot<void,UNetReceiver*,Event> EventSlot;
void connectEvent( EventSlot sl );
/*! игнорировать запись датчика в SM */
void ignore_item(UniSetTypes::ObjectId id = UniSetTypes::DefaultObjectId, bool set = true);
protected:
UNetReceiver();
SMInterface* shm;
bool recv();
void step();
void real_update();
virtual void real_update();
std::string myname;
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
bool ignore; /*!< флаг игнорирования сохранения в SM*/
ItemInfo():
id(UniSetTypes::DefaultObjectId),
iotype(UniversalIO::UnknownIOType),
ignore(false){}
};
virtual void updateDItem( ItemInfo& ii, const long& id, bool val );
virtual void updateAItem( ItemInfo& ii, const UniSetUDP::UDPAData& d );
void initIterators();
typedef std::vector<ItemInfo> ItemVec;
ItemVec d_icache; /*!< кэш итераторов для булевых */
ItemVec a_icache; /*!< кэш итераторов для аналоговых */
bool d_cache_init_ok;
bool a_cache_init_ok;
private:
UNetReceiver();
int recvpause; /*!< пауза меджду приёмами пакетов, [мсек] */
int updatepause; /*!< переодичность обновления данных в SM, [мсек] */
......@@ -113,7 +139,6 @@ class UNetReceiver
ost::UDPReceive* udp;
ost::IPV4Address addr;
ost::tpport_t port;
std::string myname;
UniSetTypes::uniset_mutex pollMutex;
PassiveTimer ptRecvTimeout;
......@@ -168,27 +193,8 @@ class UNetReceiver
Trigger trTimeout;
UniSetTypes::uniset_mutex tmMutex;
struct ItemInfo
{
long id;
IOController::AIOStateList::iterator ait;
IOController::DIOStateList::iterator dit;
UniversalIO::IOTypes iotype;
ItemInfo():
id(UniSetTypes::DefaultObjectId),
iotype(UniversalIO::UnknownIOType){}
};
typedef std::vector<ItemInfo> ItemVec;
ItemVec d_icache; /*!< кэш итераторов для булевых */
ItemVec a_icache; /*!< кэш итераторов для аналоговых */
bool d_cache_init_ok;
bool a_cache_init_ok;
void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
virtual void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
virtual void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
};
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
......
......@@ -49,7 +49,7 @@ class UNetSender
void real_send();
/*! (принудительно) обновить все данные (из SM) */
void updateFromSM();
virtual void updateFromSM();
/*! Обновить значение по ID датчика */
void updateSensor( UniSetTypes::ObjectId id, long value );
......@@ -60,39 +60,40 @@ class UNetSender
inline void setSendPause( int msec ){ sendpause = msec; }
/*! заказать датчики */
void askSensors( UniversalIO::UIOCommand cmd );
virtual void askSensors( UniversalIO::UIOCommand cmd );
/*! инициализация итераторов */
void initIterators();
protected:
UNetSender();
std::string myname;
UniSetUDP::UDPMessage mypack;
DMap dlist;
int maxItem;
std::string s_field;
std::string s_fvalue;
SMInterface* shm;
bool initItem( UniXML_iterator& it );
virtual bool initItem( UniXML_iterator& it );
bool readItem( UniXML& xml, UniXML_iterator& it, xmlNode* sec );
void readConfiguration();
virtual void readConfiguration();
private:
UNetSender();
ost::UDPBroadcast* udp;
ost::IPV4Address addr;
ost::tpport_t port;
std::string s_host;
std::string myname;
int sendpause;
bool activated;
UniSetTypes::uniset_mutex pack_mutex;
UniSetUDP::UDPMessage mypack;
DMap dlist;
int maxItem;
unsigned long packetnum;
UniSetUDP::UDPPacket s_msg;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment