Commit 4b00be19 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNetReceiver): вторая версия механизма "паузы на инициализацию"

на основе ev::timer, вместо PasiveTimer
parent fc266eae
...@@ -83,8 +83,7 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port, const std::shar ...@@ -83,8 +83,7 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port, const std::shar
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this); evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this); evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
ptInitOK.setTiming(initPause);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver() UNetReceiver::~UNetReceiver()
...@@ -148,8 +147,7 @@ void UNetReceiver::setEvrunTimeout( timeout_t msec ) noexcept ...@@ -148,8 +147,7 @@ void UNetReceiver::setEvrunTimeout( timeout_t msec ) noexcept
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setInitPause( timeout_t msec ) noexcept void UNetReceiver::setInitPause( timeout_t msec ) noexcept
{ {
initPause = msec; initPause = (msec / 1000.0);
ptInitOK.setTiming(initPause);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setRespondID( uniset::ObjectId id, bool invert ) noexcept void UNetReceiver::setRespondID( uniset::ObjectId id, bool invert ) noexcept
...@@ -174,9 +172,14 @@ void UNetReceiver::setLockUpdate( bool st ) noexcept ...@@ -174,9 +172,14 @@ void UNetReceiver::setLockUpdate( bool st ) noexcept
ptPrepare.reset(); ptPrepare.reset();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetReceiver::isLockUpdate() const noexcept
{
return lockUpdate;
}
// -----------------------------------------------------------------------------
bool UNetReceiver::isInitOK() const noexcept bool UNetReceiver::isInitOK() const noexcept
{ {
return ptInitOK.checkTime(); return initOK.load();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::resetTimeout() noexcept void UNetReceiver::resetTimeout() noexcept
...@@ -186,6 +189,16 @@ void UNetReceiver::resetTimeout() noexcept ...@@ -186,6 +189,16 @@ void UNetReceiver::resetTimeout() noexcept
trTimeout.change(false); trTimeout.change(false);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetReceiver::isRecvOK() const noexcept
{
return !ptRecvTimeout.checkTime();
}
// -----------------------------------------------------------------------------
size_t UNetReceiver::getLostPacketsNum() const noexcept
{
return lostPackets;
}
// -----------------------------------------------------------------------------
bool UNetReceiver::createConnection( bool throwEx ) bool UNetReceiver::createConnection( bool throwEx )
{ {
if( !activated ) if( !activated )
...@@ -251,10 +264,7 @@ void UNetReceiver::start() ...@@ -251,10 +264,7 @@ void UNetReceiver::start()
} }
if( upStrategy == useUpdateThread && !upThread->isRunning() ) if( upStrategy == useUpdateThread && !upThread->isRunning() )
{
ptInitOK.reset();
upThread->start(); upThread->start();
}
} }
else else
forceUpdate(); forceUpdate();
...@@ -264,7 +274,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept ...@@ -264,7 +274,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
{ {
evStatistic.set(eloop); evStatistic.set(eloop);
evStatistic.start(0, 1.0); // раз в сек evStatistic.start(0, 1.0); // раз в сек
ptInitOK.reset();
evInitPause.set(eloop);
if( upStrategy == useUpdateEventLoop ) if( upStrategy == useUpdateEventLoop )
{ {
...@@ -276,11 +287,13 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept ...@@ -276,11 +287,13 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
{ {
evCheckConnection.set(eloop); evCheckConnection.set(eloop);
evCheckConnection.start(0, checkConnectionTime); evCheckConnection.start(0, checkConnectionTime);
evInitPause.stop();
} }
else else
{ {
evReceive.set(eloop); evReceive.set(eloop);
evReceive.start(udp->getSocket(), ev::READ); evReceive.start(udp->getSocket(), ev::READ);
evInitPause.start(0);
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -336,6 +349,18 @@ void UNetReceiver::statisticsEvent(ev::periodic& tm, int revents) noexcept ...@@ -336,6 +349,18 @@ void UNetReceiver::statisticsEvent(ev::periodic& tm, int revents) noexcept
tm.again(); tm.again();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(initEvent): EVENT ERROR.." << endl;
return;
}
initOK.store(true);
tmr.stop();
}
// -----------------------------------------------------------------------------
void UNetReceiver::update() noexcept void UNetReceiver::update() noexcept
{ {
UniSetUDP::UDPMessage p; UniSetUDP::UDPMessage p;
...@@ -513,8 +538,11 @@ void UNetReceiver::updateThread() noexcept ...@@ -513,8 +538,11 @@ void UNetReceiver::updateThread() noexcept
{ {
try try
{ {
bool r = respondInvert ? !isRecvOK() : isRecvOK(); if( isInitOK() )
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID()); {
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
...@@ -630,8 +658,11 @@ void UNetReceiver::updateEvent( ev::periodic& tm, int revents ) noexcept ...@@ -630,8 +658,11 @@ void UNetReceiver::updateEvent( ev::periodic& tm, int revents ) noexcept
{ {
try try
{ {
bool r = respondInvert ? !isRecvOK() : isRecvOK(); if( isInitOK() )
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID()); {
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
......
...@@ -114,23 +114,13 @@ namespace uniset ...@@ -114,23 +114,13 @@ namespace uniset
// блокировать сохранение данных в SM // блокировать сохранение данных в SM
void setLockUpdate( bool st ) noexcept; void setLockUpdate( bool st ) noexcept;
inline bool isLockUpdate() const noexcept bool isLockUpdate() const noexcept;
{
return lockUpdate;
}
bool isInitOK() const noexcept;
void resetTimeout() noexcept; void resetTimeout() noexcept;
inline bool isRecvOK() const noexcept bool isInitOK() const noexcept;
{ bool isRecvOK() const noexcept;
return !ptRecvTimeout.checkTime(); size_t getLostPacketsNum() const noexcept;
}
inline size_t getLostPacketsNum() const noexcept
{
return lostPackets;
}
void setReceiveTimeout( timeout_t msec ) noexcept; void setReceiveTimeout( timeout_t msec ) noexcept;
void setReceivePause( timeout_t msec ) noexcept; void setReceivePause( timeout_t msec ) noexcept;
...@@ -219,6 +209,7 @@ namespace uniset ...@@ -219,6 +209,7 @@ namespace uniset
void updateEvent( ev::periodic& watcher, int revents ) noexcept; void updateEvent( ev::periodic& watcher, int revents ) noexcept;
void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept; void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
void statisticsEvent( ev::periodic& watcher, int revents ) noexcept; void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
void initEvent( ev::timer& watcher, int revents ) noexcept;
virtual void evprepare( const ev::loop_ref& eloop ) noexcept override; virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
virtual void evfinish(const ev::loop_ref& eloop ) noexcept override; virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
virtual std::string wname() const noexcept override virtual std::string wname() const noexcept override
...@@ -260,6 +251,7 @@ namespace uniset ...@@ -260,6 +251,7 @@ namespace uniset
ev::periodic evCheckConnection; ev::periodic evCheckConnection;
ev::periodic evStatistic; ev::periodic evStatistic;
ev::periodic evUpdate; ev::periodic evUpdate;
ev::timer evInitPause;
UpdateStrategy upStrategy = { useUpdateEventLoop }; UpdateStrategy upStrategy = { useUpdateEventLoop };
...@@ -281,12 +273,14 @@ namespace uniset ...@@ -281,12 +273,14 @@ namespace uniset
PassiveTimer ptRecvTimeout; PassiveTimer ptRecvTimeout;
PassiveTimer ptPrepare; PassiveTimer ptPrepare;
PassiveTimer ptInitOK;
timeout_t recvTimeout = { 5000 }; // msec timeout_t recvTimeout = { 5000 }; // msec
timeout_t prepareTime = { 2000 }; timeout_t prepareTime = { 2000 };
timeout_t evrunTimeout = { 15000 }; timeout_t evrunTimeout = { 15000 };
timeout_t lostTimeout = { 200 }; timeout_t lostTimeout = { 200 };
timeout_t initPause = { 5000 }; // пауза на начальную инициализацию
double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
std::atomic_bool initOK = { false };
PassiveTimer ptLostTimeout; PassiveTimer ptLostTimeout;
size_t lostPackets = { 0 }; /*!< счётчик потерянных пакетов */ size_t lostPackets = { 0 }; /*!< счётчик потерянных пакетов */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment