Commit ce6827a8 authored by Pavel Vainerman's avatar Pavel Vainerman

(DBServer_MySQL): Сделал буффер для запросов, чтобы по возможности не терять…

(DBServer_MySQL): Сделал буффер для запросов, чтобы по возможности не терять сообщения (eterbug #8520)
parent 5c4cf227
...@@ -4,7 +4,7 @@ ulimit -Sc 1000000 ...@@ -4,7 +4,7 @@ ulimit -Sc 1000000
#for i in `seq 1 20`; #for i in `seq 1 20`;
#do #do
uniset-start.sh -f ./uniset-simitator --confile test.xml --sid 10@localhost,9@3002 uniset-start.sh -f ./uniset-simitator --confile test.xml --sid 34
#done #done
#wait #wait
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
Name: libuniset Name: libuniset
Version: 1.4 Version: 1.4
Release: alt2 Release: alt3
Summary: UniSet - library for building distributed industrial control systems Summary: UniSet - library for building distributed industrial control systems
License: GPL License: GPL
Group: Development/C++ Group: Development/C++
...@@ -213,6 +213,9 @@ rm -f %buildroot%_libdir/*.la ...@@ -213,6 +213,9 @@ rm -f %buildroot%_libdir/*.la
%changelog %changelog
* Sun Jun 10 2012 Pavel Vainerman <pv@altlinux.ru> 1.4-alt3
- (DBServer_MySQL): buffer is added to query
* Fri Jun 08 2012 Pavel Vainerman <pv@altlinux.ru> 1.4-alt2 * Fri Jun 08 2012 Pavel Vainerman <pv@altlinux.ru> 1.4-alt2
- added support type 'double' for uniset-codegen (<variables>) - added support type 'double' for uniset-codegen (<variables>)
......
...@@ -85,7 +85,7 @@ ...@@ -85,7 +85,7 @@
textname - текстовое имя датчика textname - текстовое имя датчика
--> -->
<nodes port="2809" unet_broadcast_ip="192.168.1.255" unet_broadcast_ip2="192.168.122.255"> <nodes port="2809" unet_broadcast_ip="192.168.1.255" unet_broadcast_ip2="192.168.122.255">
<item infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048"> <item id="3000" dbserver="DBServer1" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards> <iocards>
<item card="1" name="DI32"/> <item card="1" name="DI32"/>
<item card="2" name="DO32"/> <item card="2" name="DO32"/>
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
* \author Pavel Vainerman * \author Pavel Vainerman
*/ */
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#include <sys/time.h> #include <sys/time.h>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
...@@ -32,11 +31,10 @@ ...@@ -32,11 +31,10 @@
#include "Configuration.h" #include "Configuration.h"
#include "Debug.h" #include "Debug.h"
#include "UniXML.h" #include "UniXML.h"
// --------------------------------------------------------------------------
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace std; using namespace std;
// --------------------------------------------------------------------------
// ------------------------------------------------------------------------------------------
DBServer_MySQL::DBServer_MySQL(ObjectId id): DBServer_MySQL::DBServer_MySQL(ObjectId id):
DBServer(id), DBServer(id),
...@@ -44,13 +42,13 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id): ...@@ -44,13 +42,13 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id):
PingTime(300000), PingTime(300000),
ReconnectTime(180000), ReconnectTime(180000),
connect_ok(false), connect_ok(false),
activate(true) activate(true),
lastRemove(false)
{ {
if( getId() == DefaultObjectId ) if( getId() == DefaultObjectId )
{ {
ostringstream msg; ostringstream msg;
msg << "(DBServer_MySQL): Запуск невозможен! НЕ ОПРЕДЕЛЁН ObjectId !!!!!\n"; msg << "(DBServer_MySQL): init failed! Unknown ID!" << endl;
// unideb[Debug::CRIT] << msg.str() << endl;
throw Exception(msg.str()); throw Exception(msg.str());
} }
} }
...@@ -61,14 +59,14 @@ DBServer_MySQL::DBServer_MySQL(): ...@@ -61,14 +59,14 @@ DBServer_MySQL::DBServer_MySQL():
PingTime(300000), PingTime(300000),
ReconnectTime(180000), ReconnectTime(180000),
connect_ok(false), connect_ok(false),
activate(true) activate(true),
lastRemove(false)
{ {
// init(); // init();
if( getId() == DefaultObjectId ) if( getId() == DefaultObjectId )
{ {
ostringstream msg; ostringstream msg;
msg << "(DBServer_MySQL): Запуск невозможен! Для данного узла НЕ ОПРЕДЕЛЁН ObjectId !!!!!\n"; msg << "(DBServer_MySQL): init failed! Unknown ID!" << endl;
// unideb[Debug::CRIT] << msg.str() << endl;
throw Exception(msg.str()); throw Exception(msg.str());
} }
} }
...@@ -98,28 +96,30 @@ void DBServer_MySQL::processingMessage( UniSetTypes::VoidMessage *msg ) ...@@ -98,28 +96,30 @@ void DBServer_MySQL::processingMessage( UniSetTypes::VoidMessage *msg )
DBServer::processingMessage(msg); DBServer::processingMessage(msg);
break; break;
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::sysCommand( UniSetTypes::SystemMessage *sm ) void DBServer_MySQL::sysCommand( UniSetTypes::SystemMessage *sm )
{ {
switch( sm->command ) switch( sm->command )
{ {
case SystemMessage::StartUp: case SystemMessage::StartUp:
break; break;
case SystemMessage::Finish: case SystemMessage::Finish:
{
activate = false; activate = false;
db->freeResult(); db->freeResult();
db->close(); db->close();
break; }
break;
case SystemMessage::FoldUp: case SystemMessage::FoldUp:
{
activate = false; activate = false;
db->freeResult(); db->freeResult();
db->close(); db->close();
break; }
break;
default: default:
break; break;
...@@ -245,14 +245,30 @@ bool DBServer_MySQL::writeToBase( const string& query ) ...@@ -245,14 +245,30 @@ bool DBServer_MySQL::writeToBase( const string& query )
// cout << "DBServer_MySQL: " << query << endl; // cout << "DBServer_MySQL: " << query << endl;
if( !db || !connect_ok ) if( !db || !connect_ok )
{ {
if( unideb.debugging(Debug::CRIT) ) qbuf.push(query);
unideb[Debug::CRIT] << myname << "(writeToBase): соединение с БД не установлено\n" if( qbuf.size() > qbufSize )
<< myname << "(writeToBase): lost query: " {
<< query << endl; std::string qlost;
if( lastRemove )
qlost = qbuf.back();
else
qlost = qbuf.front();
qbuf.pop();
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): DB not connected! buffer(" << qbufSize
<< ") overflow! lost query: " << qlost << endl;
}
return false; return false;
} }
db->query( query ); // На всякий скидываем очередь
flushBuffer();
// А теперь собственно запрос..
db->query(query);
// Дело в том что на INSERT И UPDATE запросы // Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому // db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка // отдельно проверять действительно ли произошла ошибка
...@@ -267,6 +283,30 @@ bool DBServer_MySQL::writeToBase( const string& query ) ...@@ -267,6 +283,30 @@ bool DBServer_MySQL::writeToBase( const string& query )
return false; return false;
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::flushBuffer()
{
// Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() )
{
db->query( qbuf.front() );
// Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка
// см. DBInterface::query.
string err(db->error());
if( err.empty() )
db->freeResult();
else if( unideb.debugging(Debug::CRIT) )
{
unideb[Debug::CRIT] << myname << "(writeToBase): error: " << err <<
" lost query: " << qbuf.front() << endl;
}
qbuf.pop();
}
}
//--------------------------------------------------------------------------------------------
void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
{ {
try try
...@@ -309,7 +349,8 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) ...@@ -309,7 +349,8 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
break; break;
default: default:
unideb[Debug::WARN] << myname << "(log sensor): неизвестный тип датчика....(сообщение игнорировано)" << endl; unideb[Debug::WARN] << myname << "(log sensor): Unknown iotype='"
<< si->sensor_type << "'.. ignore SensorMessage..." << endl;
return; return;
} }
...@@ -330,7 +371,7 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) ...@@ -330,7 +371,7 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
catch( ... ) catch( ... )
{ {
if( unideb.debugging(Debug::CRIT) ) if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(parse SensorMessage): неизвестное исключние..." << endl; unideb[Debug::CRIT] << myname << "(parse SensorMessage): catch..." << endl;
} }
} }
...@@ -351,17 +392,19 @@ void DBServer_MySQL::init_dbserver() ...@@ -351,17 +392,19 @@ void DBServer_MySQL::init_dbserver()
if( conf->getDBServer() == UniSetTypes::DefaultObjectId ) if( conf->getDBServer() == UniSetTypes::DefaultObjectId )
{ {
ostringstream msg; ostringstream msg;
msg << myname << "(init): на данном узле DBServer - отключён." msg << myname << "(init): DBServer OFF for this node.."
<< " В " << conf->getConfFileName() << " In " << conf->getConfFileName()
<< " для данного узла указан параметр dbserver=''"; << " for this node dbserver=''";
throw NameNotFound(msg.str()); throw NameNotFound(msg.str());
} }
xmlNode* node = conf->getNode("LocalDBServer"); xmlNode* node = conf->getNode("LocalDBServer");
if( !node ) if( !node )
throw NameNotFound(string(myname+"(init): в конфигурационном файле не найден раздел LocalDBServer")); throw NameNotFound(string(myname+"(init): section <LocalDBServer> not found.."));
UniXML::iterator it(node);
unideb[DBLogInfoLevel] << myname << "(init): инициализируем соединение" << endl; unideb[DBLogInfoLevel] << myname << "(init): init connection.." << endl;
string dbname(conf->getProp(node,"dbname")); string dbname(conf->getProp(node,"dbname"));
string dbnode(conf->getProp(node,"dbnode")); string dbnode(conf->getProp(node,"dbnode"));
string user(conf->getProp(node,"dbuser")); string user(conf->getProp(node,"dbuser"));
...@@ -373,6 +416,14 @@ void DBServer_MySQL::init_dbserver() ...@@ -373,6 +416,14 @@ void DBServer_MySQL::init_dbserver()
PingTime = conf->getIntProp(node,"pingTime"); PingTime = conf->getIntProp(node,"pingTime");
ReconnectTime = conf->getIntProp(node,"reconnectTime"); ReconnectTime = conf->getIntProp(node,"reconnectTime");
qbufSize = conf->getArgPInt("--dbserver-buffer-size",it.getProp("bufferSize"),200);
if( findArgParam("--dbserver-buffer-last-remove",conf->getArgc(),conf->getArgv()) != -1 )
lastRemove = true;
else if( it.getIntProp("bufferLastRemove" ) !=0 )
lastRemove = true;
else
lastRemove = false;
if( dbnode.empty() ) if( dbnode.empty() )
dbnode = "localhost"; dbnode = "localhost";
...@@ -388,7 +439,7 @@ void DBServer_MySQL::init_dbserver() ...@@ -388,7 +439,7 @@ void DBServer_MySQL::init_dbserver()
// ostringstream err; // ostringstream err;
if( unideb.debugging(Debug::CRIT) ) if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname unideb[Debug::CRIT] << myname
<< "(init): не смог создать соединение с БД err:\n" << "(init): DB connection error: "
<< db->error() << endl; << db->error() << endl;
// throw Exception( string(myname+"(init): не смогли создать соединение с БД "+db->error()) ); // throw Exception( string(myname+"(init): не смогли создать соединение с БД "+db->error()) );
askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime); askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime);
...@@ -396,13 +447,14 @@ void DBServer_MySQL::init_dbserver() ...@@ -396,13 +447,14 @@ void DBServer_MySQL::init_dbserver()
else else
{ {
if( unideb.debugging(DBLogInfoLevel) ) if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(init): connect ok\n"; unideb[DBLogInfoLevel] << myname << "(init): connect [OK]" << endl;
connect_ok = true; connect_ok = true;
askTimer(DBServer_MySQL::ReconnectTimer,0); askTimer(DBServer_MySQL::ReconnectTimer,0);
askTimer(DBServer_MySQL::PingTimer,PingTime); askTimer(DBServer_MySQL::PingTimer,PingTime);
// createTables(db); // createTables(db);
initDB(db); initDB(db);
initDBTableMap(tblMap); initDBTableMap(tblMap);
flushBuffer();
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
...@@ -412,7 +464,7 @@ void DBServer_MySQL::createTables( DBInterface *db ) ...@@ -412,7 +464,7 @@ void DBServer_MySQL::createTables( DBInterface *db )
if(!it) if(!it)
{ {
if( unideb.debugging(Debug::CRIT) ) if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << ": не найден раздел Tables...."<< endl; unideb[Debug::CRIT] << myname << ": section <Tables> not found.."<< endl;
throw Exception(); throw Exception();
} }
...@@ -421,7 +473,7 @@ void DBServer_MySQL::createTables( DBInterface *db ) ...@@ -421,7 +473,7 @@ void DBServer_MySQL::createTables( DBInterface *db )
if( it.getName() != "comment" ) if( it.getName() != "comment" )
{ {
if( unideb.debugging(DBLogInfoLevel) ) if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(createTables): создаем " << it.getName() << endl; unideb[DBLogInfoLevel] << myname << "(createTables): create " << it.getName() << endl;
ostringstream query; ostringstream query;
query << "CREATE TABLE " << conf->getProp(it,"name") << "(" << conf->getProp(it,"create") << ")"; query << "CREATE TABLE " << conf->getProp(it,"name") << "(" << conf->getProp(it,"create") << ")";
if( !db->query(query.str()) && unideb.debugging(Debug::CRIT) ) if( !db->query(query.str()) && unideb.debugging(Debug::CRIT) )
...@@ -439,7 +491,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -439,7 +491,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
if( !db->ping() ) if( !db->ping() )
{ {
if( unideb.debugging(Debug::WARN) ) if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): потеряно соединение с сервером БД" << endl; unideb[Debug::WARN] << myname << "(timerInfo): DB lost connection.." << endl;
connect_ok = false; connect_ok = false;
askTimer(DBServer_MySQL::PingTimer,0); askTimer(DBServer_MySQL::PingTimer,0);
askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime); askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime);
...@@ -448,7 +500,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -448,7 +500,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
{ {
connect_ok = true; connect_ok = true;
if( unideb.debugging(DBLogInfoLevel) ) if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(timerInfo): DB ping ok\n"; unideb[DBLogInfoLevel] << myname << "(timerInfo): DB ping ok" << endl;
} }
} }
break; break;
...@@ -467,7 +519,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -467,7 +519,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
} }
connect_ok = false; connect_ok = false;
if( unideb.debugging(Debug::WARN) ) if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): нет связи с БД" << endl; unideb[Debug::WARN] << myname << "(timerInfo): DB no connection.." << endl;
} }
else else
init_dbserver(); init_dbserver();
...@@ -476,7 +528,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -476,7 +528,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
default: default:
if( unideb.debugging(Debug::WARN) ) if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): неизвестный таймер tid=" << tm->id << endl; unideb[Debug::WARN] << myname << "(timerInfo): Unknown TimerID=" << tm->id << endl;
break; break;
} }
} }
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#define DBServer_MySQL_H_ #define DBServer_MySQL_H_
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#include <map> #include <map>
#include <queue>
#include "UniSetTypes.h" #include "UniSetTypes.h"
#include "DBInterface.h" #include "DBInterface.h"
#include "DBServer.h" #include "DBServer.h"
...@@ -35,6 +36,7 @@ ...@@ -35,6 +36,7 @@
- \ref sec_DBS_Comm - \ref sec_DBS_Comm
- \ref sec_DBS_Conf - \ref sec_DBS_Conf
- \ref sec_DBS_Tables - \ref sec_DBS_Tables
- \ref sec_DBS_Buffer
\section sec_DBS_Comm Общее описание работы DBServer_MySQL \section sec_DBS_Comm Общее описание работы DBServer_MySQL
...@@ -68,6 +70,15 @@ ...@@ -68,6 +70,15 @@
- \b pingTime - период проверки связи с сервером MySQL - \b pingTime - период проверки связи с сервером MySQL
- \b reconnectTime - время повторной попытки соединения с БД - \b reconnectTime - время повторной попытки соединения с БД
\section sec_DBS_Buffer Защита от потери данных
Для того, чтобы не момент осутствия связи с БД данные по возможности не потерялись,
сделан "кольцевой" буфер. Размер которго можно решулировать параметром "--dbserver-buffer-size"
или параметром \b bufferSize=".." в конфигурационном файле секции "<LocalDBSErver...>".
Механизм построен на том, что если связь с mysql сервером отсутствует или пропала,
то сообщения помещаются в колевой буфер, который "опустошается" как только она восстановится.
Если связь не восстановилась а буфер достиг максимального заданного размера, то удаляются
более ранние сообщения. Эту логику можно сменить, если указать параметр "--dbserver-buffer-last-remove" или \b bufferLastRemove="1", то терятся будут сообщения добавляемые в конец.
\section sec_DBS_Tables Таблицы MySQL \section sec_DBS_Tables Таблицы MySQL
К основным таблицам относятся следующие: К основным таблицам относятся следующие:
...@@ -181,6 +192,14 @@ class DBServer_MySQL: ...@@ -181,6 +192,14 @@ class DBServer_MySQL:
bool activate; bool activate;
typedef std::queue<std::string> QueryBuffer;
QueryBuffer qbuf;
unsigned int qbufSize; // размер буфера сообщений.
bool lastRemove;
void flushBuffer();
private: private:
DBTableMap tblMap; DBTableMap tblMap;
......
...@@ -2,4 +2,6 @@ ...@@ -2,4 +2,6 @@
ulimit -Sc 1000000 ulimit -Sc 1000000
uniset-start.sh -f ./uniset-mysql-dbserver --confile test.xml --name DBServer1 --unideb-add-levels info,crit,warn,level9,system uniset-start.sh -f ./uniset-mysql-dbserver --confile test.xml --name DBServer1 \
--unideb-add-levels info,crit,warn,level9,system \
--dbserver-buffer-size 100
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment