Commit 5526846b authored by Pavel Vainerman's avatar Pavel Vainerman

Заложил CommonEventLoop

parent eb476a0e
...@@ -171,6 +171,9 @@ int main( int argc, char** argv ) ...@@ -171,6 +171,9 @@ int main( int argc, char** argv )
LogServer ls(la); LogServer ls(la);
ls.setMaxSessionCount(msess); ls.setMaxSessionCount(msess);
LogServer ls2(la);
ls2.setMaxSessionCount(msess);
dlog->addLevel(Debug::ANY); dlog->addLevel(Debug::ANY);
dlog2->addLevel(Debug::ANY); dlog2->addLevel(Debug::ANY);
...@@ -178,9 +181,13 @@ int main( int argc, char** argv ) ...@@ -178,9 +181,13 @@ int main( int argc, char** argv )
dlog4->addLevel(Debug::ANY); dlog4->addLevel(Debug::ANY);
ls.run( addr, port, true ); ls.run( addr, port, true );
ls2.run( addr, 4444, true );
if( verb ) if( verb )
{
ls.setSessionLog(Debug::ANY); ls.setSessionLog(Debug::ANY);
ls2.setSessionLog(Debug::ANY);
}
unsigned int i = 0; unsigned int i = 0;
......
// -------------------------------------------------------------------------
#ifndef CommonEventLoop_H_
#define CommonEventLoop_H_
// -------------------------------------------------------------------------
#include <ev++.h>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
// -------------------------------------------------------------------------
class EvWatcher
{
public:
EvWatcher(){}
virtual ~EvWatcher(){}
// подготовка перед запуском loop
// запуск своих ev::xxx.start()
virtual void evprepare( const ev::loop_ref& ){}
// действия при завершении
// завершение своих ev::xxx.stop()
virtual void evfinish( const ev::loop_ref& ){}
virtual std::string wname(){ return ""; }
};
// -------------------------------------------------------------------------
/*!
* \brief The CommonEventLoop class
* Реализация общего eventloop для всех использующих libev.
* Каждый класс который хочет подключиться к "потоку", должен наследоваться от класса Watcher
* и при необходимости переопределить функции prepare и finish
*/
class CommonEventLoop
{
public:
CommonEventLoop();
~CommonEventLoop();
bool evIsActive();
void evrun( EvWatcher* w, bool thread = true );
/*! \return TRUE - если это был последний EvWatcher и loop остановлен */
bool evstop( EvWatcher* s );
inline const ev::loop_ref evloop(){ return loop; }
protected:
private:
void onStop();
void defaultLoop();
std::atomic_bool cancelled = { false };
std::atomic_bool isrunning = { false };
ev::dynamic_loop loop;
ev::async evterm;
std::shared_ptr<std::thread> thr;
std::mutex term_mutex;
std::condition_variable term_event;
std::atomic_bool term_notify = { false };
std::mutex wlist_mutex;
std::list<EvWatcher*> wlist;
};
// -------------------------------------------------------------------------
#endif // CommonEventLoop_H_
// -------------------------------------------------------------------------
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include "DebugStream.h" #include "DebugStream.h"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "UTCPSocket.h" #include "UTCPSocket.h"
#include "EventLoopServer.h" #include "CommonEventLoop.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class LogSession; class LogSession;
class LogAgregator; class LogAgregator;
...@@ -68,16 +68,19 @@ LogReader. Читающих клиентов может быть скольуг ...@@ -68,16 +68,19 @@ LogReader. Читающих клиентов может быть скольуг
\warning Логи отдаются "клиентам" только целоиком строкой. Т.е. по сети информация передаваться не будет пока не будет записан "endl". \warning Логи отдаются "клиентам" только целоиком строкой. Т.е. по сети информация передаваться не будет пока не будет записан "endl".
Это сделано для "оптимизации передачи" (чтобы не передавать каждый байт) Это сделано для "оптимизации передачи" (чтобы не передавать каждый байт)
\warning Т.к. LogServer в основном только отдаёт "клиентам" логи, то он реализован с использованием CommonEventLoop,
т.е. у всех LogServer будет ОДИН ОБЩИЙ event loop.
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class LogServer: class LogServer:
public EventLoopServer protected EvWatcher
{ {
public: public:
LogServer( std::shared_ptr<DebugStream> log ); LogServer( std::shared_ptr<DebugStream> log );
LogServer( std::shared_ptr<LogAgregator> log ); LogServer( std::shared_ptr<LogAgregator> log );
~LogServer(); virtual ~LogServer();
inline void setCmdTimeout( timeout_t msec ) inline void setCmdTimeout( timeout_t msec )
{ {
...@@ -98,7 +101,7 @@ class LogServer: ...@@ -98,7 +101,7 @@ class LogServer:
inline bool isRunning() inline bool isRunning()
{ {
return evIsActive(); return isrunning;
} }
void init( const std::string& prefix, xmlNode* cnode = 0 ); void init( const std::string& prefix, xmlNode* cnode = 0 );
...@@ -108,8 +111,9 @@ class LogServer: ...@@ -108,8 +111,9 @@ class LogServer:
protected: protected:
LogServer(); LogServer();
virtual void evprepare() override; virtual void evprepare( const ev::loop_ref& loop ) override;
virtual void evfinish() override; virtual void evfinish( const ev::loop_ref& loop ) override;
virtual std::string wname(){ return myname; }
void ioAccept( ev::io& watcher, int revents ); void ioAccept( ev::io& watcher, int revents );
void sessionFinished( LogSession* s ); void sessionFinished( LogSession* s );
...@@ -127,12 +131,17 @@ class LogServer: ...@@ -127,12 +131,17 @@ class LogServer:
DebugStream mylog; DebugStream mylog;
ev::io io; ev::io io;
// делаем loop общим.. одним на всех!
static CommonEventLoop loop;
std::shared_ptr<UTCPSocket> sock; std::shared_ptr<UTCPSocket> sock;
std::shared_ptr<DebugStream> elog; // eventlog.. std::shared_ptr<DebugStream> elog; // eventlog..
std::string myname = { "LogServer" }; std::string myname = { "LogServer" };
std::string addr; std::string addr;
ost::tpport_t port; ost::tpport_t port;
std::atomic_bool isrunning = { false };
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // LogServer_H_ #endif // LogServer_H_
......
...@@ -62,7 +62,7 @@ class LogSession ...@@ -62,7 +62,7 @@ class LogSession
} }
// запуск обработки входящих запросов // запуск обработки входящих запросов
void run( ev::loop_ref& loop ); void run( const ev::loop_ref& loop );
void terminate(); void terminate();
bool isAcive(); bool isAcive();
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
CommonEventLoop LogServer::loop;
// -------------------------------------------------------------------------
LogServer::~LogServer() LogServer::~LogServer()
{ {
} }
...@@ -53,7 +55,7 @@ LogServer::LogServer(): ...@@ -53,7 +55,7 @@ LogServer::LogServer():
{ {
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::evfinish() void LogServer::evfinish(const ev::loop_ref& loop )
{ {
if( mylog.is_info() ) if( mylog.is_info() )
mylog.info() << myname << "(LogServer): terminate..." << endl; mylog.info() << myname << "(LogServer): terminate..." << endl;
...@@ -74,6 +76,7 @@ void LogServer::evfinish() ...@@ -74,6 +76,7 @@ void LogServer::evfinish()
} }
io.stop(); io.stop();
isrunning = false;
cerr << "LOGServer: finished..." << endl; cerr << "LOGServer: finished..." << endl;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -81,15 +84,22 @@ void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread ...@@ -81,15 +84,22 @@ void LogServer::run( const std::string& _addr, ost::tpport_t _port, bool thread
{ {
addr = _addr; addr = _addr;
port = _port; port = _port;
evrun(thread);
{
ostringstream s;
s << _addr << ":" << _port;
myname = s.str();
}
loop.evrun(this, thread);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::terminate() void LogServer::terminate()
{ {
evstop(); loop.evstop(this);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::evprepare() void LogServer::evprepare( const ev::loop_ref& eloop )
{ {
try try
{ {
...@@ -103,7 +113,7 @@ void LogServer::evprepare() ...@@ -103,7 +113,7 @@ void LogServer::evprepare()
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port); ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
ostringstream err; ostringstream err;
err << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum; err << myname << "(evprepare): socket error(" << errnum << "): ";
if( errnum == ost::Socket::errBindingFailed ) if( errnum == ost::Socket::errBindingFailed )
err << "bind failed; port busy" << endl; err << "bind failed; port busy" << endl;
...@@ -111,7 +121,7 @@ void LogServer::evprepare() ...@@ -111,7 +121,7 @@ void LogServer::evprepare()
err << "client socket failed" << endl; err << "client socket failed" << endl;
if( mylog.is_crit() ) if( mylog.is_crit() )
mylog.crit() << myname << "(LogServer): " << err.str() << endl; mylog.crit() << err.str() << endl;
throw SystemError( err.str() ); throw SystemError( err.str() );
} }
...@@ -119,8 +129,9 @@ void LogServer::evprepare() ...@@ -119,8 +129,9 @@ void LogServer::evprepare()
sock->setCompletion(false); sock->setCompletion(false);
io.set<LogServer, &LogServer::ioAccept>(this); io.set<LogServer, &LogServer::ioAccept>(this);
io.set(loop); io.set( eloop );
io.start(sock->getSocket(), ev::READ); io.start(sock->getSocket(), ev::READ);
isrunning = true;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::ioAccept( ev::io& watcher, int revents ) void LogServer::ioAccept( ev::io& watcher, int revents )
...@@ -133,7 +144,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents ) ...@@ -133,7 +144,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
return; return;
} }
if( !evIsActive() ) if( !loop.evIsActive() )
{ {
if( mylog.is_crit() ) if( mylog.is_crit() )
mylog.crit() << myname << "(LogServer::ioAccept): terminate work.." << endl; mylog.crit() << myname << "(LogServer::ioAccept): terminate work.." << endl;
...@@ -164,7 +175,8 @@ void LogServer::ioAccept( ev::io& watcher, int revents ) ...@@ -164,7 +175,8 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s); slist.push_back(s);
} }
s->run(loop);
s->run(loop.evloop());
} }
catch( const std::exception& ex ) catch( const std::exception& ex )
{ {
......
...@@ -105,7 +105,7 @@ void LogSession::logOnEvent( const std::string& s ) ...@@ -105,7 +105,7 @@ void LogSession::logOnEvent( const std::string& s )
asyncEvent.send(); asyncEvent.send();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::run( ev::loop_ref& loop ) void LogSession::run( const ev::loop_ref& loop )
{ {
setSessionLogLevel(Debug::ANY); setSessionLogLevel(Debug::ANY);
......
#include <iostream>
#include <chrono>
#include "CommonEventLoop.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
CommonEventLoop::CommonEventLoop()
{
evterm.set(loop);
evterm.set<CommonEventLoop, &CommonEventLoop::onStop>(this);
}
// -------------------------------------------------------------------------
CommonEventLoop::~CommonEventLoop()
{
if( !cancelled )
{
cancelled = true;
evterm.send();
if( thr )
{
thr->join();
thr = nullptr;
}
}
}
// ---------------------------------------------------------------------------
void CommonEventLoop::evrun( EvWatcher* w, bool thread )
{
if( !w )
return;
{
std::unique_lock<std::mutex> l(wlist_mutex);
wlist.push_back(w);
if( !thr )
{
thr = make_shared<std::thread>( [ = ] { CommonEventLoop::defaultLoop(); } );
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
w->evprepare(loop);
}
if( !thread )
{
// ожидаем завершения основного потока..
std::unique_lock<std::mutex> locker(term_mutex);
while( !term_notify )
term_event.wait(locker);
if( thr && thr->joinable() )
thr->join();
}
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::evIsActive()
{
return isrunning;
}
// -------------------------------------------------------------------------
bool CommonEventLoop::evstop( EvWatcher* w )
{
std::unique_lock<std::mutex> l(wlist_mutex);
for( auto i = wlist.begin(); i!=wlist.end(); i++ )
{
if( (*i) == w )
{
wlist.erase(i);
break;
}
}
if( !wlist.empty() )
{
w->evfinish(loop); // для этого Watcher это уже finish..
return false;
}
cancelled = true;
evterm.send();
if( thr )
{
thr->join();
thr = nullptr;
}
return true;
}
// -------------------------------------------------------------------------
void CommonEventLoop::onStop()
{
// здесь список не защищаем wlist_mutex
// потому-что onStop будет вызываться
// из evstop, где он уже будет под "блокировкой"
// т.е. чтобы не получить deadlock
for( const auto& w: wlist )
{
try
{
w->evfinish(loop);
}
catch( std::exception& ex )
{
cerr << "(CommonEventLoop::onStop): evfinish err: " << ex.what() << endl;
}
}
evterm.stop();
loop.break_loop(ev::ALL);
}
// -------------------------------------------------------------------------
void CommonEventLoop::defaultLoop()
{
isrunning = true;
evterm.start();
cerr << "************* CommonEventLoop::defaultLoop() *************" << endl;
while( !cancelled )
{
try
{
loop.run(0);
}
catch( std::exception& ex )
{
cerr << "(CommonEventLoop::defaultLoop): " << ex.what() << endl;
}
}
cerr << "************* CommonEventLoop::defaultLoop() EXIT *************" << endl;
cancelled = true;
isrunning = false;
// будим всех ожидающих..
term_notify = true;
term_event.notify_all();
}
// -------------------------------------------------------------------------
...@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libProcesses.la ...@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libProcesses.la
libProcesses_la_CXXFLAGS = $(SIGC_CFLAGS) libProcesses_la_CXXFLAGS = $(SIGC_CFLAGS)
libProcesses_la_LIBADD = $(SIGC_LIBS) libProcesses_la_LIBADD = $(SIGC_LIBS)
libProcesses_la_SOURCES = IOController_iSK.cc IOController.cc IONotifyController.cc \ libProcesses_la_SOURCES = IOController_iSK.cc IOController.cc IONotifyController.cc \
NCRestorer.cc NCRestorer_XML.cc EventLoopServer.cc NCRestorer.cc NCRestorer_XML.cc EventLoopServer.cc CommonEventLoop.cc
local-clean: local-clean:
rm -rf *iSK.cc rm -rf *iSK.cc
......
...@@ -269,7 +269,10 @@ include/HourGlass.h ...@@ -269,7 +269,10 @@ include/HourGlass.h
include/IOController.h include/IOController.h
include/IONotifyController.h include/IONotifyController.h
include/IORFile.h include/IORFile.h
include/CommonEventLoop.h
include/EventLoopServer.h include/EventLoopServer.h
include/SingletonEventLoopServer.h
include/SingletonEventLoopServer.tcc
include/LogAgregator.h include/LogAgregator.h
include/LogReader.h include/LogReader.h
include/LogServer.h include/LogServer.h
...@@ -389,6 +392,7 @@ src/Processes/IONotifyController.cc ...@@ -389,6 +392,7 @@ src/Processes/IONotifyController.cc
src/Processes/Makefile.am src/Processes/Makefile.am
src/Processes/NCRestorer.cc src/Processes/NCRestorer.cc
src/Processes/NCRestorer_XML.cc src/Processes/NCRestorer_XML.cc
src/Processes/CommonEventLoop.cc
src/Processes/EventLoopServer.cc src/Processes/EventLoopServer.cc
src/Services/DBServer.cc src/Services/DBServer.cc
src/Services/Makefile.am src/Services/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment