Commit bd6cde6f authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): сделал скелет релизации сохранения в БД.

parent 2cc0f0c0
...@@ -55,9 +55,11 @@ LogDB::LogDB( const string& name , const string& prefix ): ...@@ -55,9 +55,11 @@ LogDB::LogDB( const string& name , const string& prefix ):
UniXML::iterator it(cnode); UniXML::iterator it(cnode);
qbufSize = conf->getArgPInt("--" + prefix + "-buffer-size", it.getProp("bufferSize"), qbufSize); qbufSize = conf->getArgPInt("--" + prefix + "-buffer-size", it.getProp("bufferSize"), qbufSize);
tmConnection_sec = tmConnection_msec / 1000.; tmConnection_sec = tmConnection_msec / 1000.;
connectionTimer.set<LogDB, &LogDB::onTimer>(this); connectionTimer.set<LogDB, &LogDB::onTimer>(this);
checkBufferTimer.set<LogDB, &LogDB::onCheckBuffer>(this);
UniXML::iterator sit(cnode); UniXML::iterator sit(cnode);
if( !sit.goChildren() ) if( !sit.goChildren() )
...@@ -111,7 +113,7 @@ LogDB::LogDB( const string& name , const string& prefix ): ...@@ -111,7 +113,7 @@ LogDB::LogDB( const string& name , const string& prefix ):
// l->tcp = make_shared<UTCPStream>(); // l->tcp = make_shared<UTCPStream>();
l->dblog = dblog; l->dblog = dblog;
l->signal_on_read().connect(sigc::mem_fun(this, &LogDB::onRead)); l->signal_on_read().connect(sigc::mem_fun(this, &LogDB::addLog));
logservers.push_back(l); logservers.push_back(l);
} }
...@@ -125,31 +127,25 @@ LogDB::LogDB( const string& name , const string& prefix ): ...@@ -125,31 +127,25 @@ LogDB::LogDB( const string& name , const string& prefix ):
} }
db = unisetstd::make_unique<SQLiteInterface>(); std::string dbfile = conf->getArgParam("--" + prefix + "-dbfile", it.getProp("dbfile"));
if( dbfile.empty() )
{
ostringstream err;
err << name << "(init): dbfile (sqlite) not defined. Use: <LogDB name='" << name << "' dbfile='..' ...>";
dbcrit << err.str() << endl;
throw uniset::SystemError(err.str());
}
#if 0 db = unisetstd::make_unique<SQLiteInterface>();
if( !db->connect(dbfile, false) ) if( !db->connect(dbfile, false) )
{ {
// ostringstream err; ostringstream err;
dbcrit << myname err << myname
<< "(init): DB connection error: " << "(init): DB connection error: "
<< db->error() << endl; << db->error();
// throw Exception( string(myname+"(init): не смогли создать соединение с БД "+db->error()) ); dbcrit << err.str() << endl;
askTimer(LogDB::ReconnectTimer, ReconnectTime); throw uniset::SystemError(err.str());
}
else
{
dbinfo << myname << "(init): connect [OK]" << endl;
connect_ok = true;
askTimer(LogDB::ReconnectTimer, 0);
askTimer(LogDB::PingTimer, PingTime);
// createTables(db);
initDB(db);
initDBTableMap(tblMap);
flushBuffer();
} }
#endif
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
LogDB::~LogDB() LogDB::~LogDB()
...@@ -158,50 +154,8 @@ LogDB::~LogDB() ...@@ -158,50 +154,8 @@ LogDB::~LogDB()
db->close(); db->close();
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
#if 0
bool LogDB::writeToBase( const string& query )
{
dbinfo << myname << "(writeToBase): " << query << endl;
// cout << "LogDB: " << query << endl;
if( !db || !connect_ok )
{
uniset_rwmutex_wrlock l(mqbuf);
qbuf.push(query);
if( qbuf.size() > qbufSize )
{
std::string qlost;
if( lastRemove )
qlost = qbuf.back();
else
qlost = qbuf.front();
qbuf.pop();
dbcrit << myname << "(writeToBase): DB not connected! buffer(" << qbufSize
<< ") overflow! lost query: " << qlost << endl;
}
return false;
}
// На всякий скидываем очередь
flushBuffer();
// А теперь собственно запрос..
if( db->insert(query) )
return true;
return false;
}
#endif
//--------------------------------------------------------------------------------------------
void LogDB::flushBuffer() void LogDB::flushBuffer()
{ {
uniset_rwmutex_wrlock l(mqbuf);
// Сперва пробуем очистить всё что накопилось в очереди до этого... // Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() ) while( !qbuf.empty() )
{ {
...@@ -215,9 +169,19 @@ void LogDB::flushBuffer() ...@@ -215,9 +169,19 @@ void LogDB::flushBuffer()
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void LogDB::onRead( LogDB::Log* log, const string& txt ) void LogDB::addLog( LogDB::Log* log, const string& txt )
{ {
cout << txt << endl; auto tm = uniset::now_to_timespec();
ostringstream q;
q << "INSERT INTO log(tms,usec,name,text) VALUES('"
<< tm.tv_sec << "','" // timestamp
<< tm.tv_nsec << "','" // usec
<< log->name << "','"
<< txt << "');";
qbuf.emplace(q.str());
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
std::shared_ptr<LogDB> LogDB::init_logdb( int argc, const char* const* argv, const std::string& prefix ) std::shared_ptr<LogDB> LogDB::init_logdb( int argc, const char* const* argv, const std::string& prefix )
...@@ -253,39 +217,18 @@ void LogDB::run( bool async ) ...@@ -253,39 +217,18 @@ void LogDB::run( bool async )
void LogDB::evfinish() void LogDB::evfinish()
{ {
connectionTimer.stop(); connectionTimer.stop();
checkBufferTimer.stop();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::evprepare() void LogDB::evprepare()
{ {
connectionTimer.set(loop); connectionTimer.set(loop);
checkBufferTimer.set(loop);
if( tmConnection_msec != UniSetTimer::WaitUpTime ) if( tmConnection_msec != UniSetTimer::WaitUpTime )
connectionTimer.start(0, tmConnection_sec); connectionTimer.start(0, tmConnection_sec);
#if 0 checkBufferTimer.start(0, tmCheckBuffer_sec);
try
{
sock = make_shared<UTCPSocket>(iaddr, port);
}
catch( const Poco::Net::NetException& ex )
{
ostringstream err;
err << "(ModbusTCPServer::evprepare): connect " << iaddr << ":" << port << " err: " << ex.what();
dlog->crit() << err.str() << endl;
throw uniset::SystemError(err.str());
}
catch( const std::exception& ex )
{
ostringstream err;
err << "(ModbusTCPServer::evprepare): connect " << iaddr << ":" << port << " err: " << ex.what();
dlog->crit() << err.str() << endl;
throw uniset::SystemError(err.str());
}
sock->setBlocking(false);
io.set(loop);
io.start(sock->getSocket(), ev::READ);
#endif
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::onTimer( ev::timer& t, int revents ) void LogDB::onTimer( ev::timer& t, int revents )
...@@ -307,6 +250,18 @@ void LogDB::onTimer( ev::timer& t, int revents ) ...@@ -307,6 +250,18 @@ void LogDB::onTimer( ev::timer& t, int revents )
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::onCheckBuffer(ev::timer& t, int revents)
{
if (EV_ERROR & revents)
{
dbcrit << myname << "(LogDB::onTimer): invalid event" << endl;
return;
}
if( qbuf.size() >= qbufSize )
flushBuffer();
}
// -----------------------------------------------------------------------------
bool LogDB::Log::isConnected() const bool LogDB::Log::isConnected() const
{ {
return tcp && tcp->isConnected(); return tcp && tcp->isConnected();
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#include <queue> #include <queue>
#include <memory> #include <memory>
#include <chrono>
#include <ev++.h> #include <ev++.h>
#include <sigc++/sigc++.h> #include <sigc++/sigc++.h>
#include "UniSetTypes.h" #include "UniSetTypes.h"
...@@ -61,6 +62,8 @@ namespace uniset ...@@ -61,6 +62,8 @@ namespace uniset
\section sec_LogDB_REST LogDB REST API \section sec_LogDB_REST LogDB REST API
\todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку,...
\todo Добавить ротацию БД
\todo Продумать поддержку websocket \todo Продумать поддержку websocket
*/ */
class LogDB: class LogDB:
...@@ -85,9 +88,13 @@ namespace uniset ...@@ -85,9 +88,13 @@ namespace uniset
protected: protected:
class Log;
virtual void evfinish() override; virtual void evfinish() override;
virtual void evprepare() override; virtual void evprepare() override;
void onTimer( ev::timer& t, int revents ); void onTimer( ev::timer& t, int revents );
void onCheckBuffer( ev::timer& t, int revents );
void addLog( Log* log, const std::string& txt );
std::string myname; std::string myname;
std::unique_ptr<SQLiteInterface> db; std::unique_ptr<SQLiteInterface> db;
...@@ -99,9 +106,6 @@ namespace uniset ...@@ -99,9 +106,6 @@ namespace uniset
size_t qbufSize = { 200 }; // размер буфера сообщений. size_t qbufSize = { 200 }; // размер буфера сообщений.
void flushBuffer(); void flushBuffer();
uniset::uniset_rwmutex mqbuf;
std::shared_ptr<DebugStream> dblog;
class Log class Log
{ {
...@@ -133,14 +137,16 @@ namespace uniset ...@@ -133,14 +137,16 @@ namespace uniset
std::string text; std::string text;
}; };
void onRead(Log* log, const std::string& txt );
std::vector< std::shared_ptr<Log> > logservers; std::vector< std::shared_ptr<Log> > logservers;
std::shared_ptr<DebugStream> dblog;
ev::timer connectionTimer; ev::timer connectionTimer;
timeout_t tmConnection_msec = { 5000 }; // пауза между попытками установить соединение timeout_t tmConnection_msec = { 5000 }; // пауза между попытками установить соединение
double tmConnection_sec = { 0.0 }; double tmConnection_sec = { 0.0 };
ev::timer checkBufferTimer;
double tmCheckBuffer_sec = { 1.0 };
private: private:
}; };
// ---------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------
......
#!/bin/sh
dbname=test.db
[ -n "$1" ] && dbname="$1"
sqlite3 $dbname <<"_EOF_"
PRAGMA foreign_keys=ON;
DROP TABLE IF EXISTS log;
CREATE TABLE log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tms timestamp KEY default (strftime('%s', 'now')),
usec INTEGER(5) NOT NULL,
name TEXT KEY NOT NULL,
text TEXT
);
_EOF_
...@@ -3,4 +3,7 @@ ...@@ -3,4 +3,7 @@
ulimit -Sc 1000000 ulimit -Sc 1000000
#uniset2-start.sh -f #uniset2-start.sh -f
./uniset2-logdb --confile test.xml --logdb-name LogDB --logdb-log-add-levels any ./uniset2-logdb --confile test.xml --logdb-name LogDB \
--logdb-log-add-levels any \
--logdb-dbfile ./test.db \
--logdb-buffer-size 5
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment