Commit c71233bb authored by Pavel Vainerman's avatar Pavel Vainerman

(DBServer_MySQL): вернул версию "без буфера", т.к. не заработала как надо

parent cb0c2342
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include <sys/time.h> #include <sys/time.h>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <cmath>
#include "ORepHelpers.h" #include "ORepHelpers.h"
#include "DBServer_MySQL.h" #include "DBServer_MySQL.h"
...@@ -45,9 +44,8 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id): ...@@ -45,9 +44,8 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id):
ReconnectTime(180000), ReconnectTime(180000),
connect_ok(false), connect_ok(false),
activate(true), activate(true),
lastRemove(false), qbufSize(200),
flushBufferTime(120000), lastRemove(false)
qbufSize(200)
{ {
if( getId() == DefaultObjectId ) if( getId() == DefaultObjectId )
{ {
...@@ -64,6 +62,7 @@ DBServer_MySQL::DBServer_MySQL(): ...@@ -64,6 +62,7 @@ DBServer_MySQL::DBServer_MySQL():
ReconnectTime(180000), ReconnectTime(180000),
connect_ok(false), connect_ok(false),
activate(true), activate(true),
qbufSize(200),
lastRemove(false) lastRemove(false)
{ {
// init(); // init();
...@@ -200,36 +199,11 @@ void DBServer_MySQL::parse( UniSetTypes::ConfirmMessage* cem ) ...@@ -200,36 +199,11 @@ void DBServer_MySQL::parse( UniSetTypes::ConfirmMessage* cem )
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
bool DBServer_MySQL::insertToBuffer( int key, const string& values )
{
// Складываем в очередь
DBTableMap::iterator it = tblMap.find(key);
if( it == tblMap.end() )
{
if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(insertToBuffer): table key=" << key << " not found.." << endl;
return false;
}
if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(insertToBuffer): " << it->second.tblname << " val: " << values << endl;
TableInfo* ti = &it->second;
ti->qbuf.push(values);
ti->qbufByteSize += values.size();
if( ti->qbuf.size() >= ti->qbufSize )
flushTableBuffer(it);
return true;
}
//--------------------------------------------------------------------------------------------
bool DBServer_MySQL::writeToBase( const string& query ) bool DBServer_MySQL::writeToBase( const string& query )
{ {
if( unideb.debugging(DBLogInfoLevel) ) if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(writeToBase): " << query << endl; unideb[DBLogInfoLevel] << myname << "(writeToBase): " << query << endl;
// cout << "DBServer_MySQL: " << query << endl;
if( !db || !connect_ok ) if( !db || !connect_ok )
{ {
uniset_mutex_lock l(mqbuf,200); uniset_mutex_lock l(mqbuf,200);
...@@ -243,16 +217,16 @@ bool DBServer_MySQL::writeToBase( const string& query ) ...@@ -243,16 +217,16 @@ bool DBServer_MySQL::writeToBase( const string& query )
qlost = qbuf.front(); qlost = qbuf.front();
qbuf.pop(); qbuf.pop();
if( unideb.debugging(Debug::CRIT) ) if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): DB not connected! buffer(" << qbufSize unideb[Debug::CRIT] << myname << "(writeToBase): DB not connected! buffer(" << qbufSize
<< ") overflow! lost query: " << qlost << endl; << ") overflow! lost query: " << qlost << endl;
} }
return false; return false;
} }
// На всякий скидываем очередь // На всякий скидываем очередь
flushQBuffer(); flushBuffer();
// А теперь собственно запрос.. // А теперь собственно запрос..
db->query(query); db->query(query);
...@@ -270,128 +244,31 @@ bool DBServer_MySQL::writeToBase( const string& query ) ...@@ -270,128 +244,31 @@ bool DBServer_MySQL::writeToBase( const string& query )
return false; return false;
} }
//-------------------------------------------------------------------------------------------
void DBServer_MySQL::flushQBuffer()
{
uniset_mutex_lock l(mqbuf);
// Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() )
{
db->query( qbuf.front() );
// Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка
// см. MySQLInterface::query.
string err(db->error());
if( !err.empty() && unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): error: " << err <<
" lost query: " << qbuf.front() << endl;
qbuf.pop();
}
}
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::flushTableBuffer( DBTableMap::iterator& it ) void DBServer_MySQL::flushBuffer()
{ {
if( !db || !connect_ok ) uniset_mutex_lock l(mqbuf,400);
return;
if( it == tblMap.end() )
return;
TableInfo* ti(&it->second);
if( ti->qbuf.empty() ) // Сперва пробуем очистить всё что накопилось в очереди до этого...
return; while( !qbuf.empty() )
if( unideb.debugging(DBLEVEL) )
unideb[DBLEVEL] << myname << "(flushBuffer): '" << ti->tblname << "' qbufSize=" << ti->qbufSize << " qbufByteSize=" << ti->qbufByteSize << endl;
// buffer чтобы записать всё одним запросом..
// "ti->qbuf.size() - 1" - это количество добавляемых запятых..
unsigned int cbufSize = ti->insHeader.size() + ti->qbuf.size() - 1 + ti->qbufByteSize + 1;
char* cbuf = new char[cbufSize];
unsigned int i= 0;
if( !cbuf )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(flushBuffer): Can`t allocate memory for buffer (sz=" << ti->qbufByteSize + 1 << endl;
return;
}
std::memcpy( &(cbuf[i]), ti->insHeader.c_str(), ti->insHeader.size() );
i+=ti->insHeader.size();
int n = 1;
while( !ti->qbuf.empty() )
{ {
std::string q(ti->qbuf.front()); db->query( qbuf.front() );
unsigned int sz = q.size(); // Дело в том что на INSERT И UPDATE запросы
if( i + sz > cbufSize ) // db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка
// см. DBInterface::query.
string err(db->error());
if( err.empty() )
db->freeResult();
else if( unideb.debugging(Debug::CRIT) )
{ {
if( unideb.debugging(Debug::CRIT) ) unideb[Debug::CRIT] << myname << "(writeToBase): error: " << err <<
unideb[Debug::CRIT] << myname << "(flushBuffer): BUFFER OVERFLOW bytesize=" << cbufSize << " i=" << i << " sz=" << sz<< endl; " lost query: " << qbuf.front() << endl;
break;
} }
if( n > 1 ) // если запись не первая.. добавляем ',' в конец предыдущей записи qbuf.pop();
cbuf[i++] = ',';
std::memcpy( &(cbuf[i]), q.c_str(), sz );
i += sz;
ti->qbuf.pop();
n++;
} }
// последняя точка с запятой, вроде не нужна (по документации и примерам из MySQL)..
if( cbuf[i-1] == ';' )
cbuf[i-1] = '\0';
else
cbuf[i] = '\0';
if( i < cbufSize )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): BAD qbufByteSize=" << ti->qbufByteSize << " LOST RECORDS..(i=" << i << ")" << endl;
ti->qbufByteSize = 0;
while( !ti->qbuf.empty() )
ti->qbuf.pop();
}
try
{
if( i > 0 )
{
db->query( (const char*)cbuf, false );
// Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка
// см. DBInterface::queryh.
string err(db->error());
if( !err.empty() && unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(flushBuffer): error: " << err << endl;
db->freeResult();
}
}
catch( Exception& ex )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(flushBuffer): " << ex << endl;
}
delete[] cbuf;
ti->qbufByteSize = 0;
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
...@@ -404,24 +281,28 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) ...@@ -404,24 +281,28 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
struct timezone tz; struct timezone tz;
gettimeofday(&si->tm,&tz); gettimeofday(&si->tm,&tz);
} }
float val = (float)si->value / (float)pow10(si->ci.precision); // см. main_history
// см. DBTABLE AnalogSensors, DigitalSensors
ostringstream data; ostringstream data;
data << "('" data << "INSERT INTO " << tblName(si->type)
<< "(date, time, time_usec, sensor_id, value, node) VALUES( '"
// Поля таблицы // Поля таблицы
<< ui.dateToString(si->sm_tv_sec,"-") << "','" // date << ui.dateToString(si->sm_tv_sec,"-") << "','" // date
<< ui.timeToString(si->sm_tv_sec,":") << "','" // time << ui.timeToString(si->sm_tv_sec,":") << "','" // time
<< si->sm_tv_usec << "','" // time_usec << si->sm_tv_usec << "'," // time_usec
<< si->id << "','" // sensor_id << si->id << "," // sensor_id
<< val << "','" // value << si->value << "," // value
<< si->node << "')"; // node << si->node << ")"; // node
if( unideb.debugging(DBLEVEL) ) if( unideb.debugging(DBLEVEL) )
unideb[DBLEVEL] << myname << "(insert_main_history): " << data.str() << endl; unideb[DBLEVEL] << myname << "(insert_main_history): " << data.str() << endl;
insertToBuffer(si->type, data.str()); if( !writeToBase(data.str()) )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(insert) sensor msg error: "<< db->error() << endl;
db->freeResult();
}
} }
catch( Exception& ex ) catch( Exception& ex )
{ {
...@@ -467,22 +348,12 @@ void DBServer_MySQL::init_dbserver() ...@@ -467,22 +348,12 @@ void DBServer_MySQL::init_dbserver()
string user(conf->getProp(node,"dbuser")); string user(conf->getProp(node,"dbuser"));
string password(conf->getProp(node,"dbpass")); string password(conf->getProp(node,"dbpass"));
tblMap[UniSetTypes::Message::SensorInfo] = "main_history";
tblMap[UniSetTypes::Message::Confirm] = "main_history";
PingTime = conf->getIntProp(node,"pingTime"); PingTime = conf->getIntProp(node,"pingTime");
ReconnectTime = conf->getIntProp(node,"reconnectTime"); ReconnectTime = conf->getIntProp(node,"reconnectTime");
flushBufferTime = conf->getArgPInt("--dbserver-flush-buffer-time",it.getProp("flushBufferTime"),120000); qbufSize = conf->getArgPInt("--dbserver-buffer-size",it.getProp("bufferSize"),200);
qbufSize = conf->getArgPInt("--dbserver-lost-buffer-size",it.getProp("lostBufferSize"),200);
int insbufSize = conf->getArgPInt("--dbserver-insert-buffer-size",it.getProp("insertBufferSize"),100000);
TableInfo ti;
ti.tblname = "main_history";
ti.qbufSize = insbufSize;
ti.qbufByteSize = 0;
ti.insHeader = "INSERT INTO main_history(date, time, time_usec, sensor_id, value, node) VALUES";
tblMap[UniSetTypes::Message::SensorInfo] = ti;
tblMap[UniSetTypes::Message::Confirm] = ti;
if( findArgParam("--dbserver-buffer-last-remove",conf->getArgc(),conf->getArgv()) != -1 ) if( findArgParam("--dbserver-buffer-last-remove",conf->getArgc(),conf->getArgv()) != -1 )
lastRemove = true; lastRemove = true;
...@@ -509,7 +380,6 @@ void DBServer_MySQL::init_dbserver() ...@@ -509,7 +380,6 @@ void DBServer_MySQL::init_dbserver()
<< db->error() << endl; << db->error() << endl;
// throw Exception( string(myname+"(init): не смогли создать соединение с БД "+db->error()) ); // throw Exception( string(myname+"(init): не смогли создать соединение с БД "+db->error()) );
askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime); askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime);
askTimer(DBServer_MySQL::FlushBufferTimer,0);
} }
else else
{ {
...@@ -518,12 +388,10 @@ void DBServer_MySQL::init_dbserver() ...@@ -518,12 +388,10 @@ void DBServer_MySQL::init_dbserver()
connect_ok = true; connect_ok = true;
askTimer(DBServer_MySQL::ReconnectTimer,0); askTimer(DBServer_MySQL::ReconnectTimer,0);
askTimer(DBServer_MySQL::PingTimer,PingTime); askTimer(DBServer_MySQL::PingTimer,PingTime);
askTimer(DBServer_MySQL::FlushBufferTimer,flushBufferTime);
// createTables(db); // createTables(db);
initDB(db); initDB(db);
initDBTableMap(tblMap); initDBTableMap(tblMap);
flushQBuffer(); flushBuffer();
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
...@@ -564,7 +432,6 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -564,7 +432,6 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
connect_ok = false; connect_ok = false;
askTimer(DBServer_MySQL::PingTimer,0); askTimer(DBServer_MySQL::PingTimer,0);
askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime); askTimer(DBServer_MySQL::ReconnectTimer,ReconnectTime);
askTimer(DBServer_MySQL::FlushBufferTimer,0);
} }
else else
{ {
...@@ -586,7 +453,6 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -586,7 +453,6 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
connect_ok = true; connect_ok = true;
askTimer(DBServer_MySQL::ReconnectTimer,0); askTimer(DBServer_MySQL::ReconnectTimer,0);
askTimer(DBServer_MySQL::PingTimer,PingTime); askTimer(DBServer_MySQL::PingTimer,PingTime);
askTimer(DBServer_MySQL::FlushBufferTimer,flushBufferTime);
} }
connect_ok = false; connect_ok = false;
if( unideb.debugging(Debug::WARN) ) if( unideb.debugging(Debug::WARN) )
...@@ -597,13 +463,6 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -597,13 +463,6 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
} }
break; break;
case FlushBufferTimer:
{
for( DBTableMap::iterator it=tblMap.begin(); it!=tblMap.end(); ++it )
flushTableBuffer(it);
}
break;
default: default:
if( unideb.debugging(Debug::WARN) ) if( unideb.debugging(Debug::WARN) )
unideb[Debug::WARN] << myname << "(timerInfo): Unknown TimerID=" << tm->id << endl; unideb[Debug::WARN] << myname << "(timerInfo): Unknown TimerID=" << tm->id << endl;
...@@ -611,22 +470,3 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm ) ...@@ -611,22 +470,3 @@ void DBServer_MySQL::timerInfo( UniSetTypes::TimerMessage* tm )
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::sigterm( int signo )
{
for( DBTableMap::iterator it=tblMap.begin(); it!=tblMap.end(); ++it )
{
try {
flushTableBuffer(it);
}
catch(...){}
}
try
{
flushQBuffer();
}
catch(...){}
DBServer::sigterm(signo);
}
//--------------------------------------------------------------------------------------------
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
- \ref sec_DBS_Conf - \ref sec_DBS_Conf
- \ref sec_DBS_Tables - \ref sec_DBS_Tables
- \ref sec_DBS_Buffer - \ref sec_DBS_Buffer
- \ref sec_DBS_InsertBuffer
\section sec_DBS_Comm Общее описание работы DBServer_MySQL \section sec_DBS_Comm Общее описание работы DBServer_MySQL
Сервис предназначен для работы с БД MySQL. В его задачи входит Сервис предназначен для работы с БД MySQL. В его задачи входит
...@@ -73,8 +73,8 @@ ...@@ -73,8 +73,8 @@
\section sec_DBS_Buffer Защита от потери данных \section sec_DBS_Buffer Защита от потери данных
Для того, чтобы на момент отсутствия связи с БД данные по возможности не потерялись, Для того, чтобы на момент отсутствия связи с БД данные по возможности не потерялись,
сделан "кольцевой" буфер. Размер которго можно регулировать параметром "--dbserver-lost-buffer-size" сделан "кольцевой" буфер. Размер которго можно регулировать параметром "--dbserver-buffer-size"
или параметром \b lostBufferSize=".." в конфигурационном файле секции "<LocalDBServer...>". или параметром \b bufferSize=".." в конфигурационном файле секции "<LocalDBSErver...>".
Механизм построен на том, что если связь с mysql сервером отсутствует или пропала, Механизм построен на том, что если связь с mysql сервером отсутствует или пропала,
то сообщения помещаются в колевой буфер, который "опустошается" как только она восстановится. то сообщения помещаются в колевой буфер, который "опустошается" как только она восстановится.
...@@ -82,20 +82,6 @@ ...@@ -82,20 +82,6 @@
более ранние сообщения. Эту логику можно сменить, если указать параметр "--dbserver-buffer-last-remove" более ранние сообщения. Эту логику можно сменить, если указать параметр "--dbserver-buffer-last-remove"
или \b bufferLastRemove="1", то терятся будут сообщения добавляемые в конец. или \b bufferLastRemove="1", то терятся будут сообщения добавляемые в конец.
\section sec_DBS_InsertBuffer Добавление данных (буферизованный INSERT)
Для оптимизации работы с БД, для каждой таблицы сделан отдельный буфер INSERT-запросов (см. TableInfo).
Размер буфера (в количестве сообщений) задаётся параметром "--dbserver-insert-buffer-size"
или параметром \b insertBufferSize=".." в конфигурационном файле секции "<LocalDBServer...>".
Данные буферы (для каждой таблицы!) скидываются в БД в двух случаях:
-# если буфер заполнился (т.е. по заполнению)
-# если истекло заданное время (т.е. принудительно). Время скидывания insert-буферов задаётся параметром "--dbserver-flush-buffer-time" или параметром \b flushBufferTime=".." в конфигурационном файле секции "<LocalDBServer...>".
\note Следует иметь ввиду, что размер буфера (и период принудительного скидывания) потенциально влияют на то, сколько данных
будет ПОТЕРЯНО если произодёт отключение питания или вылет процесса. С другой стороны, постоянная запись в БД без промежуточного клэширования (буферизации) приводит к большой нагрузке на БД (диск и т.п.).
\note Для примерного расчёта размера буфера, можно считать, что размер одной записи в буфере ~ 50 байт.
\note Следует иметь ввиду, что итоговый "INSERT" (когда буфер будет скидываться в БД) не должен по размеру превышать \b max_allowed_packet, которая задаётся в настройках MySQL (см. документацию).
\section sec_DBS_Tables Таблицы MySQL \section sec_DBS_Tables Таблицы MySQL
К основным таблицам относятся следующие: К основным таблицам относятся следующие:
\code \code
...@@ -156,19 +142,7 @@ class DBServer_MySQL: ...@@ -156,19 +142,7 @@ class DBServer_MySQL:
static const Debug::type DBLogInfoLevel = Debug::LEVEL9; static const Debug::type DBLogInfoLevel = Debug::LEVEL9;
protected: protected:
typedef std::map<int, std::string> DBTableMap;
typedef std::queue<std::string> QueryBuffer;
struct TableInfo
{
QueryBuffer qbuf; // буфер для данных
unsigned int qbufSize; // размер буфера сообщений.
unsigned int qbufByteSize; // размер буфера в байтах
std::string tblname; // название таблицы
std::string insHeader; // заголовок запроса (например: INSERT INTO tblname(date, time, time_usec, sensor_id, value, node) VALUES( '"
};
typedef std::map<int, TableInfo> DBTableMap;
virtual void initDB(DBInterface *db){}; virtual void initDB(DBInterface *db){};
virtual void initDBTableMap(DBTableMap& tblMap){}; virtual void initDBTableMap(DBTableMap& tblMap){};
...@@ -182,28 +156,23 @@ class DBServer_MySQL: ...@@ -182,28 +156,23 @@ class DBServer_MySQL:
virtual void parse( UniSetTypes::DBMessage* dbmsg ); virtual void parse( UniSetTypes::DBMessage* dbmsg );
virtual void parse( UniSetTypes::ConfirmMessage* cmsg ); virtual void parse( UniSetTypes::ConfirmMessage* cmsg );
/*! запись в БД */
bool writeToBase( const string& query ); bool writeToBase( const string& query );
/*! поместить в буфер для INSERT запросов к таблице 'key' */
bool insertToBuffer( int key, const string& values );
virtual void init_dbserver(); virtual void init_dbserver();
void createTables( DBInterface* db ); void createTables( DBInterface* db );
inline const char* tblName( int key ) inline const char* tblName(int key)
{ {
return tblMap[key].tblname.c_str(); return tblMap[key].c_str();
} }
enum Timers enum Timers
{ {
PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */ PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */
ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */ ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */
FlushBufferTimer, /*!< таймер на сброс буфера */
lastNumberOfTimer lastNumberOfTimer
}; };
DBInterface *db; DBInterface *db;
int PingTime; int PingTime;
int ReconnectTime; int ReconnectTime;
...@@ -211,23 +180,18 @@ class DBServer_MySQL: ...@@ -211,23 +180,18 @@ class DBServer_MySQL:
bool activate; bool activate;
bool lastRemove; typedef std::queue<std::string> QueryBuffer;
int flushBufferTime;
/*! сброс insert-буфера для конкретной таблицы */
void flushTableBuffer( DBTableMap::iterator& i );
void flushQBuffer(); QueryBuffer qbuf;
unsigned int qbufSize; // размер буфера сообщений.
bool lastRemove;
virtual void sigterm( int signo ); void flushBuffer();
UniSetTypes::uniset_mutex mqbuf;
private: private:
DBTableMap tblMap; DBTableMap tblMap;
UniSetTypes::uniset_mutex mqbuf;
QueryBuffer qbuf; // юуфер для запросов, пока нет связи с БД
unsigned int qbufSize;
}; };
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
#endif #endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment