Commit 2f800c5f authored by Pavel Vainerman's avatar Pavel Vainerman

(DBServer_PGSQL): буфер переделан на vector, добавил немного описания.

parent cddcc629
...@@ -69,6 +69,7 @@ Version 2.5 ...@@ -69,6 +69,7 @@ Version 2.5
- совместимость между 64bit и 32bit - для этого нужно отказаться от использования сырого long и перейти либо на int32_t либо CORBA::Long. - совместимость между 64bit и 32bit - для этого нужно отказаться от использования сырого long и перейти либо на int32_t либо CORBA::Long.
- timeout_t getSharedMemoryReadyTimeout()!
libev libev
======= =======
...@@ -76,9 +77,12 @@ libev ...@@ -76,9 +77,12 @@ libev
- UniSetActivator (обработка сигналов и возможно вынести сюда DefaultEventLoop) (на подумать) - UniSetActivator (обработка сигналов и возможно вынести сюда DefaultEventLoop) (на подумать)
SM: подумать насчёт асинхронности публикации событий и посылки уведомления (setValue/push) через очередь.. SM: подумать насчёт асинхронности публикации событий и посылки уведомления (setValue/push) через очередь..
// worker-ы рассылающие сообщения, очередь заданий, конфигурирование количества worker-ов, основной поток ждёт если нет свободных.
SM: Подумать насчёт применения https://github.com/efficient/libcuckoo SM: Подумать насчёт применения https://github.com/efficient/libcuckoo
DB: Сделать регулируемый буфер на INSERT-ы БД, чтобы поберечь винт (DBServer_PGSQL, DBServer_MySQL...) // по времени или по количеству
version 3 version 3
========= =========
- подумать нужен ли нам где-то ZeroMQ (zerorpc) (вместо omniORB?) - подумать нужен ли нам где-то ZeroMQ (zerorpc) (вместо omniORB?)
...@@ -98,3 +102,5 @@ UResolver (или ObjectRepository) позволяющего манипулир ...@@ -98,3 +102,5 @@ UResolver (или ObjectRepository) позволяющего манипулир
- вместо commoncpp,libxml2,DebugStream и т.п. перейти всё-таки на boost? - вместо commoncpp,libxml2,DebugStream и т.п. перейти всё-таки на boost?
- подумать возможно стоит переходить на lockfree-библиотеку libcds..(актуально для многопроцессорных систем) - подумать возможно стоит переходить на lockfree-библиотеку libcds..(актуально для многопроцессорных систем)
- Ввести namespace uniset:: ust:: uns:: ?
...@@ -84,7 +84,7 @@ There are set of base components to construct this kind of systems: ...@@ -84,7 +84,7 @@ There are set of base components to construct this kind of systems:
* base interfaces for your implementation of control algorithms. * base interfaces for your implementation of control algorithms.
* algorithms for the discrete and analog input/output based on COMEDI interface. * algorithms for the discrete and analog input/output based on COMEDI interface.
* IPC mechanism based on CORBA (omniORB). * IPC mechanism based on CORBA (omniORB).
* logging system based on MySQL database. * logging system based on MySQL,SQLite,PostgreSQL databases.
* Web interface to display logging and statistic information. * Web interface to display logging and statistic information.
* utilities for system's configuration based on XML. * utilities for system's configuration based on XML.
......
...@@ -203,13 +203,13 @@ void DBServer_PostgreSQL::flushInsertBuffer() ...@@ -203,13 +203,13 @@ void DBServer_PostgreSQL::flushInsertBuffer()
// Чистим заданное число // Чистим заданное число
size_t delnum = lroundf(ibufSize * ibufOverflowCleanFactor); size_t delnum = lroundf(ibufSize * ibufOverflowCleanFactor);
InsertBuffer::iterator end = ibuf.end(); auto end = ibuf.end();
InsertBuffer::iterator beg = ibuf.end(); auto beg = ibuf.end();
// Удаляем последние (новые) // Удаляем последние (новые)
if( lastRemove ) if( lastRemove )
{ {
std::advance(end, -delnum); std::advance(beg, -delnum);
} }
else else
{ {
...@@ -220,12 +220,18 @@ void DBServer_PostgreSQL::flushInsertBuffer() ...@@ -220,12 +220,18 @@ void DBServer_PostgreSQL::flushInsertBuffer()
} }
ibuf.erase(beg,end); ibuf.erase(beg,end);
ibufSize -= delnum;
if( ibufSize < 0 )
ibufSize = 0;
dbwarn << myname << "(flushInsertBuffer): overflow: clear data " << delnum << " records." << endl;
return;
} }
if( ibufSize == 0 ) if( ibufSize == 0 )
return; return;
dbinfo << myname << "(flushInsertBuffer): write insert buffer to DB.." << endl; dbinfo << myname << "(flushInsertBuffer): write insert buffer[" << ibufSize << "] to DB.." << endl;
if( !db->copy("main_history",tblcols,ibuf) ) if( !db->copy("main_history",tblcols,ibuf) )
{ {
...@@ -254,18 +260,20 @@ void DBServer_PostgreSQL::sensorInfo( const UniSetTypes::SensorMessage* si ) ...@@ -254,18 +260,20 @@ void DBServer_PostgreSQL::sensorInfo( const UniSetTypes::SensorMessage* si )
#endif #endif
// (date, time, time_usec, sensor_id, value, node) // (date, time, time_usec, sensor_id, value, node)
PostgreSQLInterface::Record rec; PostgreSQLInterface::Record rec =
rec.push_back(dateToString(si->sm_tv_sec, "-")); // date {
rec.push_back(timeToString(si->sm_tv_sec, ":")); // time dateToString(si->sm_tv_sec, "-"), // date
rec.push_back(std::to_string(si->sm_tv_usec)); timeToString(si->sm_tv_sec, ":"), // time
rec.push_back(std::to_string(si->id)); std::to_string(si->sm_tv_usec),
rec.push_back(std::to_string(si->value)); std::to_string(si->id),
rec.push_back(std::to_string(si->node)); std::to_string(si->value),
std::to_string(si->node),
};
ibuf.push_back(std::move(rec)); ibuf.push_back(std::move(rec));
ibufSize++; ibufSize++;
if( ibufSize > ibufMaxSize ) if( ibufSize >= ibufMaxSize )
flushInsertBuffer(); flushInsertBuffer();
} }
catch( const Exception& ex ) catch( const Exception& ex )
...@@ -316,9 +324,11 @@ void DBServer_PostgreSQL::initDBServer() ...@@ -316,9 +324,11 @@ void DBServer_PostgreSQL::initDBServer()
string dbpass( conf->getArgParam("--" + prefix + "-dbpass", it.getProp("dbpass"))); string dbpass( conf->getArgParam("--" + prefix + "-dbpass", it.getProp("dbpass")));
unsigned int dbport = conf->getArgPInt("--" + prefix + "-dbport", it.getProp("dbport"),5432); unsigned int dbport = conf->getArgPInt("--" + prefix + "-dbport", it.getProp("dbport"),5432);
ibufMaxSize = conf->getArgPInt("--" + prefix + "-ibufMaxSize", it.getProp("ibufMaxSize"),5000); ibufMaxSize = conf->getArgPInt("--" + prefix + "-ibuf-maxsize", it.getProp("ibufMaxSize"),2000);
ibufSyncTimeout = conf->getArgPInt("--" + prefix + "-ibufSyncTimeout", it.getProp("ibufSyncTimeout"),15000); ibuf.reserve(ibufMaxSize);
std::string sfactor = conf->getArg2Param("--" + prefix + "-ibufOverflowCleanFactor", it.getProp("ibufOverflowCleanFactor"),"0.5");
ibufSyncTimeout = conf->getArgPInt("--" + prefix + "-ibuf-sync-timeout", it.getProp("ibufSyncTimeout"),15000);
std::string sfactor = conf->getArg2Param("--" + prefix + "-ibuf-overflow-cleanfactor", it.getProp("ibufOverflowCleanFactor"),"0.5");
ibufOverflowCleanFactor = atof(sfactor.c_str()); ibufOverflowCleanFactor = atof(sfactor.c_str());
tblMap[UniSetTypes::Message::SensorInfo] = "main_history"; tblMap[UniSetTypes::Message::SensorInfo] = "main_history";
...@@ -505,9 +515,9 @@ void DBServer_PostgreSQL::help_print( int argc, const char* const* argv ) ...@@ -505,9 +515,9 @@ void DBServer_PostgreSQL::help_print( int argc, const char* const* argv )
cout << "--prefix-reconnectTime msec - reconnect time. Default: 30000 msec " << endl; cout << "--prefix-reconnectTime msec - reconnect time. Default: 30000 msec " << endl;
cout << "Insert buffer:" << endl; cout << "Insert buffer:" << endl;
cout << "--prefix-ibufMaxSize sz - INSERT-buffer size. Default: 5000" << endl; cout << "--prefix-ibuf-maxsize sz - INSERT-buffer size. Default: 2000" << endl;
cout << "--prefix-ibufSyncTimeout msec - INSERT-buffer sync timeout. Default: 15000 msec" << endl; cout << "--prefix-ibuf-sync-timeout msec - INSERT-buffer sync timeout. Default: 15000 msec" << endl;
cout << "--prefix-ibufOverflowCleanFactor [0...1] - INSERT-buffer overflow clean factor. Default: 0.5" << endl; cout << "--prefix-ibuf-overflow-cleanfactor [0...1] - INSERT-buffer overflow clean factor. Default: 0.5" << endl;
cout << "Query buffer:" << endl; cout << "Query buffer:" << endl;
cout << "--prefix-buffer-size sz - The buffer in case the database is unavailable. Default: 200" << endl; cout << "--prefix-buffer-size sz - The buffer in case the database is unavailable. Default: 200" << endl;
......
...@@ -27,15 +27,23 @@ ...@@ -27,15 +27,23 @@
* \brief The DBServer_PostgreSQL class * \brief The DBServer_PostgreSQL class
* Реализация работы с PostgreSQL. * Реализация работы с PostgreSQL.
* *
* Т.к. основная работа DBServer-а это частая запись данных, то сделана следующая оптимизация: * Т.к. основная работа сервера - это частая запись данных, то сделана следующая оптимизация:
* Создаётся insert-буфер настраиваемого размера (ibufMaxSize). * Создаётся insert-буфер настраиваемого размера (ibufMaxSize).
* Как только буфер заполняется, он пишется в БД одним "оптимизированным" запросом. * Как только буфер заполняется, он пишется в БД одним "оптимизированным" запросом.
* Помимо этого буфер скидывается, если прошло ibufSyncTimeout мсек или если пришёл запрос * Помимо этого буфер скидывается, если прошло ibufSyncTimeout мсек или если пришёл запрос
* на UPDATE данных. * на UPDATE данных.
* *
* В случае если буфер переполняется (например нет связи с БД), то он чистится. При этом сколько * В случае если буфер переполняется (например нет связи с БД), то он чистится. При этом сколько
* записей удалять определяется коэффициентом wbufOverflowCleanFactor={0...1}. * записей удалять определяется коэффициентом ibufOverflowCleanFactor={0...1}.
* А также флаг lastRemove определяет удалять с конца или начала очереди. * А также флаг lastRemove определяет удалять с конца или начала очереди.
*
* \warning Следует иметь ввиду, что чтобы не было постоянных "перевыделений памяти" буфер сделан на основе
* vector и в начале работы в памяти сразу(!) резервируется место под буфер.
* Во первых надо иметь ввиду, что буфер - это то, что потеряется если вдруг произойдёт сбой
* по питанию или программа вылетит. Поэтому если он большой, то будет потеряно много данных.
* И второе, т.к. это vector - то идёт выделение "непрерывного куска памяти", поэтому у ОС могут
* быть проблеммы найти "большой непрерывный кусок".
* Тем не менее реализация сделана на vector-е чтобы избежать лишних "перевыделений" (и сегментации) памяти во время работы.
*/ */
class DBServer_PostgreSQL: class DBServer_PostgreSQL:
public DBServer public DBServer
...@@ -108,10 +116,10 @@ class DBServer_PostgreSQL: ...@@ -108,10 +116,10 @@ class DBServer_PostgreSQL:
// writeBuffer // writeBuffer
const std::list<std::string> tblcols = { "date", "time","time_usec","sensor_id","value","node" }; const std::list<std::string> tblcols = { "date", "time","time_usec","sensor_id","value","node" };
typedef std::list<PostgreSQLInterface::Record> InsertBuffer; typedef std::vector<PostgreSQLInterface::Record> InsertBuffer;
InsertBuffer ibuf; InsertBuffer ibuf;
size_t ibufSize = { 0 }; size_t ibufSize = { 0 };
size_t ibufMaxSize = { 5000 }; size_t ibufMaxSize = { 2000 };
timeout_t ibufSyncTimeout = { 15000 }; timeout_t ibufSyncTimeout = { 15000 };
void flushInsertBuffer(); void flushInsertBuffer();
float ibufOverflowCleanFactor = { 0.5 }; // коэфициент {0...1} чистки буфера при переполнении float ibufOverflowCleanFactor = { 0.5 }; // коэфициент {0...1} чистки буфера при переполнении
......
...@@ -86,10 +86,14 @@ bool PostgreSQLInterface::close() ...@@ -86,10 +86,14 @@ bool PostgreSQLInterface::close()
return true; return true;
} }
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std::string>& cols, const std::list<Record>& data ) bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std::string>& cols,
const PostgreSQLInterface::Data& data )
{ {
if( !db ) if( !db )
{
lastE = "no connection";
return false; return false;
}
try try
{ {
...@@ -116,7 +120,10 @@ bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std: ...@@ -116,7 +120,10 @@ bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std:
bool PostgreSQLInterface::insert( const string& q ) bool PostgreSQLInterface::insert( const string& q )
{ {
if( !db ) if( !db )
{
lastE = "no connection";
return false; return false;
}
try try
{ {
...@@ -137,7 +144,10 @@ bool PostgreSQLInterface::insert( const string& q ) ...@@ -137,7 +144,10 @@ bool PostgreSQLInterface::insert( const string& q )
bool PostgreSQLInterface::insertAndSaveRowid( const string& q ) bool PostgreSQLInterface::insertAndSaveRowid( const string& q )
{ {
if( !db ) if( !db )
{
lastE = "no connection";
return false; return false;
}
std::string qplus = q + " RETURNING id"; std::string qplus = q + " RETURNING id";
......
...@@ -50,9 +50,10 @@ class PostgreSQLInterface: ...@@ -50,9 +50,10 @@ class PostgreSQLInterface:
void save_inserted_id( const pqxx::result& res ); void save_inserted_id( const pqxx::result& res );
typedef std::list<std::string> Record; typedef std::list<std::string> Record;
typedef std::vector<Record> Data;
// fast insert: Use COPY..from SDTIN.. // fast insert: Use COPY..from SDTIN..
virtual bool copy( const std::string& tblname, const std::list<std::string>& cols, const std::list<Record>& data ); bool copy( const std::string& tblname, const std::list<std::string>& cols, const Data& data );
virtual const std::string error() override; virtual const std::string error() override;
......
...@@ -61,7 +61,7 @@ int main(int argc, char** argv) ...@@ -61,7 +61,7 @@ int main(int argc, char** argv)
if( ver == 3 ) if( ver == 3 )
{ {
std::list<std::string> cols = { "date", "time","time_usec","sensor_id","value","node" }; std::list<std::string> cols = { "date", "time","time_usec","sensor_id","value","node" };
std::list<PostgreSQLInterface::Record> data; PostgreSQLInterface::Data data;
for( size_t i=0; i<num; i++ ) for( size_t i=0; i<num; i++ )
{ {
PostgreSQLInterface::Record d = { "now()","now()","0","7","1","3000" }; PostgreSQLInterface::Record d = { "now()","now()","0","7","1","3000" };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment