Commit 90a046c4 authored by Pavel Vainerman's avatar Pavel Vainerman

(TSDB): added support 'setMaxSizeOfMessageQueue', added 'bufMaxSize' option, optimization

parent add7ab0f
......@@ -61,9 +61,14 @@ void BackendOpenTSDB::init( xmlNode* cnode )
tsdbPrefix = conf->getArg2Param("--" + prefix + "-prefix", it.getProp("prefix"), "");
tsdbTags = conf->getArg2Param("--" + prefix + "-tags", it.getProp("tags"), "");
reconnectTime = conf->getArgPInt("--" + prefix + "-reconnect-time", it.getProp("reconnectTime"), reconnectTime);
bufMaxSize = conf->getArgPInt("--" + prefix + "-buf-maxsize", it.getProp("bufMaxSize"), bufMaxSize);
bufSize = conf->getArgPInt("--" + prefix + "-buf-size", it.getProp("bufMaxSize"), bufSize);
bufSyncTime = conf->getArgPInt("--" + prefix + "-buf-sync-time", it.getProp("bufSyncTimeout"), bufSyncTime);
int sz = conf->getArgPInt("--" + prefix + "-uniset-object-size-message-queue", it.getProp("sizeOfMessageQueue"), 10000);
if( sz > 0 )
setMaxSizeOfMessageQueue(sz);
const string ff = conf->getArg2Param("--" + prefix + "-filter-field", it.getProp("filter_field"), "" );
const string fv = conf->getArg2Param("--" + prefix + "-filter-value", it.getProp("filter_value"), "" );
......@@ -149,7 +154,8 @@ void BackendOpenTSDB::help_print( int argc, const char* const* argv )
cout << "--prefix-tags 'TAG1=VAL1 TAG2=VAL2...' - OpenTSDB: tags for data" << endl;
cout << "--prefix-reconnect-time msec - Time for attempts to connect to DB. Default: 5 sec" << endl;
cout << endl;
cout << "--prefix-buf-size sz - Buffer before save to DB. Default: 100" << endl;
cout << "--prefix-buf-size sz - Buffer before save to DB. Default: 500" << endl;
cout << "--prefix-buf-maxsize sz - Maximum size for buffer (drop messages). Default: 5000" << endl;
cout << "--prefix-buf-sync-time msec - Time period for forced data writing to DB. Default: 5 sec" << endl;
cout << endl;
cout << "--prefix-heartbeat-id name - ID for heartbeat sensor." << endl;
......@@ -207,6 +213,13 @@ std::shared_ptr<BackendOpenTSDB> BackendOpenTSDB::init_opendtsdb( int argc,
return make_shared<BackendOpenTSDB>(ID, cnode, icID, ic, prefix);
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::callback() noexcept
{
// используем стандартную "низкоуровневую" реализацию
// т.к. она нас устраивает (обработка очереди сообщений и таймеров)
UniSetObject::callback();
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::askSensors( UniversalIO::UIOCommand cmd )
{
UObject_SK::askSensors(cmd);
......@@ -237,12 +250,18 @@ void BackendOpenTSDB::sensorInfo( const uniset::SensorMessage* sm )
if( it != tsdbParams.end() )
{
// если размер буфера стал максимальный
// скидываем сразу в БД
if( buf.size() >= bufSize && !flushBuffer() )
// если буфер заполнился, делаем попытку записи в БД
if( buf.size() >= bufSize )
{
mycrit << "buffer overflow. Lost data: sid=" << sm->id << " value=" << sm->value << endl;
return;
if ( !flushBuffer() )
{
// если размер буфера стал максимальный (теряем сообщения)
if( buf.size() >= bufMaxSize && !flushBuffer() )
{
mycrit << "buffer overflow. Lost data: sid=" << sm->id << " value=" << sm->value << endl;
return;
}
}
}
// put <metric> <timestamp>.msec <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
......@@ -396,6 +415,7 @@ std::string BackendOpenTSDB::getMonitInfo() const
inf << "Database: " << host << ":" << port
<< " ["
<< " reconnect=" << reconnectTime
<< " bufSyncTime=" << bufSyncTime
<< " bufSize=" << bufSize
<< " tsdbPrefix: '" << tsdbPrefix << "'"
<< " tsdbTags: '" << tsdbTags << "'"
......
......@@ -89,12 +89,14 @@ namespace uniset
Параметры буфера задаются аргументами командной строки или в конфигурационном файле.
Доступны следующие параметры:
- \b bufSize - размер буфера, при заполнении которого происходит посылка данных в БД
- \b bufMaxSize - максимальный размер буфера, при котором начинают откидываться новые сообщения (потеря сообщений)
- \b bufSyncTimeout - период сброса данных в БД
- \b reconnectTime - время на повторную попытку подключения к БД
- \b sizeOfMessageQueue - Размер очереди сообщений для обработки изменений по датчикам.
При большом количестве отслеживаемых датчиков, размер должен быть достаточным, чтобы не терять изменения.
\todo Нужна ли поддержка авторизации для TSDB (возможно придётся перейти на HTTP REST API)
\todo Доделать возможность задать политику при переполнении буфера (удалять последние или первые, сколько чистить)
\todo Может стоит отделить настройки: размер буфера и сколько за один запрос писать в БД
*/
class BackendOpenTSDB:
public UObject_SK
......@@ -131,6 +133,11 @@ namespace uniset
protected:
BackendOpenTSDB();
// переопределяем callback, чтобы оптимизировать
// обработку большого количества сообщений
// и убрать не нужную в данном процессе обработку (включая sleep_msec)
virtual void callback() noexcept override;
virtual void askSensors( UniversalIO::UIOCommand cmd ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void timerInfo( const uniset::TimerMessage* tm ) override;
......@@ -157,7 +164,8 @@ namespace uniset
std::unordered_map<uniset::ObjectId, ParamInfo> tsdbParams;
timeout_t bufSyncTime = { 5000 };
size_t bufSize = { 100 };
size_t bufSize = { 500 };
size_t bufMaxSize = { 5000 }; // drop messages
bool timerIsOn = { false };
timeout_t reconnectTime = { 5000 };
std::string lastError;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment