Commit 04b387b3 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNetReceiver): перевёл все receiver-ы на использование CommonEventLoop

parent fc665a71
......@@ -698,11 +698,21 @@ bool UNetExchange::activateObject()
return true;
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sigterm( int signo )
bool UNetExchange::deactivateObject()
{
unetinfo << myname << ": ********* SIGTERM(" << signo << ") ********" << endl;
activated = false;
if( activated )
{
unetinfo << myname << "(deactivateObject): disactivate.." << endl;
activated = false;
termReceivers();
termSenders();
}
return UniSetObject::deactivateObject();
}
// ------------------------------------------------------------------------------------------
void UNetExchange::termReceivers()
{
for( const auto& it : recvlist )
{
try
......@@ -719,7 +729,10 @@ void UNetExchange::sigterm( int signo )
}
catch(...) {}
}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::termSenders()
{
try
{
if( sender )
......@@ -733,6 +746,15 @@ void UNetExchange::sigterm( int signo )
sender2->stop();
}
catch(...) {}
}
// ------------------------------------------------------------------------------------------
void UNetExchange::sigterm( int signo )
{
unetinfo << myname << ": ********* SIGTERM(" << signo << ") ********" << endl;
activated = false;
termReceivers();
termSenders();
UniSetObject::sigterm(signo);
}
......
......@@ -157,10 +157,13 @@ class UNetExchange:
void waitSMReady();
void receiverEvent( const std::shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev );
virtual bool activateObject();
virtual bool activateObject() override;
virtual bool deactivateObject() override;
// действия при завершении работы
virtual void sigterm( int signo );
void termSenders();
void termReceivers();
void initIterators();
void startReceivers();
......@@ -170,7 +173,7 @@ class UNetExchange:
tmStep
};
private:
private:
UNetExchange();
timeout_t initPause;
UniSetTypes::uniset_rwmutex mutex_start;
......
......@@ -25,6 +25,8 @@ using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
CommonEventLoop UNetReceiver::loop;
// -----------------------------------------------------------------------------
/*
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
......@@ -101,8 +103,6 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
{
evReceive.stop();
evUpdate.stop();
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec )
......@@ -182,25 +182,32 @@ void UNetReceiver::start()
if( !activated )
{
activated = true;
evrun(true);
loop.evrun(this,true);
}
else
forceUpdate();
}
// -----------------------------------------------------------------------------
void UNetReceiver::evprepare()
void UNetReceiver::evprepare( const ev::loop_ref& eloop )
{
evReceive.set(loop);
evReceive.set(eloop);
evReceive.start(udp->getSocket(),ev::READ);
evUpdate.set(loop);
evUpdate.set(eloop);
evUpdate.start( updateTime );
}
// -----------------------------------------------------------------------------
void UNetReceiver::evfinish()
void UNetReceiver::evfinish( const ev::loop_ref& eloop )
{
evReceive.stop();
evUpdate.stop();
activated = false;
if( evReceive.is_active() )
evReceive.stop();
if( evUpdate.is_active() )
evUpdate.stop();
//udp->disconnect();
udp = nullptr;
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate()
......@@ -419,10 +426,14 @@ void UNetReceiver::readEvent( ev::io& watcher )
// только если "режим подготовки закончился, то можем генерировать "события"
if( ptPrepare.checkTime() && trTimeout.change(tout) )
{
if( tout )
slEvent(shared_from_this(), evTimeout);
else
slEvent(shared_from_this(), evOK);
auto w = shared_from_this();
if( w )
{
if( tout )
slEvent(w, evTimeout);
else
slEvent(w, evOK);
}
}
}
// -----------------------------------------------------------------------------
......@@ -480,7 +491,7 @@ void UNetReceiver::stop()
{
unetinfo << myname << ": stop.." << endl;
activated = false;
evstop();
loop.evstop(this);
}
// -----------------------------------------------------------------------------
bool UNetReceiver::receive()
......
......@@ -31,7 +31,7 @@
#include "SMInterface.h"
#include "SharedMemory.h"
#include "UDPPacket.h"
#include "EventLoopServer.h"
#include "CommonEventLoop.h"
#include "UDPCore.h"
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
......@@ -77,8 +77,8 @@
*/
// -----------------------------------------------------------------------------
class UNetReceiver:
public std::enable_shared_from_this<UNetReceiver>,
public EventLoopServer
protected EvWatcher,
public std::enable_shared_from_this<UNetReceiver>
{
public:
UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi );
......@@ -161,8 +161,9 @@ class UNetReceiver:
void callback( ev::io& watcher, int revents );
void readEvent( ev::io& watcher );
void updateEvent( ev::periodic& watcher, int revents );
virtual void evfinish() override;
virtual void evprepare() override;
virtual void evprepare( const ev::loop_ref& eloop ) override;
virtual void evfinish(const ev::loop_ref& eloop ) override;
virtual std::string wname(){ return myname; }
void initIterators();
......@@ -192,6 +193,10 @@ class UNetReceiver:
std::string myname;
ev::io evReceive;
ev::periodic evUpdate;
// делаем loop общим.. одним на всех!
static CommonEventLoop loop;
double updateTime = { 0.0 };
UniSetTypes::uniset_rwmutex pollMutex;
......
......@@ -207,11 +207,17 @@ void UNetSender::send()
if( it.first > 1 && (ncycle % it.first) != 0 )
continue;
if( !activated )
break;
auto& pk = it.second;
int size = pk.size();
for(int i = 0; i < size; ++i)
{
if( !activated )
break;
real_send(pk[i]);
msleep(packsendpause);
}
......@@ -236,6 +242,9 @@ void UNetSender::send()
unetwarn << myname << "(send): catch ..." << std::endl;
}
if( !activated )
break;
msleep(sendpause);
}
......@@ -279,6 +288,8 @@ void UNetSender::stop()
{
activated = false;
// s_thr->stop();
if( s_thr )
s_thr->join();
}
// -----------------------------------------------------------------------------
void UNetSender::start()
......
......@@ -3,6 +3,6 @@
uniset2-start.sh -f ./uniset2-unetexchange --unet-name UNetExchange --unet-run-logserver \
--confile test.xml --smemory-id SharedMemory \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 --unet-sendpause 1000 \
--dlog-add-levels info,crit,warn --unet-log-add-levels info,crit,warn $*
--dlog-add-levels info,crit,warn --unet-log-add-levels info,crit,warn,any $*
#--unet-nodes-confnode specnet
......@@ -50,7 +50,7 @@ class CommonEventLoop
bool evrun( EvWatcher* w, bool thread = true );
/*! \return TRUE - если это был последний EvWatcher и loop остановлен */
bool evstop( EvWatcher* s );
bool evstop( EvWatcher* w );
inline const ev::loop_ref evloop(){ return loop; }
......@@ -76,7 +76,8 @@ class CommonEventLoop
std::mutex wlist_mutex;
std::list<EvWatcher*> wlist;
// готовящийся Watcher..(он может быть только один, единицу времени)
// готовящийся Watcher..он может быть только один в единицу времени
// это гарантирует prep_mutex
EvWatcher* wprep = { nullptr };
ev::async evprep;
std::condition_variable prep_event;
......
......@@ -207,7 +207,6 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
void LogServer::sessionFinished( LogSession* s )
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i = slist.begin(); i != slist.end(); ++i )
{
if( i->get() == s )
......
......@@ -462,22 +462,6 @@ void UniSetActivator::stop()
deactivate();
try
{
deactivateObject();
}
catch( const omniORB::fatalException& fe )
{
ucrit << myname << "(stop): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(stop): file: " << fe.file() << endl;
ucrit << myname << "(stop): line: " << fe.line() << endl;
ucrit << myname << "(stop): mesg: " << fe.errmsg() << endl;
}
catch( const std::exception& ex )
{
ucrit << myname << "(stop): " << ex.what() << endl;
}
ulogsys << myname << "(stop): deactivate ok. " << endl;
ulogsys << myname << "(stop): discard request..." << endl;
......
......@@ -27,7 +27,7 @@ CommonEventLoop::~CommonEventLoop()
}
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::evrun( EvWatcher* w, bool thread )
bool CommonEventLoop::evrun(EvWatcher* w, bool thread )
{
if( !w )
return false;
......@@ -80,25 +80,21 @@ bool CommonEventLoop::evIsActive()
// -------------------------------------------------------------------------
bool CommonEventLoop::evstop( EvWatcher* w )
{
if( !w )
return false;
std::unique_lock<std::mutex> l(wlist_mutex);
for( auto i = wlist.begin(); i!=wlist.end(); i++ )
try
{
if( (*i) == w )
{
try
{
w->evfinish(loop); // для этого Watcher это уже finish..
}
catch( std::exception& ex )
{
cerr << "(CommonEventLoop::evfinish): evfinish err: " << ex.what() << endl;
}
wlist.erase(i);
break;
}
w->evfinish(loop); // для этого Watcher это уже finish..
}
catch( std::exception& ex )
{
cerr << "(CommonEventLoop::evfinish): evfinish err: " << ex.what() << endl;
}
wlist.remove(w);
if( !wlist.empty() )
return false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment