Commit 2cc0f0c0 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): заложил реализацию чтения на основе libev.

parent 149149bd
......@@ -31,7 +31,7 @@ static void print_help()
// printf("[-t|--timeout] msec - Timeout. Default: 2000.\n");
printf("[-v|--verbose] - Print all messages to stdout\n");
printf("[-i|--iaddr] addr - Inet address for listen connections.\n");
printf("[-p|--port] port - Bind port.\n");
printf("[-p|--port] port - Bind port. Default: 3333\n");
printf("[-d|--delay] msec - Delay for generate message. Default 5000.\n");
printf("[-m|--max-sessions] num - Maximum count sessions for server. Default: 5\n");
printf("[-s|--silent] - Silent mode. Not write logs..\n");
......
......@@ -39,6 +39,11 @@
<dlog name="dlog"/>
<LogServer name="smplus" port="3333" host="localhost" />
<LogDB name="LogDB">
<logserver name="logserver1" ip="localhost" port="3333" cmd=""/>
<logserver name="logserver2" ip="localhost" port="3333" cmd=""/>
</LogDB>
<settings>
<LostTestProc name="TestProc1" sensor_s="Input4_S"/>
......
......@@ -22,6 +22,7 @@
#include <iomanip>
#include "unisetstd.h"
#include <Poco/Net/NetException.h>
#include "LogDB.h"
#include "Configuration.h"
#include "Debug.h"
......@@ -39,6 +40,8 @@ LogDB::LogDB( const string& name , const string& prefix ):
auto conf = uniset_conf();
auto xml = conf->getConfXML();
conf->initLogStream(dblog, prefix + "-log" );
xmlNode* cnode = conf->findNode(xml->getFirstNode(), "LogDB", name);
if( !cnode )
......@@ -52,7 +55,9 @@ LogDB::LogDB( const string& name , const string& prefix ):
UniXML::iterator it(cnode);
qbufSize = conf->getArgPInt("--" + prefix + "-buffer-size", it.getProp("bufferSize"), qbufSize);
tmConnection_sec = tmConnection_msec / 1000.;
connectionTimer.set<LogDB, &LogDB::onTimer>(this);
UniXML::iterator sit(cnode);
if( !sit.goChildren() )
......@@ -65,13 +70,14 @@ LogDB::LogDB( const string& name , const string& prefix ):
for( ;sit.getCurrent(); sit++ )
{
Log l;
l.name = sit.getProp("name");
l.ip = sit.getProp("ip");
l.port = sit.getIntProp("port");
l.cmd = sit.getProp("cmd");
auto l = make_shared<Log>();
l->name = sit.getProp("name");
l->ip = sit.getProp("ip");
l->port = sit.getIntProp("port");
l->cmd = sit.getProp("cmd");
if( l.name.empty() )
if( l->name.empty() )
{
ostringstream err;
err << name << "(init): Unknown name for logserver..";
......@@ -79,31 +85,35 @@ LogDB::LogDB( const string& name , const string& prefix ):
throw uniset::SystemError(err.str());
}
if( l.ip.empty() )
if( l->ip.empty() )
{
ostringstream err;
err << name << "(init): Unknown 'ip' for '" << l.name << "'..";
err << name << "(init): Unknown 'ip' for '" << l->name << "'..";
dbcrit << err.str() << endl;
throw uniset::SystemError(err.str());
}
if( l.port == 0 )
if( l->port == 0 )
{
ostringstream err;
err << name << "(init): Unknown 'port' for '" << l.name << "'..";
err << name << "(init): Unknown 'port' for '" << l->name << "'..";
dbcrit << err.str() << endl;
throw uniset::SystemError(err.str());
}
if( l.cmd.empty() )
{
ostringstream err;
err << name << "(init): Unknown 'cmd' for '" << l.name << "'..";
dbcrit << err.str() << endl;
throw uniset::SystemError(err.str());
}
// if( l->cmd.empty() )
// {
// ostringstream err;
// err << name << "(init): Unknown 'cmd' for '" << l->name << "'..";
// dbcrit << err.str() << endl;
// throw uniset::SystemError(err.str());
// }
logservers.emplace_back(std::move(l));
// l->tcp = make_shared<UTCPStream>();
l->dblog = dblog;
l->signal_on_read().connect(sigc::mem_fun(this, &LogDB::onRead));
logservers.push_back(l);
}
if( logservers.empty() )
......@@ -115,8 +125,6 @@ LogDB::LogDB( const string& name , const string& prefix ):
}
db = unisetstd::make_unique<SQLiteInterface>();
#if 0
......@@ -207,6 +215,11 @@ void LogDB::flushBuffer()
}
}
//--------------------------------------------------------------------------------------------
void LogDB::onRead( LogDB::Log* log, const string& txt )
{
cout << txt << endl;
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<LogDB> LogDB::init_logdb( int argc, const char* const* argv, const std::string& prefix )
{
auto conf = uniset_conf();
......@@ -231,6 +244,190 @@ void LogDB::help_print()
// -----------------------------------------------------------------------------
void LogDB::run( bool async )
{
if( async )
async_evrun();
else
evrun();
}
// -----------------------------------------------------------------------------
void LogDB::evfinish()
{
connectionTimer.stop();
}
// -----------------------------------------------------------------------------
void LogDB::evprepare()
{
connectionTimer.set(loop);
if( tmConnection_msec != UniSetTimer::WaitUpTime )
connectionTimer.start(0, tmConnection_sec);
#if 0
try
{
sock = make_shared<UTCPSocket>(iaddr, port);
}
catch( const Poco::Net::NetException& ex )
{
ostringstream err;
err << "(ModbusTCPServer::evprepare): connect " << iaddr << ":" << port << " err: " << ex.what();
dlog->crit() << err.str() << endl;
throw uniset::SystemError(err.str());
}
catch( const std::exception& ex )
{
ostringstream err;
err << "(ModbusTCPServer::evprepare): connect " << iaddr << ":" << port << " err: " << ex.what();
dlog->crit() << err.str() << endl;
throw uniset::SystemError(err.str());
}
sock->setBlocking(false);
io.set(loop);
io.start(sock->getSocket(), ev::READ);
#endif
}
// -----------------------------------------------------------------------------
void LogDB::onTimer( ev::timer& t, int revents )
{
if (EV_ERROR & revents)
{
dbcrit << myname << "(LogDB::onTimer): invalid event" << endl;
return;
}
// проверяем соединения..
for( const auto& s: logservers )
{
if( !s->isConnected() )
{
if( s->connect() )
s->ioprepare(loop);
}
}
}
// -----------------------------------------------------------------------------
bool LogDB::Log::isConnected() const
{
return tcp && tcp->isConnected();
}
// -----------------------------------------------------------------------------
bool LogDB::Log::connect() noexcept
{
if( tcp && tcp->isConnected() )
return true;
// dbinfo << name << "(connect): connect " << ip << ":" << port << "..." << endl;
try
{
tcp = make_shared<UTCPStream>();
tcp->create(ip, port);
// tcp->setReceiveTimeout( UniSetTimer::millisecToPoco(inTimeout) );
// tcp->setSendTimeout( UniSetTimer::millisecToPoco(outTimeout) );
tcp->setKeepAlive(true);
tcp->setBlocking(false);
dbinfo << name << "(connect): connect OK to " << ip << ":" << port << endl;
return true;
}
catch( const Poco::TimeoutException& e )
{
dbwarn << name << "(connect): connection " << ip << ":" << port << " timeout.." << endl;
}
catch( const Poco::Net::NetException& e )
{
dbwarn << name << "(connect): connection " << ip << ":" << port << " error: " << e.what() << endl;
}
catch( const std::exception& e )
{
dbwarn << name << "(connect): connection " << ip << ":" << port << " error: " << e.what() << endl;
}
catch( ... )
{
std::exception_ptr p = std::current_exception();
dbwarn << name << "(connect): connection " << ip << ":" << port << " error: "
<< (p ? p.__cxa_exception_type()->name() : "null") << endl;
}
tcp->disconnect();
tcp = nullptr;
return false;
}
// -----------------------------------------------------------------------------
void LogDB::Log::ioprepare( ev::dynamic_loop& loop )
{
if( !tcp || !tcp->isConnected() )
return;
io.set(loop);
io.set<LogDB::Log, &LogDB::Log::event>(this);
io.start(tcp->getSocket(), ev::READ);
text.reserve(reservsize);
}
// -----------------------------------------------------------------------------
void LogDB::Log::event( ev::io& watcher, int revents )
{
if( EV_ERROR & revents )
{
dbcrit << name << "(event): invalid event" << endl;
return;
}
if( revents & EV_READ )
read();
if( revents & EV_WRITE )
{
dbinfo << name << "(event): ..write event.." << endl;
}
}
// -----------------------------------------------------------------------------
LogDB::Log::ReadSignal LogDB::Log::signal_on_read()
{
return sigRead;
}
// -----------------------------------------------------------------------------
void LogDB::Log::read()
{
int n = tcp->available();
n = std::min(n,bufsize);
if( n > 0 )
{
tcp->receiveBytes(buf, n);
// нарезаем на строки
for( size_t i=0; i<n; i++ )
{
if( buf[i] != '\n' )
text += buf[i];
else
{
sigRead.emit(this,text);
text = "";
if( text.capacity() < reservsize )
text.reserve(reservsize);
}
}
}
else if( n == 0 )
{
dbinfo << name << ": " << ip << ":" << port << " connection is closed.." << endl;
tcp->disconnect();
if( !text.empty() )
{
sigRead.emit(this,text);
text = "";
if( text.capacity() < reservsize )
text.reserve(reservsize);
}
}
}
// -----------------------------------------------------------------------------
void LogDB::Log::write()
{
}
// -----------------------------------------------------------------------------
......@@ -23,10 +23,14 @@
// --------------------------------------------------------------------------
#include <queue>
#include <memory>
#include <ev++.h>
#include <sigc++/sigc++.h>
#include "UniSetTypes.h"
#include "LogAgregator.h"
#include "DebugStream.h"
#include "SQLiteInterface.h"
#include "EventLoopServer.h"
#include "UTCPStream.h"
#include "LogReader.h"
// -------------------------------------------------------------------------
namespace uniset
......@@ -56,8 +60,11 @@ namespace uniset
\section sec_LogDB_REST LogDB REST API
\todo Продумать поддержку websocket
*/
class LogDB
class LogDB:
public EventLoopServer
{
public:
LogDB( const std::string& name, const std::string& prefix = "" );
......@@ -78,6 +85,10 @@ namespace uniset
protected:
virtual void evfinish() override;
virtual void evprepare() override;
void onTimer( ev::timer& t, int revents );
std::string myname;
std::unique_ptr<SQLiteInterface> db;
......@@ -92,15 +103,43 @@ namespace uniset
std::shared_ptr<DebugStream> dblog;
struct Log
class Log
{
public:
std::string name;
std::string ip;
int port = { 0 };
std::string cmd;
std::shared_ptr<DebugStream> dblog;
bool connect() noexcept;
bool isConnected() const;
void ioprepare( ev::dynamic_loop& loop );
void read();
void write();
void event( ev::io& watcher, int revents );
typedef sigc::signal<void, Log*, const std::string&> ReadSignal;
ReadSignal signal_on_read();
private:
ReadSignal sigRead;
ev::io io;
std::shared_ptr<UTCPStream> tcp;
static const int bufsize = { 10001 };
char buf[bufsize];
static const size_t reservsize = { 1000 };
std::string text;
};
std::vector<Log> logservers;
void onRead(Log* log, const std::string& txt );
std::vector< std::shared_ptr<Log> > logservers;
ev::timer connectionTimer;
timeout_t tmConnection_msec = { 5000 }; // пауза между попытками установить соединение
double tmConnection_sec = { 0.0 };
private:
};
......
......@@ -2,4 +2,5 @@
ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-logdb --confile test.xml --logdb-name LogDB
#uniset2-start.sh -f
./uniset2-logdb --confile test.xml --logdb-name LogDB --logdb-log-add-levels any
......@@ -52,7 +52,7 @@ namespace uniset
bool isConnection() const;
void setReadCount( unsigned int n );
void setReadCount( size_t n );
void setCommandOnlyMode( bool s );
void setinTimeout( timeout_t msec );
......
......@@ -102,7 +102,7 @@ void LogReader::connect( const std::string& _addr, int _port, timeout_t msec )
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " error: " << e.what();
s << "(LogReader): connection " << iaddr << ":" << port << " error: " << e.what();
rlog.crit() << s.str() << std::endl;
}
}
......@@ -111,7 +111,7 @@ void LogReader::connect( const std::string& _addr, int _port, timeout_t msec )
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " error: " << e.what();
s << "(LogReader): connection " << iaddr << ":" << port << " error: " << e.what();
rlog.crit() << s.str() << std::endl;
}
}
......@@ -120,7 +120,7 @@ void LogReader::connect( const std::string& _addr, int _port, timeout_t msec )
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " error: catch ...";
s << "(LogReader): connection " << iaddr << ":" << port << " error: catch ...";
rlog.crit() << s.str() << std::endl;
}
}
......@@ -153,7 +153,7 @@ bool LogReader::isConnection() const
return (tcp && tcp->isConnected() );
}
// -------------------------------------------------------------------------
void LogReader::setReadCount(unsigned int n)
void LogReader::setReadCount( size_t n )
{
readcount = n;
}
......
......@@ -152,7 +152,9 @@ namespace uniset
}
catch( ... )
{
cerr << "(EventLoopServer::defaultLoop): UNKNOWN EXCEPTION.." << endl;
std::exception_ptr p = std::current_exception();
cerr << "(EventLoopServer::defaultLoop): error: "
<< (p ? p.__cxa_exception_type()->name() : "null") << endl;
}
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment