Commit 5e90512f authored by Pavel Vainerman's avatar Pavel Vainerman

Merge branch 'mysql-buffer'

parents 60282166 8db0746f
...@@ -47,13 +47,14 @@ DBInterface::~DBInterface() ...@@ -47,13 +47,14 @@ DBInterface::~DBInterface()
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
bool DBInterface::connect( const string& host, const string& user, const string& pswd, const string& dbname) bool DBInterface::connect( const string& host, const string& user, const string& pswd, const string& dbname)
{ {
if (!mysql_real_connect(mysql,host.c_str(), user.c_str(),pswd.c_str(),dbname.c_str(),0,NULL,0)) if( !mysql_real_connect(mysql,host.c_str(), user.c_str(),pswd.c_str(),dbname.c_str(),0,NULL,0) ) // CLIENT_MULTI_STATEMENTS) )
{ {
cout << error() << endl; cout << error() << endl;
mysql_close(mysql); mysql_close(mysql);
connected = false; connected = false;
return false; return false;
} }
connected = true; connected = true;
return true; return true;
} }
...@@ -78,28 +79,45 @@ bool DBInterface::insert( const string& q ) ...@@ -78,28 +79,45 @@ bool DBInterface::insert( const string& q )
queryok=true; queryok=true;
return true; return true;
} }
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
bool DBInterface::query( const string& q ) bool DBInterface::query( const string& q )
{ {
return query( q.c_str(), true );
}
// -----------------------------------------------------------------------------------------
bool DBInterface::query( const char* q, bool noLastQ )
{
if( !mysql ) if( !mysql )
return false; return false;
if( mysql_query(mysql,q.c_str()) ) if( mysql_query(mysql,q) )
{ {
queryok=false; queryok=false;
return false; return false;
} }
lastQ = ( noLastQ ) ? q : "";
lastQ = q;
result = mysql_store_result(mysql); // _use_result - некорректно работает с _num_rows result = mysql_store_result(mysql); // _use_result - некорректно работает с _num_rows
#if 0
// Если при соединении используется CLIENT_MULTI_STATEMENTS
// то необходимо вынуть все результаты..
while( mysql_more_results(mysql) )
{
cerr << "**** store result..." << endl;
if( mysql_next_result(mysql) >=0 )
mysql_store_result(mysql);
}
#endif
if( numRows()==0 ) if( numRows()==0 )
{ {
queryok=false; queryok=false;
return false; return false;
} }
queryok=true; queryok=true;
return true; return true;
} }
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
......
...@@ -47,6 +47,8 @@ class DBInterface ...@@ -47,6 +47,8 @@ class DBInterface
bool close(); bool close();
bool query(const std::string& q); bool query(const std::string& q);
bool query( const char* q, bool noLastQ=false ); // noLastQ - не запоминать запрос
const std::string lastQuery(); const std::string lastQuery();
bool insert(const std::string& q); bool insert(const std::string& q);
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
- \ref sec_DBS_Conf - \ref sec_DBS_Conf
- \ref sec_DBS_Tables - \ref sec_DBS_Tables
- \ref sec_DBS_Buffer - \ref sec_DBS_Buffer
- \ref sec_DBS_InsertBuffer
\section sec_DBS_Comm Общее описание работы DBServer_MySQL \section sec_DBS_Comm Общее описание работы DBServer_MySQL
Сервис предназначен для работы с БД MySQL. В его задачи входит Сервис предназначен для работы с БД MySQL. В его задачи входит
...@@ -73,8 +73,8 @@ ...@@ -73,8 +73,8 @@
\section sec_DBS_Buffer Защита от потери данных \section sec_DBS_Buffer Защита от потери данных
Для того, чтобы на момент отсутствия связи с БД данные по возможности не потерялись, Для того, чтобы на момент отсутствия связи с БД данные по возможности не потерялись,
сделан "кольцевой" буфер. Размер которго можно регулировать параметром "--dbserver-buffer-size" сделан "кольцевой" буфер. Размер которго можно регулировать параметром "--dbserver-lost-buffer-size"
или параметром \b bufferSize=".." в конфигурационном файле секции "<LocalDBSErver...>". или параметром \b lostBufferSize=".." в конфигурационном файле секции "<LocalDBServer...>".
Механизм построен на том, что если связь с mysql сервером отсутствует или пропала, Механизм построен на том, что если связь с mysql сервером отсутствует или пропала,
то сообщения помещаются в колевой буфер, который "опустошается" как только она восстановится. то сообщения помещаются в колевой буфер, который "опустошается" как только она восстановится.
...@@ -82,6 +82,20 @@ ...@@ -82,6 +82,20 @@
более ранние сообщения. Эту логику можно сменить, если указать параметр "--dbserver-buffer-last-remove" более ранние сообщения. Эту логику можно сменить, если указать параметр "--dbserver-buffer-last-remove"
или \b bufferLastRemove="1", то терятся будут сообщения добавляемые в конец. или \b bufferLastRemove="1", то терятся будут сообщения добавляемые в конец.
\section sec_DBS_InsertBuffer Добавление данных (буферизованный INSERT)
Для оптимизации работы с БД, для каждой таблицы сделан отдельный буфер INSERT-запросов (см. TableInfo).
Размер буфера (в количестве сообщений) задаётся параметром "--dbserver-insert-buffer-size"
или параметром \b insertBufferSize=".." в конфигурационном файле секции "<LocalDBServer...>".
Данные буферы (для каждой таблицы!) скидываются в БД в двух случаях:
-# если буфер заполнился (т.е. по заполнению)
-# если истекло заданное время (т.е. принудительно). Время скидывания insert-буферов задаётся параметром "--dbserver-flush-buffer-time" или параметром \b flushBufferTime=".." в конфигурационном файле секции "<LocalDBServer...>".
\note Следует иметь ввиду, что размер буфера (и период принудительного скидывания) потенциально влияют на то, сколько данных
будет ПОТЕРЯНО если произодёт отключение питания или вылет процесса. С другой стороны, постоянная запись в БД без промежуточного клэширования (буферизации) приводит к большой нагрузке на БД (диск и т.п.).
\note Для примерного расчёта размера буфера, можно считать, что размер одной записи в буфере ~ 50 байт.
\note Следует иметь ввиду, что итоговый "INSERT" (когда буфер будет скидываться в БД) не должен по размеру превышать \b max_allowed_packet, которая задаётся в настройках MySQL (см. документацию).
\section sec_DBS_Tables Таблицы MySQL \section sec_DBS_Tables Таблицы MySQL
К основным таблицам относятся следующие: К основным таблицам относятся следующие:
\code \code
...@@ -142,7 +156,19 @@ class DBServer_MySQL: ...@@ -142,7 +156,19 @@ class DBServer_MySQL:
static const Debug::type DBLogInfoLevel = Debug::LEVEL9; static const Debug::type DBLogInfoLevel = Debug::LEVEL9;
protected: protected:
typedef std::map<int, std::string> DBTableMap;
typedef std::queue<std::string> QueryBuffer;
struct TableInfo
{
QueryBuffer qbuf; // буфер для данных
unsigned int qbufSize; // размер буфера сообщений.
unsigned int qbufByteSize; // размер буфера в байтах
std::string tblname; // название таблицы
std::string insHeader; // заголовок запроса (например: INSERT INTO tblname(date, time, time_usec, sensor_id, value, node) VALUES( '"
};
typedef std::map<int, TableInfo> DBTableMap;
virtual void initDB(DBInterface *db){}; virtual void initDB(DBInterface *db){};
virtual void initDBTableMap(DBTableMap& tblMap){}; virtual void initDBTableMap(DBTableMap& tblMap){};
...@@ -156,23 +182,28 @@ class DBServer_MySQL: ...@@ -156,23 +182,28 @@ class DBServer_MySQL:
virtual void parse( UniSetTypes::DBMessage* dbmsg ); virtual void parse( UniSetTypes::DBMessage* dbmsg );
virtual void parse( UniSetTypes::ConfirmMessage* cmsg ); virtual void parse( UniSetTypes::ConfirmMessage* cmsg );
/*! запись в БД */
bool writeToBase( const string& query ); bool writeToBase( const string& query );
/*! поместить в буфер для INSERT запросов к таблице 'key' */
bool insertToBuffer( int key, const string& values );
virtual void init_dbserver(); virtual void init_dbserver();
void createTables( DBInterface* db ); void createTables( DBInterface* db );
inline const char* tblName(int key) inline const char* tblName( int key )
{ {
return tblMap[key].c_str(); return tblMap[key].tblname.c_str();
} }
enum Timers enum Timers
{ {
PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */ PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */
ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */ ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */
FlushBufferTimer, /*!< таймер на сброс буфера */
lastNumberOfTimer lastNumberOfTimer
}; };
DBInterface *db; DBInterface *db;
int PingTime; int PingTime;
int ReconnectTime; int ReconnectTime;
...@@ -180,18 +211,21 @@ class DBServer_MySQL: ...@@ -180,18 +211,21 @@ class DBServer_MySQL:
bool activate; bool activate;
typedef std::queue<std::string> QueryBuffer;
QueryBuffer qbuf;
unsigned int qbufSize; // размер буфера сообщений.
bool lastRemove; bool lastRemove;
void flushBuffer(); int flushBufferTime;
UniSetTypes::uniset_mutex mqbuf;
/*! сброс insert-буфера для конкретной таблицы */
void flushTableBuffer( DBTableMap::iterator& i );
void flushQBuffer();
private: private:
DBTableMap tblMap;
DBTableMap tblMap;
UniSetTypes::uniset_mutex mqbuf;
QueryBuffer qbuf; // юуфер для запросов, пока нет связи с БД
unsigned int qbufSize;
}; };
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
#endif #endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment