Commit 3426e8bd authored by Pavel Vainerman's avatar Pavel Vainerman

(IOC): переработка заказа датчиков. Попытка уйти от расинхронизации

сообщений SensorMessage (в момент заказа датчика и в момент изменения его состояния).
parent e61596e3
......@@ -16,7 +16,7 @@
Name: libuniset2
Version: 2.6
Release: alt15
Release: alt16
Summary: UniSet - library for building distributed industrial control systems
License: LGPL
......@@ -508,6 +508,9 @@ rm -f %buildroot%_libdir/*.la
# history of current unpublished changes
%changelog
* Wed May 03 2017 Pavel Vainerman <pv@altlinux.ru> 2.6-alt16
- IOC:askSensor() refactoring
* Mon May 01 2017 Pavel Vainerman <pv@altlinux.ru> 2.6-alt15
- (codegen): add msg statistic for getInfo()
- (http): minor fixes format for help
......
......@@ -285,8 +285,8 @@ namespace uniset
virtual bool activateObject() override;
virtual void initItem(std::shared_ptr<USensorInfo>& usi, IOController* ic );
//! посылка информации об изменении состояния датчика
virtual void send( ConsumerListInfo& lst, const uniset::SensorMessage& sm );
//! посылка информации об изменении состояния датчика (всем или указанному заказчику)
virtual void send( ConsumerListInfo& lst, const uniset::SensorMessage& sm, const uniset::ConsumerInfo* ci = nullptr );
//! проверка срабатывания пороговых датчиков
virtual void checkThreshold( std::shared_ptr<USensorInfo>& usi, bool send = true );
......@@ -295,10 +295,6 @@ namespace uniset
//! поиск информации о пороговом датчике
ThresholdInfoExt* findThreshold( AskThresholdMap& tmap, const uniset::ObjectId sid, const uniset::ThresholdId tid );
// обновление статистики
bool updateThresholdStat( AskThresholdMap& tmap, uniset::ObjectId sid, uniset::ThresholdId tid, const uniset::ConsumerInfo& ci, size_t stat_smCount );
bool updateSensorStat( IOController::IOStateList::iterator& it, const uniset::ConsumerInfo& ci, size_t stat_smCount );
/*! сохранение списка заказчиков
По умолчанию делает dump, если объявлен dumper.
*/
......
......@@ -504,41 +504,32 @@ void IONotifyController::askSensor(const uniset::ObjectId sid,
<< " на аналоговый датчик "
<< uniset_conf()->oind->getNameById(sid) << endl;
// если такого аналогового датчика нет, здесь сработает исключение...
auto li = myioEnd();
try
{
// если такого аналогового датчика нет, здесь сработает исключение...
localGetValue(li, sid);
}
catch( IOController_i::Undefined& ex ) {}
{
// lock
uniset_rwmutex_wrlock lock(askIOMutex);
// а раз есть обрабатываем
ask(askIOList, sid, ci, cmd);
} // unlock
// Чтобы не было гонки между текущей функцией (askSensor)
// и setValue(), которая может происходить параллельно с этой
// держим askIOMutex до конца функции, а также для посылки пользуемся функцией send()...
uniset_rwmutex_wrlock lock(askIOMutex);
ask(askIOList, sid, ci, cmd);
auto usi = li->second;
// посылка первый раз состояния
if( cmd == UniversalIO::UIONotify || (cmd == UIONotifyFirstNotNull && li->second->value) )
if( cmd == UniversalIO::UIONotify || (cmd == UIONotifyFirstNotNull && usi->value) )
{
SensorMessage smsg( std::move(li->second->makeSensorMessage()) );
try
{
ui->send(ci.id, std::move(smsg.transport_msg()), ci.node);
updateSensorStat(li,ci,1);
}
catch( const uniset::Exception& ex )
{
uwarn << myname << "(askSensor): " << uniset_conf()->oind->getNameById(sid) << " error: " << ex << endl;
}
catch( const CORBA::SystemException& ex )
if( usi->userdata[udataConsumerList] != nullptr )
{
uwarn << myname << "(askSensor): " << uniset_conf()->oind->getNameById(ci.id) << "@" << ci.node
<< " недоступен!!(CORBA::SystemException): "
<< ex.NP_minorString() << endl;
ConsumerListInfo& lst = *(static_cast<ConsumerListInfo*>(usi->userdata[udataConsumerList]));
SensorMessage smsg( std::move(usi->makeSensorMessage()) );
send(lst,smsg,&ci);
}
}
}
......@@ -667,25 +658,7 @@ long IONotifyController::localSetValue( std::shared_ptr<IOController::USensorInf
if( prevValue == curValue )
return curValue;
SensorMessage sm(1); // <-- вызываем dummy конструктор т.к. потом все поля всё-равно сами инициализируем
sm.id = usi->si.id;
sm.node = usi->si.node; // uniset_conf()->getLocalNode();
sm.value = curValue;
sm.priority = (Message::Priority)usi->priority;
sm.supplier = sup_id; // owner_id
sm.sensor_type = usi->type;
// копируем под lock только изменяемую часть
// lock
{
uniset_rwmutex_rlock lock(usi->val_lock);
sm.undefined = usi->undefined;
sm.sm_tv.tv_sec = usi->tv_sec;
sm.sm_tv.tv_nsec = usi->tv_nsec;
sm.ci = usi->ci;
} // unlock value
sm.tm = sm.sm_tv;
SensorMessage sm(std::move(usi->makeSensorMessage()));
try
{
......@@ -720,7 +693,7 @@ long IONotifyController::localSetValue( std::shared_ptr<IOController::USensorInf
Возможно нужно ввести своего агента на удалённой стороне, который будет заниматься
только приёмом сообщений и локальной рассылкой. Lav
*/
void IONotifyController::send( ConsumerListInfo& lst, const uniset::SensorMessage& sm )
void IONotifyController::send( ConsumerListInfo& lst, const uniset::SensorMessage& sm, const uniset::ConsumerInfo* ci )
{
TransportMessage tmsg(sm.transport_msg());
......@@ -728,6 +701,12 @@ void IONotifyController::send( ConsumerListInfo& lst, const uniset::SensorMessag
for( ConsumerList::iterator li = lst.clst.begin(); li != lst.clst.end(); ++li )
{
if( ci )
{
if( ci->id != li->id || ci->node != li->node )
continue;
}
for( int i = 0; i < sendAttemtps; i++ ) // на каждый объект по две попытки послать
{
try
......@@ -887,13 +866,13 @@ void IONotifyController::askThreshold(uniset::ObjectId sid, const uniset::Consum
if( lowLimit > hiLimit )
throw IONotifyController_i::BadRange();
// если такого дискретного датчика нет сдесь сработает исключение...
auto li = myioEnd();
CORBA::Long val = 0;
try
{
// если такого дискретного датчика нет сдесь сработает исключение...
val = localGetValue(li, sid);
}
catch( const IOController_i::Undefined& ex ) {}
......@@ -956,60 +935,31 @@ void IONotifyController::askThreshold(uniset::ObjectId sid, const uniset::Consum
if( cmd == UniversalIO::UIONotifyChange )
break;
// посылка первый раз состояния
try
{
SensorMessage sm;
sm.id = sid;
sm.node = uniset_conf()->getLocalNode();
sm.value = val;
sm.undefined = li->second->undefined;
sm.sensor_type = li->second->type;
sm.priority = (Message::Priority)li->second->priority;
sm.consumer = ci.id;
sm.tid = tid;
sm.ci = li->second->ci;
// Проверка нижнего предела
if( val <= lowLimit )
{
sm.threshold = false;
CORBA::Object_var op = ui->resolve(ci.id, ci.node);
UniSetObject_i_var ref = UniSetObject_i::_narrow(op);
it = askTMap.find(sid);
if(!CORBA::is_nil(ref))
{
ref->push( std::move(sm.transport_msg()) );
// посылка первый раз состояния
SensorMessage sm(std::move(li->second->makeSensorMessage()));
sm.consumer = ci.id;
sm.tid = tid;
// Проверка нижнего предела
if( val <= lowLimit )
sm.threshold = false;
// Проверка верхнего предела
else if( val >= hiLimit )
sm.threshold = true;
// askTMap уже залочен trshMutex
updateThresholdStat(askTMap, sid, tid, ci, 1);
}
}
// Проверка верхнего предела
else if( val >= hiLimit )
if( it != askTMap.end() )
{
uniset_rwmutex_rlock l(it->second.mut);
for( auto i = it->second.list.begin(); i != it->second.list.end(); ++i )
{
sm.threshold = true;
CORBA::Object_var op = ui->resolve(ci.id, ci.node);
UniSetObject_i_var ref = UniSetObject_i::_narrow(op);
if(!CORBA::is_nil(ref))
if( i->id == tid )
{
ref->push( std::move(sm.transport_msg()) );
// askTMap уже залочен trshMutex
updateThresholdStat(askTMap, sid, tid, ci, 1);
send(i->clst,sm,&ci);
break;
}
}
}
catch( const uniset::Exception& ex )
{
uwarn << myname << "(askThreshod): " << ex << endl;
}
catch( const CORBA::SystemException& ex )
{
uwarn << myname << "(askThreshod): CORBA::SystemException: "
<< ex.NP_minorString() << endl;
}
}
break;
......@@ -1037,7 +987,8 @@ void IONotifyController::askThreshold(uniset::ObjectId sid, const uniset::Consum
}
// askTMap уже залочен trshMutex
it = askTMap.find(sid);
if( it == askTMap.end() )
it = askTMap.find(sid);
if( li != myioEnd() )
{
......@@ -1051,60 +1002,6 @@ void IONotifyController::askThreshold(uniset::ObjectId sid, const uniset::Consum
} // unlock
}
// --------------------------------------------------------------------------------------------------------------
bool IONotifyController::updateThresholdStat( AskThresholdMap& tmap, uniset::ObjectId sid, uniset::ThresholdId tid, const uniset::ConsumerInfo& ci, size_t smCount )
{
auto ti = findThreshold(tmap, sid, tid);
if( !ti )
return false;
uniset::uniset_rwmutex_rlock lock(ti->clst.mut);
for( auto&& c: ti->clst.clst )
{
if( c.id == ci.id && c.node == ci.node )
{
c.smCount += smCount;
return true;
}
}
return false;
}
// --------------------------------------------------------------------------------------------------------------
bool IONotifyController::updateSensorStat( IOController::IOStateList::iterator& it, const ConsumerInfo& ci, size_t smCount )
{
if( it == myioEnd() )
return false;
ConsumerListInfo* clist = nullptr;
if( it->second->userdata[udataConsumerList] )
clist = static_cast<ConsumerListInfo*>(it->second->userdata[udataConsumerList]);
if( !clist )
{
uniset_rwmutex_wrlock lock(askIOMutex);
auto i = askIOList.find(it->second->si.id);
if( i == askIOList.end() )
return false;
}
if( !clist )
return false;
uniset_rwmutex_wrlock lock(clist->mut);
for( auto&& c: clist->clst )
{
if( c.id == ci.id && c.node == ci.node )
{
c.smCount += smCount;
return true;
}
}
return false;
}
// --------------------------------------------------------------------------------------------------------------
bool IONotifyController::addThreshold( ThresholdExtList& lst, ThresholdInfoExt&& ti, const uniset::ConsumerInfo& ci )
{
for( auto it = lst.begin(); it != lst.end(); ++it)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment