Commit 008d58dd authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): сделал механизм удаления старых записей (защиты от переполнения).

parent bd42e765
......@@ -202,7 +202,10 @@ bool SQLiteInterface::wait( sqlite3_stmt* stmt, int result )
const string SQLiteInterface::error()
{
if( db )
lastE = sqlite3_errmsg(db);
{
int errcode = sqlite3_errcode(db);
lastE = ( errcode == SQLITE_OK ) ? "" : sqlite3_errmsg(db);
}
return lastE;
}
......@@ -212,6 +215,11 @@ const string SQLiteInterface::lastQuery()
return lastQ;
}
// -----------------------------------------------------------------------------------------
bool SQLiteInterface::lastQueryOK() const
{
return queryok;
}
// -----------------------------------------------------------------------------------------
double SQLiteInterface::insert_id()
{
if( !db )
......
......@@ -113,6 +113,7 @@ namespace uniset
virtual DBResult query( const std::string& q ) override;
virtual const std::string lastQuery() override;
bool lastQueryOK() const;
virtual bool insert( const std::string& q ) override;
virtual double insert_id() override;
......
......@@ -20,6 +20,7 @@
// --------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include <unistd.h>
#include "unisetstd.h"
#include <Poco/Net/NetException.h>
......@@ -57,6 +58,16 @@ LogDB::LogDB( const string& name , const string& prefix ):
UniXML::iterator it(cnode);
qbufSize = conf->getArgPInt("--" + prefix + "buffer-size", it.getProp("bufferSize"), qbufSize);
maxdbRecords = conf->getArgPInt("--" + prefix + "max-records", it.getProp("maxRecords"), qbufSize);
std::string s_overflow = conf->getArg2Param("--" + prefix + "overflow-factor", it.getProp("overflowFactor"), "1.3");
float ovf = atof(s_overflow.c_str());
numOverflow = lroundf( (float)maxdbRecords*ovf );
if( numOverflow == 0 )
numOverflow = maxdbRecords;
dbinfo << myname << "(init) maxdbRecords=" << maxdbRecords << " numOverflow=" << numOverflow << endl;
flushBufferTimer.set<LogDB, &LogDB::onCheckBuffer>(this);
wsactivate.set<LogDB, &LogDB::onActivate>(this);
......@@ -122,7 +133,7 @@ LogDB::LogDB( const string& name , const string& prefix ):
}
std::string dbfile = conf->getArgParam("--" + prefix + "dbfile", it.getProp("dbfile"));
const std::string dbfile = conf->getArgParam("--" + prefix + "dbfile", it.getProp("dbfile"));
if( dbfile.empty() )
{
......@@ -183,8 +194,10 @@ LogDB::~LogDB()
//--------------------------------------------------------------------------------------------
void LogDB::flushBuffer()
{
// без BEGIN и COMMIT вставка большого количества данных будет тормозить!
if( qbuf.empty() || !db->isConnection() )
return;
// без BEGIN и COMMIT вставка большого количества данных будет тормозить!
db->query("BEGIN;");
while( !qbuf.empty() )
......@@ -199,6 +212,44 @@ void LogDB::flushBuffer()
}
db->query("COMMIT;");
if( !db->error().empty() )
{
dbcrit << myname << "(flushBuffer): error: " << db->error() << endl;
}
// вызываем каждый раз, для отслеживания переполнения..
rotateDB();
}
//--------------------------------------------------------------------------------------------
void LogDB::rotateDB()
{
// ротация отключена
if( maxdbRecords == 0 )
return;
size_t num = getCountOfRecords();
if( num <= numOverflow )
return;
dblog2 << myname << "(rotateDB): num=" << num << " > " << numOverflow << endl;
size_t firstOldID = getFirstOfOldRecord(numOverflow);
DBResult ret = db->query("DELETE FROM logs WHERE id <= " + std::to_string(firstOldID) + ";");
if( !db->lastQueryOK() )
{
dbwarn << myname << "(rotateDB): delete error: " << db->error() << endl;
}
ret = db->query("VACUUM;");
if( !db->lastQueryOK() )
{
dbwarn << myname << "(rotateDB): vacuum error: " << db->error() << endl;
}
// dblog3 << myname << "(rotateDB): after rotate: " << getCountOfRecords() << " records" << endl;
}
//--------------------------------------------------------------------------------------------
void LogDB::addLog( LogDB::Log* log, const string& txt )
......@@ -215,7 +266,35 @@ void LogDB::addLog( LogDB::Log* log, const string& txt )
qbuf.emplace(q.str());
}
//--------------------------------------------------------------------------------------------
size_t LogDB::getCountOfRecords( const std::string& logname )
{
ostringstream q;
q << "SELECT count(*) FROM logs";
if( !logname.empty() )
q << " WHERE name='" << logname << "'";
DBResult ret = db->query(q.str());
if( !ret )
return 0;
return (size_t) DBResult::as_int(ret.begin(),0);
}
//--------------------------------------------------------------------------------------------
size_t LogDB::getFirstOfOldRecord( size_t maxnum )
{
ostringstream q;
q << "SELECT id FROM logs order by id DESC limit " << maxnum << ",1";
DBResult ret = db->query(q.str());
if( !ret )
return 0;
return (size_t) DBResult::as_int(ret.begin(),0);
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<LogDB> LogDB::init_logdb( int argc, const char* const* argv, const std::string& prefix )
{
......@@ -235,8 +314,10 @@ std::shared_ptr<LogDB> LogDB::init_logdb( int argc, const char* const* argv, con
void LogDB::help_print()
{
cout << "Default: prefix='logdb'" << endl;
cout << "--prefix-name name - Имя. Для поиска настроечной секции в configure.xml" << endl;
cout << "--prefix-buffer-size sz - Размер буфера (до скидывания в БД)." << endl;
cout << "--prefix-name name - Имя. Для поиска настроечной секции в configure.xml" << endl;
cout << "--prefix-buffer-size sz - Размер буфера (до скидывания в БД)." << endl;
cout << "--prefix-max-records sz - Максимальное количество записей в БД. При превышении, старые удаляются. 0 - не удалять" << endl;
cout << "--prefix-overflow-factor float - Коэффициент переполнения, после которого запускается удаление старых записей. По умолчанию: 1.3" << endl;
}
// -----------------------------------------------------------------------------
void LogDB::run( bool async )
......@@ -800,23 +881,9 @@ Poco::JSON::Object::Ptr LogDB::httpGetCount( const Poco::URI::QueryParameters& p
throw uniset::SystemError(err.str());
}
ostringstream q;
q << "SELECT count(*) FROM logs WHERE name='" << logname << "'";
DBResult ret = db->query(q.str());
if( !ret )
{
jdata->set("name", logname);
jdata->set("count", 0);
return jdata;
}
auto it = ret.begin();
size_t count = getCountOfRecords(logname);
jdata->set("name", logname);
jdata->set("count", it.as_int(0));
jdata->set("count",count);
return jdata;
}
// -----------------------------------------------------------------------------
......
......@@ -76,6 +76,10 @@ namespace uniset
\section sec_LogDB_DB LogDB Работа с БД
Для оптимизации, запись в БД сделана не по каждому сообщению, а через промежуточнй буффер.
Т.е. только после того как в буфере скапливается \a qbufSize сообщений (строк) буфер скидывается в базу.
Помимо этого, встроен механизм "ротации БД". Если задан параметр maxRecords (--prefix-max-records),
то в БД будет поддерживаться ограниченное количество записей. При этом введён "гистерезис",
т.е. фактически удаление старых записей начинается при переполнении БД определяемом коэффициентом
переполнения overflowFactor (--prefix-overflow-factor). По умолчанию 1.3.
\section sec_LogDB_REST LogDB REST API
LogDB предоставляет возможность получения логов через REST API. Для этого запускается
......@@ -117,7 +121,6 @@ namespace uniset
\todo conf: Отвязать конфигурирование от uniset (uniset_conf). Чтобы можно было просто указать xml-файл с настройками
\todo conf: может быть даже добавить поддержку конфигурирования в формате yaml.
\todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку, количество потоков для http и т.п.
\todo db: Добавить ротацию БД (удаление старых записей и vacuum)
\todo db: Сделать настройку, для формата даты и времени при выгрузке из БД (при формировании json).
\todo rest: Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз).
\todo db: Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип.
......@@ -166,6 +169,9 @@ namespace uniset
void onActivate( ev::async& watcher, int revents ) ;
void addLog( Log* log, const std::string& txt );
size_t getCountOfRecords( const std::string& logname="" );
size_t getFirstOfOldRecord( size_t maxnum );
#ifndef DISABLE_REST_API
Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
Poco::JSON::Object::Ptr httpGetRequest( const std::string& cmd, const Poco::URI::QueryParameters& p );
......@@ -192,6 +198,10 @@ namespace uniset
ev::timer flushBufferTimer;
double tmFlushBuffer_sec = { 1.0 };
void flushBuffer();
void rotateDB();
size_t maxdbRecords = { 200*1000 };
size_t numOverflow = { 0 }; // вычисляется из параметра "overflow factor"(float)
ev::async wsactivate; // активация LogWebSocket-ов
......
......@@ -8,5 +8,6 @@ ulimit -Sc 1000000
--logdb-dbfile ./test.db \
--logdb-buffer-size 5 \
--logdb-httpserver-port 8888 \
--logdb-max-records 20000 \
$*
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment