Commit 571a1c02 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNET): версия UNetReceiver с отдельным потоком для update

(иметь отдельные потоки на update, безопаснее, если какой-то поток обновления застрянет, но затратно и по памяти и на "переключение между потоками")
parent 018cd873
......@@ -74,10 +74,11 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t _port
auto conf = uniset_conf();
conf->initLogStream(unetlog, myname);
upThread = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::updateThread);
if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evForceUpdate.set<UNetReceiver, &UNetReceiver::forceUpdateEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
}
// -----------------------------------------------------------------------------
......@@ -87,6 +88,7 @@ UNetReceiver::~UNetReceiver()
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec )
{
std::lock_guard<std::mutex> l(tmMutex);
recvTimeout = msec;
ptRecvTimeout.setTiming(msec);
}
......@@ -119,10 +121,6 @@ void UNetReceiver::setReceivePause( timeout_t msec )
void UNetReceiver::setUpdatePause( timeout_t msec )
{
updatepause = msec;
updateTime = (double)updatepause / 1000.0;
if( evUpdate.is_active() )
evUpdate.start(0, updateTime);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
......@@ -150,6 +148,7 @@ void UNetReceiver::setLostPacketsID( UniSetTypes::ObjectId id )
// -----------------------------------------------------------------------------
void UNetReceiver::setLockUpdate( bool st )
{
lockUpdate = st;
if( !st )
......@@ -158,6 +157,7 @@ void UNetReceiver::setLockUpdate( bool st )
// -----------------------------------------------------------------------------
void UNetReceiver::resetTimeout()
{
std::lock_guard<std::mutex> l(tmMutex);
ptRecvTimeout.reset();
trTimeout.change(false);
}
......@@ -174,7 +174,6 @@ bool UNetReceiver::createConnection( bool throwEx )
udp = make_shared<UDPReceiveU>(addr, port);
udp->setCompletion(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
......@@ -219,6 +218,8 @@ void UNetReceiver::start()
{
activated = true;
loop.evrun(this, true);
if( !upThread->isRunning() )
upThread->start();
}
else
forceUpdate();
......@@ -226,7 +227,6 @@ void UNetReceiver::start()
// -----------------------------------------------------------------------------
void UNetReceiver::evprepare( const ev::loop_ref& eloop )
{
evForceUpdate.set(eloop);
evStatistic.set(eloop);
evStatistic.start(0, 1.0); // раз в сек
......@@ -240,9 +240,6 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop )
{
evReceive.set(eloop);
evReceive.start(udp->getSocket(), ev::READ);
evUpdate.set(eloop);
evUpdate.start( 0, updateTime );
}
}
// -----------------------------------------------------------------------------
......@@ -250,18 +247,16 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop )
{
activated = false;
if( evCheckConnection.is_active() )
evCheckConnection.stop();
{
std::lock_guard<std::mutex> l(checkConnMutex);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
}
if( evReceive.is_active() )
evReceive.stop();
if( evUpdate.is_active() )
evUpdate.stop();
if( evForceUpdate.is_active() )
evForceUpdate.stop();
if( evStatistic.is_active() )
evStatistic.stop();
......@@ -271,17 +266,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop )
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate()
{
evForceUpdate.start(0, 0.001);
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdateEvent( ev::timer& watcher, int revents )
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(forceUpdateEvent): EVENT ERROR.." << endl;
return;
}
std::lock_guard<std::mutex> l(packMutex);
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. update)
}
......@@ -316,6 +301,9 @@ void UNetReceiver::update()
while( k > 0 )
{
{
// lock qpack
std::lock_guard<std::mutex> l(packMutex);
if( qpack.empty() )
return;
......@@ -458,6 +446,48 @@ void UNetReceiver::update()
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::updateThread()
{
while( activated )
{
try
{
update();
}
catch( std::exception& ex )
{
unetcrit << myname << "(update_thread): " << ex.what() << endl;
}
if( sidRespond != DefaultObjectId )
{
try
{
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (respond) " << ex.what() << std::endl;
}
}
if( sidLostPackets != DefaultObjectId )
{
try
{
shm->localSetValue(itLostPackets, sidLostPackets, getLostPacketsNum(), shm->ID());
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (lostPackets) " << ex.what() << std::endl;
}
}
msleep(updatepause);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::callback( ev::io& watcher, int revents )
{
if( EV_ERROR & revents )
......@@ -475,10 +505,15 @@ void UNetReceiver::readEvent( ev::io& watcher )
if( !activated )
return;
bool tout = false;
try
{
if( receive() )
{
std::lock_guard<std::mutex> l(tmMutex);
ptRecvTimeout.reset();
}
}
catch( UniSetTypes::Exception& ex)
{
......@@ -489,6 +524,13 @@ void UNetReceiver::readEvent( ev::io& watcher )
unetwarn << myname << "(receive): " << e.what() << std::endl;
}
// делаем через промежуточную переменную
// чтобы поскорее освободить mutex
{
std::lock_guard<std::mutex> l(tmMutex);
tout = ptRecvTimeout.checkTime();
}
// только если "режим подготовки закончился, то можем генерировать "события"
if( ptPrepare.checkTime() && trTimeout.change(ptRecvTimeout.checkTime()) )
{
......@@ -496,7 +538,7 @@ void UNetReceiver::readEvent( ev::io& watcher )
if( w )
{
if( trTimeout.get() )
if( tout )
slEvent(w, evTimeout);
else
slEvent(w, evOK);
......@@ -508,7 +550,7 @@ void UNetReceiver::updateEvent( ev::periodic& tm, int revents )
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(callback): EVENT ERROR.." << endl;
unetcrit << myname << "(updateEvent): EVENT ERROR.." << endl;
return;
}
......@@ -525,7 +567,7 @@ void UNetReceiver::updateEvent( ev::periodic& tm, int revents )
}
catch( std::exception& ex )
{
unetcrit << myname << "(update): " << ex.what() << std::endl;
unetcrit << myname << "(updateEvent): " << ex.what() << std::endl;
}
if( sidRespond != DefaultObjectId )
......@@ -537,7 +579,7 @@ void UNetReceiver::updateEvent( ev::periodic& tm, int revents )
}
catch( const std::exception& ex )
{
unetcrit << myname << "(step): (respond) " << ex.what() << std::endl;
unetcrit << myname << "(updateEvent): (respond) " << ex.what() << std::endl;
}
}
......@@ -549,7 +591,7 @@ void UNetReceiver::updateEvent( ev::periodic& tm, int revents )
}
catch( const std::exception& ex )
{
unetcrit << myname << "(step): (lostPackets) " << ex.what() << std::endl;
unetcrit << myname << "(updateEvent): (lostPackets) " << ex.what() << std::endl;
}
}
}
......@@ -567,6 +609,8 @@ void UNetReceiver::checkConnectionEvent( ev::periodic& tm, int revents )
unetinfo << myname << "(checkConnectionEvent): check connection event.." << endl;
std::lock_guard<std::mutex> l(checkConnMutex);
if( !createConnection(false) )
tm.again();
}
......@@ -575,6 +619,7 @@ void UNetReceiver::stop()
{
unetinfo << myname << ": stop.." << endl;
activated = false;
upThread->join();
loop.evstop(this);
}
// -----------------------------------------------------------------------------
......@@ -646,6 +691,9 @@ bool UNetReceiver::receive()
#endif
{
// lock qpack
std::lock_guard<std::mutex> l(packMutex);
if( !waitClean )
{
qpack.push(pack);
......
......@@ -170,11 +170,11 @@ class UNetReceiver:
bool receive();
void step();
void update();
void updateThread();
void callback( ev::io& watcher, int revents );
void readEvent( ev::io& watcher );
void updateEvent( ev::periodic& watcher, int revents );
void checkConnectionEvent( ev::periodic& watcher, int revents );
void forceUpdateEvent( ev::timer& watcher, int revents );
void statisticsEvent( ev::periodic& watcher, int revents );
virtual void evprepare( const ev::loop_ref& eloop ) override;
virtual void evfinish(const ev::loop_ref& eloop ) override;
......@@ -211,19 +211,19 @@ class UNetReceiver:
ost::tpport_t port = { 0 };
std::string myname;
ev::io evReceive;
ev::periodic evUpdate;
ev::periodic evCheckConnection;
ev::timer evForceUpdate;
ev::periodic evStatistic;
size_t recvCount = { 0 };
size_t upCount = { 0 };
std::shared_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
// делаем loop общим.. одним на всех!
static CommonEventLoop loop;
double updateTime = { 0.01 };
double checkConnectionTime = { 10.0 }; // sec
std::mutex checkConnMutex;
PassiveTimer ptRecvTimeout;
PassiveTimer ptPrepare;
......@@ -244,6 +244,7 @@ class UNetReceiver:
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */
UniSetUDP::UDPPacket r_buf;
std::mutex packMutex; /*!< mutex для работы с очередью */
size_t pnum = { 0 }; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! максимальная разница межд номерами пакетов, при которой считается, что счётчик пакетов
......@@ -261,6 +262,7 @@ class UNetReceiver:
EventSlot slEvent;
Trigger trTimeout;
std::mutex tmMutex;
struct CacheItem
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment