Commit cddcc629 authored by Pavel Vainerman's avatar Pavel Vainerman

(DBSErver_PostgreSQL): первая версия реализации буферизации insert-ов

parent 97db46bc
......@@ -81,6 +81,7 @@ void DBServer_PostgreSQL::sysCommand( const UniSetTypes::SystemMessage* sm )
switch( sm->command )
{
case SystemMessage::StartUp:
askTimer(FlushInsertBuffer,ibufSyncTimeout);
break;
case SystemMessage::Finish:
......@@ -120,6 +121,9 @@ void DBServer_PostgreSQL::confirmInfo( const UniSetTypes::ConfirmMessage* cem )
dbinfo << myname << "(update_confirm): " << data.str() << endl;
// перед UPDATE обязательно скинуть insertBuffer
flushInsertBuffer();
if( !writeToBase( std::move(data.str())) )
{
dbcrit << myname << "(update_confirm): db error: " << db->error() << endl;
......@@ -187,6 +191,53 @@ void DBServer_PostgreSQL::flushBuffer()
}
}
//--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::flushInsertBuffer()
{
if( !db || !connect_ok )
{
if( ibufSize < ibufMaxSize )
return;
dbcrit << myname << "(flushWriteBuffer): DB not connected!"
<< " buffer[" << ibufSize << "] overflow! LOST DATA..." << endl;
// Чистим заданное число
size_t delnum = lroundf(ibufSize * ibufOverflowCleanFactor);
InsertBuffer::iterator end = ibuf.end();
InsertBuffer::iterator beg = ibuf.end();
// Удаляем последние (новые)
if( lastRemove )
{
std::advance(end, -delnum);
}
else
{
// Удаляем первые (старые)
beg = ibuf.begin();
end = ibuf.begin();
std::advance(end, delnum);
}
ibuf.erase(beg,end);
}
if( ibufSize == 0 )
return;
dbinfo << myname << "(flushInsertBuffer): write insert buffer to DB.." << endl;
if( !db->copy("main_history",tblcols,ibuf) )
{
dbcrit << myname << "(flushInsertBuffer): error: " << db->error() << endl;
}
else
{
ibuf.clear();
ibufSize = 0;
}
}
//--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::sensorInfo( const UniSetTypes::SensorMessage* si )
{
try
......@@ -202,24 +253,20 @@ void DBServer_PostgreSQL::sensorInfo( const UniSetTypes::SensorMessage* si )
}
#endif
// см. main_history
ostringstream data;
data << "INSERT INTO " << tblName(si->type)
<< "(date, time, time_usec, sensor_id, value, node) VALUES( '"
// Поля таблицы
<< dateToString(si->sm_tv_sec, "-") << "','" // date
<< timeToString(si->sm_tv_sec, ":") << "','" // time
<< si->sm_tv_usec << "'," // time_usec
<< si->id << "," // sensor_id
<< si->value << "," // value
<< si->node << ")"; // node
dbinfo << myname << "(insert_main_history): " << data.str() << endl;
if( !writeToBase(std::move(data.str())) )
{
dbcrit << myname << "(insert) sensor msg error: " << db->error() << endl;
}
// (date, time, time_usec, sensor_id, value, node)
PostgreSQLInterface::Record rec;
rec.push_back(dateToString(si->sm_tv_sec, "-")); // date
rec.push_back(timeToString(si->sm_tv_sec, ":")); // time
rec.push_back(std::to_string(si->sm_tv_usec));
rec.push_back(std::to_string(si->id));
rec.push_back(std::to_string(si->value));
rec.push_back(std::to_string(si->node));
ibuf.push_back(std::move(rec));
ibufSize++;
if( ibufSize > ibufMaxSize )
flushInsertBuffer();
}
catch( const Exception& ex )
{
......@@ -262,20 +309,27 @@ void DBServer_PostgreSQL::initDBServer()
UniXML::iterator it(node);
dbinfo << myname << "(init): init connection.." << endl;
string dbname(conf->getProp(node, "dbname"));
string dbnode(conf->getProp(node, "dbnode"));
string user(conf->getProp(node, "dbuser"));
string password(conf->getProp(node, "dbpass"));
unsigned int dbport = conf->getPIntProp(node, "dbport",5432);
string dbname( conf->getArgParam("--" + prefix + "-dbname", it.getProp("dbname")));
string dbnode( conf->getArgParam("--" + prefix + "-dbnode", it.getProp("dbnode")));
string dbuser( conf->getArgParam("--" + prefix + "-dbuser", it.getProp("dbuser")));
string dbpass( conf->getArgParam("--" + prefix + "-dbpass", it.getProp("dbpass")));
unsigned int dbport = conf->getArgPInt("--" + prefix + "-dbport", it.getProp("dbport"),5432);
ibufMaxSize = conf->getArgPInt("--" + prefix + "-ibufMaxSize", it.getProp("ibufMaxSize"),5000);
ibufSyncTimeout = conf->getArgPInt("--" + prefix + "-ibufSyncTimeout", it.getProp("ibufSyncTimeout"),15000);
std::string sfactor = conf->getArg2Param("--" + prefix + "-ibufOverflowCleanFactor", it.getProp("ibufOverflowCleanFactor"),"0.5");
ibufOverflowCleanFactor = atof(sfactor.c_str());
tblMap[UniSetTypes::Message::SensorInfo] = "main_history";
tblMap[UniSetTypes::Message::Confirm] = "main_history";
PingTime = conf->getIntProp(node, "pingTime");
ReconnectTime = conf->getIntProp(node, "reconnectTime");
qbufSize = conf->getArgPInt("--dbserver-buffer-size", it.getProp("bufferSize"), 200);
PingTime = conf->getArgPInt("--" + prefix + "-pingTime", it.getProp("pingTime"),15000);
ReconnectTime = conf->getArgPInt("--" + prefix + "-reconnectTime", it.getProp("reconnectTime"),30000);
qbufSize = conf->getArgPInt("--" + prefix + "-buffer-size", it.getProp("bufferSize"), 200);
if( findArgParam("--dbserver-buffer-last-remove", conf->getArgc(), conf->getArgv()) != -1 )
if( findArgParam("--" + prefix + "-buffer-last-remove", conf->getArgc(), conf->getArgv()) != -1 )
lastRemove = true;
else if( it.getIntProp("bufferLastRemove" ) != 0 )
lastRemove = true;
......@@ -290,7 +344,7 @@ void DBServer_PostgreSQL::initDBServer()
<< " pingTime=" << PingTime
<< " ReconnectTime=" << ReconnectTime << endl;
if( !db->nconnect(dbnode, user, password, dbname, dbport) )
if( !db->nconnect(dbnode, dbuser, dbpass, dbname, dbport) )
{
dbwarn << myname << "(init): DB connection error: " << db->error() << endl;
askTimer(DBServer_PostgreSQL::ReconnectTimer, ReconnectTime);
......@@ -383,6 +437,13 @@ void DBServer_PostgreSQL::timerInfo( const UniSetTypes::TimerMessage* tm )
}
break;
case FlushInsertBuffer:
{
dbinfo << myname << "(timerInfo): insert flush timer.." << endl;
flushInsertBuffer();
}
break;
default:
dbwarn << myname << "(timerInfo): Unknown TimerID=" << tm->id << endl;
break;
......@@ -431,6 +492,27 @@ void DBServer_PostgreSQL::help_print( int argc, const char* const* argv )
{
cout << "Default: prefix='pgsql'" << endl;
cout << "--prefix-name objectID - ObjectID. Default: 'conf->getDBServer()'" << endl;
cout << "Connection: " << endl;
cout << "--prefix-dbname name - database name" << endl;
cout << "--prefix-dbnode host - database host" << endl;
cout << "--prefix-dbuser user - database user" << endl;
cout << "--prefix-dbpass pass - database password" << endl;
cout << "--prefix-dbport port - database port. Default: 5432" << endl;
cout << "Check connection: " << endl;
cout << "--prefix-pingTime msec - check connetcion time. Default: 15000 msec" << endl;
cout << "--prefix-reconnectTime msec - reconnect time. Default: 30000 msec " << endl;
cout << "Insert buffer:" << endl;
cout << "--prefix-ibufMaxSize sz - INSERT-buffer size. Default: 5000" << endl;
cout << "--prefix-ibufSyncTimeout msec - INSERT-buffer sync timeout. Default: 15000 msec" << endl;
cout << "--prefix-ibufOverflowCleanFactor [0...1] - INSERT-buffer overflow clean factor. Default: 0.5" << endl;
cout << "Query buffer:" << endl;
cout << "--prefix-buffer-size sz - The buffer in case the database is unavailable. Default: 200" << endl;
cout << "--prefix-buffer-last-remove - Delete the last recording buffer overflow." << endl;
cout << DBServer::help_print() << endl;
}
// -----------------------------------------------------------------------------
......@@ -27,6 +27,15 @@
* \brief The DBServer_PostgreSQL class
* Реализация работы с PostgreSQL.
*
* Т.к. основная работа DBServer-а это частая запись данных, то сделана следующая оптимизация:
* Создаётся insert-буфер настраиваемого размера (ibufMaxSize).
* Как только буфер заполняется, он пишется в БД одним "оптимизированным" запросом.
* Помимо этого буфер скидывается, если прошло ibufSyncTimeout мсек или если пришёл запрос
* на UPDATE данных.
*
* В случае если буфер переполняется (например нет связи с БД), то он чистится. При этом сколько
* записей удалять определяется коэффициентом wbufOverflowCleanFactor={0...1}.
* А также флаг lastRemove определяет удалять с конца или начала очереди.
*/
class DBServer_PostgreSQL:
public DBServer
......@@ -76,11 +85,12 @@ class DBServer_PostgreSQL:
{
PingTimer, /*!< таймер на переодическую проверку соединения с сервером БД */
ReconnectTimer, /*!< таймер на повторную попытку соединения с сервером БД (или восстановления связи) */
FlushInsertBuffer, /*!< таймер на сброс Insert-буфера */
lastNumberOfTimer
};
std::shared_ptr<PostgreSQLInterface> db;
int PingTime;
int PingTime = { 15000 };
int ReconnectTime;
bool connect_ok; /*! признак наличия соеднинения с сервером БД */
......@@ -90,11 +100,22 @@ class DBServer_PostgreSQL:
QueryBuffer qbuf;
unsigned int qbufSize; // размер буфера сообщений.
bool lastRemove;
bool lastRemove = { false };
void flushBuffer();
UniSetTypes::uniset_mutex mqbuf;
// writeBuffer
const std::list<std::string> tblcols = { "date", "time","time_usec","sensor_id","value","node" };
typedef std::list<PostgreSQLInterface::Record> InsertBuffer;
InsertBuffer ibuf;
size_t ibufSize = { 0 };
size_t ibufMaxSize = { 5000 };
timeout_t ibufSyncTimeout = { 15000 };
void flushInsertBuffer();
float ibufOverflowCleanFactor = { 0.5 }; // коэфициент {0...1} чистки буфера при переполнении
private:
DBTableMap tblMap;
......
......@@ -86,7 +86,7 @@ bool PostgreSQLInterface::close()
return true;
}
// -----------------------------------------------------------------------------------------
bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std::string>& cols, const std::list<std::list<std::string>>& data )
bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std::string>& cols, const std::list<Record>& data )
{
if( !db )
return false;
......@@ -96,7 +96,7 @@ bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std:
work w( *(db.get()) );
tablewriter t(w,tblname,cols.begin(),cols.end());
t.reserve(data.size());
t.reserve(data.size()); // size() не дорогая операция для list?
for( const auto& d: data )
t.push_back(d.begin(),d.end());
......
......@@ -20,6 +20,7 @@
#include <string>
#include <list>
#include <vector>
#include <queue>
#include <iostream>
#include <pqxx/pqxx>
#include <PassiveTimer.h>
......@@ -48,8 +49,10 @@ class PostgreSQLInterface:
virtual double insert_id() override;
void save_inserted_id( const pqxx::result& res );
typedef std::list<std::string> Record;
// fast insert: Use COPY..from SDTIN..
virtual bool copy( const std::string& tblname, const std::list<std::string>& cols, const std::list<std::list<std::string>>& data );
virtual bool copy( const std::string& tblname, const std::list<std::string>& cols, const std::list<Record>& data );
virtual const std::string error() override;
......
......@@ -3,5 +3,4 @@
ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-pgsql-dbserver --confile test.xml --name DBServer1 \
--ulog-add-levels info,crit,warn,level9,system \
--dbserver-buffer-size 100
--pgsql-dbserver-buffer-size 100 $*
......@@ -13,6 +13,14 @@ int main(int argc, char** argv)
if( argc > 1 )
dbname = string(argv[1]);
size_t ver = 1;
if( argc > 2 )
ver = atoi(argv[2]);
size_t num = 10000;
if( argc > 3 )
num = atoi(argv[3]);
try
{
PostgreSQLInterface db;
......@@ -22,33 +30,68 @@ int main(int argc, char** argv)
if( !db.nconnect("localhost", "dbadmin", "dbadmin", dbname) )
{
cerr << "[FAILED] connect error: " << db.error() << endl;
db.close();
return 1;
}
cout << "connect to '" << dbname << "' [OK]" << endl;
stringstream q;
q << "SELECT * from main_history";
DBResult r = db.query(q.str());
if( !r )
if( ver == 1 )
{
cerr << "query error: " << db.error() << endl;
return 1;
for( size_t i=0; i<num; i++ )
{
q << "INSERT INTO main_history(date,time,time_usec,sensor_id,value,node)"
<< " VALUES(now(),now(),0,7,1,3000);";
}
}
for( DBResult::iterator it = r.begin(); it != r.end(); it++ )
else if( ver == 2 )
{
cout << "ROW: ";
DBResult::COL col(*it);
q << "INSERT INTO main_history(date,time,time_usec,sensor_id,value,node) VALUES";
for( size_t i=0; i<num; i++ )
{
q << "(now(),now(),0,7,1,3000),";
}
q << "(now(),now(),0,7,1,3000);";
}
for( DBResult::COL::iterator cit = it->begin(); cit != it->end(); cit++ )
cout << DBResult::as_string(cit) << "(" << DBResult::as_double(cit) << ") | ";
std::chrono::time_point<std::chrono::system_clock> start, end;
if( ver == 3 )
{
std::list<std::string> cols = { "date", "time","time_usec","sensor_id","value","node" };
std::list<PostgreSQLInterface::Record> data;
for( size_t i=0; i<num; i++ )
{
PostgreSQLInterface::Record d = { "now()","now()","0","7","1","3000" };
data.push_back(std::move(d));
}
cout << endl;
start = std::chrono::system_clock::now();
if( !db.copy("main_history",cols,data) )
{
cerr << "query error: " << db.error() << endl;
db.close();
return 1;
}
}
else
{
start = std::chrono::system_clock::now();
if( !db.insert( std::move(q.str())) )
{
cerr << "query error: " << db.error() << endl;
db.close();
return 1;
}
}
end = std::chrono::system_clock::now();
int elapsed_seconds = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cerr << "INSERT " << num << " records elasped time: " << elapsed_seconds << " ms\n";
db.close();
}
catch( const Exception& ex )
......
......@@ -72,6 +72,8 @@ class DBServer:
std::string logserv_host = {""};
int logserv_port = {0};
const std::string prefix = { "db" };
private:
};
//------------------------------------------------------------------------------------------
......
......@@ -34,8 +34,9 @@
using namespace UniSetTypes;
using namespace std;
// ------------------------------------------------------------------------------------------
DBServer::DBServer( ObjectId id, const std::string& prefix ):
UniSetObject(id)
DBServer::DBServer( ObjectId id, const std::string& _prefix ):
UniSetObject(id),
prefix(_prefix)
{
if( getId() == DefaultObjectId )
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment