Commit dd3046b2 authored by Pavel Vainerman's avatar Pavel Vainerman

(MQTTPublisher): первая (базовая) реализация MQTT издателя..

(публикация событий по изменению датчиков)
parent fa487750
......@@ -8,6 +8,7 @@
%def_enable logicproc
#%def_enable modbus
%def_disable tests
%def_enable mqtt
%define oname uniset2
......@@ -59,6 +60,10 @@ BuildRequires: libpqxx-devel
BuildRequires: librrd-devel
%endif
%if_enabled mqtt
BuildRequires: mosquitto-devel
%endif
%if_enabled python
BuildRequires: python-devel
BuildRequires(pre): rpm-build-python
......@@ -263,6 +268,17 @@ Requires: %name-extension-common-devel = %version-%release
Libraries needed to develop for uniset IOControl (io)
%endif
%if_enabled mqtt
%package extension-mqtt
Group: Development/C++
Summary: MQTTpublisher from UniSet
Requires: %name-extension-common = %version-%release
%description extension-mqtt
MQTT for %name
%endif
%package extension-smplus
Group: Development/C++
Summary: libUniSet2 SharedMemoryPlus extension ('all in one')
......@@ -271,12 +287,13 @@ Requires: %name-extension-common = %version-%release
%description extension-smplus
SharedMemoryPlus extension ('all in one') for libuniset
%prep
%setup
%build
%autoreconf
%configure %{subst_enable docs} %{subst_enable mysql} %{subst_enable sqlite} %{subst_enable pgsql} %{subst_enable python} %{subst_enable rrd} %{subst_enable io} %{subst_enable logicproc} %{subst_enable tests}
%configure %{subst_enable docs} %{subst_enable mysql} %{subst_enable sqlite} %{subst_enable pgsql} %{subst_enable python} %{subst_enable rrd} %{subst_enable io} %{subst_enable logicproc} %{subst_enable tests} %{subst_enable mqtt}
%make
%install
......
......@@ -177,6 +177,8 @@
</specnet>
<GroupAnalogSensors name="TestGroup" ok_c="D106_S"/>
<MQTTPublisher1 name="MQTTPublisher" />
</settings>
<ObjectsMap idfromfile="1">
<!--
......@@ -272,6 +274,9 @@
<item id="109" iotype="DI" name="MM1_Not_Respond_S" priority="Medium" textname="multimaster test sensor"/>
<item id="110" iotype="DI" name="MM2_Not_Respond_S" priority="Medium" textname="multimaster test sensor"/>
<item id="111" iotype="AI" name="MQTT_AI_AS" textname="MQTT test AI" mqtt="1"/>
<item id="112" iotype="DI" name="MQTT_DI_S" textname="MQTT test DI" mqtt="1"/>
</sensors>
<thresholds name="thresholds">
<sensor iotype="AI" name="AI_AS">
......@@ -380,6 +385,7 @@
<item id="6086" name="MBSlave9"/>
<item id="6087" name="MBSlave10"/>
<item id="6088" name="TestGroup"/>
<item id="6089" name="MQTTPublisher1"/>
</objects>
</ObjectsMap>
<messages idfromfile="1" name="messages">
......
......@@ -185,6 +185,25 @@ fi
AM_CONDITIONAL(DISABLE_PYTHON, test ${buildpython} = false)
#check mqtt support
AC_MSG_CHECKING([mqtt support])
buildmqtt=true
AC_ARG_ENABLE(mqtt, AC_HELP_STRING([--disable-mqtt], [disable MQTT support]),
[ if test $enableval = yes; then buildmqtt=true; else buildmqtt=false; fi],[ buildmqtt=true; ])
if test ${buildmqtt} = true; then
AC_MSG_RESULT([enabled])
AC_CHECK_HEADER(mosquittopp.h)
MQTT_LIBS=-lmosquittopp
MQTT_CFLAGS=
AC_SUBST(MQTT_LIBS)
AC_SUBST(MQTT_CFLAGS)
else
AC_MSG_RESULT([disabled])
fi
AM_CONDITIONAL(DISABLE_MQTT, test ${buildmqtt} = false)
# check Doxygen
DOXYGEN=""
......@@ -380,6 +399,7 @@ AC_CONFIG_FILES([Makefile
extensions/SharedMemoryPlus/Makefile
extensions/RRDServer/Makefile
extensions/RRDServer/libUniSet2RRDServer.pc
extensions/MQTTPublisher/Makefile
extensions/tests/Makefile
extensions/tests/SMemoryTest/Makefile
testsuite/Makefile
......
// -----------------------------------------------------------------------------
#include <sstream>
#include "Exceptions.h"
#include "MQTTPublisher.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
MQTTPublisher::MQTTPublisher(UniSetTypes::ObjectId objId, xmlNode* cnode, UniSetTypes::ObjectId shmId, const std::shared_ptr<SharedMemory>& ic,
const string& prefix ):
UObject_SK(objId, cnode, string(prefix + "-")),
mosquittopp(NULL),
prefix(prefix)
{
auto conf = uniset_conf();
if( ic )
ic->logAgregator()->add(logAgregator());
shm = make_shared<SMInterface>(shmId, ui, objId, ic);
UniXML::iterator it(cnode);
topicsensors = conf->getRootSection() + "/" + conf->getArg2Param("--" + prefix + "topic-sensors", it.getProp("topicsensors"), "sensors");
string ff = conf->getArg2Param("--" + argprefix + "filter-field", it.getProp("filterField"), "");
string fv = conf->getArg2Param("--" + argprefix + "filter-value", it.getProp("filterValue"), "");
myinfo << myname << "(init): filter-field=" << ff << " filter-value=" << fv << endl;
xmlNode* senssec = conf->getXMLSensorsSection();
if( !senssec )
{
ostringstream err;
err << myname << "(init): Not found sensors section";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
UniXML::iterator sit(senssec);
if( !sit.goChildren() )
{
ostringstream err;
err << myname << "(init): empty sensors section?!";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
ostringstream pubname;
for( ; sit.getCurrent(); sit++ )
{
if( !UniSetTypes::check_filter(sit, ff, fv) )
continue;
std::string sname = sit.getProp("name");
ObjectId sid = conf->getSensorID(sname);
if( sid == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): Unkown ID for sensor '" << sname << "'";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
pubname.str("");
pubname << topicsensors << "/" << sname;
MQTTInfo m(sid,pubname.str());
publist.emplace(sid, std::move(m) );
if( smTestID == DefaultObjectId )
smTestID = sid;
}
if( publist.empty() )
{
ostringstream err;
err << myname << "(init): FAIL! Empty publish list...";
mycrit << err.str() << endl;
throw SystemError(err.str());
}
// Работа с MQTT
mosqpp::lib_init();
host = conf->getArg2Param("--" + argprefix + "mqtt-host", it.getProp("mqttHost"), "localhost");
port = conf->getArgPInt("--" + argprefix + "-mqtt-port", it.getProp("mqttPort"), 1883);
keepalive = conf->getArgPInt("--" + argprefix + "-mqtt-keepalive", it.getProp("mqttKeepAlive"), 60);
// см. sysCommad()
// connect_async(host.c_str(),port,keepalive);
// loop_start();
}
// -----------------------------------------------------------------------------
MQTTPublisher::~MQTTPublisher()
{
loop_stop(); // Kill the thread
mosqpp::lib_cleanup(); // Mosquitto library cleanup
}
// -----------------------------------------------------------------------------
void MQTTPublisher::sigterm(int signo )
{
myinfo << myname << "(sigterm): signo=" << signo << endl;
disconnect();
connectOK = false;
UObject_SK::sigterm(signo);
}
//--------------------------------------------------------------------------------
bool MQTTPublisher::deactivateObject()
{
myinfo << myname << "(deactivateObject): ...disconnect.." << endl;
disconnect();
connectOK = false;
return UObject_SK::deactivateObject();
}
//--------------------------------------------------------------------------------
void MQTTPublisher::sysCommand(const SystemMessage* sm)
{
UObject_SK::sysCommand(sm);
if( sm->command == SystemMessage::StartUp || sm->command == SystemMessage::WatchDog )
{
if( !connectOK )
{
connect_async(host.c_str(),port,keepalive);
loop_start();
}
}
}
//--------------------------------------------------------------------------------
void MQTTPublisher::help_print( int argc, const char* const* argv )
{
cout << " Default prefix='mqtt'" << endl;
cout << "--prefix-name - ID for mqttpublisher. Default: MQTTPublisher1. " << endl;
cout << "--prefix-confnode - configuration section name. Default: <NAME name='NAME'...> " << endl;
cout << "--prefix-heartbeat-id name - ID for heartbeat sensor." << endl;
cout << "--prefix-heartbeat-max val - max value for heartbeat sensor." << endl;
cout << "--prefix-filter-field - Фильтр для загрузки списка датчиков." << endl;
cout << "--prefix-filter-value - Значение фильтра для загрузки списка датчиков." << endl;
cout << endl;
cout << " MQTT: " << endl;
cout << "--prefix-mqtt-host host - host(ip) MQTT Broker (server). Default: localhost" << endl;
cout << "--prefix-mqtt-port port - port for MQTT Broker (server). Default: 1883" << endl;
cout << "--prefix-mqtt-keepalive val - keepalive for connection to MQTT Broker (server). Default: 60" << endl;
cout << endl;
cout << " Logs: " << endl;
cout << "--prefix-log-... - log control" << endl;
cout << " add-levels ... " << endl;
cout << " del-levels ... " << endl;
cout << " set-levels ... " << endl;
cout << " logfile filanme " << endl;
cout << " no-debug " << endl;
cout << " LogServer: " << endl;
cout << "--prefix-run-logserver - run logserver. Default: localhost:id" << endl;
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
cout << LogServer::help_print("prefix-logserver") << endl;
}
// -----------------------------------------------------------------------------
void MQTTPublisher::on_connect(int rc)
{
connectOK = ( rc == 0 );
myinfo << myname << "(on_connect): connect to " << host << ":" << port << " " << ( connectOK ? "OK" : "FAIL" ) << endl;
if( connectOK )
askSensors(UniversalIO::UIONotify);
// else
// {
// askTimer(reconnectTimer,reconnectTime);
// }
}
// -----------------------------------------------------------------------------
void MQTTPublisher::on_message( const mosquitto_message* message )
{
}
// -----------------------------------------------------------------------------
void MQTTPublisher::on_subscribe( int mid, int qos_count, const int* granted_qos )
{
}
// -----------------------------------------------------------------------------
std::shared_ptr<MQTTPublisher> MQTTPublisher::init_mqttpublisher(int argc, const char* const* argv,
UniSetTypes::ObjectId icID, const std::shared_ptr<SharedMemory>& ic,
const std::string& prefix )
{
auto conf = uniset_conf();
string name = conf->getArgParam("--" + prefix + "-name", "MQTTPublisher");
if( name.empty() )
{
dcrit << "(MQTTPublisher): Unknown name. Usage: --" << prefix << "-name" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
dcrit << "(MQTTPublisher): Not found ID for '" << name
<< " in '" << conf->getObjectsSection() << "' section" << endl;
return 0;
}
string confname = conf->getArgParam("--" + prefix + "-confnode", name);
xmlNode* cnode = conf->getNode(confname);
if( !cnode )
{
dcrit << "(MQTTPublisher): " << name << "(init): Not found <" + confname + ">" << endl;
return 0;
}
dinfo << "(MQTTPublisher): name = " << name << "(" << ID << ")" << endl;
return make_shared<MQTTPublisher>(ID, cnode, icID, ic, prefix);
}
// -----------------------------------------------------------------------------
void MQTTPublisher::askSensors( UniversalIO::UIOCommand cmd )
{
UObject_SK::askSensors(cmd);
for( const auto& i: publist )
{
try
{
shm->askSensor(i.second.sid, cmd);
}
catch( const std::exception& ex )
{
mycrit << myname << "(askSensors): " << ex.what() << endl;
}
}
}
// -----------------------------------------------------------------------------
void MQTTPublisher::sensorInfo( const UniSetTypes::SensorMessage* sm )
{
auto i = publist.find(sm->id);
if( i == publist.end() )
return;
ostringstream m;
m << sm->value;
string tmsg(m.str());
//subscribe(NULL, i.second.pubname.c_str());
myinfo << "(sensorInfo): publish: topic='" << i->second.pubname << "' msg='" << tmsg.c_str() << "'" << endl;
int ret = publish(NULL,i->second.pubname.c_str(),tmsg.size(),tmsg.c_str(),1,false);
if( ret != MOSQ_ERR_SUCCESS )
{
mycrit << myname << "(sensorInfo): PUBLISH FAILED: err(" << ret << "): " << mosqpp::strerror(ret) << endl;
}
}
// -----------------------------------------------------------------------------
#ifndef _MQTTPublisher_H_
#define _MQTTPublisher_H_
// -----------------------------------------------------------------------------
#include <unordered_map>
#include <list>
#include <memory>
#include <mosquittopp.h>
#include "UObject_SK.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "extensions/Extensions.h"
// -----------------------------------------------------------------------------
/*!
\page page_MQTTPublisher Реализация MQTT издателя
- \ref sec_MQTT_Comm
- \ref sec_MQTT_Conf
\section sec_MQTT_Comm Общее описание MQTTPublisher
MQTT - это..
Данная реализация построена на использованиие билиотеки mosquitto.
\section sec_MQTT_Conf Настройка MQTTPublisher
\todo Доделать контрольный таймер (контроль наличия соединения с сервером)
*/
class MQTTPublisher:
public UObject_SK,
public mosqpp::mosquittopp
{
public:
MQTTPublisher( UniSetTypes::ObjectId objId, xmlNode* cnode, UniSetTypes::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "mqtt" );
virtual ~MQTTPublisher();
/*! глобальная функция для инициализации объекта */
static std::shared_ptr<MQTTPublisher> init_mqttpublisher( int argc, const char* const* argv,
UniSetTypes::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "mqtt" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* const* argv );
inline std::shared_ptr<LogAgregator> getLogAggregator()
{
return loga;
}
inline std::shared_ptr<DebugStream> log()
{
return mylog;
}
virtual void on_connect(int rc) override;
virtual void on_message(const struct mosquitto_message *message) override;
virtual void on_subscribe(int mid, int qos_count, const int *granted_qos) override;
protected:
MQTTPublisher();
virtual void askSensors( UniversalIO::UIOCommand cmd ) override;
virtual void sensorInfo( const UniSetTypes::SensorMessage* sm ) override;
virtual void sigterm( int signo ) override;
virtual bool deactivateObject() override;
virtual void sysCommand( const UniSetTypes::SystemMessage* sm ) override;
std::shared_ptr<SMInterface> shm;
struct MQTTInfo
{
UniSetTypes::ObjectId sid;
std::string pubname;
MQTTInfo( UniSetTypes::ObjectId id, const std::string& name ):
sid(id), pubname(name) {}
};
typedef std::unordered_map<UniSetTypes::ObjectId, MQTTInfo> MQTTMap;
MQTTMap publist;
private:
std::string prefix;
std::string topicsensors; // "топик" для публикации датчиков
bool connectOK = { false };
std::string host = { "localhost" };
int port = { 1883 };
int keepalive = { 60 };
};
// -----------------------------------------------------------------------------
#endif // _MQTTPublisher_H_
// -----------------------------------------------------------------------------
if DISABLE_MQTT
else
bin_PROGRAMS = @PACKAGE@-mqttpublisher
MQTT_VER=@LIBVER@
lib_LTLIBRARIES = libUniSet2MQTTPublisher.la
libUniSet2MQTTPublisher_la_LDFLAGS = -version-info $(MQTT_VER)
libUniSet2MQTTPublisher_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS) $(MQTT_LIBS)
libUniSet2MQTTPublisher_la_CXXFLAGS = -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/SharedMemory \
$(SIGC_CFLAGS) $(MQTT_CFLAGS)
libUniSet2MQTTPublisher_la_SOURCES = MQTTPublisher.cc
@PACKAGE@_mqttpublisher_SOURCES = main.cc
@PACKAGE@_mqttpublisher_LDADD = libUniSet2MQTTPublisher.la $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS) $(MQTT_LIBS)
@PACKAGE@_mqttpublisher_CXXFLAGS = -I$(top_builddir)/extensions/include \
-I$(top_builddir)/extensions/SharedMemory \
$(SIGC_CFLAGS) $(MQTT_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions/mqtt
#pkgconfigdir = $(libdir)/pkgconfig
#pkgconfig_DATA = libUniSet2MQTTPublisher.pc
#all-local:
# ln -sf ../MQTTPublisher/$(devel_include_HEADERS) ../include
include $(top_builddir)/include.mk
endif
#include <sstream>
#include "MQTTPublisher.h"
#include "Configuration.h"
#include "Debug.h"
#include "UniSetActivator.h"
#include "Extensions.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
int main( int argc, const char** argv )
{
// std::ios::sync_with_stdio(false);
if( argc > 1 && (!strcmp(argv[1], "--help") || !strcmp(argv[1], "-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: autodetect" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << endl;
MQTTPublisher::help_print(argc, argv);
return 0;
}
try
{
auto conf = uniset_init( argc, argv );
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
auto mqtt = MQTTPublisher::init_mqttpublisher(argc, argv, shmID);
if( !mqtt )
{
dcrit << "(mqttpublisher): init failed..." << endl;
return 1;
}
auto act = UniSetActivator::Instance();
act->add(mqtt);
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
ulogany << "\n\n\n";
ulogany << "(main): -------------- MQTTPublisher START -------------------------\n\n";
dlogany << "\n\n\n";
dlogany << "(main): -------------- MQTTPublisher START -------------------------\n\n";
act->run(false);
return 0;
}
catch( UniSetTypes::Exception& ex )
{
cerr << "(mqttpublisher): " << ex << std::endl;
}
catch(...)
{
cerr << "(mqttpublisher): catch ..." << std::endl;
}
return 1;
}
#!/bin/sh
uniset2-start.sh -f ./uniset2-mqttpublisher --confile test.xml \
--mqtt-name MQTTPublisher1 \
--mqtt-filter-field mqtt \
--mqtt-filter-value 1 \
--mqtt-log-add-levels any $*
../../Utilities/scripts/uniset2-stop.sh
\ No newline at end of file
../../conf/test.xml
\ No newline at end of file
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
......@@ -5,7 +5,7 @@
if HAVE_EXTENTIONS
SUBDIRS = lib include SharedMemory SharedMemory/tests IOControl LogicProcessor LogicProcessor/tests \
ModbusMaster ModbusSlave SMViewer UniNetwork UNetUDP UNetUDP/tests \
DBServer-MySQL DBServer-SQLite DBServer-PostgreSQL \
DBServer-MySQL DBServer-SQLite DBServer-PostgreSQL MQTTPublisher \
RRDServer SharedMemoryPlus tests ModbusMaster/tests ModbusSlave/tests
#SMDBServer
......
......@@ -46,6 +46,7 @@ extensions/include/MTR.h
extensions/include/PassiveLProcessor.h
extensions/include/PID.h
extensions/include/RRDServer.h
extensions/include/MQTTPublisher.h
extensions/include/RTUExchange.h
extensions/include/RTUStorage.h
extensions/include/Schema.h
......@@ -153,6 +154,10 @@ extensions/RRDServer/main.cc
extensions/RRDServer/Makefile.am
extensions/RRDServer/RRDServer.cc
extensions/RRDServer/RRDServer.h
extensions/MQTTPublisher/main.cc
extensions/MQTTPublisher/Makefile.am
extensions/MQTTPublisher/MQTTPublisher.cc
extensions/MQTTPublisher/MQTTPublisher.h
extensions/SharedMemory/libUniSet2SharedMemory.pc.in
extensions/SharedMemory/Makefile.am
extensions/SharedMemory/SharedMemory.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment