Commit 8b84cbd0 authored by Pavel Vainerman's avatar Pavel Vainerman

(MySQL): первая версия реализации буфера для INSERT-ов одним запросом

parent a042ea14
...@@ -47,13 +47,15 @@ DBInterface::~DBInterface() ...@@ -47,13 +47,15 @@ DBInterface::~DBInterface()
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
bool DBInterface::connect( const string& host, const string& user, const string& pswd, const string& dbname) bool DBInterface::connect( const string& host, const string& user, const string& pswd, const string& dbname)
{ {
if (!mysql_real_connect(mysql,host.c_str(), user.c_str(),pswd.c_str(),dbname.c_str(),0,NULL,0)) if( !mysql_real_connect(mysql,host.c_str(), user.c_str(),pswd.c_str(),dbname.c_str(),0,NULL,CLIENT_MULTI_STATEMENTS) )
{ {
cout << error() << endl; cout << error() << endl;
mysql_close(mysql); mysql_close(mysql);
connected = false; connected = false;
return false; return false;
} }
// mysql_set_server_option(mysql,MYSQL_OPTION_MULTI_STATEMENTS_ON);
connected = true; connected = true;
return true; return true;
} }
...@@ -81,18 +83,33 @@ bool DBInterface::insert( const string& q ) ...@@ -81,18 +83,33 @@ bool DBInterface::insert( const string& q )
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
bool DBInterface::query( const string& q ) bool DBInterface::query( const string& q )
{ {
return query( q.c_str(), true );
}
// -----------------------------------------------------------------------------------------
bool DBInterface::query( const char* q, bool noLastQ )
{
if( !mysql ) if( !mysql )
return false; return false;
if( mysql_query(mysql,q.c_str()) ) if( mysql_query(mysql,q) )
{ {
queryok=false; queryok=false;
return false; return false;
} }
lastQ = ( noLastQ ) ? q : "";
lastQ = q;
result = mysql_store_result(mysql); // _use_result - некорректно работает с _num_rows result = mysql_store_result(mysql); // _use_result - некорректно работает с _num_rows
// Если при соединении используется CLIENT_MULTI_STATEMENTS
// то необходимо вынуть все результаты..
while( mysql_more_results(mysql) )
{
cerr << "**** store result..." << endl;
if( mysql_next_result(mysql) >=0 )
mysql_store_result(mysql);
}
if( numRows()==0 ) if( numRows()==0 )
{ {
queryok=false; queryok=false;
......
...@@ -47,6 +47,8 @@ class DBInterface ...@@ -47,6 +47,8 @@ class DBInterface
bool close(); bool close();
bool query(const std::string& q); bool query(const std::string& q);
bool query( const char* q, bool noLastQ=false ); // noLastQ - не запоминать запрос
const std::string lastQuery(); const std::string lastQuery();
bool insert(const std::string& q); bool insert(const std::string& q);
......
...@@ -46,7 +46,8 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id): ...@@ -46,7 +46,8 @@ DBServer_MySQL::DBServer_MySQL(ObjectId id):
connect_ok(false), connect_ok(false),
activate(true), activate(true),
qbufSize(200), qbufSize(200),
lastRemove(false) lastRemove(false),
qbufByteSize(0)
{ {
if( getId() == DefaultObjectId ) if( getId() == DefaultObjectId )
{ {
...@@ -64,6 +65,7 @@ DBServer_MySQL::DBServer_MySQL(): ...@@ -64,6 +65,7 @@ DBServer_MySQL::DBServer_MySQL():
connect_ok(false), connect_ok(false),
activate(true), activate(true),
qbufSize(200), qbufSize(200),
qbufByteSize(0),
lastRemove(false) lastRemove(false)
{ {
// init(); // init();
...@@ -204,6 +206,23 @@ bool DBServer_MySQL::writeToBase( const string& query ) ...@@ -204,6 +206,23 @@ bool DBServer_MySQL::writeToBase( const string& query )
{ {
if( unideb.debugging(DBLogInfoLevel) ) if( unideb.debugging(DBLogInfoLevel) )
unideb[DBLogInfoLevel] << myname << "(writeToBase): " << query << endl; unideb[DBLogInfoLevel] << myname << "(writeToBase): " << query << endl;
// Складываем в очередь
bool fb = false;
{
uniset_mutex_lock l(mqbuf,2000);
qbuf.push(query);
fb = ( qbuf.size() >= qbufSize );
qbufByteSize += query.size();
}
if( fb )
flushBuffer();
return true;
#if 0
// cout << "DBServer_MySQL: " << query << endl; // cout << "DBServer_MySQL: " << query << endl;
if( !db || !connect_ok ) if( !db || !connect_ok )
{ {
...@@ -244,32 +263,89 @@ bool DBServer_MySQL::writeToBase( const string& query ) ...@@ -244,32 +263,89 @@ bool DBServer_MySQL::writeToBase( const string& query )
} }
return false; return false;
#endif
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::flushBuffer() void DBServer_MySQL::flushBuffer()
{ {
uniset_mutex_lock l(mqbuf,400); uniset_mutex_lock l(mqbuf,400);
// Сперва пробуем очистить всё что накопилось в очереди до этого... if( qbuf.empty() )
return;
if( unideb.debugging(DBLEVEL) )
unideb[DBLEVEL] << myname << "(flushBuffer): qbufSize=" << qbufSize << " qbufByteSize=" << qbufByteSize << endl;
// buffer чтобы записать всё одним запросом..
char* cbuf = new char[ qbufByteSize + 1 ];
int i= 0;
if( !cbuf )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(flushBuffer): Can`t allocate memory for buffer (sz=" << qbufByteSize + 1 << endl;
return;
}
while( !qbuf.empty() )
{
std::string q(qbuf.front());
int sz = q.size();
if( i + sz > qbufByteSize )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(flushBuffer): BUFFER OVERFLOW bytesize=" << qbufByteSize + 1 << " i=" << i << " sz=" << sz<< endl;
break;
}
std::memcpy( &(cbuf[i]), q.c_str(), sz );
i += sz;
qbuf.pop();
}
// последняя точка с запятой, вроде не нужна (по документации и примерам из MySQL)..
if( cbuf[i-1] == ';' )
cbuf[i-1] = '\0';
else
cbuf[i] = '\0';
if( i < qbufByteSize )
{
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(writeToBase): BAD qbufByteSize=" << qbufByteSize << " LOST RECORDS..(i=" << i << ")" << endl;
qbufByteSize = 0;
while( !qbuf.empty() ) while( !qbuf.empty() )
qbuf.pop();
}
try
{
if( i > 0 )
{ {
db->query( qbuf.front() ); db->query( (const char*)cbuf, false );
// Дело в том что на INSERT И UPDATE запросы // Дело в том что на INSERT И UPDATE запросы
// db->query() может возвращать false и надо самому // db->query() может возвращать false и надо самому
// отдельно проверять действительно ли произошла ошибка // отдельно проверять действительно ли произошла ошибка
// см. DBInterface::query. // см. DBInterface::queryh.
string err(db->error()); string err(db->error());
if( err.empty() ) if( !err.empty() && unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(flushBuffer): error: " << err << endl;
db->freeResult(); db->freeResult();
else if( unideb.debugging(Debug::CRIT) ) }
}
catch( Exception& ex )
{ {
unideb[Debug::CRIT] << myname << "(writeToBase): error: " << err << if( unideb.debugging(Debug::CRIT) )
" lost query: " << qbuf.front() << endl; unideb[Debug::CRIT] << myname << "(flushBuffer): " << ex << endl;
} }
qbuf.pop(); delete[] cbuf;
} qbufByteSize = 0;
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
...@@ -295,7 +371,7 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si ) ...@@ -295,7 +371,7 @@ void DBServer_MySQL::parse( UniSetTypes::SensorMessage *si )
<< si->sm_tv_usec << "','" // time_usec << si->sm_tv_usec << "','" // time_usec
<< si->id << "','" // sensor_id << si->id << "','" // sensor_id
<< val << "','" // value << val << "','" // value
<< si->node << "')"; // node << si->node << "');"; // node
if( unideb.debugging(DBLEVEL) ) if( unideb.debugging(DBLEVEL) )
unideb[DBLEVEL] << myname << "(insert_main_history): " << data.str() << endl; unideb[DBLEVEL] << myname << "(insert_main_history): " << data.str() << endl;
......
...@@ -169,6 +169,7 @@ class DBServer_MySQL: ...@@ -169,6 +169,7 @@ class DBServer_MySQL:
{ {
PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */ PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */
ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */ ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */
FlushBufferTimer, /*!< таймер на сброс буфера */
lastNumberOfTimer lastNumberOfTimer
}; };
...@@ -188,6 +189,7 @@ class DBServer_MySQL: ...@@ -188,6 +189,7 @@ class DBServer_MySQL:
void flushBuffer(); void flushBuffer();
UniSetTypes::uniset_mutex mqbuf; UniSetTypes::uniset_mutex mqbuf;
int qbufByteSize; // размер буфера в байтах
private: private:
DBTableMap tblMap; DBTableMap tblMap;
......
...@@ -4,5 +4,5 @@ ulimit -Sc 1000000 ...@@ -4,5 +4,5 @@ ulimit -Sc 1000000
uniset-start.sh -f ./uniset-mysql-dbserver --confile test.xml --name DBServer1 \ uniset-start.sh -f ./uniset-mysql-dbserver --confile test.xml --name DBServer1 \
--unideb-add-levels info,crit,warn,level9,system \ --unideb-add-levels info,crit,warn,level9,system \
--dbserver-buffer-size 100 --dbserver-buffer-size 50
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment