Commit f9b2e2b9 authored by Pavel Vainerman's avatar Pavel Vainerman

(DBServer_PostgreSQL): refactoring interface

parent 4a18b249
...@@ -74,17 +74,11 @@ void DBServer_PostgreSQL::sysCommand( const uniset::SystemMessage* sm ) ...@@ -74,17 +74,11 @@ void DBServer_PostgreSQL::sysCommand( const uniset::SystemMessage* sm )
break; break;
case SystemMessage::Finish: case SystemMessage::Finish:
{
activate = false;
db->close(); db->close();
}
break; break;
case SystemMessage::FoldUp: case SystemMessage::FoldUp:
{
activate = false;
db->close(); db->close();
}
break; break;
default: default:
...@@ -171,7 +165,7 @@ void DBServer_PostgreSQL::flushBuffer() ...@@ -171,7 +165,7 @@ void DBServer_PostgreSQL::flushBuffer()
// Сперва пробуем очистить всё что накопилось в очереди до этого... // Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() ) while( !qbuf.empty() )
{ {
if(!db->insert( qbuf.front() )) if( !db->insert(qbuf.front()) )
{ {
dbcrit << myname << "(writeToBase): error: " << db->error() << " lost query: " << qbuf.front() << endl; dbcrit << myname << "(writeToBase): error: " << db->error() << " lost query: " << qbuf.front() << endl;
} }
...@@ -222,7 +216,7 @@ void DBServer_PostgreSQL::flushInsertBuffer() ...@@ -222,7 +216,7 @@ void DBServer_PostgreSQL::flushInsertBuffer()
dbinfo << myname << "(flushInsertBuffer): write insert buffer[" << ibufSize << "] to DB.." << endl; dbinfo << myname << "(flushInsertBuffer): write insert buffer[" << ibufSize << "] to DB.." << endl;
if( !writeBufferToDB("main_history", tblcols, ibuf) ) if( !writeInsertBufferToDB("main_history", tblcols, ibuf) )
{ {
dbcrit << myname << "(flushInsertBuffer): error: " << db->error() << endl; dbcrit << myname << "(flushInsertBuffer): error: " << db->error() << endl;
} }
...@@ -233,9 +227,18 @@ void DBServer_PostgreSQL::flushInsertBuffer() ...@@ -233,9 +227,18 @@ void DBServer_PostgreSQL::flushInsertBuffer()
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
bool DBServer_PostgreSQL::writeBufferToDB( const std::string& tableName void DBServer_PostgreSQL::addRecord( const PostgreSQLInterface::Record&& rec )
{
ibuf.emplace_back( std::move(rec) );
ibufSize++;
if( ibufSize >= ibufMaxSize )
flushInsertBuffer();
}
//--------------------------------------------------------------------------------------------
bool DBServer_PostgreSQL::writeInsertBufferToDB( const std::string& tableName
, const std::vector<std::string>& colNames , const std::vector<std::string>& colNames
, DBServer_PostgreSQL::InsertBuffer& wbuf ) , const InsertBuffer& wbuf )
{ {
return db->copy(tableName, colNames, wbuf); return db->copy(tableName, colNames, wbuf);
} }
...@@ -265,11 +268,7 @@ void DBServer_PostgreSQL::sensorInfo( const uniset::SensorMessage* si ) ...@@ -265,11 +268,7 @@ void DBServer_PostgreSQL::sensorInfo( const uniset::SensorMessage* si )
std::to_string(si->node), std::to_string(si->node),
}; };
ibuf.emplace_back(std::move(rec)); addRecord( std::move(rec) );
ibufSize++;
if( ibufSize >= ibufMaxSize )
flushInsertBuffer();
} }
catch( const uniset::Exception& ex ) catch( const uniset::Exception& ex )
{ {
...@@ -367,7 +366,7 @@ void DBServer_PostgreSQL::initDBServer() ...@@ -367,7 +366,7 @@ void DBServer_PostgreSQL::initDBServer()
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::createTables( std::shared_ptr<PostgreSQLInterface>& db ) void DBServer_PostgreSQL::createTables( const std::shared_ptr<PostgreSQLInterface>& db )
{ {
auto conf = uniset_conf(); auto conf = uniset_conf();
...@@ -548,3 +547,8 @@ void DBServer_PostgreSQL::help_print( int argc, const char* const* argv ) ...@@ -548,3 +547,8 @@ void DBServer_PostgreSQL::help_print( int argc, const char* const* argv )
cout << DBServer::help_print() << endl; cout << DBServer::help_print() << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool DBServer_PostgreSQL::isConnectOk() const
{
return connect_ok;
}
// -----------------------------------------------------------------------------
...@@ -74,12 +74,14 @@ namespace uniset ...@@ -74,12 +74,14 @@ namespace uniset
return dblog; return dblog;
} }
bool isConnectOk() const;
protected: protected:
typedef std::unordered_map<int, std::string> DBTableMap; typedef std::unordered_map<int, std::string> DBTableMap;
virtual void initDBServer() override; virtual void initDBServer() override;
virtual void initDB( std::unique_ptr<PostgreSQLInterface>& db ) {}; virtual void initDB( std::unique_ptr<PostgreSQLInterface>& db ) {};
virtual void initDBTableMap(DBTableMap& tblMap) {}; virtual void initDBTableMap( DBTableMap& tblMap ) {};
virtual void timerInfo( const uniset::TimerMessage* tm ) override; virtual void timerInfo( const uniset::TimerMessage* tm ) override;
virtual void sysCommand( const uniset::SystemMessage* sm ) override; virtual void sysCommand( const uniset::SystemMessage* sm ) override;
...@@ -89,7 +91,7 @@ namespace uniset ...@@ -89,7 +91,7 @@ namespace uniset
virtual std::string getMonitInfo( const std::string& params ) override; virtual std::string getMonitInfo( const std::string& params ) override;
bool writeToBase( const std::string& query ); bool writeToBase( const std::string& query );
void createTables( std::shared_ptr<PostgreSQLInterface>& db ); void createTables( const std::shared_ptr<PostgreSQLInterface>& db );
inline std::string tblName(int key) inline std::string tblName(int key)
{ {
...@@ -105,39 +107,38 @@ namespace uniset ...@@ -105,39 +107,38 @@ namespace uniset
}; };
std::unique_ptr<PostgreSQLInterface> db; std::unique_ptr<PostgreSQLInterface> db;
int PingTime = { 15000 };
int ReconnectTime = { 30000 };
bool connect_ok = { false }; /*! признак наличия соеднинения с сервером БД */
bool activate = { false };
typedef std::queue<std::string> QueryBuffer; typedef std::queue<std::string> QueryBuffer;
QueryBuffer qbuf;
size_t qbufSize = { 200 }; // размер буфера сообщений.
bool lastRemove = { false };
void flushBuffer(); void flushBuffer();
std::mutex mqbuf;
// writeBuffer // writeBuffer
const std::vector<std::string> tblcols = { "date", "time", "time_usec", "sensor_id", "value", "node" }; const std::vector<std::string> tblcols = { "date", "time", "time_usec", "sensor_id", "value", "node" };
typedef std::vector<PostgreSQLInterface::Record> InsertBuffer; typedef std::vector<PostgreSQLInterface::Record> InsertBuffer;
InsertBuffer ibuf;
size_t ibufSize = { 0 };
size_t ibufMaxSize = { 2000 };
timeout_t ibufSyncTimeout = { 15000 };
void flushInsertBuffer(); void flushInsertBuffer();
virtual bool writeBufferToDB( const std::string& table virtual void addRecord( const PostgreSQLInterface::Record&& rec );
virtual bool writeInsertBufferToDB( const std::string& table
, const std::vector<std::string>& colname , const std::vector<std::string>& colname
, InsertBuffer& ibuf ); , const InsertBuffer& ibuf );
float ibufOverflowCleanFactor = { 0.5 }; // коэфициент {0...1} чистки буфера при переполнении
private: private:
DBTableMap tblMap; DBTableMap tblMap;
int PingTime = { 15000 };
int ReconnectTime = { 30000 };
bool connect_ok = { false }; /*! признак наличия соеднинения с сервером БД */
QueryBuffer qbuf;
size_t qbufSize = { 200 }; // размер буфера сообщений.
bool lastRemove = { false };
std::mutex mqbuf;
InsertBuffer ibuf;
size_t ibufSize = { 0 };
size_t ibufMaxSize = { 2000 };
timeout_t ibufSyncTimeout = { 15000 };
float ibufOverflowCleanFactor = { 0.5 }; // коэфициент {0...1} чистки буфера при переполнении
}; };
// ---------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------
} // end of namespace uniset } // end of namespace uniset
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment