Commit 95dd3042 authored by Pavel Vainerman's avatar Pavel Vainerman

(Backend-OpenTSDB): first prototype

parent 6fdcbce1
...@@ -45,6 +45,8 @@ ...@@ -45,6 +45,8 @@
<logserver name="ses" ip="localhost" port="21107" cmd="-d level8 ALL" description="SES"/> <logserver name="ses" ip="localhost" port="21107" cmd="-d level8 ALL" description="SES"/>
</LogDB> </LogDB>
<BackendOpenTSDB name="BackendOpenTSDB" host="localhost" filter_field="tsdb" filter_value="1" tags="host=localhost uniset=1" prefix="uniset"/>
<settings> <settings>
<LostTestProc name="TestProc1" sensor_s="Input4_S"/> <LostTestProc name="TestProc1" sensor_s="Input4_S"/>
...@@ -320,8 +322,13 @@ ...@@ -320,8 +322,13 @@
<item id="201" iotype="DI" name="UNET_DATA2_S" textname="UNET: data2" ndata="unet"/> <item id="201" iotype="DI" name="UNET_DATA2_S" textname="UNET: data2" ndata="unet"/>
<item id="202" iotype="DI" name="UNET_DATA3_S" textname="UNET: data3" ndata="unet"/> <item id="202" iotype="DI" name="UNET_DATA3_S" textname="UNET: data3" ndata="unet"/>
<!-- test BackendOpenTSDB -->
<item id="210" iotype="AI" name="DATA1_S" textname="UNET: data1" tsdb="1"/>
<item id="211" iotype="AI" name="DATA2_S" textname="UNET: data2" tsdb="1"/>
<!-- IO TEST --> <!-- IO TEST -->
<!-- Датчики системы смазочного масла (сгенерировано из csv) alex: 05.07.2017 --> <!-- Датчики системы смазочного масла (сгенерировано из csv) alex: 05.07.2017 -->
<item id="301" name="01_01_S" iotype="DI" mtype="2" textname="ГДГ1: Аварийно-низкое давление смазочного масла" unet="gdg1" aps="301" mko_aps="1" depend1="D1_State_FS" depend_value="1" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x05" arm_mbreg="4526" arm_mbtype="rtu" io="1" card="1" subdev="0" channel="0" jardelay="8000"/> <item id="301" name="01_01_S" iotype="DI" mtype="2" textname="ГДГ1: Аварийно-низкое давление смазочного масла" unet="gdg1" aps="301" mko_aps="1" depend1="D1_State_FS" depend_value="1" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x05" arm_mbreg="4526" arm_mbtype="rtu" io="1" card="1" subdev="0" channel="0" jardelay="8000"/>
<item id="302" name="01_02_S" iotype="DI" mtype="2" textname="ГДГ2: Аварийно-низкое давление смазочного масла" unet="gdg2" aps="301" mko_aps="1" depend1="D2_State_FS" depend_value="1" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x05" arm_mbreg="4846" arm_mbtype="rtu" io="1" card="1" subdev="0" channel="0" jardelay="8000"/> <item id="302" name="01_02_S" iotype="DI" mtype="2" textname="ГДГ2: Аварийно-низкое давление смазочного масла" unet="gdg2" aps="301" mko_aps="1" depend1="D2_State_FS" depend_value="1" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x05" arm_mbreg="4846" arm_mbtype="rtu" io="1" card="1" subdev="0" channel="0" jardelay="8000"/>
<item id="303" name="01_04_AS" iotype="AI" textname="ГДГ1: Давление смазочного масла" unet="gdg1" analog_group="1" filtersize="300" iir_thr="2000" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x06" arm_mbreg="7086" arm_mbtype="rtu" io="1" card="3" subdev="2" channel="0" range="1" aref="2" cmin="0" cmax="300" rmin="820" rmax="4095" precision="1" noprecision="1"/> <item id="303" name="01_04_AS" iotype="AI" textname="ГДГ1: Давление смазочного масла" unet="gdg1" analog_group="1" filtersize="300" iir_thr="2000" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x06" arm_mbreg="7086" arm_mbtype="rtu" io="1" card="3" subdev="2" channel="0" range="1" aref="2" cmin="0" cmax="300" rmin="820" rmax="4095" precision="1" noprecision="1"/>
...@@ -5492,6 +5499,7 @@ ...@@ -5492,6 +5499,7 @@
<item id="6087" name="MBSlave10"/> <item id="6087" name="MBSlave10"/>
<item id="6088" name="TestGroup"/> <item id="6088" name="TestGroup"/>
<item id="6089" name="MQTTPublisher1"/> <item id="6089" name="MQTTPublisher1"/>
<item id="6090" name="BackendOpenTSDB"/>
<item id="6101" name="MBTCP1"/> <item id="6101" name="MBTCP1"/>
<item id="6102" name="MBTCP2"/> <item id="6102" name="MBTCP2"/>
<item id="6103" name="MBTCP3"/> <item id="6103" name="MBTCP3"/>
......
...@@ -179,6 +179,20 @@ fi ...@@ -179,6 +179,20 @@ fi
AM_CONDITIONAL(DISABLE_RRD, test ${buildrrd} = false) AM_CONDITIONAL(DISABLE_RRD, test ${buildrrd} = false)
#check opentsdb support
AC_MSG_CHECKING([opentsdb support])
buildtsdb=true
AC_ARG_ENABLE(opentsdb, AC_HELP_STRING([--disable-opentsdb], [disable OpenTSDB backend]),
[ if test $enableval = yes; then buildtsdb=true; else buildtsdb=false; fi],[ buildtsdb=true; ])
if test ${buildtsdb} = true; then
AC_MSG_RESULT([enabled])
else
AC_MSG_RESULT([disabled])
fi
AM_CONDITIONAL(DISABLE_OPENTSDB, test ${buildtsdb} = false)
#check io support #check io support
AC_MSG_CHECKING([io support]) AC_MSG_CHECKING([io support])
buildio=true buildio=true
...@@ -557,6 +571,8 @@ AC_CONFIG_FILES([Makefile ...@@ -557,6 +571,8 @@ AC_CONFIG_FILES([Makefile
extensions/RRDServer/libUniSet2RRDServer.pc extensions/RRDServer/libUniSet2RRDServer.pc
extensions/MQTTPublisher/Makefile extensions/MQTTPublisher/Makefile
extensions/MQTTPublisher/libUniSet2MQTTPublisher.pc extensions/MQTTPublisher/libUniSet2MQTTPublisher.pc
extensions/Backend-OpenTSDB/Makefile
extensions/Backend-OpenTSDB/libUniSet2BackendOpenTSDB.pc
extensions/tests/Makefile extensions/tests/Makefile
extensions/tests/SMemoryTest/Makefile extensions/tests/SMemoryTest/Makefile
extensions/tests/MBSlaveTest/Makefile extensions/tests/MBSlaveTest/Makefile
......
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include "Exceptions.h"
#include <Poco/Net/NetException.h>
#include "BackendOpenTSDB.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset;
using namespace uniset::extensions;
// -----------------------------------------------------------------------------
BackendOpenTSDB::BackendOpenTSDB( uniset::ObjectId objId, xmlNode* cnode,
uniset::ObjectId shmId, const std::shared_ptr<SharedMemory>& ic,
const string& prefix ):
UObject_SK(objId, cnode, string(prefix + "-")),
prefix(prefix)
{
auto conf = uniset_conf();
if( ic )
ic->logAgregator()->add(logAgregator());
shm = make_shared<SMInterface>(shmId, ui, objId, ic);
init(cnode);
if( smTestID == DefaultObjectId && !tsdbParams.empty() )
{
// берём первый датчик из списка
smTestID = tsdbParams.begin()->first;
}
}
// -----------------------------------------------------------------------------
BackendOpenTSDB::~BackendOpenTSDB()
{
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::init( xmlNode* cnode )
{
UniXML::iterator it(cnode);
auto conf = uniset_conf();
host = conf->getArg2Param("--" + prefix+ "-host", it.getProp("host"), "localhost");
port = conf->getArgPInt("--" + prefix+ "-port", it.getProp("port"), port);
tsdbPrefix = conf->getArg2Param("--" + prefix+ "-prefix", it.getProp("prefix"), "");
tsdbTags = conf->getArg2Param("--" + prefix+ "-tags", it.getProp("tags"), "");
reconnectTime = conf->getArgPInt("--" + prefix+ "-reconnect-time", it.getProp("reconnectTime"), reconnectTime);
bufSize = conf->getArgPInt("--" + prefix+ "-buf-size", it.getProp("bufMaxSize"),bufSize);
bufSyncTime = conf->getArgPInt("--" + prefix+ "-buf-sync-time", it.getProp("bufSyncTimeout"),bufSyncTime);
const string ff = conf->getArg2Param("--" + prefix+ "-filter-field", it.getProp("filter_field"), "" );
const string fv = conf->getArg2Param("--" + prefix+ "-filter-value", it.getProp("filter_value"), "" );
myinfo << myname << "(init): opentsdb host=" << host << ":" << port
<< " " << ff << "='" << fv << "'"
<< " prefix='" << tsdbPrefix << "'"
<< " tags='" << tsdbTags << "'"
<< endl;
// try
{
auto conf = uniset_conf();
xmlNode* snode = conf->getXMLSensorsSection();
if( !snode )
{
ostringstream err;
err << myname << "(init): Not found section <sensors>";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
UniXML::iterator it1(snode);
if( !it1.goChildren() )
{
ostringstream err;
err << myname << "(init): section <sensors> empty?!";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
for(; it1.getCurrent(); it1.goNext() )
{
if( !uniset::check_filter(it1, ff, fv) )
continue;
std::string pname = it1.getProp2("tsdb_name", it1.getProp("name"));
if( pname.empty() )
{
ostringstream err;
err << myname << "(init): Unknown name sensor id = '" << it1.getProp("id") << "'";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
ObjectId sid = conf->getSensorID( it1.getProp("name") );
if( sid == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): Unknown SensorID for '" << it1.getProp("name") << "'";
mycrit << err.str();
throw SystemError(err.str());
}
tsdbParams.emplace( sid, ParamInfo(pname, it1.getProp("tsdb_tags")) );
}
if( tsdbParams.empty() )
{
ostringstream err;
err << myname << "(init): Not found items for send to OpenTSDB...";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
}
}
//--------------------------------------------------------------------------------
void BackendOpenTSDB::help_print( int argc, const char* const* argv )
{
cout << " Default prefix='opentsdb'" << endl;
cout << "--prefix-name - ID. Default: BackendOpenTSDB." << endl;
cout << "--prefix-confnode - configuration section name. Default: <NAME name='NAME'...> " << endl;
cout << endl;
cout << " OpenTSDB: " << endl;
cout << "--prefix-host ip - OpenTSDB: host. Default: localhost" << endl;
cout << "--prefix-port num - OpenTSDB: port. Default: 4242" << endl;
cout << "--prefix-prefix name - OpenTSDB: prefix for data" << endl;
cout << "--prefix-tags 'TAG1=VAL1 TAG2=VAL2...' - OpenTSDB: tags for data" << endl;
cout << "--prefix-reconnect-time msec - Time for attempts to connect to DB. Default: 5 sec" << endl;
cout << endl;
cout << "--prefix-buf-size sz - Buffer before save to DB. Default: 100" << endl;
cout << "--prefix-buf-sync-time msec - Time period for forced data writing to DB. Default: 5 sec" << endl;
cout << endl;
cout << "--prefix-heartbeat-id name - ID for heartbeat sensor." << endl;
cout << "--prefix-heartbeat-max val - max value for heartbeat sensor." << endl;
cout << endl;
cout << " Logs: " << endl;
cout << "--prefix-log-... - log control" << endl;
cout << " add-levels ... " << endl;
cout << " del-levels ... " << endl;
cout << " set-levels ... " << endl;
cout << " logfile filanme " << endl;
cout << " no-debug " << endl;
cout << " Base oprtions: " << endl;
// cout << UObject_SK::help() << endl;
cout << " LogServer: " << endl;
cout << "--prefix-run-logserver - run logserver. Default: localhost:id" << endl;
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
cout << LogServer::help_print("prefix-logserver") << endl;
}
// -----------------------------------------------------------------------------
std::shared_ptr<BackendOpenTSDB> BackendOpenTSDB::init_opendtsdb( int argc,
const char* const* argv,
uniset::ObjectId icID, const std::shared_ptr<SharedMemory>& ic,
const std::string& prefix )
{
auto conf = uniset_conf();
string name = conf->getArgParam("--" + prefix + "-name", "BackendOpenTSDB");
if( name.empty() )
{
dcrit << "(BackendOpenTSDB): Unknown name. Usage: --" << prefix << "-name" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == uniset::DefaultObjectId )
{
dcrit << "(BackendOpenTSDB): Not found ID for '" << name
<< " in '" << conf->getObjectsSection() << "' section" << endl;
return 0;
}
string confname = conf->getArgParam("--" + prefix + "-confnode", name);
xmlNode* cnode = conf->getNode(confname);
if( !cnode )
{
dcrit << "(BackendOpenTSDB): " << name << "(init): Not found <" + confname + ">" << endl;
return 0;
}
dinfo << "(BackendOpenTSDB): name = " << name << "(" << ID << ")" << endl;
return make_shared<BackendOpenTSDB>(ID, cnode, icID, ic, prefix);
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::askSensors( UniversalIO::UIOCommand cmd )
{
UObject_SK::askSensors(cmd);
// прежде чем заказывать датчики, надо убедиться что SM доступна
if( !waitSM(smReadyTimeout) )
{
uterminate();
return;
}
for( const auto& s : tsdbParams )
{
try
{
shm->askSensor(s.first, cmd);
}
catch( const std::exception& ex )
{
mycrit << myname << "(askSensors): " << ex.what() << endl;
}
}
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::sensorInfo( const uniset::SensorMessage* sm )
{
auto it = tsdbParams.find(sm->id);
if( it != tsdbParams.end() )
{
// если размер буфера стал максимальный
// скидываем сразу в БД
if( buf.size() >= bufSize && !flushBuffer() )
{
mycrit << "buffer overflow. Lost data: sid=" << sm->id << " value=" << sm->value << endl;
return;
}
// put <metric> <timestamp>.msec <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
ostringstream s;
s << "put ";
if( !tsdbPrefix.empty() )
s << tsdbPrefix << ".";
s << it->second.name
<< " " << setw(10) << setfill('0') << sm->sm_tv.tv_sec
<< setw(3) << setfill('0') << std::round( sm->sm_tv.tv_nsec / 10e6 )
<< " "
<< sm->value;
if( !tsdbTags.empty() )
s << " " << tsdbTags;
s << " "
<< it->second.tags
<< endl;
buf.push_back(s.str());
myinfo << myname << "(sensorInfo): " << s.str() << endl;
if( !timerIsOn )
{
timerIsOn = true;
askTimer(tmFlushBuffer, bufSyncTime, 1);
}
}
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::timerInfo( const uniset::TimerMessage* tm )
{
if( tm->id == tmFlushBuffer )
flushBuffer();
else if( tm->id == tmReconnect )
{
if( reconnect() )
askTimer(tmReconnect, 0);
}
}
// -----------------------------------------------------------------------------
void BackendOpenTSDB::sysCommand(const SystemMessage* sm)
{
if( sm->command == SystemMessage::StartUp )
{
if( !reconnect() )
askTimer(tmReconnect, reconnectTime);
}
}
// -----------------------------------------------------------------------------
bool BackendOpenTSDB::flushBuffer()
{
if( buf.empty() )
{
if( timerIsOn )
{
askTimer(tmFlushBuffer, 0);
timerIsOn = false;
}
return true;
}
if( !tcp || !tcp->isConnected() )
return false;
ostringstream q;
for( const auto& s : buf )
q << s;
try
{
const string s(q.str());
tcp->sendBytes(s.data(), s.size());
buf.clear();
askTimer(tmFlushBuffer, 0);
timerIsOn = false;
return true;
}
catch( std::exception& ex )
{
mywarn << "(flushBuffer): " << ex.what() << endl;
}
return false;
}
//------------------------------------------------------------------------------
bool BackendOpenTSDB::reconnect()
{
if( tcp )
{
tcp->forceDisconnect();
tcp = nullptr;
}
try
{
myinfo << myname << "(reconnect): " << host << ":" << port << endl;
tcp = make_shared<UTCPStream>();
tcp->create(host, port, 500);
// tcp->setReceiveTimeout(UniSetTimer::millisecToPoco(replyTimeOut_ms));
tcp->setKeepAlive(true); // tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true);
return true;
}
catch( Poco::TimeoutException& ex)
{
mycrit << myname << "(connect): " << host << ":" << port << " timeout exception" << endl;
}
catch( Poco::Net::NetException& ex)
{
mycrit << myname << "(connect): " << host << ":" << port << " error: " << ex.displayText() << endl;
}
catch( const std::exception& e )
{
mycrit << myname << "(connect): " << host << ":" << port << " error: " << e.what() << endl;
}
catch( ... )
{
mycrit << myname << "(connect): " << host << ":" << port << " unknown exception..." << endl;
}
tcp = nullptr;
return false;
}
//------------------------------------------------------------------------------
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -----------------------------------------------------------------------------
#ifndef _BackendOpenTSDB_H_
#define _BackendOpenTSDB_H_
// -----------------------------------------------------------------------------
#include <deque>
#include <memory>
#include <unordered_map>
#include <chrono>
#include "UObject_SK.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "extensions/Extensions.h"
#include "UTCPStream.h"
// --------------------------------------------------------------------------
namespace uniset
{
// -----------------------------------------------------------------------------
/*!
\page page_BackendOpenTSDB Реализация шлюза к БД поддерживающей интерфейс OpenTSDB
- \ref sec_OpenTSDB_Comm
- \ref sec_OpenTSDB_Conf
- \ref sec_OpenTSDB_Name
- \ref sec_OpenTSDB_Queue
\section sec_OpenTSDB_Comm Общее описание шлюза к OpenTSDB
"OpenTSDB" - time series database. Специальная БД оптимизированная
для хранения временных рядов (по простому: данных с временными метками).
Класс реализует пересылку указанных (настроенных) дачиков в БД поддерживающую
интерфейс совместимый с OpenTSDB. В текущей реализации используется посылка
строк в формате Telnet
\code
put <metric> <timestamp>.msec <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
\endcode
См. http://opentsdb.net/docs/build/html/user_guide/writing/index.html
\section sec_OpenTSDB_Conf Настройка BackendOpenTSDB
Пример секции конфигурации:
\code
<BackendOpenTSDB name="BackendOpenTSDB1" host="localhost" port="4242"
filter_field="tsdb" filter_value="1"
prefix="uniset"
tags="TAG1=VAL1 TAG2=VAL2 ..."/>
\endcode
Где:
- \b host - host для связи с TSDB
- \b port - port для связи с TSDB. Default: 4242
- \b filter_field - поле у датчика, определяющее, что его нужно сохранять в БД
- \b filter_value - значение \b filter_field, определяющее, что датчик нужно сохранять в БД
- \b prefix - необязательный префикс дописываемый ко всем параметрам (prefix.parameter)
- \b tags - теги которые будут записаны для каждой записи, перечисляемые через пробел.
При этом в секции <sensors> у датчиков можно задать дополнительные теги.
Помимо этого можно переопределить название метрики (tsdb_name="...").
\code
<sensors>
...
<item id="54" iotype="AI" name="AI54_S" textname="AI sensor 54" tsdb="1" tsdb_tags=""/>
<item id="55" iotype="AI" name="AI55_S" textname="AI sensor 55" tsdb="1" tsdb_tags="" tsdb_name="MySpecName"/>
...
</sensors>
\endcode
\section sec_OpenTSDB_Name Имя значения сохраняемое в БД.
По умолчанию в качестве имени берётся name, но при необходимости можно указать определиться специальное имя.
Для этого достаточно задать поле tsdb_name="...".
\section sec_OpenTSDB_Queue Буффер на запись в БД
В данной реализации встроен специальный буффер, который накапливает данные и скидывает их
пачкой в БД. Так же он является защитным механизом на случай если БД временно недоступна.
Параметры буффера задаются аргументами командной строки или в конфигурационном файле.
Доступны следующие параметры:
- \b bufSize - размер буфера, при заполнении которого происходит посылка данных в БД
- \b bufSyncTimeout - период сброса данных в БД
- \b reconnectTime - время на повторную попытку подключения к БД
\todo Нужна ли поддержка авторизации для TSDB (возможно придётся перейти на HTTP REST API)
\todo Доделать возможность задать политику при переволнении буфера (удалять последние или первые, сколько чистить)
\todo Может стоит отделить настройки: размер буфера и сколько за один запрос писать в БД
*/
class BackendOpenTSDB:
public UObject_SK
{
public:
BackendOpenTSDB( uniset::ObjectId objId, xmlNode* cnode, uniset::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "opentsdb" );
virtual ~BackendOpenTSDB();
/*! глобальная функция для инициализации объекта */
static std::shared_ptr<BackendOpenTSDB> init_opendtsdb( int argc, const char* const* argv,
uniset::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "opentsdb" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* const* argv );
inline std::shared_ptr<LogAgregator> getLogAggregator()
{
return loga;
}
inline std::shared_ptr<DebugStream> log()
{
return mylog;
}
enum Timers
{
tmFlushBuffer,
tmReconnect,
tmLastNumberOfTimer
};
protected:
BackendOpenTSDB();
virtual void askSensors( UniversalIO::UIOCommand cmd ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void timerInfo( const uniset::TimerMessage* tm ) override;
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
void init( xmlNode* cnode );
bool flushBuffer();
bool reconnect();
std::shared_ptr<SMInterface> shm;
struct ParamInfo
{
const std::string name;
const std::string tags;
ParamInfo( const std::string& _name, const std::string& _tags ):
name(_name), tags(_tags) {}
};
std::string tsdbPrefix;
std::string tsdbTags; // теги в виде строки TAG=VAL TAG2=VAL2 ...
std::unordered_map<uniset::ObjectId, ParamInfo> tsdbParams;
timeout_t bufSyncTime = { 5000 };
size_t bufSize = { 100 };
bool timerIsOn = { false };
timeout_t reconnectTime = { 5000 };
// буфер mutex-ом можно не защищать
// т.к. к нему идёт обращение только из основного потока обработки
// (sensorInfo, timerInfo)
std::deque<std::string> buf;
// работа с OpenTSDB
std::shared_ptr<UTCPStream> tcp;
std::string host = { "localhost" };
int port = { 4242 };
private:
std::string prefix;
};
// --------------------------------------------------------------------------
} // end of namespace uniset
// -----------------------------------------------------------------------------
#endif // _BackendOpenTSDB_H_
// -----------------------------------------------------------------------------
if DISABLE_OPENTSDB
else
bin_PROGRAMS = @PACKAGE@-backend-opentsdb
TSDB_VER=@LIBVER@
lib_LTLIBRARIES = libUniSet2BackendOpenTSDB.la
libUniSet2BackendOpenTSDB_la_LDFLAGS = -version-info $(TSDB_VER)
libUniSet2BackendOpenTSDB_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
libUniSet2BackendOpenTSDB_la_CXXFLAGS = -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/SharedMemory \
$(SIGC_CFLAGS)
libUniSet2BackendOpenTSDB_la_SOURCES = BackendOpenTSDB.cc
@PACKAGE@_backend_opentsdb_SOURCES = main.cc
@PACKAGE@_backend_opentsdb_LDADD = libUniSet2BackendOpenTSDB.la $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
@PACKAGE@_backend_opentsdb_CXXFLAGS = -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/SharedMemory \
$(SIGC_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libUniSet2BackendOpenTSDB.pc
include $(top_builddir)/include.mk
endif
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: libUniSet2BackendOpenTSDB
Description: Support library for UniSet2BackendOpenTSDB
Requires: libUniSet2Extensions
Version: @VERSION@
Libs: -L${libdir} -lUniSet2BackendOpenTSDB
Cflags: -I${includedir}/@PACKAGE@/extensions/backend-opentsdb
#include <sstream>
#include "BackendOpenTSDB.h"
#include "Configuration.h"
#include "Debug.h"
#include "UniSetActivator.h"
#include "Extensions.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset;
using namespace uniset::extensions;
// -----------------------------------------------------------------------------
int main( int argc, const char** argv )
{
// std::ios::sync_with_stdio(false);
if( argc > 1 && (!strcmp(argv[1], "--help") || !strcmp(argv[1], "-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: autodetect" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << endl;
BackendOpenTSDB::help_print(argc, argv);
return 0;
}
try
{
auto conf = uniset_init( argc, argv );
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
auto db = BackendOpenTSDB::init_opendtsdb(argc, argv, shmID);
if( !db )
{
cerr << "(opendtsdb): init failed..." << endl;
return 1;
}
auto act = UniSetActivator::Instance();
act->add(db);
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
act->run(false);
return 0;
}
catch( uniset::Exception& ex )
{
cerr << "(opendtsdb): " << ex << std::endl;
}
catch(...)
{
cerr << "(opendtsdb): catch ..." << std::endl;
}
return 1;
}
#!/bin/sh
uniset2-start.sh -f ./uniset2-backend-opentsdb --confile test.xml \
--opentsdb-name BackendOpenTSDB \
--opentsdb-log-add-levels any $*
../../Utilities/scripts/uniset2-stop.sh
\ No newline at end of file
../../conf/test.xml
\ No newline at end of file
../../Utilities/scripts/uniset2-functions.sh
\ No newline at end of file
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
...@@ -6,7 +6,9 @@ if HAVE_EXTENTIONS ...@@ -6,7 +6,9 @@ if HAVE_EXTENTIONS
SUBDIRS = lib include SharedMemory SharedMemory/tests IOControl IOControl/tests LogicProcessor LogicProcessor/tests \ SUBDIRS = lib include SharedMemory SharedMemory/tests IOControl IOControl/tests LogicProcessor LogicProcessor/tests \
ModbusMaster ModbusSlave SMViewer UniNetwork UNetUDP UNetUDP/tests \ ModbusMaster ModbusSlave SMViewer UniNetwork UNetUDP UNetUDP/tests \
DBServer-MySQL DBServer-SQLite DBServer-PostgreSQL MQTTPublisher \ DBServer-MySQL DBServer-SQLite DBServer-PostgreSQL MQTTPublisher \
RRDServer tests ModbusMaster/tests ModbusSlave/tests LogDB LogDB/tests RRDServer tests ModbusMaster/tests ModbusSlave/tests LogDB LogDB/tests \
Backend-OpenTSDB
pkgconfigdir = $(libdir)/pkgconfig pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libUniSet2Extensions.pc pkgconfig_DATA = libUniSet2Extensions.pc
......
...@@ -172,6 +172,11 @@ extensions/MQTTPublisher/main.cc ...@@ -172,6 +172,11 @@ extensions/MQTTPublisher/main.cc
extensions/MQTTPublisher/Makefile.am extensions/MQTTPublisher/Makefile.am
extensions/MQTTPublisher/MQTTPublisher.cc extensions/MQTTPublisher/MQTTPublisher.cc
extensions/MQTTPublisher/MQTTPublisher.h extensions/MQTTPublisher/MQTTPublisher.h
extensions/Backend-OpenTSDB/libUniSet2BackendOpenTSDB.pc.in
extensions/Backend-OpenTSDB/main.cc
extensions/Backend-OpenTSDB/Makefile.am
extensions/Backend-OpenTSDB/BackendOpenTSDB.cc
extensions/Backend-OpenTSDB/BackendOpenTSDB.h
extensions/SharedMemory/libUniSet2SharedMemory.pc.in extensions/SharedMemory/libUniSet2SharedMemory.pc.in
extensions/SharedMemory/Makefile.am extensions/SharedMemory/Makefile.am
extensions/SharedMemory/SharedMemory.cc extensions/SharedMemory/SharedMemory.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment