Commit 333cf17c authored by Pavel Vainerman's avatar Pavel Vainerman

(UNetReceiver): доработал, чтобы в случае если не удалось создать сокет,

процесс с заданным периодом пытался это сделать, но не вылетал. Сделал такую логику умолчательной.
parent 18b435f3
...@@ -65,9 +65,6 @@ Version 2.5 ...@@ -65,9 +65,6 @@ Version 2.5
- Использовать PIMPL - Использовать PIMPL
- подумать.. это серьёзная переделка кода.. - подумать.. это серьёзная переделка кода..
UNET: сделать чтобы "канал"(UReceiver, USender) не вылетал если не удалось создать сокет, а собственно,
делал периодические попытки пока не получиться...(т.к. сеть может подняться позже внешними системами мониторинга)
libev libev
======= =======
- переписать UNetUDP (на подумать) - переписать UNetUDP (на подумать)
......
...@@ -83,6 +83,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh ...@@ -83,6 +83,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000); steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100); int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100);
int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount", it.getProp("maxProcessingCount"), 100); int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount", it.getProp("maxProcessingCount"), 100);
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
no_sender = conf->getArgInt("--" + prefix + "-nosender", it.getProp("nosender")); no_sender = conf->getArgInt("--" + prefix + "-nosender", it.getProp("nosender"));
...@@ -322,6 +323,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh ...@@ -322,6 +323,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
r->setLostTimeout(lostTimeout); r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause); r->setReceivePause(recvpause);
r->setUpdatePause(updatepause); r->setUpdatePause(updatepause);
r->setCheckConnectionPause(checkConnectionPause);
r->setMaxDifferens(maxDiff); r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount); r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id, resp_invert); r->setRespondID(resp_id, resp_invert);
...@@ -348,6 +350,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh ...@@ -348,6 +350,7 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
r2->setLostTimeout(lostTimeout); r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause); r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause); r2->setUpdatePause(updatepause);
r2->setCheckConnectionPause(checkConnectionPause);
r2->setMaxDifferens(maxDiff); r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount); r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id, resp_invert); r2->setRespondID(resp2_id, resp_invert);
...@@ -784,6 +787,7 @@ void UNetExchange::help_print( int argc, const char* argv[] ) ...@@ -784,6 +787,7 @@ void UNetExchange::help_print( int argc, const char* argv[] )
cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl; cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl;
cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с recvpause и sendpause). По умолчанию 100" << endl; cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с recvpause и sendpause). По умолчанию 100" << endl;
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl; cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl; cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - время на ожидание старта SM" << endl; cout << "--prefix-maxprocessingcount num - время на ожидание старта SM" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl; cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
......
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
"получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM. "получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM.
При этом "получатели" работют на одном(!) потоке с использованием событий libev (см. UNetReceiver). При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver).
\par \par
При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать", При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать",
......
...@@ -38,7 +38,7 @@ bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs, ...@@ -38,7 +38,7 @@ bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
} }
*/ */
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi ): UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection ):
shm(smi), shm(smi),
recvpause(10), recvpause(10),
updatepause(100), updatepause(100),
...@@ -65,40 +65,16 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port, ...@@ -65,40 +65,16 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
myname = s.str(); myname = s.str();
} }
addr = s_host.c_str();
unetlog = make_shared<DebugStream>(); unetlog = make_shared<DebugStream>();
unetlog->setLogName(myname); unetlog->setLogName(myname);
auto conf = uniset_conf(); auto conf = uniset_conf();
conf->initLogStream(unetlog, myname); conf->initLogStream(unetlog, myname);
ost::Thread::setException(ost::Thread::throwException); if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
try
{
addr = s_host.c_str();
udp = make_shared<UDPReceiveU>(addr, port);
udp->setCompletion(false); // делаем неблокирующее чтение (нужно для libev)
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << ": " << e.what();
unetcrit << s.str() << std::endl;
throw SystemError(s.str());
}
catch( ... )
{
ostringstream s;
s << myname << ": catch...";
unetcrit << s.str() << std::endl;
throw SystemError(s.str());
}
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
ptRecvTimeout.setTiming(recvTimeout);
ptPrepare.setTiming(prepareTime);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver() UNetReceiver::~UNetReceiver()
...@@ -118,6 +94,14 @@ void UNetReceiver::setPrepareTime( timeout_t msec ) ...@@ -118,6 +94,14 @@ void UNetReceiver::setPrepareTime( timeout_t msec )
ptPrepare.setTiming(msec); ptPrepare.setTiming(msec);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setCheckConnectionPause( timeout_t msec )
{
checkConnectionTime = (double)msec / 1000.0;
if( evCheckConnection.is_active() )
evCheckConnection.start(0,checkConnectionTime);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setLostTimeout( timeout_t msec ) void UNetReceiver::setLostTimeout( timeout_t msec )
{ {
lostTimeout = msec; lostTimeout = msec;
...@@ -135,7 +119,7 @@ void UNetReceiver::setUpdatePause( timeout_t msec ) ...@@ -135,7 +119,7 @@ void UNetReceiver::setUpdatePause( timeout_t msec )
updateTime = (double)updatepause / 1000.0; updateTime = (double)updatepause / 1000.0;
if( evUpdate.is_active() ) if( evUpdate.is_active() )
evUpdate.start(updateTime); evUpdate.start(0,updateTime);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set ) void UNetReceiver::setMaxProcessingCount( int set )
...@@ -177,6 +161,51 @@ void UNetReceiver::resetTimeout() ...@@ -177,6 +161,51 @@ void UNetReceiver::resetTimeout()
trTimeout.change(false); trTimeout.change(false);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetReceiver::createConnection( bool throwEx )
{
if( !activated )
return false;
ost::Thread::setException(ost::Thread::throwException);
try
{
udp = make_shared<UDPReceiveU>(addr, port);
udp->setCompletion(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
ptRecvTimeout.setTiming(recvTimeout);
ptPrepare.setTiming(prepareTime);
if( activated )
evprepare(loop.evloop());
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
catch( ... )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
return ( udp!=nullptr );
}
// -----------------------------------------------------------------------------
void UNetReceiver::start() void UNetReceiver::start()
{ {
unetinfo << myname << ":... start... " << endl; unetinfo << myname << ":... start... " << endl;
...@@ -192,17 +221,31 @@ void UNetReceiver::start() ...@@ -192,17 +221,31 @@ void UNetReceiver::start()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::evprepare( const ev::loop_ref& eloop ) void UNetReceiver::evprepare( const ev::loop_ref& eloop )
{ {
evReceive.set(eloop); if( !udp )
evReceive.start(udp->getSocket(), ev::READ); {
evCheckConnection.set(eloop);
evCheckConnection.start(0,checkConnectionTime);
}
else
{
evReceive.set(eloop);
evReceive.start(udp->getSocket(), ev::READ);
evUpdate.set(eloop); evUpdate.set(eloop);
evUpdate.start( updateTime ); evUpdate.start( 0, updateTime );
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::evfinish( const ev::loop_ref& eloop ) void UNetReceiver::evfinish( const ev::loop_ref& eloop )
{ {
activated = false; activated = false;
{
uniset_mutex_lock l(checkConnMutex);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
}
if( evReceive.is_active() ) if( evReceive.is_active() )
evReceive.stop(); evReceive.stop();
...@@ -442,7 +485,7 @@ void UNetReceiver::readEvent( ev::io& watcher ) ...@@ -442,7 +485,7 @@ void UNetReceiver::readEvent( ev::io& watcher )
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::updateEvent(ev::periodic& tm, int revents ) void UNetReceiver::updateEvent( ev::periodic& tm, int revents )
{ {
if( EV_ERROR & revents ) if( EV_ERROR & revents )
{ {
...@@ -453,8 +496,8 @@ void UNetReceiver::updateEvent(ev::periodic& tm, int revents ) ...@@ -453,8 +496,8 @@ void UNetReceiver::updateEvent(ev::periodic& tm, int revents )
if( !activated ) if( !activated )
return; return;
// взводим // взводим таймер опять..
tm.start(updateTime); tm.again();
// собственно обработка события // собственно обработка события
try try
...@@ -492,6 +535,24 @@ void UNetReceiver::updateEvent(ev::periodic& tm, int revents ) ...@@ -492,6 +535,24 @@ void UNetReceiver::updateEvent(ev::periodic& tm, int revents )
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::checkConnectionEvent( ev::periodic& tm, int revents )
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(checkConnectionEvent): EVENT ERROR.." << endl;
return;
}
if( !activated )
return;
unetinfo << myname << "(checkConnectionEvent): check connection..(checkConnectionTime=" << checkConnectionTime << ")" << endl;
uniset_mutex_lock l(checkConnMutex);
if( !createConnection(false) )
tm.again();
}
// -----------------------------------------------------------------------------
void UNetReceiver::stop() void UNetReceiver::stop()
{ {
unetinfo << myname << ": stop.." << endl; unetinfo << myname << ": stop.." << endl;
......
...@@ -74,6 +74,17 @@ ...@@ -74,6 +74,17 @@
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди. * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
* ========================================================================= * =========================================================================
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.. * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
*
* Создание соединения (открытие сокета)
* ======================================
* Попытка создать сокет производиться сразу в конструкторе, если это не получается,
* то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
* открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
* (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
* ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
* Если такая логика не требуется, то можно задать в конструкторе
* последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
*/ */
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetReceiver: class UNetReceiver:
...@@ -81,7 +92,7 @@ class UNetReceiver: ...@@ -81,7 +92,7 @@ class UNetReceiver:
public std::enable_shared_from_this<UNetReceiver> public std::enable_shared_from_this<UNetReceiver>
{ {
public: public:
UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi ); UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection=false );
virtual ~UNetReceiver(); virtual ~UNetReceiver();
void start(); void start();
...@@ -115,6 +126,7 @@ class UNetReceiver: ...@@ -115,6 +126,7 @@ class UNetReceiver:
void setUpdatePause( timeout_t msec ); void setUpdatePause( timeout_t msec );
void setLostTimeout( timeout_t msec ); void setLostTimeout( timeout_t msec );
void setPrepareTime( timeout_t msec ); void setPrepareTime( timeout_t msec );
void setCheckConnectionPause( timeout_t msec );
void setMaxDifferens( unsigned long set ); void setMaxDifferens( unsigned long set );
void setRespondID( UniSetTypes::ObjectId id, bool invert = false ); void setRespondID( UniSetTypes::ObjectId id, bool invert = false );
...@@ -161,14 +173,16 @@ class UNetReceiver: ...@@ -161,14 +173,16 @@ class UNetReceiver:
void callback( ev::io& watcher, int revents ); void callback( ev::io& watcher, int revents );
void readEvent( ev::io& watcher ); void readEvent( ev::io& watcher );
void updateEvent( ev::periodic& watcher, int revents ); void updateEvent( ev::periodic& watcher, int revents );
void checkConnectionEvent( ev::periodic& watcher, int revents );
virtual void evprepare( const ev::loop_ref& eloop ) override; virtual void evprepare( const ev::loop_ref& eloop ) override;
virtual void evfinish(const ev::loop_ref& eloop ) override; virtual void evfinish(const ev::loop_ref& eloop ) override;
virtual std::string wname() virtual std::string wname() override
{ {
return myname; return myname;
} }
void initIterators(); void initIterators();
bool createConnection( bool throwEx = false );
public: public:
...@@ -196,11 +210,14 @@ class UNetReceiver: ...@@ -196,11 +210,14 @@ class UNetReceiver:
std::string myname; std::string myname;
ev::io evReceive; ev::io evReceive;
ev::periodic evUpdate; ev::periodic evUpdate;
ev::periodic evCheckConnection;
// делаем loop общим.. одним на всех! // делаем loop общим.. одним на всех!
static CommonEventLoop loop; static CommonEventLoop loop;
double updateTime = { 0.0 }; double updateTime = { 0.01 };
double checkConnectionTime = { 10.0 }; // sec
UniSetTypes::uniset_mutex checkConnMutex;
UniSetTypes::uniset_rwmutex pollMutex; UniSetTypes::uniset_rwmutex pollMutex;
PassiveTimer ptRecvTimeout; PassiveTimer ptRecvTimeout;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment