Commit b6692ee6 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNet): сделал возможность задавать стратегию обновления данных в SM

'thread' - поток у каждого Receiver 'evloop' - использование общего event loop (libev). По умолчанию - evloop
parent 2f6a0230
......@@ -85,6 +85,8 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount", it.getProp("maxProcessingCount"), 100);
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStartegy"),"evloop");
no_sender = conf->getArgInt("--" + prefix + "-nosender", it.getProp("nosender"));
std::string nconfname = conf->getArg2Param("--" + prefix + "-nodes-confnode", it.getProp("nodesConfNode"), "nodes");
......@@ -310,6 +312,15 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
}
}
UNetReceiver::UpdateStrategy r_upStrategy = UNetReceiver::strToUpdateStrategy( n_it.getProp2("unet_update_strategy",updateStrategy) );
if( r_upStrategy == UNetReceiver::useUpdateUnknown )
{
ostringstream err;
err << myname << ": Unknown update strategy!!! '" << n_it.getProp2("unet_update_startegy",updateStrategy) << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm);
......@@ -331,6 +342,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r->setUpdateStrategy(r_upStrategy);
shared_ptr<UNetReceiver> r2(nullptr);
......@@ -358,6 +370,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r2->setUpdateStrategy(r_upStrategy);
}
}
catch(...)
......@@ -791,6 +804,11 @@ void UNetExchange::help_print( int argc, const char* argv[] )
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - время на ожидание старта SM" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-update-strategy [thread,evloop] - Стратегия обновления данных в SM. " << endl;
cout << " 'thread' - у каждого UNetReceiver отдельный поток" << endl;
cout << " 'evloop' - используется общий (с приёмом сообщений) event loop" << endl;
cout << " По умолчанию: evloop" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 15000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
......
......@@ -50,7 +50,7 @@
\section pgUNetUDP_Common Общее описание
Обмен построен на основе протокола UDP.
Основная идея заключается в том, что каждый узел на порту равном своему ID
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
"получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM.
При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver).
......@@ -73,11 +73,16 @@
...
</iocards>
</item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_update_strategy="evloop"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes>
\endcode
* \b unet_update_strategy - задаёт стратегию обновления данных в SM.
Поддерживается два варианта:
- 'thread' - отдельный поток обновления
- 'evloop' - использование общего с приёмом event loop (libev)
\note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра
--prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes>
......
......@@ -82,6 +82,7 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port, const std::shar
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
......@@ -123,6 +124,9 @@ void UNetReceiver::setReceivePause( timeout_t msec )
void UNetReceiver::setUpdatePause( timeout_t msec )
{
updatepause = msec;
if( upStrategy == useUpdateEventLoop && evUpdate.is_active() )
evUpdate.start(0, (float)updatepause/1000.);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
......@@ -174,6 +178,8 @@ bool UNetReceiver::createConnection( bool throwEx )
udp = make_shared<UDPReceiveU>(addr, port);
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
if( upStrategy == useUpdateEventLoop )
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
......@@ -219,7 +225,7 @@ void UNetReceiver::start()
activated = true;
loop.evrun(this, true);
if( !upThread->isRunning() )
if( upStrategy == useUpdateThread && !upThread->isRunning() )
upThread->start();
}
else
......@@ -229,9 +235,15 @@ void UNetReceiver::start()
void UNetReceiver::evprepare( const ev::loop_ref& eloop )
{
evStatistic.set(eloop);
evStatistic.start(0, 1.0); // раз в сек
if( upStrategy == useUpdateEventLoop )
{
evUpdate.set(eloop);
evUpdate.start();
evUpdate.start( 0, ((float)updatepause/1000.) );
}
if( !udp )
{
evCheckConnection.set(eloop);
......@@ -261,13 +273,16 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop )
if( evStatistic.is_active() )
evStatistic.stop();
if( evUpdate.is_active() )
evUpdate.stop();
//udp->disconnect();
udp = nullptr;
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate()
{
std::lock_guard<std::mutex> l(packMutex);
pack_guard l(packMutex,upStrategy);
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. update)
}
......@@ -306,7 +321,7 @@ void UNetReceiver::update()
{
{
// lock qpack
std::lock_guard<std::mutex> l(packMutex);
pack_guard l(packMutex,upStrategy);
if( qpack.empty() )
return;
......@@ -705,7 +720,7 @@ bool UNetReceiver::receive()
{
// lock qpack
std::lock_guard<std::mutex> l(packMutex);
pack_guard l(packMutex,upStrategy);
if( !waitClean )
{
......@@ -847,6 +862,48 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl )
{
slEvent = sl;
}
UNetReceiver::UpdateStrategy UNetReceiver::strToUpdateStrategy(const string& s)
{
if( s == "thread" || s == "THREAD" )
return useUpdateThread;
if( s == "evloop" || s == "EVLOOP" )
return useUpdateEventLoop;
return useUpdateUnknown;
}
// -----------------------------------------------------------------------------
string UNetReceiver::to_string(UNetReceiver::UpdateStrategy s)
{
if( s == useUpdateThread )
return "thread";
if( s == useUpdateEventLoop )
return "evloop";
return "";
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdateStrategy( UNetReceiver::UpdateStrategy set )
{
if( set == useUpdateEventLoop && upThread->isRunning() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateEventLoop' strategy but updateThread is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
if( set == useUpdateThread && evUpdate.is_active() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateThread' strategy but update event loop is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
upStrategy = set;
}
// -----------------------------------------------------------------------------
const std::string UNetReceiver::getShortInfo() const
{
......@@ -860,6 +917,7 @@ const std::string UNetReceiver::getShortInfo() const
<< " recvOK=" << isRecvOK()
<< " receivepack=" << rnum
<< " lostPackets=" << setw(6) << getLostPacketsNum()
<< " updateStartegy=" << to_string(upStrategy)
<< endl
<< "\t["
<< " recvTimeout=" << setw(6) << recvTimeout
......@@ -877,3 +935,17 @@ const std::string UNetReceiver::getShortInfo() const
return std::move(s.str());
}
// -----------------------------------------------------------------------------
UNetReceiver::pack_guard::pack_guard( mutex& _m, UNetReceiver::UpdateStrategy _s ):
m(_m),
s(_s)
{
if( s == useUpdateThread )
m.lock();
}
// -----------------------------------------------------------------------------
UNetReceiver::pack_guard::~pack_guard()
{
if( s == useUpdateThread )
m.unlock();
}
// -----------------------------------------------------------------------------
......@@ -84,6 +84,13 @@
* Если такая логика не требуется, то можно задать в конструкторе
* последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
*
* Стратегия обновления данных в SM
* ==================================
* При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
* Поддерживается два варианта:
* 'thread' - отдельный поток обновления
* 'evloop' - использование общего с приёмом event loop (libev)
*/
// -----------------------------------------------------------------------------
class UNetReceiver:
......@@ -154,6 +161,36 @@ class UNetReceiver:
typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
void connectEvent( EventSlot sl );
// --------------------------------------------------------------------
/*! Стратегия обработки сообщений */
enum UpdateStrategy
{
useUpdateUnknown,
useUpdateThread, /*!< использовать отдельный поток */
useUpdateEventLoop /*!< использовать event loop (т.е. совместно с receive) */
};
static UpdateStrategy strToUpdateStrategy( const std::string& s );
static std::string to_string( UpdateStrategy s);
//! функция должна вызываться до первого вызова start()
void setUpdateStrategy( UpdateStrategy set );
// специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
// (т.к. при evloop mutex захватытвать не нужно)
class pack_guard
{
public:
pack_guard( std::mutex& m, UpdateStrategy s );
~pack_guard();
protected:
std::mutex& m;
UpdateStrategy s;
};
// --------------------------------------------------------------------
inline std::shared_ptr<DebugStream> getLog()
{
return unetlog;
......@@ -197,6 +234,7 @@ class UNetReceiver:
return lhs.num > rhs.num;
}
};
typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
private:
......@@ -213,6 +251,9 @@ class UNetReceiver:
ev::io evReceive;
ev::periodic evCheckConnection;
ev::periodic evStatistic;
ev::periodic evUpdate;
UpdateStrategy upStrategy = { useUpdateEventLoop };
// счётчики для подсчёта статистики
size_t recvCount = { 0 };
......
......@@ -10,5 +10,5 @@ cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy evloop
#--unet-log-add-levels any
#!/bin/sh
# '--' - нужен для отделения аргументов catch, от наших..
cd ../../../Utilities/Admin/
./uniset2-start.sh -f ./create_links.sh
./uniset2-start.sh -f ./create
./uniset2-start.sh -f ./exist | grep -q UNISET_PLC/Controllers || exit 1
cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy thread
#--unet-log-add-levels any
AT_SETUP([UNetUDP tests (with SM)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm.sh],[0],[ignore],[ignore])
AT_SETUP([UNetUDP tests (with SM)(thread)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_thread.sh],[0],[ignore],[ignore])
AT_CLEANUP
AT_SETUP([UNetUDP tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP
# AT_SETUP([UNetUDP tests (Apart)])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment