Commit 5512dac2 authored by Pavel Vainerman's avatar Pavel Vainerman

(DBServer_MySQL): Сделал буффер для запросов, чтобы по возможности не

терять сообщения (eterbug #8520)
parent ed49677f
......@@ -4,7 +4,7 @@ ulimit -Sc 1000000
#for i in `seq 1 20`;
#do
uniset-start.sh -f ./uniset-simitator --confile test.xml --sid 10@localhost,9@3002
uniset-start.sh -f ./uniset-simitator --confile test.xml --sid 34
#done
#wait
......
......@@ -3,7 +3,7 @@
Name: libuniset
Version: 1.4
Release: alt2
Release: alt3
Summary: UniSet - library for building distributed industrial control systems
License: GPL
Group: Development/C++
......@@ -213,6 +213,9 @@ rm -f %buildroot%_libdir/*.la
%changelog
* Sun Jun 10 2012 Pavel Vainerman <pv@altlinux.ru> 1.4-alt3
- (DBServer_MySQL): buffer is added to query
* Fri Jun 08 2012 Pavel Vainerman <pv@altlinux.ru> 1.4-alt2
- added support type 'double' for uniset-codegen (<variables>)
......
......@@ -85,7 +85,7 @@
textname - текстовое имя датчика
-->
<nodes port="2809" unet_broadcast_ip="192.168.1.255" unet_broadcast_ip2="192.168.122.255">
<item id="3000" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<item id="3000" dbserver="DBServer1" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards>
<item card="1" name="DI32"/>
<item card="2" name="DO32"/>
......
......@@ -22,7 +22,6 @@
* \author Pavel Vainerman
*/
// --------------------------------------------------------------------------
#include <sys/time.h>
#include <sstream>
#include <iomanip>
......@@ -32,11 +31,10 @@
#include "Configuration.h"
#include "Debug.h"
#include "UniXML.h"
// --------------------------------------------------------------------------
using namespace UniSetTypes;
using namespace std;
// ------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------
DBServer_MySQL::DBServer_MySQL(ObjectId id):
DBServer(id),
......@@ -44,13 +42,13 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id):
PingTime(300000),
ReconnectTime(180000),
connect_ok(false),
activate(true)
activate(true),
lastRemove(false)
{
if( getId() == DefaultObjectId )
{
ostringstream msg;
msg << "(DBServer_MySQL): Запуск невозможен! НЕ ОПРЕДЕЛЁН ObjectId !!!!!\n";
// unideb[Debug::CRIT] << msg.str() << endl;
msg << "(DBServer_MySQL): init failed! Unknown ID!" << endl;
throw Exception(msg.str());
}
}
......@@ -61,14 +59,14 @@ DBServer_MySQL::DBServer_MySQL():
PingTime(300000),
ReconnectTime(180000),
connect_ok(false),
activate(true)
activate(true),
lastRemove(false)
{
// init();
if( getId() == DefaultObjectId )
{
ostringstream msg;
msg << "(DBServer_MySQL): Запуск невозможен! Для данного узла НЕ ОПРЕДЕЛЁН ObjectId !!!!!\n";
// unideb[Debug::CRIT] << msg.str() << endl;
msg << "(DBServer_MySQL): init failed! Unknown ID!" << endl;
throw Exception(msg.str());
}
}
......@@ -98,28 +96,30 @@ void DBServer_MySQL::processingMessage( UniSetTypes::VoidMessage *msg )
DBServer::processingMessage(msg);
break;
}
}
//--------------------------------------------------------------------------------------------
void DBServer_MySQL::sysCommand( UniSetTypes::SystemMessage *sm )
{
switch( sm->command )
{
case SystemMessage::StartUp:
break;
case SystemMessage::Finish:
{
activate = false;
db->freeResult();
db->close();
break;
}
break;
case SystemMessage::FoldUp:
{
activate = false;
db->freeResult();
db->close();
break;
}
break;
default:
break;
......@@ -245,14 +245,30 @@ bool DBServer_MySQL::writeToBase( const string& query )
// cout << "DBServer_MySQL: " << query << endl;
if( !db || !connect_ok )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): соединение с БД не установлено\n"
<< myname << "(writeToBase): lost query: "
<< query << endl;
qbuf.push(query);
if( qbuf.size() > qbufSize )
{
std::string qlost;
if( lastRemove )
qlost = qbuf.back();
else
qlost = qbuf.front();
qbuf.pop();
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): DB not connected! buffer(" << qbufSize
<< ") overflow! lost query: " << qlost << endl;
}
return false;
}
db->query( query );
// На всякий скидываем очередь
flushBuffer();
// А теперь собственно запрос..
db->query(query);
// Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка
......@@ -267,6 +283,30 @@ bool DBServer_MySQL::writeToBase( const string& query )
return false;
}
//--------------------------------------------------------------------------------------------
void DBServer_MySQL::flushBuffer()
{
// Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() )
{
db->query( qbuf.front() );
// Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка
// см. DBInterface::query.
string err(db->error());
if( err.empty() )
db->freeResult();
else if( unideb.debugging(Debug::CRIT) )
{
unideb[Debug::CRIT] << myname << "(writeToBase): error: " << err <<
" lost query: " << qbuf.front() << endl;
}
qbuf.pop();
}
}
//--------------------------------------------------------------------------------------------
void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
{
try
......@@ -309,7 +349,8 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
break;
default:
unideb[Debug::WARN] << myname << "(log sensor): неизвестный тип датчика....(сообщение игнорировано)" << endl;
unideb[Debug::WARN] << myname << "(log sensor): Unknown iotype='"
<< si->sensor_type << "'.. ignore SensorMessage..." << endl;
return;
}
......@@ -330,7 +371,7 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
catch( ... )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(parse SensorMessage): неизвестное исключние..." << endl;
unideb[Debug::CRIT] << myname << "(parse SensorMessage): catch..." << endl;
}
}
......@@ -351,17 +392,19 @@ void DBServer_MySQL::init_dbserver()
if( conf->getDBServer() == UniSetTypes::DefaultObjectId )
{
ostringstream msg;
msg << myname << "(init): на данном узле DBServer - отключён."
<< " В " << conf->getConfFileName()
<< " для данного узла указан параметр dbserver=''";
msg << myname << "(init): DBServer OFF for this node.."
<< " In " << conf->getConfFileName()
<< " for this node dbserver=''";
throw NameNotFound(msg.str());
}
xmlNode* node = conf->getNode("LocalDBServer");
if( !node )
throw NameNotFound(string(myname+"(init): в конфигурационном файле не найден раздел LocalDBServer"));
throw NameNotFound(string(myname+"(init): section <LocalDBServer> not found.."));
UniXML::iterator it(node);
unideb[DBLogInfoLevel] << myname << "(init): инициализируем соединение" << endl;
unideb[DBLogInfoLevel] << myname << "(init): init connection.." << endl;
string dbname(conf->getProp(node,"dbname"));
string dbnode(conf->getProp(node,"dbnode"));
string user(conf->getProp(node,"dbuser"));
......@@ -373,6 +416,14 @@ void DBServer_MySQL::init_dbserver()
PingTime = conf->getIntProp(node,"pingTime");
ReconnectTime = conf->getIntProp(node,"reconnectTime");
qbufSize = conf->getArgPInt("--dbserver-buffer-size",it.getProp("bufferSize"),200);
if( findArgParam("--dbserver-buffer-last-remove",conf->getArgc(),conf->getArgv()) != -1 )
lastRemove = true;
else if( it.getIntProp("bufferLastRemove" ) !=0 )
lastRemove = true;
else
lastRemove = false;
if( dbnode.empty() )
dbnode = "localhost";
......@@ -388,7 +439,7 @@ void DBServer_MySQL::init_dbserver()
// ostringstream err;
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname
<< "(init): не смог создать соединение с БД err:\n"
<< "(init): DB connection error: "
<< db->error() << endl;
// throw Exception( string(myname+"(init): не смогли создать соединение с БД "+db->error()) );
askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime);
......@@ -396,13 +447,14 @@ void DBServer_MySQL::init_dbserver()
else
{
if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(init): connect ok\n";
unideb[DBLogInfoLevel] << myname << "(init): connect [OK]" << endl;
connect_ok = true;
askTimer(DBServer_MySQL::ReconnectTimer,0);
askTimer(DBServer_MySQL::PingTimer,PingTime);
// createTables(db);
initDB(db);
initDBTableMap(tblMap);
flushBuffer();
}
}
//--------------------------------------------------------------------------------------------
......@@ -412,7 +464,7 @@ void DBServer_MySQL::createTables( DBInterface *db )
if(!it)
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << ": не найден раздел Tables...."<< endl;
unideb[Debug::CRIT] << myname << ": section <Tables> not found.."<< endl;
throw Exception();
}
......@@ -421,7 +473,7 @@ void DBServer_MySQL::createTables( DBInterface *db )
if( it.getName() != "comment" )
{
if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(createTables): создаем " << it.getName() << endl;
unideb[DBLogInfoLevel] << myname << "(createTables): create " << it.getName() << endl;
ostringstream query;
query << "CREATE TABLE " << conf->getProp(it,"name") << "(" << conf->getProp(it,"create") << ")";
if( !db->query(query.str()) && unideb.debugging(Debug::CRIT) )
......@@ -439,7 +491,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
if( !db->ping() )
{
if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): потеряно соединение с сервером БД" << endl;
unideb[Debug::WARN] << myname << "(timerInfo): DB lost connection.." << endl;
connect_ok = false;
askTimer(DBServer_MySQL::PingTimer,0);
askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime);
......@@ -448,7 +500,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
{
connect_ok = true;
if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(timerInfo): DB ping ok\n";
unideb[DBLogInfoLevel] << myname << "(timerInfo): DB ping ok" << endl;
}
}
break;
......@@ -467,7 +519,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
}
connect_ok = false;
if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): нет связи с БД" << endl;
unideb[Debug::WARN] << myname << "(timerInfo): DB no connection.." << endl;
}
else
init_dbserver();
......@@ -476,7 +528,7 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
default:
if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): неизвестный таймер tid=" << tm->id << endl;
unideb[Debug::WARN] << myname << "(timerInfo): Unknown TimerID=" << tm->id << endl;
break;
}
}
......
......@@ -25,6 +25,7 @@
#define DBServer_MySQL_H_
// --------------------------------------------------------------------------
#include <map>
#include <queue>
#include "UniSetTypes.h"
#include "DBInterface.h"
#include "DBServer.h"
......@@ -35,6 +36,7 @@
- \ref sec_DBS_Comm
- \ref sec_DBS_Conf
- \ref sec_DBS_Tables
- \ref sec_DBS_Buffer
\section sec_DBS_Comm Общее описание работы DBServer_MySQL
......@@ -68,7 +70,16 @@
- \b pingTime - период проверки связи с сервером MySQL
- \b reconnectTime - время повторной попытки соединения с БД
\section sec_DBS_Buffer Защита от потери данных
Для того, чтобы не момент осутствия связи с БД данные по возможности не потерялись,
сделан "кольцевой" буфер. Размер которго можно решулировать параметром "--dbserver-buffer-size"
или параметром \b bufferSize=".." в конфигурационном файле секции "<LocalDBSErver...>".
Механизм построен на том, что если связь с mysql сервером отсутствует или пропала,
то сообщения помещаются в колевой буфер, который "опустошается" как только она восстановится.
Если связь не восстановилась а буфер достиг максимального заданного размера, то удаляются
более ранние сообщения. Эту логику можно сменить, если указать параметр "--dbserver-buffer-last-remove" или \b bufferLastRemove="1", то терятся будут сообщения добавляемые в конец.
\section sec_DBS_Tables Таблицы MySQL
К основным таблицам относятся следующие:
\code
......@@ -181,6 +192,14 @@ class DBServer_MySQL:
bool activate;
typedef std::queue<std::string> QueryBuffer;
QueryBuffer qbuf;
unsigned int qbufSize; // размер буфера сообщений.
bool lastRemove;
void flushBuffer();
private:
DBTableMap tblMap;
......
......@@ -2,4 +2,7 @@
ulimit -Sc 1000000
uniset-start.sh -f ./uniset-mysql-dbserver --confile test.xml --name DBServer1 --unideb-add-levels info,crit,warn,level9,system
uniset-start.sh -f ./uniset-mysql-dbserver --confile test.xml --name DBServer1 \
--unideb-add-levels info,crit,warn,level9,system \
--dbserver-buffer-size 100
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment