Commit 6a099370 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): Исправил причину "подвисаний" при большом потоке сообщений.

Тормозила запись в БД (flushBuffer), помогло добавление транзакции BEGIN..COMMIT. 2) Сделал посылку сообщений в websocket не подряд по мере появления, а периодически по "maxsend" штук.
parent ed84ef14
......@@ -108,7 +108,7 @@ LogDB::LogDB( const string& name , const string& prefix ):
// l->tcp = make_shared<UTCPStream>();
l->dblog = dblog;
l->signal_on_read().connect(sigc::mem_fun(this, &LogDB::addLog));
// l->set(loop);
// l->set(loop);
logservers.push_back(l);
}
......@@ -183,7 +183,10 @@ LogDB::~LogDB()
//--------------------------------------------------------------------------------------------
void LogDB::flushBuffer()
{
// Сперва пробуем очистить всё что накопилось в очереди до этого...
// без BEGIN и COMMIT вставка большого количества данных будет тормозить!
db->query("BEGIN;");
while( !qbuf.empty() )
{
if( !db->insert(qbuf.front()) )
......@@ -194,6 +197,8 @@ void LogDB::flushBuffer()
qbuf.pop();
}
db->query("COMMIT;");
}
//--------------------------------------------------------------------------------------------
void LogDB::addLog( LogDB::Log* log, const string& txt )
......@@ -284,10 +289,11 @@ void LogDB::onActivate( ev::async& watcher, int revents )
}
uniset_rwmutex_rlock lk(wsocksMutex);
for( const auto& s: wsocks )
for( const auto& s : wsocks )
{
if( !s->isActive() )
s->start(loop);
s->set(loop);
}
}
// -----------------------------------------------------------------------------
......@@ -301,7 +307,7 @@ void LogDB::Log::set( ev::dynamic_loop& loop )
io.set(loop);
iocheck.set(loop);
iocheck.set<LogDB::Log, &LogDB::Log::check>(this);
iocheck.start(0,checkConnection_sec);
iocheck.start(0, checkConnection_sec);
}
// -----------------------------------------------------------------------------
void LogDB::Log::check( ev::timer& t, int revents )
......@@ -977,10 +983,12 @@ LogDB::LogWebSocket::LogWebSocket(Poco::Net::HTTPServerRequest* _req,
con = _log->signal_on_read().connect( sigc::mem_fun(*this, &LogWebSocket::add));
// т.к. создание websocket-а происходит в другом потоке
// то активация и привязка к loop происходит в функции start()
// то активация и привязка к loop происходит в функции set()
// вызываемой из eventloop
io.set<LogDB::LogWebSocket, &LogDB::LogWebSocket::event>(this);
ioping.set<LogDB::LogWebSocket, &LogDB::LogWebSocket::ping>(this);
iosend.set<LogDB::LogWebSocket, &LogDB::LogWebSocket::send>(this);
maxsize = maxsend * 10; // пока так
}
// -----------------------------------------------------------------------------
LogDB::LogWebSocket::~LogWebSocket()
......@@ -998,25 +1006,25 @@ LogDB::LogWebSocket::~LogWebSocket()
// -----------------------------------------------------------------------------
bool LogDB::LogWebSocket::isActive()
{
return io.is_active();
return iosend.is_active();
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::start( ev::dynamic_loop& loop )
void LogDB::LogWebSocket::set( ev::dynamic_loop& loop )
{
io.set(loop);
iosend.set(loop);
ioping.set(loop);
io.start();
ioping.start(ping_sec,ping_sec);
iosend.start(0, send_sec);
ioping.start(ping_sec, ping_sec);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::event( ev::io& watcher, int revents )
void LogDB::LogWebSocket::send( ev::timer& t, int revents )
{
if( EV_ERROR & revents )
return;
if( revents & EV_WRITE )
write(watcher);
for( size_t i = 0; !wbuf.empty() && i < maxsend && !cancelled; i++ )
write();
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::ping( ev::timer& t, int revents )
......@@ -1037,8 +1045,6 @@ void LogDB::LogWebSocket::ping( ev::timer& t, int revents )
if( ioping.is_active() )
ioping.stop();
io.set(ev::WRITE);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::add( LogDB::Log* log, const string& txt )
......@@ -1046,25 +1052,27 @@ void LogDB::LogWebSocket::add( LogDB::Log* log, const string& txt )
if( cancelled || txt.empty())
return;
if( wbuf.size() > maxsize )
{
dbwarn << req->clientAddress().toString() << " lost messages..." << endl;
return;
}
wbuf.emplace(new UTCPCore::Buffer(txt));
if( ioping.is_active() )
ioping.stop();
io.set(ev::WRITE);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::write( ev::io& w )
void LogDB::LogWebSocket::write()
{
UTCPCore::Buffer* msg = 0;
cerr << "write: " << wbuf.size()
<< endl;
if( wbuf.empty() )
{
io.set(EV_NONE);
if( !ioping.is_active() )
ioping.start(ping_sec,ping_sec);
ioping.start(ping_sec, ping_sec);
return;
}
......@@ -1114,15 +1122,13 @@ void LogDB::LogWebSocket::write( ev::io& w )
if( !wbuf.empty() )
{
io.set(EV_WRITE);
if( ioping.is_active() )
ioping.stop();
}
else
{
io.set(EV_NONE);
if( !ioping.is_active() )
ioping.start(ping_sec,ping_sec);
ioping.start(ping_sec, ping_sec);
}
return;
......@@ -1168,14 +1174,15 @@ void LogDB::LogWebSocket::term()
cancelled = true;
con.disconnect();
io.stop();
ioping.stop();
iosend.stop();
finish.notify_all();
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::waitCompletion()
{
std::unique_lock<std::mutex> lk(finishmut);
while( !cancelled )
finish.wait(lk);
}
......
......@@ -49,7 +49,10 @@ namespace uniset
- \ref sec_LogDB_Comm
- \ref sec_LogDB_Conf
- \ref sec_LogDB_DB
- \ref sec_LogDB_REST
- \ref sec_LogDB_WEBSOCK
- \ref sec_LogDB_DETAIL
\section sec_LogDB_Comm Общее описание работы LogDB
......@@ -70,6 +73,9 @@ namespace uniset
<logserver name="" ip=".." port=".." cmd=".."/>
</LogDB>
\section sec_LogDB_DB LogDB Работа с БД
Для оптимизации, запись в БД сделана не по каждому сообщению, а через промежуточнй буффер.
Т.е. только после того как в буфере скапливается \a qbufSize сообщений (строк) буфер скидывается в базу.
\section sec_LogDB_REST LogDB REST API
LogDB предоставляет возможность получения логов через REST API. Для этого запускается
......@@ -105,18 +111,20 @@ namespace uniset
\section sec_LogDB_DETAIL LogDB Технические детали
Вся релизация построена на "однопоточном" eventloop. В нём происходит,
чтение данных от логсерверов, посылка сообщений в websockets.
При этом обработка запросов REST API реалиуется отдельными потоками контролируемыми libpoco.
чтение данных от логсерверов, посылка сообщений в websockets, запись в БД.
При этом обработка запросов REST API реализуется отдельными потоками контролируемыми libpoco.
\todo conf: Отвязать конфигурирование от uniset (uniset_conf). Чтобы можно было просто указать xml-файл с настройками
\todo conf: может быть даже добавить поддержку конфигурирования в формате yaml.
\todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку, количество потоков для http и т.п.
\todo Добавить ротацию БД
\todo Сделать настройку, для формата даты и времени при выгрузке из БД (при формировании json).
\todo Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз).
\todo Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип.
\todo Пока не очень эффективная работа с датой и временем (заодно подумать всё-таки в чём хранить)
\todo db: Добавить ротацию БД (удаление старых записей и vacuum)
\todo db: Сделать настройку, для формата даты и времени при выгрузке из БД (при формировании json).
\todo rest: Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз).
\todo db: Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип.
\todo db: Пока не очень эффективная работа с датой и временем (заодно подумать всё-таки в чём хранить)
\todo WebSocket: доделать настройку всевозможных timeout-ов
\todo WebSocket: доделать проверку соединения
\todo WebSocket: сделать ограничение на максимальное количество соединений (websocket)
\todo WebSocket: сделать ограничение на максимальное количество соединений
*/
class LogDB:
public EventLoopServer
......@@ -179,7 +187,7 @@ namespace uniset
typedef std::queue<std::string> QueryBuffer;
QueryBuffer qbuf;
size_t qbufSize = { 200 }; // размер буфера сообщений.
size_t qbufSize = { 1000 }; // размер буфера сообщений.
ev::timer flushBufferTimer;
double tmFlushBuffer_sec = { 1.0 };
......@@ -241,6 +249,13 @@ namespace uniset
std::string httpHost = { "" };
int httpPort = { 0 };
/*! класс реализует работу с websocket через eventloop
* Из-за того, что поступление логов может быть достаточно быстрым
* чтобы не "завалить" браузер кучей сообщений,
* сделана посылка не по факту приёма сообщения, а раз в send_sec,
* не более maxsend сообщений.
* \todo websocket: может стоит объединять сообщения в одну посылку (пока считаю преждевременной оптимизацией)
*/
class LogWebSocket:
public Poco::Net::WebSocket
{
......@@ -255,9 +270,9 @@ namespace uniset
std::shared_ptr<DebugStream> dblog;
bool isActive();
void start( ev::dynamic_loop& loop );
void set( ev::dynamic_loop& loop );
void event( ev::io& watcher, int revents );
void send( ev::timer& t, int revents );
void ping( ev::timer& t, int revents );
void add( Log* log, const std::string& txt );
......@@ -268,9 +283,12 @@ namespace uniset
protected:
void write( ev::io& w );
void write();
ev::timer iosend;
double send_sec = { 0.5 };
size_t maxsend = { 200 };
ev::io io;
ev::timer ioping;
double ping_sec = { 3.0 };
......@@ -286,6 +304,7 @@ namespace uniset
// очередь данных на посылку..
std::queue<UTCPCore::Buffer*> wbuf;
size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
};
class LogWebSocketGuard
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment