Commit fc665a71 authored by Pavel Vainerman's avatar Pavel Vainerman

(CommonEventLoop): отладил корректность завершения

parent 5526846b
......@@ -171,23 +171,15 @@ int main( int argc, char** argv )
LogServer ls(la);
ls.setMaxSessionCount(msess);
LogServer ls2(la);
ls2.setMaxSessionCount(msess);
dlog->addLevel(Debug::ANY);
dlog2->addLevel(Debug::ANY);
dlog3->addLevel(Debug::ANY);
dlog4->addLevel(Debug::ANY);
ls.run( addr, port, true );
ls2.run( addr, 4444, true );
if( verb )
{
ls.setSessionLog(Debug::ANY);
ls2.setSessionLog(Debug::ANY);
}
unsigned int i = 0;
......
......@@ -30,7 +30,12 @@ class EvWatcher
* \brief The CommonEventLoop class
* Реализация общего eventloop для всех использующих libev.
* Каждый класс который хочет подключиться к "потоку", должен наследоваться от класса Watcher
* и при необходимости переопределить функции prepare и finish
* и при необходимости переопределить функции evprepare и evfinish
*
* Т.к. evprepare необходимо вызывать из потока в котором крутится event loop (иначе libev не работает),
* а функция run() в общем случае вызывается "откуда угодно" и может быть вызвана в том числе уже после
* запуска event loop, то задействован механизм асинхронного уведомления (см. evprep, onPrepapre) и ожидания
* на condition_variable, когда произойдёт инициализация (см. реализацию evrun()).
*/
class CommonEventLoop
{
......@@ -41,7 +46,8 @@ class CommonEventLoop
bool evIsActive();
void evrun( EvWatcher* w, bool thread = true );
/*! \return TRUE - если всё удалось. return актуален только для случая когда thread = true */
bool evrun( EvWatcher* w, bool thread = true );
/*! \return TRUE - если это был последний EvWatcher и loop остановлен */
bool evstop( EvWatcher* s );
......@@ -53,6 +59,7 @@ class CommonEventLoop
private:
void onStop();
void onPrepare();
void defaultLoop();
std::atomic_bool cancelled = { false };
......@@ -68,6 +75,13 @@ class CommonEventLoop
std::mutex wlist_mutex;
std::list<EvWatcher*> wlist;
// готовящийся Watcher..(он может быть только один, единицу времени)
EvWatcher* wprep = { nullptr };
ev::async evprep;
std::condition_variable prep_event;
std::mutex prep_mutex;
std::atomic_bool prep_notify = { false };
};
// -------------------------------------------------------------------------
#endif // CommonEventLoop_H_
......
......@@ -29,6 +29,8 @@ CommonEventLoop LogServer::loop;
// -------------------------------------------------------------------------
LogServer::~LogServer()
{
if( isrunning )
loop.evstop(this);
}
// -------------------------------------------------------------------------
LogServer::LogServer( std::shared_ptr<LogAgregator> log ):
......@@ -55,10 +57,13 @@ LogServer::LogServer():
{
}
// -------------------------------------------------------------------------
void LogServer::evfinish(const ev::loop_ref& loop )
void LogServer::evfinish( const ev::loop_ref& loop )
{
if( !isrunning )
return;
if( mylog.is_info() )
mylog.info() << myname << "(LogServer): terminate..." << endl;
mylog.info() << myname << "(evfinish): terminate..." << endl;
auto lst(slist);
......@@ -77,7 +82,11 @@ void LogServer::evfinish(const ev::loop_ref& loop )
io.stop();
isrunning = false;
cerr << "LOGServer: finished..." << endl;
sock.reset();
if( mylog.is_info() )
mylog.info() << myname << "(LogServer): finished." << endl;
}
// -------------------------------------------------------------------------
void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread )
......@@ -101,6 +110,17 @@ void LogServer::terminate()
// -------------------------------------------------------------------------
void LogServer::evprepare( const ev::loop_ref& eloop )
{
if( sock )
{
ostringstream err;
err << myname << "(evprepare): socket ALREADY BINDINNG..";
if( mylog.is_crit() )
mylog.crit() << err.str() << endl;
throw SystemError( err.str() );
}
try
{
ost::InetAddress iaddr(addr.c_str());
......@@ -144,7 +164,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
return;
}
if( !loop.evIsActive() )
if( !isrunning )
{
if( mylog.is_crit() )
mylog.crit() << myname << "(LogServer::ioAccept): terminate work.." << endl;
......@@ -176,7 +196,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
slist.push_back(s);
}
s->run(loop.evloop());
s->run(watcher.loop);
}
catch( const std::exception& ex )
{
......
......@@ -184,6 +184,7 @@ void LogSession::callback( ev::io& watcher, int revents )
std::unique_lock<std::mutex> lk(io_mutex);
io.stop();
cmdTimer.stop();
{
std::unique_lock<std::mutex> lk(logbuf_mutex);
asyncEvent.stop();
......
......@@ -8,6 +8,9 @@ CommonEventLoop::CommonEventLoop()
{
evterm.set(loop);
evterm.set<CommonEventLoop, &CommonEventLoop::onStop>(this);
evprep.set(loop);
evprep.set<CommonEventLoop, &CommonEventLoop::onPrepare>(this);
}
// -------------------------------------------------------------------------
CommonEventLoop::~CommonEventLoop()
......@@ -24,11 +27,12 @@ CommonEventLoop::~CommonEventLoop()
}
}
// ---------------------------------------------------------------------------
void CommonEventLoop::evrun( EvWatcher* w, bool thread )
bool CommonEventLoop::evrun( EvWatcher* w, bool thread )
{
if( !w )
return;
return false;
bool ret = false;
{
std::unique_lock<std::mutex> l(wlist_mutex);
wlist.push_back(w);
......@@ -39,11 +43,25 @@ void CommonEventLoop::evrun( EvWatcher* w, bool thread )
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
w->evprepare(loop);
// ожидаем обработки evprepare (которая будет в defaultLoop)
wprep = w;
evprep.send(); // будим default loop
// ждём..
std::unique_lock<std::mutex> locker(prep_mutex);
while( !prep_notify )
prep_event.wait(locker);
prep_notify = false;
// если стал nullptr - значит evprepare отработал нормально
ret = ( wprep == nullptr );
}
if( !thread )
{
// если ждать завершения не надо (thread=true)
// или evprepare не удалось.. выходим..
if( thread || !ret )
return ret;
// ожидаем завершения основного потока..
std::unique_lock<std::mutex> locker(term_mutex);
while( !term_notify )
......@@ -51,7 +69,8 @@ void CommonEventLoop::evrun( EvWatcher* w, bool thread )
if( thr && thr->joinable() )
thr->join();
}
return true;
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::evIsActive()
......@@ -66,28 +85,61 @@ bool CommonEventLoop::evstop( EvWatcher* w )
{
if( (*i) == w )
{
try
{
w->evfinish(loop); // для этого Watcher это уже finish..
}
catch( std::exception& ex )
{
cerr << "(CommonEventLoop::evfinish): evfinish err: " << ex.what() << endl;
}
wlist.erase(i);
break;
}
}
if( !wlist.empty() )
{
w->evfinish(loop); // для этого Watcher это уже finish..
return false;
}
if( isrunning || !cancelled )
{
cancelled = true;
evterm.send();
if( thr )
{
if( thr->joinable() )
thr->join();
thr = nullptr;
}
thr = nullptr; // после этого можно уже запускать заново поток..
cancelled = false;
}
}
return true;
}
// -------------------------------------------------------------------------
void CommonEventLoop::onPrepare()
{
if( wprep )
{
try
{
wprep->evprepare(loop);
}
catch( std::exception& ex )
{
cerr << "(CommonEventLoop::onPrepare): evfinish err: " << ex.what() << endl;
}
wprep = nullptr;
}
// будим всех ожидающих..
prep_notify = true;
prep_event.notify_all();
}
// -------------------------------------------------------------------------
void CommonEventLoop::onStop()
{
// здесь список не защищаем wlist_mutex
......@@ -107,6 +159,7 @@ void CommonEventLoop::onStop()
}
evterm.stop();
evprep.stop();
loop.break_loop(ev::ALL);
}
// -------------------------------------------------------------------------
......@@ -116,7 +169,8 @@ void CommonEventLoop::defaultLoop()
isrunning = true;
evterm.start();
cerr << "************* CommonEventLoop::defaultLoop() *************" << endl;
evprep.start();
while( !cancelled )
{
try
......@@ -129,11 +183,12 @@ void CommonEventLoop::defaultLoop()
}
}
cerr << "************* CommonEventLoop::defaultLoop() EXIT *************" << endl;
cancelled = true;
isrunning = false;
// будим всех ожидающих..
evterm.stop();
evprep.stop();
term_notify = true;
term_event.notify_all();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment