Commit 7520b17c authored by Pavel Vainerman's avatar Pavel Vainerman

(EventLoop): переписал механизм инициализации Watcher-ов с использованием promise/future

parent 7589cb87
...@@ -176,10 +176,14 @@ int main( int argc, char** argv ) ...@@ -176,10 +176,14 @@ int main( int argc, char** argv )
dlog4->addLevel(Debug::ANY); dlog4->addLevel(Debug::ANY);
ls.run( addr, port, true ); ls.run( addr, port, true );
if( verb ) if( verb )
ls.setSessionLog(Debug::ANY); ls.setSessionLog(Debug::ANY);
if( !ls.isRunning() )
cerr << "LOG SERVER NOT RUNNING!!" << endl;
unsigned int i = 0; unsigned int i = 0;
while( true ) while( true )
......
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <vector> #include <vector>
#include <queue>
#include <future>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
namespace uniset namespace uniset
{ {
...@@ -47,7 +49,7 @@ namespace uniset ...@@ -47,7 +49,7 @@ namespace uniset
* Т.к. evprepare необходимо вызывать из потока в котором крутится event loop (иначе libev не работает), * Т.к. evprepare необходимо вызывать из потока в котором крутится event loop (иначе libev не работает),
* а функция run() в общем случае вызывается "откуда угодно" и может быть вызвана в том числе уже после * а функция run() в общем случае вызывается "откуда угодно" и может быть вызвана в том числе уже после
* запуска event loop, то задействован механизм асинхронного уведомления (см. evprep, onPrepapre) и ожидания * запуска event loop, то задействован механизм асинхронного уведомления (см. evprep, onPrepapre) и ожидания
* на condition_variable, когда произойдёт инициализация (см. реализацию evrun()). * когда произойдёт инициализация при помощи promise/future (см. реализацию evrun()).
*/ */
class CommonEventLoop class CommonEventLoop
{ {
...@@ -84,7 +86,9 @@ namespace uniset ...@@ -84,7 +86,9 @@ namespace uniset
void onStop( ev::async& w, int revents ) noexcept; void onStop( ev::async& w, int revents ) noexcept;
void onPrepare( ev::async& w, int revents ) noexcept; void onPrepare( ev::async& w, int revents ) noexcept;
void defaultLoop() noexcept; void defaultLoop( std::promise<bool>& runOK ) noexcept;
bool runDefaultLoop( size_t waitTimeout_msec );
bool activateWatcher( EvWatcher* w, size_t waitTimeout_msec );
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
std::atomic_bool isrunning = { false }; std::atomic_bool isrunning = { false };
...@@ -101,13 +105,19 @@ namespace uniset ...@@ -101,13 +105,19 @@ namespace uniset
std::mutex wlist_mutex; std::mutex wlist_mutex;
std::vector<EvWatcher*> wlist; std::vector<EvWatcher*> wlist;
// готовящийся Watcher..он может быть только один в единицу времени // очередь wather-ов для инициализации (добавления в обработку)
// это гарантирует prep_mutex struct WatcherInfo
EvWatcher* wprep = { nullptr }; {
WatcherInfo( EvWatcher* w, std::promise<bool>& p ):
watcher(w),result(p){}
EvWatcher* watcher;
std::promise<bool>& result;
};
std::queue<WatcherInfo> wactlist;
std::mutex wact_mutex;
ev::async evprep; ev::async evprep;
std::condition_variable prep_event;
std::mutex prep_mutex;
std::atomic_bool prep_notify = { false };
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
} // end of uniset namespace } // end of uniset namespace
......
...@@ -32,77 +32,97 @@ namespace uniset ...@@ -32,77 +32,97 @@ namespace uniset
} }
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
bool CommonEventLoop::evrun( EvWatcher* w, bool thread, size_t waitTimeout_msec ) bool CommonEventLoop::runDefaultLoop( size_t waitTimeout_msec )
{ {
if( w == nullptr ) std::lock_guard<std::mutex> lock(thr_mutex);
return false; if( thr )
return true;
bool ret = false; bool defaultLoopOK = true;
{
{
std::lock_guard<std::mutex> lck(wlist_mutex);
if( std::find(wlist.begin(), wlist.end(), w) != wlist.end() ) std::promise<bool> pRun;
{ auto runOK = pRun.get_future();
cerr << "(CommonEventLoop::evrun): " << w->wname() << " ALREADY ADDED.." << endl;
return false;
}
wlist.push_back(w); thr = make_shared<std::thread>( [ &pRun, this ] { CommonEventLoop::defaultLoop(pRun); } );
}
// ожидание старта потока
while( true )
{
auto status = runOK.wait_for(std::chrono::milliseconds(waitTimeout_msec));
if( status == future_status::timeout )
{ {
std::lock_guard<std::mutex> lock(thr_mutex); defaultLoopOK = false;
break;
}
if( !thr ) if( status == future_status::ready )
{ {
thr = make_shared<std::thread>( [ = ] { CommonEventLoop::defaultLoop(); } ); defaultLoopOK = runOK.get();
break;
std::unique_lock<std::mutex> locker(prep_mutex);
// ожидаем запуска loop
// иначе evprep.send() улетит в никуда
prep_event.wait_for(locker, std::chrono::milliseconds(waitTimeout_msec), [ = ]()
{
return ( isrunning == true );
} );
if( !isrunning )
{
cerr << "(CommonEventLoop::evrun): " << w->wname() << " evloop NOT RUN!.." << endl;
return false;
}
// небольшая пауза после запуск event loop
// чтобы "надёжнее" сработал evprep.send() (см. ниже)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
} }
}
return defaultLoopOK;
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::activateWatcher( EvWatcher* w, size_t waitTimeout_msec )
{
std::promise<bool> p;
WatcherInfo winfo(w,p);
auto result = p.get_future();
// готовим "указатель" на объект требующий активации {
std::unique_lock<std::mutex> locker(prep_mutex); std::unique_lock<std::mutex> l(wact_mutex);
wprep = w; wactlist.push(winfo);
}
// взводим флаг bool ret = true;
prep_notify = false;
// посылаем сигнал для обработки // посылаем сигнал для обработки
evprep.send(); // будим default loop evprep.send(); // будим default loop
// ожидаем обработки evprepare (которая будет в defaultLoop) // ждём инициализации
prep_event.wait_for(locker, std::chrono::milliseconds(waitTimeout_msec), [ = ]() while( true )
{
auto status = result.wait_for(std::chrono::milliseconds(waitTimeout_msec));
if( status == future_status::timeout )
{ {
return ( prep_notify == true ); ret = false;
} ); break;
}
if( status == future_status::ready )
{
ret = result.get();
break;
}
}
return ret;
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::evrun( EvWatcher* w, bool thread, size_t waitTimeout_msec )
{
if( w == nullptr )
return false;
// сбрасываем флаг {
prep_notify = false; std::lock_guard<std::mutex> lck(wlist_mutex);
if( std::find(wlist.begin(), wlist.end(), w) != wlist.end() )
{
cerr << "(CommonEventLoop::evrun): " << w->wname() << " ALREADY ADDED.." << endl;
return false;
}
// если wprep стал nullptr - значит evprepare отработал нормально wlist.push_back(w);
ret = ( wprep == nullptr );
} }
bool defaultLoopOK = runDefaultLoop(waitTimeout_msec);
bool ret = defaultLoopOK && activateWatcher(w, waitTimeout_msec);
// если ждать завершения не надо (thread=true) // если ждать завершения не надо (thread=true)
// или evprepare не удалось.. выходим.. // или activateWatcher не удалось.. выходим..
if( thread || !ret ) if( thread || !ret )
return ret; return ret;
...@@ -176,28 +196,27 @@ namespace uniset ...@@ -176,28 +196,27 @@ namespace uniset
return; return;
} }
prep_notify = false;
{ {
std::lock_guard<std::mutex> lock(prep_mutex); std::lock_guard<std::mutex> lock(wact_mutex);
if( wprep ) while( !wactlist.empty() )
{ {
auto&& winf = wactlist.front();
try try
{ {
wprep->evprepare(loop); winf.watcher->evprepare(loop);
winf.result.set_value(true);
} }
catch( std::exception& ex ) catch( std::exception& ex )
{ {
cerr << "(CommonEventLoop::onPrepare): evprepare err: " << ex.what() << endl; cerr << "(CommonEventLoop::onPrepare): evprepare err: " << ex.what() << endl;
winf.result.set_value(false);
} }
wprep = nullptr; wactlist.pop();
} }
} }
// будим всех ожидающих..
prep_notify = true;
prep_event.notify_all();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void CommonEventLoop::onStop( ev::async& aw, int revents ) noexcept void CommonEventLoop::onStop( ev::async& aw, int revents ) noexcept
...@@ -230,13 +249,15 @@ namespace uniset ...@@ -230,13 +249,15 @@ namespace uniset
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void CommonEventLoop::defaultLoop() noexcept void CommonEventLoop::defaultLoop( std::promise<bool>& runOK ) noexcept
{ {
evterm.start(); evterm.start();
evprep.start(); evprep.start();
isrunning = true; isrunning = true;
runOK.set_value(true);
while( !cancelled ) while( !cancelled )
{ {
try try
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment