Commit 8001fb12 authored by Pavel Vainerman's avatar Pavel Vainerman

backported to p8 as 2.7-alt11.M80P.12 (with rpmbph script)

parents 7bda9575 33316397
......@@ -248,6 +248,13 @@
\param showLinkName - TRUE - выводить SensorName, FALSE - не выводить
*/
std::string strval( uniset::ObjectId id, bool showLinkName=true ) const;
/*! Вывод в строку названия датчика-сообщения: xxx(SensorName)
\param id - идентификатор датчика
\param showLinkName - TRUE - выводить SensorName, FALSE - не выводить
*/
std::string msgstr( uniset::ObjectId id, bool showLinkName=true ) const;
/*! Вывод состояния внутренних переменных */
inline std::string dumpVars(){ return vmon.pretty_str(); }
......@@ -1561,6 +1568,20 @@ std::string <xsl:value-of select="$CLASSNAME"/>_SK::dumpIO()
return s.str();
}
// ----------------------------------------------------------------------------
std::string <xsl:value-of select="$CLASSNAME"/>_SK::msgstr( uniset::ObjectId id, bool showLinkName ) const
{
ostringstream s;
<xsl:for-each select="//msgmap/item">
if( id == <xsl:value-of select="@name"/> )
{
s &lt;&lt; "<xsl:value-of select="@name"/>";
if( showLinkName ) s &lt;&lt; "(" &lt;&lt; ORepHelpers::getShortName( uniset_conf()->oind->getMapName(<xsl:value-of select="@name"/>)) &lt;&lt; ")";
return s.str();
}
</xsl:for-each>
return "";
}
// ----------------------------------------------------------------------------
std::string <xsl:value-of select="$CLASSNAME"/>_SK::str( uniset::ObjectId id, bool showLinkName ) const
{
ostringstream s;
......
......@@ -14,13 +14,19 @@
%def_disable netdata
%def_enable api
%def_enable logdb
%def_enable opentsdb
%ifarch %ix86
%def_enable com485f
%else
%def_disable com485f
%endif
%define oname uniset2
Name: libuniset2
Version: 2.7
Release: alt6.M80P.7
Release: alt11.M80P.12
Summary: UniSet - library for building distributed industrial control systems
License: LGPL
......@@ -48,7 +54,6 @@ BuildRequires: libcomedi-devel
%endif
%if_enabled mysql
# Using old package name instead of libmysqlclient-devel it absent in branch 5.0 for yauza
BuildRequires: libmysqlclient-devel
%endif
......@@ -225,6 +230,25 @@ Database (sqlite) for logs for %name
%endif
%endif
%if_enabled opentsdb
%package extension-opentsdb
Group: Development/C++
Summary: backend for OpenTSDB
Requires: %name-extension-common = %version-%release
%description extension-opentsdb
Backend for OpenTSDB
%package extension-opentsdb-devel
Group: Development/Databases
Summary: Libraries needed to develop for uniset OpenTSDB backend
Requires: %name-extension-common-devel = %version-%release
%description extension-opentsdb-devel
Libraries needed to develop for backend for OpenTSDB
%endif
%if_enabled pgsql
%package extension-pgsql
Group: Development/Databases
......@@ -319,23 +343,18 @@ Libraries needed to develop for uniset MQTT extension
%build
%autoreconf
%configure %{subst_enable docs} %{subst_enable mysql} %{subst_enable sqlite} %{subst_enable pgsql} %{subst_enable python} %{subst_enable rrd} %{subst_enable io} %{subst_enable logicproc} %{subst_enable tests} %{subst_enable mqtt} %{subst_enable api} %{subst_enable netdata} %{subst_enable logdb} %{subst_enable com485f}
%configure %{subst_enable docs} %{subst_enable mysql} %{subst_enable sqlite} %{subst_enable pgsql} %{subst_enable python} %{subst_enable rrd} %{subst_enable io} %{subst_enable logicproc} %{subst_enable tests} %{subst_enable mqtt} %{subst_enable api} %{subst_enable netdata} %{subst_enable logdb} %{subst_enable com485f} %{subst_enable opentsdb}
%make_build
# fix for ALTLinux build (noarch)
%if_enabled docs
cd docs/html
PNGFILES=`find ./ -name '*.png' -type f`
for F in ${PNGFILES}; do
# echo "$F"
convert ${F} -flatten +matte ${F}
done
%endif
%install
%makeinstall_std
rm -f %buildroot%_libdir/*.la
%if_enabled docs
rm -f %buildroot%_docdir/%oname/html/*.map
rm -f %buildroot%_docdir/%oname/html/*.md5
%endif
%files utils
%_bindir/%oname-admin
%_bindir/%oname-mb*
......@@ -384,11 +403,12 @@ rm -f %buildroot%_libdir/*.la
%if_enabled sqlite
%files extension-sqlite
%_bindir/%oname-sqlite-*dbserver
%_libdir/*-sqlite.so*
%_libdir/*-sqlite.so.*
%files extension-sqlite-devel
%_pkgconfigdir/libUniSet2SQLite.pc
%_includedir/%oname/extensions/sqlite/
%_libdir/*-sqlite.so
%if_enabled logdb
%files extension-logdb
......@@ -396,20 +416,33 @@ rm -f %buildroot%_libdir/*.la
%endif
%endif
%if_enabled opentsdb
%files extension-opentsdb
%_bindir/%oname-backend-opentsdb*
%_libdir/libUniSet2BackendOpenTSDB.so.*
%files extension-opentsdb-devel
%_pkgconfigdir/libUniSet2BackendOpenTSDB.pc
%_includedir/%oname/extensions/BackendOpenTSDB.h
%_libdir/libUniSet2BackendOpenTSDB.so
%endif
%if_enabled pgsql
%files extension-pgsql
%_bindir/%oname-pgsql-*dbserver
%_libdir/*-pgsql.so*
%_libdir/*-pgsql.so.*
%files extension-pgsql-devel
%_pkgconfigdir/libUniSet2PostgreSQL.pc
%_includedir/%oname/extensions/pgsql/
%_libdir/*-pgsql.so
%endif
%if_enabled python
%files -n python-module-%oname
%python_sitelibdir/*
%python_sitelibdir_noarch/%oname/*
%dir %python_sitelibdir_noarch/%oname
%endif
%if_enabled netdata
......@@ -492,6 +525,7 @@ rm -f %buildroot%_libdir/*.la
%files extension-common-devel
%dir %_includedir/%oname/extensions
%_includedir/%oname/extensions/*.*
%exclude %_includedir/%oname/extensions/BackendOpenTSDB.h
%_libdir/libUniSet2Extensions.so
%_libdir/libUniSet2MB*.so
%_libdir/libUniSet2RT*.so
......@@ -511,9 +545,25 @@ rm -f %buildroot%_libdir/*.la
# history of current unpublished changes
%changelog
* Wed May 09 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt6.M80P.7
* Sun Jun 10 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt11.M80P.12
- backport to ALTLinux p8 (by rpmbph script)
* Sun Jun 10 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt12
- spec cleanup
* Mon Jun 04 2018 Alexey Shabalin <shaba@altlinux.ru> 2.7-alt11
- rebuild with libmariadb
* Thu May 24 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt10
- fix pack opentsdb backend
* Thu May 24 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt9
- added opentsdb backend
* Fri May 18 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt8
- (codegen): added msgstr() function
- make style
* Tue May 08 2018 Pavel Vainerman <pv@altlinux.ru> 2.7-alt7
- (http): added support "CORS" (Access-Control-Allow-Origin)
- (logdb): added --logdb-httpserver-reply-addr host:port
......
......@@ -45,6 +45,8 @@
<logserver name="ses" ip="localhost" port="21107" cmd="-d level8 ALL" description="SES"/>
</LogDB>
<BackendOpenTSDB name="BackendOpenTSDB" host="localhost" filter_field="tsdb" filter_value="1" tags="host=localhost uniset=1" prefix="uniset"/>
<settings>
<LostTestProc name="TestProc1" sensor_s="Input4_S"/>
......@@ -320,8 +322,13 @@
<item id="201" iotype="DI" name="UNET_DATA2_S" textname="UNET: data2" ndata="unet"/>
<item id="202" iotype="DI" name="UNET_DATA3_S" textname="UNET: data3" ndata="unet"/>
<!-- test BackendOpenTSDB -->
<item id="210" iotype="AI" name="DATA1_S" textname="UNET: data1" tsdb="1"/>
<item id="211" iotype="AI" name="DATA2_S" textname="UNET: data2" tsdb="1"/>
<!-- IO TEST -->
<!-- Датчики системы смазочного масла (сгенерировано из csv) alex: 05.07.2017 -->
<!-- Датчики системы смазочного масла (сгенерировано из csv) alex: 05.07.2017 -->
<item id="301" name="01_01_S" iotype="DI" mtype="2" textname="ГДГ1: Аварийно-низкое давление смазочного масла" unet="gdg1" aps="301" mko_aps="1" depend1="D1_State_FS" depend_value="1" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x05" arm_mbreg="4526" arm_mbtype="rtu" io="1" card="1" subdev="0" channel="0" jardelay="8000"/>
<item id="302" name="01_02_S" iotype="DI" mtype="2" textname="ГДГ2: Аварийно-низкое давление смазочного масла" unet="gdg2" aps="301" mko_aps="1" depend1="D2_State_FS" depend_value="1" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x05" arm_mbreg="4846" arm_mbtype="rtu" io="1" card="1" subdev="0" channel="0" jardelay="8000"/>
<item id="303" name="01_04_AS" iotype="AI" textname="ГДГ1: Давление смазочного масла" unet="gdg1" analog_group="1" filtersize="300" iir_thr="2000" arm="arm_gui" arm_mbaddr="0x01" arm_mbfunc="0x06" arm_mbreg="7086" arm_mbtype="rtu" io="1" card="3" subdev="2" channel="0" range="1" aref="2" cmin="0" cmax="300" rmin="820" rmax="4095" precision="1" noprecision="1"/>
......@@ -5492,6 +5499,7 @@
<item id="6087" name="MBSlave10"/>
<item id="6088" name="TestGroup"/>
<item id="6089" name="MQTTPublisher1"/>
<item id="6090" name="BackendOpenTSDB"/>
<item id="6101" name="MBTCP1"/>
<item id="6102" name="MBTCP2"/>
<item id="6103" name="MBTCP3"/>
......
......@@ -179,6 +179,20 @@ fi
AM_CONDITIONAL(DISABLE_RRD, test ${buildrrd} = false)
#check opentsdb support
AC_MSG_CHECKING([OpenTSDB backend])
buildtsdb=true
AC_ARG_ENABLE(opentsdb, AC_HELP_STRING([--disable-opentsdb], [disable OpenTSDB backend]),
[ if test $enableval = yes; then buildtsdb=true; else buildtsdb=false; fi],[ buildtsdb=true; ])
if test ${buildtsdb} = true; then
AC_MSG_RESULT([enabled])
else
AC_MSG_RESULT([disabled])
fi
AM_CONDITIONAL(DISABLE_OPENTSDB, test ${buildtsdb} = false)
#check io support
AC_MSG_CHECKING([io support])
buildio=true
......@@ -557,6 +571,8 @@ AC_CONFIG_FILES([Makefile
extensions/RRDServer/libUniSet2RRDServer.pc
extensions/MQTTPublisher/Makefile
extensions/MQTTPublisher/libUniSet2MQTTPublisher.pc
extensions/Backend-OpenTSDB/Makefile
extensions/Backend-OpenTSDB/libUniSet2BackendOpenTSDB.pc
extensions/tests/Makefile
extensions/tests/SMemoryTest/Makefile
extensions/tests/MBSlaveTest/Makefile
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -----------------------------------------------------------------------------
#ifndef _BackendOpenTSDB_H_
#define _BackendOpenTSDB_H_
// -----------------------------------------------------------------------------
#include <deque>
#include <memory>
#include <unordered_map>
#include <chrono>
#include "UObject_SK.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "extensions/Extensions.h"
#include "UTCPStream.h"
// --------------------------------------------------------------------------
namespace uniset
{
// -----------------------------------------------------------------------------
/*!
\page page_BackendOpenTSDB Реализация шлюза к БД поддерживающей интерфейс OpenTSDB
- \ref sec_OpenTSDB_Comm
- \ref sec_OpenTSDB_Conf
- \ref sec_OpenTSDB_Name
- \ref sec_OpenTSDB_Queue
\section sec_OpenTSDB_Comm Общее описание шлюза к OpenTSDB
"OpenTSDB" - time series database. Специальная БД оптимизированная
для хранения временных рядов (по простому: данных с временными метками).
Класс реализует пересылку указанных (настроенных) дачиков в БД поддерживающую
интерфейс совместимый с OpenTSDB. В текущей реализации используется посылка
строк в формате Telnet
\code
put <metric> <timestamp>.msec <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
\endcode
См. http://opentsdb.net/docs/build/html/user_guide/writing/index.html
\section sec_OpenTSDB_Conf Настройка BackendOpenTSDB
Пример секции конфигурации:
\code
<BackendOpenTSDB name="BackendOpenTSDB1" host="localhost" port="4242"
filter_field="tsdb" filter_value="1"
prefix="uniset"
tags="TAG1=VAL1 TAG2=VAL2 ..."/>
\endcode
Где:
- \b host - host для связи с TSDB
- \b port - port для связи с TSDB. Default: 4242
- \b filter_field - поле у датчика, определяющее, что его нужно сохранять в БД
- \b filter_value - значение \b filter_field, определяющее, что датчик нужно сохранять в БД
- \b prefix - необязательный префикс дописываемый ко всем параметрам (prefix.parameter)
- \b tags - теги которые будут записаны для каждой записи, перечисляемые через пробел.
При этом в секции <sensors> у датчиков можно задать дополнительные теги.
Помимо этого можно переопределить название метрики (tsdb_name="...").
\code
<sensors>
...
<item id="54" iotype="AI" name="AI54_S" textname="AI sensor 54" tsdb="1" tsdb_tags=""/>
<item id="55" iotype="AI" name="AI55_S" textname="AI sensor 55" tsdb="1" tsdb_tags="" tsdb_name="MySpecName"/>
...
</sensors>
\endcode
\section sec_OpenTSDB_Name Имя значения сохраняемое в БД.
По умолчанию в качестве имени берётся name, но при необходимости можно указать определиться специальное имя.
Для этого достаточно задать поле tsdb_name="...".
\section sec_OpenTSDB_Queue Буффер на запись в БД
В данной реализации встроен специальный буффер, который накапливает данные и скидывает их
пачкой в БД. Так же он является защитным механизом на случай если БД временно недоступна.
Параметры буффера задаются аргументами командной строки или в конфигурационном файле.
Доступны следующие параметры:
- \b bufSize - размер буфера, при заполнении которого происходит посылка данных в БД
- \b bufSyncTimeout - период сброса данных в БД
- \b reconnectTime - время на повторную попытку подключения к БД
\todo Нужна ли поддержка авторизации для TSDB (возможно придётся перейти на HTTP REST API)
\todo Доделать возможность задать политику при переполнении буфера (удалять последние или первые, сколько чистить)
\todo Может стоит отделить настройки: размер буфера и сколько за один запрос писать в БД
*/
class BackendOpenTSDB:
public UObject_SK
{
public:
BackendOpenTSDB( uniset::ObjectId objId, xmlNode* cnode, uniset::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "opentsdb" );
virtual ~BackendOpenTSDB();
/*! глобальная функция для инициализации объекта */
static std::shared_ptr<BackendOpenTSDB> init_opendtsdb( int argc, const char* const* argv,
uniset::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "opentsdb" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* const* argv );
inline std::shared_ptr<LogAgregator> getLogAggregator()
{
return loga;
}
inline std::shared_ptr<DebugStream> log()
{
return mylog;
}
enum Timers
{
tmFlushBuffer,
tmReconnect,
tmLastNumberOfTimer
};
protected:
BackendOpenTSDB();
virtual void askSensors( UniversalIO::UIOCommand cmd ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void timerInfo( const uniset::TimerMessage* tm ) override;
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual std::string getMonitInfo() const override;
void init( xmlNode* cnode );
bool flushBuffer();
bool reconnect();
std::shared_ptr<SMInterface> shm;
struct ParamInfo
{
const std::string name;
const std::string tags;
ParamInfo( const std::string& _name, const std::string& _tags ):
name(_name), tags(_tags) {}
};
std::string tsdbPrefix;
std::string tsdbTags; // теги в виде строки TAG=VAL TAG2=VAL2 ...
std::unordered_map<uniset::ObjectId, ParamInfo> tsdbParams;
timeout_t bufSyncTime = { 5000 };
size_t bufSize = { 100 };
bool timerIsOn = { false };
timeout_t reconnectTime = { 5000 };
std::string lastError;
// буфер mutex-ом можно не защищать
// т.к. к нему идёт обращение только из основного потока обработки
// (sensorInfo, timerInfo)
std::deque<std::string> buf;
// работа с OpenTSDB
std::shared_ptr<UTCPStream> tcp;
std::string host = { "localhost" };
int port = { 4242 };
private:
std::string prefix;
};
// --------------------------------------------------------------------------
} // end of namespace uniset
// -----------------------------------------------------------------------------
#endif // _BackendOpenTSDB_H_
// -----------------------------------------------------------------------------
if DISABLE_OPENTSDB
else
bin_PROGRAMS = @PACKAGE@-backend-opentsdb
TSDB_VER=@LIBVER@
lib_LTLIBRARIES = libUniSet2BackendOpenTSDB.la
libUniSet2BackendOpenTSDB_la_LDFLAGS = -version-info $(TSDB_VER)
libUniSet2BackendOpenTSDB_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
libUniSet2BackendOpenTSDB_la_CXXFLAGS = -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/SharedMemory \
$(SIGC_CFLAGS)
libUniSet2BackendOpenTSDB_la_SOURCES = BackendOpenTSDB.cc
@PACKAGE@_backend_opentsdb_SOURCES = main.cc
@PACKAGE@_backend_opentsdb_LDADD = libUniSet2BackendOpenTSDB.la $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
@PACKAGE@_backend_opentsdb_CXXFLAGS = -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/SharedMemory \
$(SIGC_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libUniSet2BackendOpenTSDB.pc
include $(top_builddir)/include.mk
endif
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: libUniSet2BackendOpenTSDB
Description: Support library for UniSet2BackendOpenTSDB
Requires: libUniSet2Extensions
Version: @VERSION@
Libs: -L${libdir} -lUniSet2BackendOpenTSDB
Cflags: -I${includedir}/@PACKAGE@/extensions
#include <sstream>
#include "BackendOpenTSDB.h"
#include "Configuration.h"
#include "Debug.h"
#include "UniSetActivator.h"
#include "Extensions.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset;
using namespace uniset::extensions;
// -----------------------------------------------------------------------------
int main( int argc, const char** argv )
{
// std::ios::sync_with_stdio(false);
if( argc > 1 && (!strcmp(argv[1], "--help") || !strcmp(argv[1], "-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: autodetect" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << endl;
BackendOpenTSDB::help_print(argc, argv);
return 0;
}
try
{
auto conf = uniset_init( argc, argv );
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
auto db = BackendOpenTSDB::init_opendtsdb(argc, argv, shmID);
if( !db )
{
cerr << "(opendtsdb): init failed..." << endl;
return 1;
}
auto act = UniSetActivator::Instance();
act->add(db);
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
act->run(false);
return 0;
}
catch( uniset::Exception& ex )
{
cerr << "(opendtsdb): " << ex << std::endl;
}
catch(...)
{
cerr << "(opendtsdb): catch ..." << std::endl;
}
return 1;
}
#!/bin/sh
uniset2-start.sh -f ./uniset2-backend-opentsdb --confile test.xml \
--opentsdb-name BackendOpenTSDB \
--opentsdb-log-add-levels any $*
../../Utilities/scripts/uniset2-stop.sh
\ No newline at end of file
../../conf/test.xml
\ No newline at end of file
../../Utilities/scripts/uniset2-functions.sh
\ No newline at end of file
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
......@@ -427,3 +427,22 @@ void DBServer_MySQL::help_print( int argc, const char* const* argv )
cout << DBServer::help_print() << endl;
}
// -----------------------------------------------------------------------------
std::string DBServer_MySQL::getMonitInfo( const std::string& params )
{
ostringstream inf;
inf << "Database: "
<< "[ ping=" << PingTime
<< " reconnect=" << ReconnectTime
<< " qbufSize=" << qbufSize
<< " ]" << endl
<< " connection: " << (connect_ok ? "OK" : "FAILED") << endl;
{
uniset_rwmutex_rlock lock(mqbuf);
inf << " buffer size: " << qbuf.size() << endl;
}
inf << " lastError: " << db->error() << endl;
return inf.str();
}
// -----------------------------------------------------------------------------
......@@ -168,6 +168,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) override;
virtual std::string getMonitInfo( const std::string& params ) override;
bool writeToBase( const std::string& query );
void createTables( MySQLInterface* db );
......@@ -184,7 +185,6 @@ namespace uniset
lastNumberOfTimer
};
std::unique_ptr<MySQLInterface> db;
int PingTime = { 15000 };
int ReconnectTime = { 30000 };
......
......@@ -74,17 +74,11 @@ void DBServer_PostgreSQL::sysCommand( const uniset::SystemMessage* sm )
break;
case SystemMessage::Finish:
{
activate = false;
db->close();
}
break;
case SystemMessage::FoldUp:
{
activate = false;
db->close();
}
break;
default:
......@@ -151,6 +145,11 @@ bool DBServer_PostgreSQL::writeToBase( const string& query )
<< ") overflow! lost query: " << qlost << endl;
}
// Надо подумать, что тут лучше возвращать true или false
// вроде бы фактически запись не сделали, но запрос поместили в буфер
// А вызывающая сторона может посчитать, что раз вернули false, значит
// можно/нужно потом запрос повторить..
return false;
}
......@@ -171,7 +170,7 @@ void DBServer_PostgreSQL::flushBuffer()
// Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() )
{
if(!db->insert( qbuf.front() ))
if( !db->insert(qbuf.front()) )
{
dbcrit << myname << "(writeToBase): error: " << db->error() << " lost query: " << qbuf.front() << endl;
}
......@@ -222,7 +221,7 @@ void DBServer_PostgreSQL::flushInsertBuffer()
dbinfo << myname << "(flushInsertBuffer): write insert buffer[" << ibufSize << "] to DB.." << endl;
if( !db->copy("main_history", tblcols, ibuf) )
if( !writeInsertBufferToDB("main_history", tblcols, ibuf) )
{
dbcrit << myname << "(flushInsertBuffer): error: " << db->error() << endl;
}
......@@ -233,6 +232,22 @@ void DBServer_PostgreSQL::flushInsertBuffer()
}
}
//--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::addRecord( const PostgreSQLInterface::Record&& rec )
{
ibuf.emplace_back( std::move(rec) );
ibufSize++;
if( ibufSize >= ibufMaxSize )
flushInsertBuffer();
}
//--------------------------------------------------------------------------------------------
bool DBServer_PostgreSQL::writeInsertBufferToDB( const std::string& tableName
, const std::vector<std::string>& colNames
, const InsertBuffer& wbuf )
{
return db->copy(tableName, colNames, wbuf);
}
//--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::sensorInfo( const uniset::SensorMessage* si )
{
try
......@@ -258,11 +273,7 @@ void DBServer_PostgreSQL::sensorInfo( const uniset::SensorMessage* si )
std::to_string(si->node),
};
ibuf.emplace_back(std::move(rec));
ibufSize++;
if( ibufSize >= ibufMaxSize )
flushInsertBuffer();
addRecord( std::move(rec) );
}
catch( const uniset::Exception& ex )
{
......@@ -360,7 +371,7 @@ void DBServer_PostgreSQL::initDBServer()
}
}
//--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::createTables( std::shared_ptr<PostgreSQLInterface>& db )
void DBServer_PostgreSQL::createTables( const std::shared_ptr<PostgreSQLInterface>& db )
{
auto conf = uniset_conf();
......@@ -464,6 +475,31 @@ bool DBServer_PostgreSQL::deactivateObject()
return DBServer::deactivateObject();
}
//--------------------------------------------------------------------------------------------
string DBServer_PostgreSQL::getMonitInfo( const string& params )
{
ostringstream inf;
inf << "Database: "
<< "[ ping=" << PingTime
<< " reconnect=" << ReconnectTime
<< " qbufSize=" << qbufSize
<< " ]" << endl
<< " connection: " << (connect_ok ? "OK" : "FAILED") << endl;
{
std::lock_guard<std::mutex> lock(mqbuf);
inf << " buffer size: " << qbuf.size() << endl;
}
inf << " lastError: " << db->error() << endl;
inf << "Insert buffer: "
<< "[ ibufMaxSize=" << ibufMaxSize
<< " ibufSize=" << ibufSize
<< " ibufSyncTimeout=" << ibufSyncTimeout
<< " ]" << endl;
return inf.str();
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<DBServer_PostgreSQL> DBServer_PostgreSQL::init_dbserver( int argc, const char* const* argv,
const std::string& prefix )
{
......@@ -516,3 +552,8 @@ void DBServer_PostgreSQL::help_print( int argc, const char* const* argv )
cout << DBServer::help_print() << endl;
}
// -----------------------------------------------------------------------------
bool DBServer_PostgreSQL::isConnectOk() const
{
return connect_ok;
}
// -----------------------------------------------------------------------------
......@@ -74,21 +74,24 @@ namespace uniset
return dblog;
}
bool isConnectOk() const;
protected:
typedef std::unordered_map<int, std::string> DBTableMap;
virtual void initDBServer() override;
virtual void initDB( std::unique_ptr<PostgreSQLInterface>& db ) {};
virtual void initDBTableMap(DBTableMap& tblMap) {};
virtual void initDBTableMap( DBTableMap& tblMap ) {};
virtual void timerInfo( const uniset::TimerMessage* tm ) override;
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) override;
virtual bool deactivateObject() override;
virtual std::string getMonitInfo( const std::string& params ) override;
bool writeToBase( const std::string& query );
void createTables( std::shared_ptr<PostgreSQLInterface>& db );
void createTables( const std::shared_ptr<PostgreSQLInterface>& db );
inline std::string tblName(int key)
{
......@@ -104,35 +107,38 @@ namespace uniset
};
std::unique_ptr<PostgreSQLInterface> db;
typedef std::queue<std::string> QueryBuffer;
void flushBuffer();
// writeBuffer
const std::vector<std::string> tblcols = { "date", "time", "time_usec", "sensor_id", "value", "node" };
typedef std::vector<PostgreSQLInterface::Record> InsertBuffer;
void flushInsertBuffer();
virtual void addRecord( const PostgreSQLInterface::Record&& rec );
virtual bool writeInsertBufferToDB( const std::string& table
, const std::vector<std::string>& colname
, const InsertBuffer& ibuf );
private:
DBTableMap tblMap;
int PingTime = { 15000 };
int ReconnectTime = { 30000 };
bool connect_ok = { false }; /*! признак наличия соеднинения с сервером БД */
bool activate = { false };
typedef std::queue<std::string> QueryBuffer;
bool connect_ok = { false }; /*! признак наличия соеднинения с сервером БД */
QueryBuffer qbuf;
size_t qbufSize = { 200 }; // размер буфера сообщений.
bool lastRemove = { false };
void flushBuffer();
std::mutex mqbuf;
// writeBuffer
const std::vector<std::string> tblcols = { "date", "time", "time_usec", "sensor_id", "value", "node" };
typedef std::vector<PostgreSQLInterface::Record> InsertBuffer;
InsertBuffer ibuf;
size_t ibufSize = { 0 };
size_t ibufMaxSize = { 2000 };
timeout_t ibufSyncTimeout = { 15000 };
void flushInsertBuffer();
float ibufOverflowCleanFactor = { 0.5 }; // коэфициент {0...1} чистки буфера при переполнении
private:
DBTableMap tblMap;
};
// ----------------------------------------------------------------------------------
} // end of namespace uniset
......
......@@ -27,4 +27,6 @@ devel_includedir = $(includedir)/@PACKAGE@/extensions/pgsql
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libUniSet2PostgreSQL.pc
include $(top_builddir)/include.mk
endif
......@@ -401,3 +401,22 @@ void DBServer_SQLite::help_print( int argc, const char* const* argv )
cout << DBServer::help_print() << endl;
}
// -----------------------------------------------------------------------------
std::string DBServer_SQLite::getMonitInfo( const std::string& params )
{
ostringstream inf;
inf << "Database: "
<< "[ ping=" << PingTime
<< " reconnect=" << ReconnectTime
<< " qbufSize=" << qbufSize
<< " ]" << endl
<< " connection: " << (connect_ok ? "OK" : "FAILED") << endl;
{
uniset_rwmutex_rlock lock(mqbuf);
inf << " buffer size: " << qbuf.size() << endl;
}
inf << " lastError: " << db->error() << endl;
return inf.str();
}
// -----------------------------------------------------------------------------
......@@ -168,6 +168,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) override;
virtual std::string getMonitInfo( const std::string& params ) override;
bool writeToBase( const std::string& query );
void createTables( SQLiteInterface* db );
......
......@@ -892,7 +892,7 @@ namespace uniset
<< ". Must be aref=[0..3]" << endl;
return false;
}
inf->adelay = IOBase::initIntProp(it, "adelay", prop_prefix, false);
}
......
......@@ -276,7 +276,7 @@ namespace uniset
3 - analog ref = other (undefined)
*/
int aref;
int adelay; /*! Задержка на чтение аналоговых входов с мультиплексированием ( в мкс ) */
/*! Измерительный диапазон
......
......@@ -90,7 +90,8 @@ LogDB::LogDB( const string& name, int argc, const char* const* argv, const strin
else
{
// инициализируем сами, т.к. conf нету..
const std::string loglevels = uniset::getArg2Param("--" + prefix + "log-add-levels", argc, argv, it.getProp("log"),"");
const std::string loglevels = uniset::getArg2Param("--" + prefix + "log-add-levels", argc, argv, it.getProp("log"), "");
if( !loglevels.empty() )
dblog->level(Debug::value(loglevels));
}
......@@ -105,6 +106,11 @@ LogDB::LogDB( const string& name, int argc, const char* const* argv, const strin
if( tformat == "localtime" || tformat == "utc" )
tmsFormat = tformat;
bgColor = uniset::getArg2Param("--" + prefix + "bg-color", argc, argv, it.getProp("bgColor"), bgColor);
fgColor = uniset::getArg2Param("--" + prefix + "fg-color", argc, argv, it.getProp("fgColor"), fgColor);
fgColorTitle = uniset::getArg2Param("--" + prefix + "fg-color-title", argc, argv, it.getProp("fgColorTitle"), fgColorTitle);
bgColorTitle = uniset::getArg2Param("--" + prefix + "bg-color-title", argc, argv, it.getProp("bgColorTitle"), bgColorTitle);
double checkConnection_sec = atof( uniset::getArg2Param("--" + prefix + "ls-check-connection-sec", argc, argv, it.getProp("lsCheckConnectionSec"), "5").c_str());
int bufSize = uniset::getArgPInt("--" + prefix + "ls-read-buffer-size", argc, argv, it.getProp("lsReadBufferSize"), 10001);
......@@ -449,9 +455,13 @@ void LogDB::help_print()
cout << "websockets: " << endl;
cout << "--prefix-ws-max num - Максимальное количество websocket-ов" << endl;
cout << "--prefix-ws-heartbeat-time msec - Период сердцебиения в соединении. По умолчанию: 3000 мсек" << endl;
cout << "--prefix-ws-send-time msec - Период посылки сообщений. По умолчанию: 500 мсек" << endl;
cout << "--prefix-ws-heartbeat-time msec - Период сердцебиения в соединении. По умолчанию: 3000 мсек" << endl;
cout << "--prefix-ws-send-time msec - Период посылки сообщений. По умолчанию: 500 мсек" << endl;
cout << "--prefix-ws-max num - Максимальное число сообщений посылаемых за один раз. По умолчанию: 200" << endl;
cout << "--prefix-bg-color clr - Цвет фона при выводе логов. По умолчанию: #111111" << endl;
cout << "--prefix-fg-color clr - Цвет текста при выводе логов. По умолчанию: #c4c4c4" << endl;
cout << "--prefix-bg-color-title clr - Цвет фона заголовка окна. По умолчанию: green" << endl;
cout << "--prefix-fg-color-title clr - Цвет текста заголовка окна. По умолчанию: #ececec" << endl;
cout << "logservers: " << endl;
cout << "--prefix-ls-check-connection-sec sec - Период проверки соединения с логсервером" << endl;
......@@ -1627,14 +1637,15 @@ void LogDB::httpWebSocketConnectPage( ostream& ostr,
ostr << " padding: 10px;" << endl;
ostr << " width: 100%;" << endl;
ostr << " height: 25px;" << endl;
ostr << " background-color: green;" << endl;
ostr << " background-color: " << bgColorTitle << ";" << endl;
ostr << " color: " << fgColorTitle << ";" << endl;
ostr << " border-top: 2px solid;" << endl;
ostr << " border-bottom: 2px solid;" << endl;
ostr << " border-color: white;" << endl;
ostr << "}" << endl;
ostr << "</style>" << endl;
ostr << "</head>" << endl;
ostr << "<body style='background: #111111; color: #ececec;' onload=\"javascript:WebSocketCreate('" << logname << "')\">" << endl;
ostr << "<body style='background: " << bgColor << "; color: " << fgColor << ";' onload=\"javascript:WebSocketCreate('" << logname << "')\">" << endl;
ostr << "<h4><div onclick='javascritpt:clickScroll()' id='logname' class='logtitle'></div></h4>" << endl;
ostr << "<div id='logs' class='logs'></div>" << endl;
ostr << "<p><div id='end' style='display: hidden;'>&nbsp;</div></p>" << endl;
......
......@@ -322,6 +322,11 @@ namespace uniset
double wsSendTime_sec = { 0.5 };
size_t wsMaxSend = { 200 };
std::string fgColor = { "#c4c4c4" };
std::string bgColor = { "#111111" };
std::string bgColorTitle = { "green" };
std::string fgColorTitle = { "#ececec" };
/*! класс реализует работу с websocket через eventloop
* Из-за того, что поступление логов может быть достаточно быстрым
* чтобы не "завалить" браузер кучей сообщений,
......
......@@ -6,7 +6,9 @@ if HAVE_EXTENTIONS
SUBDIRS = lib include SharedMemory SharedMemory/tests IOControl IOControl/tests LogicProcessor LogicProcessor/tests \
ModbusMaster ModbusSlave SMViewer UniNetwork UNetUDP UNetUDP/tests \
DBServer-MySQL DBServer-SQLite DBServer-PostgreSQL MQTTPublisher \
RRDServer tests ModbusMaster/tests ModbusSlave/tests LogDB LogDB/tests
RRDServer tests ModbusMaster/tests ModbusSlave/tests LogDB LogDB/tests \
Backend-OpenTSDB
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libUniSet2Extensions.pc
......
......@@ -537,9 +537,7 @@ namespace uniset
else
{
dinfo << "(smemory): init from configure: " << conf->getConfFileName() << endl;
UniXML::iterator it(conf->getXMLSensorsSection());
it.goChildren();
ioconf = make_shared<IOConfig_XML>(conf->getConfXML(), conf, it);
ioconf = make_shared<IOConfig_XML>(conf->getConfXML(), conf, conf->getXMLSensorsSection());
}
uniset::ObjectId ID = conf->getControllerID(conf->getArgParam("--smemory-id", "SharedMemory"));
......
......@@ -8,7 +8,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2017-09-01+03:00
// generate timestamp: 2018-05-18+03:00
// -----------------------------------------------------------------------------
#ifndef UObject_SK_H_
#define UObject_SK_H_
......@@ -121,6 +121,13 @@ class UObject_SK:
*/
std::string strval( uniset::ObjectId id, bool showLinkName = true ) const;
/*! Вывод в строку названия датчика-сообщения: xxx(SensorName)
\param id - идентификатор датчика
\param showLinkName - TRUE - выводить SensorName, FALSE - не выводить
*/
std::string msgstr( uniset::ObjectId id, bool showLinkName = true ) const;
/*! Вывод состояния внутренних переменных */
inline std::string dumpVars()
{
......
......@@ -11,7 +11,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2017-09-01+03:00
// generate timestamp: 2018-05-18+03:00
// -----------------------------------------------------------------------------
#include <memory>
#include <iomanip>
......@@ -356,6 +356,13 @@ std::string UObject_SK::dumpIO()
return s.str();
}
// ----------------------------------------------------------------------------
std::string UObject_SK::msgstr( uniset::ObjectId id, bool showLinkName ) const
{
ostringstream s;
return "";
}
// ----------------------------------------------------------------------------
std::string UObject_SK::str( uniset::ObjectId id, bool showLinkName ) const
{
ostringstream s;
......
......@@ -58,6 +58,8 @@ namespace uniset
static std::string help_print();
virtual uniset::SimpleInfo* getInfo( const char* userparam = "" ) override;
protected:
virtual void processingMessage( const uniset::VoidMessage* msg ) override;
......@@ -65,7 +67,8 @@ namespace uniset
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) {}
virtual bool activateObject() override;
virtual void initDBServer() {};
virtual void initDBServer() {}
virtual std::string getMonitInfo( const std::string& params ){ return ""; }
std::shared_ptr<LogAgregator> loga;
......
......@@ -51,7 +51,7 @@ namespace uniset
/*!
* \param xml - xml формата uniset-project
* \param conf - конфигурация
* \param root - узел с датчиками (если не задан, ищется "sensors")
* \param root - узел со списком датчиков (если не задан, ищется "sensors")
*/
IOConfig_XML( const std::shared_ptr<UniXML>& _xml, const std::shared_ptr<Configuration>& conf, xmlNode* root = nullptr );
......
......@@ -272,6 +272,7 @@ namespace uniset
uniset::uniset_rwmutex ioMutex; /*!< замок для блокирования совместного доступа к ioList */
bool isPingDBServer; // флаг связи с DBServer-ом
uniset::ObjectId dbserverID = { uniset::DefaultObjectId };
std::mutex loggingMutex; /*!< logging info mutex */
......
......@@ -18,6 +18,7 @@
#define UHelpers_H_
// --------------------------------------------------------------------------
#include "UniSetTypes.h"
#include "Exceptions.h"
#include "Configuration.h"
// --------------------------------------------------------------------------
namespace uniset
......
......@@ -43,6 +43,7 @@ namespace uniset
resp.set("Access-Control-Allow-Methods", "GET");
resp.set("Access-Control-Allow-Request-Method", "*");
resp.set("Access-Control-Allow-Origin", httpCORS_allow /* req.get("Origin") */);
if( !registry )
{
resp.setStatus(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
......
......@@ -308,6 +308,7 @@ namespace uniset
if( res < 0 )
{
int errnum = errno;
if( errnum != EAGAIN && dlog->is_warn() )
dlog->warn() << peername << "(getNextData): read from socket error(" << errnum << "): " << strerror(errnum) << endl;
......
......@@ -372,6 +372,9 @@ std::list<uniset::ConsumerInfo> uniset::getObjectsList( const string& str, std::
if( item.id == DefaultObjectId )
item.id = conf->getControllerID(s_id);
if( item.id == DefaultObjectId )
item.id = conf->getServiceID(s_id);
}
item.node = DefaultObjectId;
......@@ -389,6 +392,9 @@ std::list<uniset::ConsumerInfo> uniset::getObjectsList( const string& str, std::
if( item.id == DefaultObjectId )
item.id = conf->getControllerID(s_id);
if( item.id == DefaultObjectId )
item.id = conf->getServiceID(s_id);
}
if( is_digit(s_node) )
......
......@@ -55,26 +55,25 @@ namespace uniset
IOController::IOStateList lst;
if( !root )
{
root = uxml->findNode( uxml->getFirstNode(), "sensors");
UniXML::iterator it(root);
it.goChildren();
root = it.getCurrent();
}
if( !root )
return lst;
if( root )
{
lst = read_list(root);
// только после чтения всех датчиков и формирования списка IOList
// можно инициализировать списки зависимостей
init_depends_signals(lst);
UniXML::iterator it(root);
xmlNode* tnode = uxml->findNode(uxml->getFirstNode(), "thresholds");
if( !it.goChildren() )
return lst;
if( tnode )
init_thresholds(tnode, lst);
}
lst = read_list(it);
// только после чтения всех датчиков и формирования списка IOList
// можно инициализировать списки зависимостей
init_depends_signals(lst);
xmlNode* tnode = uxml->findNode(uxml->getFirstNode(), "thresholds");
if( tnode )
init_thresholds(tnode, lst);
return lst;
}
......
......@@ -43,6 +43,9 @@ IOController::IOController(const string& name, const string& section):
ioMutex(name + "_ioMutex"),
isPingDBServer(true)
{
auto conf = uniset_conf();
if( conf )
dbserverID = conf->getDBServer();
}
IOController::IOController(ObjectId id):
......@@ -50,6 +53,9 @@ IOController::IOController(ObjectId id):
ioMutex(string(uniset_conf()->oind->getMapName(id)) + "_ioMutex"),
isPingDBServer(true)
{
auto conf = uniset_conf();
if( conf )
dbserverID = conf->getDBServer();
}
// ------------------------------------------------------------------------------------------
......@@ -412,16 +418,14 @@ void IOController::logging( uniset::SensorMessage& sm )
try
{
ObjectId dbID = uniset_conf()->getDBServer();
// значит на этом узле нет DBServer-а
if( dbID == uniset::DefaultObjectId )
if( dbserverID == uniset::DefaultObjectId )
{
isPingDBServer = false;
return;
}
sm.consumer = dbID;
sm.consumer = dbserverID;
TransportMessage tm(std::move(sm.transport_msg()));
ui->send( sm.consumer, std::move(tm) );
isPingDBServer = true;
......@@ -439,7 +443,7 @@ void IOController::logging( uniset::SensorMessage& sm )
void IOController::dumpToDB()
{
// значит на этом узле нет DBServer-а
if( uniset_conf()->getDBServer() == uniset::DefaultObjectId )
if( dbserverID == uniset::DefaultObjectId )
return;
{
......
......@@ -141,3 +141,12 @@ std::string DBServer::help_print()
return h.str();
}
//--------------------------------------------------------------------------------------------
SimpleInfo* DBServer::getInfo( const char* userparam )
{
uniset::SimpleInfo_var i = UniSetObject::getInfo(userparam);
const std::string inf = getMonitInfo( std::string(userparam) );
i->info = inf.c_str();
return i._retn();
}
//--------------------------------------------------------------------------------------------
......@@ -598,6 +598,9 @@ bool UniXML_iterator::findName( const std::string& nodename, const std::string&
{
fnode = this->findX(fnode, nodename, deepfind);
if( !fnode )
return false;
if ( searchname == UniXML::getProp(fnode, "name") )
{
curNode = fnode;
......
......@@ -97,6 +97,10 @@ TEST_CASE("UniXML::iterator", "[unixml][iterator][basic]" )
--it;
CHECK( it.getName() == "UserData" );
UniXML::iterator it_bad = uxml.begin();
REQUIRE_FALSE( it_bad.findName("UnknownNode","NOTFOUND") );
REQUIRE_FALSE( !it_bad );
it = uxml.begin();
CHECK( it.findName("TestNode", "TestNode1") != 0 );
it = uxml.begin();
......
......@@ -172,6 +172,11 @@ extensions/MQTTPublisher/main.cc
extensions/MQTTPublisher/Makefile.am
extensions/MQTTPublisher/MQTTPublisher.cc
extensions/MQTTPublisher/MQTTPublisher.h
extensions/Backend-OpenTSDB/libUniSet2BackendOpenTSDB.pc.in
extensions/Backend-OpenTSDB/main.cc
extensions/Backend-OpenTSDB/Makefile.am
extensions/Backend-OpenTSDB/BackendOpenTSDB.cc
extensions/Backend-OpenTSDB/BackendOpenTSDB.h
extensions/SharedMemory/libUniSet2SharedMemory.pc.in
extensions/SharedMemory/Makefile.am
extensions/SharedMemory/SharedMemory.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment