Commit 1bf3a239 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNetSender): сделал так, чтобы процесс не вылетал если не удалось создать

соединение, а периодически пытался его создать..
parent 333cf17c
...@@ -160,8 +160,9 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh ...@@ -160,8 +160,9 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
} }
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl; unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = make_shared<UNetSender>(h, p, shm, s_field, s_fvalue, prefix); sender = make_shared<UNetSender>(h, p, shm, false, s_field, s_fvalue, prefix);
sender->setSendPause(sendpause); sender->setSendPause(sendpause);
sender->setCheckConnectionPause(checkConnectionPause);
loga->add(sender->getLog()); loga->add(sender->getLog());
try try
...@@ -170,8 +171,9 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh ...@@ -170,8 +171,9 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
if( !h2.empty() ) if( !h2.empty() )
{ {
unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl; unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = make_shared<UNetSender>(h2, p2, shm, s_field, s_fvalue, prefix); sender2 = make_shared<UNetSender>(h2, p2, shm, false, s_field, s_fvalue, prefix);
sender2->setSendPause(sendpause); sender2->setSendPause(sendpause);
sender2->setCheckConnectionPause(checkConnectionPause);
loga->add(sender2->getLog()); loga->add(sender2->getLog());
} }
} }
......
...@@ -546,7 +546,7 @@ void UNetReceiver::checkConnectionEvent( ev::periodic& tm, int revents ) ...@@ -546,7 +546,7 @@ void UNetReceiver::checkConnectionEvent( ev::periodic& tm, int revents )
if( !activated ) if( !activated )
return; return;
unetinfo << myname << "(checkConnectionEvent): check connection..(checkConnectionTime=" << checkConnectionTime << ")" << endl; unetinfo << myname << "(checkConnectionEvent): check connection event.." << endl;
uniset_mutex_lock l(checkConnMutex); uniset_mutex_lock l(checkConnMutex);
if( !createConnection(false) ) if( !createConnection(false) )
......
...@@ -25,9 +25,9 @@ using namespace std; ...@@ -25,9 +25,9 @@ using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace UniSetExtensions; using namespace UniSetExtensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, UNetSender::UNetSender(const std::string& s_host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi,
const std::string& s_f, const std::string& s_val, const std::string& s_prefix, bool nocheckConnection, const std::string& s_f, const std::string& s_val,
size_t maxDCount, size_t maxACount ): const std::string& s_prefix, size_t maxDCount, size_t maxACount ):
s_field(s_f), s_field(s_f),
s_fvalue(s_val), s_fvalue(s_val),
prefix(s_prefix), prefix(s_prefix),
...@@ -64,27 +64,10 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con ...@@ -64,27 +64,10 @@ UNetSender::UNetSender( const std::string& s_host, const ost::tpport_t port, con
unetinfo << "(UNetSender): UDP set to " << s_host << ":" << port << endl; unetinfo << "(UNetSender): UDP set to " << s_host << ":" << port << endl;
ost::Thread::setException(ost::Thread::throwException); addr = s_host.c_str();
try ptCheckConnection.setTiming(10000); // default 10 сек
{ createConnection(nocheckConnection);
addr = s_host.c_str();
udp = make_shared<ost::UDPBroadcast>(addr, port);
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << ": " << e.what();
unetcrit << s.str() << std::endl;
throw SystemError(s.str());
}
catch( ... )
{
ostringstream s;
s << myname << ": catch...";
unetcrit << s.str() << std::endl;
throw SystemError(s.str());
}
s_thr = make_shared< ThreadCreator<UNetSender> >(this, &UNetSender::send); s_thr = make_shared< ThreadCreator<UNetSender> >(this, &UNetSender::send);
...@@ -123,6 +106,40 @@ UNetSender::~UNetSender() ...@@ -123,6 +106,40 @@ UNetSender::~UNetSender()
{ {
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetSender::createConnection( bool throwEx )
{
ost::Thread::setException(ost::Thread::throwException);
unetinfo << myname << "(createConnection): .." << endl;
try
{
udp = make_shared<ost::UDPBroadcast>(addr, port);
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
catch( ... )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
return (udp != nullptr);
}
// -----------------------------------------------------------------------------
void UNetSender::updateFromSM() void UNetSender::updateFromSM()
{ {
auto it = dlist.begin(); auto it = dlist.begin();
...@@ -173,30 +190,40 @@ void UNetSender::updateItem( DMap::iterator& it, long value ) ...@@ -173,30 +190,40 @@ void UNetSender::updateItem( DMap::iterator& it, long value )
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::setCheckConnectionPause( int msec )
{
if( msec > 0 )
ptCheckConnection.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetSender::send() void UNetSender::send()
{ {
dlist.resize(maxItem); dlist.resize(maxItem);
unetinfo << myname << "(send): dlist size = " << dlist.size() << endl; unetinfo << myname << "(send): dlist size = " << dlist.size() << endl;
/*
ost::IPV4Broadcast h = s_host.c_str();
try
{
udp->setPeer(h,port);
}
catch( ost::SockException& e )
{
ostringstream s;
s << e.getString() << ": " << e.getSystemErrorString();
unetcrit << myname << "(poll): " << s.str() << endl;
throw SystemError(s.str());
}
*/
ncycle = 0; ncycle = 0;
ptCheckConnection.reset();
while( activated ) while( activated )
{ {
if( !udp )
{
if( !ptCheckConnection.checkTime() )
{
msleep(sendpause);
continue;
}
unetinfo << myname << "(send): check connection event.." << endl;
if( !createConnection(false) )
{
ptCheckConnection.reset();
msleep(sendpause);
continue;
}
}
try try
{ {
if( !shm->isLocalwork() ) if( !shm->isLocalwork() )
...@@ -274,7 +301,7 @@ void UNetSender::real_send(UniSetUDP::UDPMessage& mypack) ...@@ -274,7 +301,7 @@ void UNetSender::real_send(UniSetUDP::UDPMessage& mypack)
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
if( !udp->isPending(ost::Socket::pendingOutput) ) if( !udp || !udp->isPending(ost::Socket::pendingOutput) )
return; return;
mypack.transport_msg(s_msg); mypack.transport_msg(s_msg);
......
...@@ -47,11 +47,24 @@ ...@@ -47,11 +47,24 @@
* ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetReceiver) сделана следующая логика: * ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetReceiver) сделана следующая логика:
* Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента * Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.. последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним..
*
*
* Создание соединения
* ======================================
* Попытка создать соединение производиться сразу в конструкторе, если это не получается,
* то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
* и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
* (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
* ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
* Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
* тогда при создании объекта UNetSender, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
* \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
*/ */
class UNetSender class UNetSender
{ {
public: public:
UNetSender( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, UNetSender( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection=false,
const std::string& s_field = "", const std::string& s_fvalue = "", const std::string& prefix = "unet", const std::string& s_field = "", const std::string& s_fvalue = "", const std::string& prefix = "unet",
size_t maxDCount = UniSetUDP::MaxDCount, size_t maxACount = UniSetUDP::MaxACount ); size_t maxDCount = UniSetUDP::MaxDCount, size_t maxACount = UniSetUDP::MaxACount );
...@@ -106,6 +119,8 @@ class UNetSender ...@@ -106,6 +119,8 @@ class UNetSender
packsendpause = msec; packsendpause = msec;
} }
void setCheckConnectionPause( int msec );
/*! заказать датчики */ /*! заказать датчики */
void askSensors( UniversalIO::UIOCommand cmd ); void askSensors( UniversalIO::UIOCommand cmd );
...@@ -151,6 +166,8 @@ class UNetSender ...@@ -151,6 +166,8 @@ class UNetSender
void readConfiguration(); void readConfiguration();
bool createConnection( bool throwEx );
private: private:
UNetSender(); UNetSender();
...@@ -163,6 +180,7 @@ class UNetSender ...@@ -163,6 +180,7 @@ class UNetSender
timeout_t sendpause = { 150 }; timeout_t sendpause = { 150 };
timeout_t packsendpause = { 5 }; timeout_t packsendpause = { 5 };
std::atomic_bool activated = { false }; std::atomic_bool activated = { false };
PassiveTimer ptCheckConnection;
UniSetTypes::uniset_rwmutex pack_mutex; UniSetTypes::uniset_rwmutex pack_mutex;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment