Commit ed84ef14 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): ушёл от использования глобального таймера проверки соединений.

Сделал на уровне отдельного Log-а.
parent cc7c1c60
......@@ -58,11 +58,8 @@ LogDB::LogDB( const string& name , const string& prefix ):
qbufSize = conf->getArgPInt("--" + prefix + "buffer-size", it.getProp("bufferSize"), qbufSize);
tmConnection_sec = tmConnection_msec / 1000.;
connectionTimer.set<LogDB, &LogDB::onTimer>(this);
checkBufferTimer.set<LogDB, &LogDB::onCheckBuffer>(this);
pingWebSockets.set<LogDB, &LogDB::onPingWebSockets>(this);
flushBufferTimer.set<LogDB, &LogDB::onCheckBuffer>(this);
wsactivate.set<LogDB, &LogDB::onActivate>(this);
UniXML::iterator sit(cnode);
......@@ -111,6 +108,7 @@ LogDB::LogDB( const string& name , const string& prefix ):
// l->tcp = make_shared<UTCPStream>();
l->dblog = dblog;
l->signal_on_read().connect(sigc::mem_fun(this, &LogDB::addLog));
// l->set(loop);
logservers.push_back(l);
}
......@@ -190,8 +188,8 @@ void LogDB::flushBuffer()
{
if( !db->insert(qbuf.front()) )
{
dbcrit << myname << "(flushBuffer): error: " << db->error() <<
" lost query: " << qbuf.front() << endl;
dbcrit << myname << "(flushBuffer): error: " << db->error()
<< " lost query: " << qbuf.front() << endl;
}
qbuf.pop();
......@@ -249,53 +247,48 @@ void LogDB::run( bool async )
// -----------------------------------------------------------------------------
void LogDB::evfinish()
{
connectionTimer.stop();
checkBufferTimer.stop();
pingWebSockets.stop();
flushBufferTimer.stop();
wsactivate.stop();
}
// -----------------------------------------------------------------------------
void LogDB::evprepare()
{
connectionTimer.set(loop);
checkBufferTimer.set(loop);
pingWebSockets.set(loop);
flushBufferTimer.set(loop);
flushBufferTimer.start(0, tmFlushBuffer_sec);
if( tmConnection_msec != UniSetTimer::WaitUpTime )
connectionTimer.start(0, tmConnection_sec);
wsactivate.set(loop);
wsactivate.start();
checkBufferTimer.start(0, tmCheckBuffer_sec);
pingWebSockets.start(0, tmPingWebSockets_sec);
for( const auto& s : logservers )
s->set(loop);
}
// -----------------------------------------------------------------------------
void LogDB::onTimer( ev::timer& t, int revents )
void LogDB::onCheckBuffer(ev::timer& t, int revents)
{
if (EV_ERROR & revents)
{
dbcrit << myname << "(LogDB::onTimer): invalid event" << endl;
dbcrit << myname << "(LogDB::onCheckBuffer): invalid event" << endl;
return;
}
// проверяем соединения..
for( const auto& s : logservers )
{
if( !s->isConnected() )
{
if( s->connect() )
s->ioprepare(loop);
}
}
if( qbuf.size() >= qbufSize )
flushBuffer();
}
// -----------------------------------------------------------------------------
void LogDB::onCheckBuffer(ev::timer& t, int revents)
void LogDB::onActivate( ev::async& watcher, int revents )
{
if (EV_ERROR & revents)
{
dbcrit << myname << "(LogDB::onTimer): invalid event" << endl;
dbcrit << myname << "(LogDB::onActivate): invalid event" << endl;
return;
}
if( qbuf.size() >= qbufSize )
flushBuffer();
uniset_rwmutex_rlock lk(wsocksMutex);
for( const auto& s: wsocks )
{
if( !s->isActive() )
s->start(loop);
}
}
// -----------------------------------------------------------------------------
bool LogDB::Log::isConnected() const
......@@ -303,6 +296,26 @@ bool LogDB::Log::isConnected() const
return tcp && tcp->isConnected();
}
// -----------------------------------------------------------------------------
void LogDB::Log::set( ev::dynamic_loop& loop )
{
io.set(loop);
iocheck.set(loop);
iocheck.set<LogDB::Log, &LogDB::Log::check>(this);
iocheck.start(0,checkConnection_sec);
}
// -----------------------------------------------------------------------------
void LogDB::Log::check( ev::timer& t, int revents )
{
if (EV_ERROR & revents)
return;
if( isConnected() )
return;
if( connect() )
ioprepare();
}
// -----------------------------------------------------------------------------
bool LogDB::Log::connect() noexcept
{
if( tcp && tcp->isConnected() )
......@@ -349,12 +362,11 @@ bool LogDB::Log::connect() noexcept
return false;
}
// -----------------------------------------------------------------------------
void LogDB::Log::ioprepare( ev::dynamic_loop& loop )
void LogDB::Log::ioprepare()
{
if( !tcp || !tcp->isConnected() )
return;
io.set(loop);
io.set<LogDB::Log, &LogDB::Log::event>(this);
io.start(tcp->getSocket(), ev::READ);
text.reserve(reservsize);
......@@ -920,9 +932,17 @@ std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerR
}
std::shared_ptr<LogWebSocket> ws;
{
uniset_rwmutex_wrlock lock(wsocksMutex);
std::shared_ptr<LogWebSocket> ws = make_shared<LogWebSocket>(req, resp, log, loop);
ws = make_shared<LogWebSocket>(req, resp, log);
ws->dblog = dblog;
wsocks.emplace_back(ws);
}
// wsocksMutex надо отпустить, прежде чем посылать сигнал
// т.е. в обработчике происходит его захват
wsactivate.send();
return ws;
}
// -----------------------------------------------------------------------------
......@@ -942,34 +962,25 @@ void LogDB::delWebSocket( std::shared_ptr<LogWebSocket>& ws )
}
}
// -----------------------------------------------------------------------------
void LogDB::onPingWebSockets( ev::timer& t, int revents )
{
if (EV_ERROR & revents)
{
dbcrit << myname << "(onPingWebSockets): invalid event" << endl;
return;
}
uniset_rwmutex_rlock lk(wsocksMutex);
for( const auto& s : wsocks )
s->ping();
}
// -----------------------------------------------------------------------------
LogDB::LogWebSocket::LogWebSocket(Poco::Net::HTTPServerRequest* _req,
Poco::Net::HTTPServerResponse* _resp,
std::shared_ptr<Log>& _log,
ev::dynamic_loop& loop ):
std::shared_ptr<Log>& _log ):
Poco::Net::WebSocket(*_req, *_resp),
req(_req),
resp(_resp)
// log(_log)
{
setBlocking(false);
cancelled = false;
con = _log->signal_on_read().connect( sigc::mem_fun(*this, &LogWebSocket::add));
io.set(loop);
// т.к. создание websocket-а происходит в другом потоке
// то активация и привязка к loop происходит в функции start()
// вызываемой из eventloop
io.set<LogDB::LogWebSocket, &LogDB::LogWebSocket::event>(this);
io.start();
ioping.set<LogDB::LogWebSocket, &LogDB::LogWebSocket::ping>(this);
}
// -----------------------------------------------------------------------------
LogDB::LogWebSocket::~LogWebSocket()
......@@ -985,6 +996,20 @@ LogDB::LogWebSocket::~LogWebSocket()
}
}
// -----------------------------------------------------------------------------
bool LogDB::LogWebSocket::isActive()
{
return io.is_active();
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::start( ev::dynamic_loop& loop )
{
io.set(loop);
ioping.set(loop);
io.start();
ioping.start(ping_sec,ping_sec);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::event( ev::io& watcher, int revents )
{
if( EV_ERROR & revents )
......@@ -994,12 +1019,25 @@ void LogDB::LogWebSocket::event( ev::io& watcher, int revents )
write(watcher);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::ping()
void LogDB::LogWebSocket::ping( ev::timer& t, int revents )
{
if( EV_ERROR & revents )
return;
if( cancelled )
return;
if( !wbuf.empty() )
{
ioping.stop();
return;
}
wbuf.emplace(new UTCPCore::Buffer("."));
if( ioping.is_active() )
ioping.stop();
io.set(ev::WRITE);
}
// -----------------------------------------------------------------------------
......@@ -1009,6 +1047,9 @@ void LogDB::LogWebSocket::add( LogDB::Log* log, const string& txt )
return;
wbuf.emplace(new UTCPCore::Buffer(txt));
if( ioping.is_active() )
ioping.stop();
io.set(ev::WRITE);
}
// -----------------------------------------------------------------------------
......@@ -1016,9 +1057,14 @@ void LogDB::LogWebSocket::write( ev::io& w )
{
UTCPCore::Buffer* msg = 0;
cerr << "write: " << wbuf.size()
<< endl;
if( wbuf.empty() )
{
io.set(EV_NONE);
if( !ioping.is_active() )
ioping.start(ping_sec,ping_sec);
return;
}
......@@ -1043,12 +1089,12 @@ void LogDB::LogWebSocket::write( ev::io& w )
if( ret < 0 )
{
cerr << "(websocket): " << req->clientAddress().toString()
dblog3 << "(websocket): " << req->clientAddress().toString()
<< " write to socket error(" << errno << "): " << strerror(errno) << endl;
if( errno == EPIPE || errno == EBADF )
{
cerr << "(websocket): "
dblog3 << "(websocket): "
<< req->clientAddress().toString()
<< " write error.. terminate session.." << endl;
......@@ -1067,7 +1113,17 @@ void LogDB::LogWebSocket::write( ev::io& w )
}
if( !wbuf.empty() )
{
io.set(EV_WRITE);
if( ioping.is_active() )
ioping.stop();
}
else
{
io.set(EV_NONE);
if( !ioping.is_active() )
ioping.start(ping_sec,ping_sec);
}
return;
}
......@@ -1089,14 +1145,14 @@ void LogDB::LogWebSocket::write( ev::io& w )
}
catch( const Poco::Net::NetException& e )
{
cerr << "(websocket):NetException: "
dblog3 << "(websocket):NetException: "
<< req->clientAddress().toString()
<< " error: " << e.displayText()
<< endl;
}
catch( Poco::IOException& ex )
{
cerr << "(websocket): IOException: "
dblog3 << "(websocket): IOException: "
<< req->clientAddress().toString()
<< " error: " << ex.displayText()
<< endl;
......@@ -1113,6 +1169,7 @@ void LogDB::LogWebSocket::term()
cancelled = true;
con.disconnect();
io.stop();
ioping.stop();
finish.notify_all();
}
// -----------------------------------------------------------------------------
......
......@@ -114,7 +114,6 @@ namespace uniset
\todo Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз).
\todo Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип.
\todo Пока не очень эффективная работа с датой и временем (заодно подумать всё-таки в чём хранить)
\todo WebSocket: Доделать ограничение на размер буфера для каждого сокета
\todo WebSocket: доделать настройку всевозможных timeout-ов
\todo WebSocket: доделать проверку соединения
\todo WebSocket: сделать ограничение на максимальное количество соединений (websocket)
......@@ -155,8 +154,8 @@ namespace uniset
virtual void evfinish() override;
virtual void evprepare() override;
void onTimer( ev::timer& t, int revents );
void onCheckBuffer( ev::timer& t, int revents );
void onActivate( ev::async& watcher, int revents ) ;
void addLog( Log* log, const std::string& txt );
#ifndef DISABLE_REST_API
......@@ -172,7 +171,6 @@ namespace uniset
std::shared_ptr<LogWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const std::string& logname );
void delWebSocket( std::shared_ptr<LogWebSocket>& ws );
void onPingWebSockets( ev::timer& t, int revents );
#endif
std::string myname;
std::unique_ptr<SQLiteInterface> db;
......@@ -183,8 +181,12 @@ namespace uniset
QueryBuffer qbuf;
size_t qbufSize = { 200 }; // размер буфера сообщений.
ev::timer flushBufferTimer;
double tmFlushBuffer_sec = { 1.0 };
void flushBuffer();
ev::async wsactivate; // активация LogWebSocket-ов
class Log
{
public:
......@@ -197,9 +199,10 @@ namespace uniset
std::shared_ptr<DebugStream> dblog;
bool connect() noexcept;
bool isConnected() const;
void ioprepare( ev::dynamic_loop& loop );
void set( ev::dynamic_loop& loop );
void check( ev::timer& t, int revents );
void event( ev::io& watcher, int revents );
void read( ev::io& watcher );
void write( ev::io& io );
......@@ -208,12 +211,20 @@ namespace uniset
typedef sigc::signal<void, Log*, const std::string&> ReadSignal;
ReadSignal signal_on_read();
protected:
void ioprepare();
bool connect() noexcept;
private:
ReadSignal sigRead;
ev::io io;
ev::timer iocheck;
double checkConnection_sec = { 5.0 };
std::shared_ptr<UTCPStream> tcp;
static const int bufsize = { 10001 };
char buf[bufsize];
char buf[bufsize]; // буфер для чтения сообщений
static const size_t reservsize = { 1000 };
std::string text;
......@@ -225,16 +236,6 @@ namespace uniset
std::vector< std::shared_ptr<Log> > logservers;
std::shared_ptr<DebugStream> dblog;
ev::timer connectionTimer;
timeout_t tmConnection_msec = { 5000 }; // пауза между попытками установить соединение
double tmConnection_sec = { 0.0 };
ev::timer checkBufferTimer;
double tmCheckBuffer_sec = { 1.0 };
ev::timer pingWebSockets;
double tmPingWebSockets_sec = { 3.0 };
#ifndef DISABLE_REST_API
std::shared_ptr<Poco::Net::HTTPServer> httpserv;
std::string httpHost = { "" };
......@@ -246,14 +247,18 @@ namespace uniset
public:
LogWebSocket(Poco::Net::HTTPServerRequest* req,
Poco::Net::HTTPServerResponse* resp,
std::shared_ptr<Log>& log,
ev::dynamic_loop& loop );
std::shared_ptr<Log>& log );
virtual ~LogWebSocket();
void event( ev::io& watcher, int revents );
// конечно некрасиво что это в public
std::shared_ptr<DebugStream> dblog;
bool isActive();
void start( ev::dynamic_loop& loop );
void ping();
void event( ev::io& watcher, int revents );
void ping( ev::timer& t, int revents );
void add( Log* log, const std::string& txt );
......@@ -266,6 +271,8 @@ namespace uniset
void write( ev::io& w );
ev::io io;
ev::timer ioping;
double ping_sec = { 3.0 };
std::mutex finishmut;
std::condition_variable finish;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment