Commit 00eb5a61 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): предварительная реализация посылки комад (cmd)

parent bd6cde6f
...@@ -38,10 +38,10 @@ ...@@ -38,10 +38,10 @@
</UniSet> </UniSet>
<dlog name="dlog"/> <dlog name="dlog"/>
<LogServer name="smplus" port="3333" host="localhost" /> <LogServer name="smplus" port="3333" host="localhost" />
<LogDB name="LogDB"> <LogDB name="LogDB">
<logserver name="logserver1" ip="localhost" port="3333" cmd=""/> <logserver name="logserver1" ip="localhost" port="3333" cmd="-s level1"/>
<logserver name="logserver2" ip="localhost" port="3333" cmd=""/> <!-- <logserver name="logserver2" ip="localhost" port="3333" cmd=""/> -->
</LogDB> </LogDB>
<settings> <settings>
......
...@@ -274,6 +274,9 @@ bool LogDB::Log::connect() noexcept ...@@ -274,6 +274,9 @@ bool LogDB::Log::connect() noexcept
// dbinfo << name << "(connect): connect " << ip << ":" << port << "..." << endl; // dbinfo << name << "(connect): connect " << ip << ":" << port << "..." << endl;
if( peername.empty() )
peername = ip + ":" + std::to_string(port);
try try
{ {
tcp = make_shared<UTCPStream>(); tcp = make_shared<UTCPStream>();
...@@ -287,20 +290,20 @@ bool LogDB::Log::connect() noexcept ...@@ -287,20 +290,20 @@ bool LogDB::Log::connect() noexcept
} }
catch( const Poco::TimeoutException& e ) catch( const Poco::TimeoutException& e )
{ {
dbwarn << name << "(connect): connection " << ip << ":" << port << " timeout.." << endl; dbwarn << name << "(connect): connection " << peername << " timeout.." << endl;
} }
catch( const Poco::Net::NetException& e ) catch( const Poco::Net::NetException& e )
{ {
dbwarn << name << "(connect): connection " << ip << ":" << port << " error: " << e.what() << endl; dbwarn << name << "(connect): connection " << peername << " error: " << e.what() << endl;
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
dbwarn << name << "(connect): connection " << ip << ":" << port << " error: " << e.what() << endl; dbwarn << name << "(connect): connection " << peername << " error: " << e.what() << endl;
} }
catch( ... ) catch( ... )
{ {
std::exception_ptr p = std::current_exception(); std::exception_ptr p = std::current_exception();
dbwarn << name << "(connect): connection " << ip << ":" << port << " error: " dbwarn << name << "(connect): connection " << peername << " error: "
<< (p ? p.__cxa_exception_type()->name() : "null") << endl; << (p ? p.__cxa_exception_type()->name() : "null") << endl;
} }
...@@ -319,6 +322,18 @@ void LogDB::Log::ioprepare( ev::dynamic_loop& loop ) ...@@ -319,6 +322,18 @@ void LogDB::Log::ioprepare( ev::dynamic_loop& loop )
io.set<LogDB::Log, &LogDB::Log::event>(this); io.set<LogDB::Log, &LogDB::Log::event>(this);
io.start(tcp->getSocket(), ev::READ); io.start(tcp->getSocket(), ev::READ);
text.reserve(reservsize); text.reserve(reservsize);
// первый раз при подключении надо послать команды
//! \todo Пока закрываем глаза на не оптимальность, того, что парсим строку каждый раз
auto cmdlist = LogServerTypes::getCommands(cmd);
if( !cmdlist.empty() )
{
for( const auto& msg: cmdlist )
wbuf.emplace(new UTCPCore::Buffer((unsigned char*)&msg, sizeof(msg)));
io.set(ev::WRITE);
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::Log::event( ev::io& watcher, int revents ) void LogDB::Log::event( ev::io& watcher, int revents )
...@@ -330,12 +345,10 @@ void LogDB::Log::event( ev::io& watcher, int revents ) ...@@ -330,12 +345,10 @@ void LogDB::Log::event( ev::io& watcher, int revents )
} }
if( revents & EV_READ ) if( revents & EV_READ )
read(); read(watcher);
if( revents & EV_WRITE ) if( revents & EV_WRITE )
{ write(watcher);
dbinfo << name << "(event): ..write event.." << endl;
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
LogDB::Log::ReadSignal LogDB::Log::signal_on_read() LogDB::Log::ReadSignal LogDB::Log::signal_on_read()
...@@ -343,8 +356,11 @@ LogDB::Log::ReadSignal LogDB::Log::signal_on_read() ...@@ -343,8 +356,11 @@ LogDB::Log::ReadSignal LogDB::Log::signal_on_read()
return sigRead; return sigRead;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::Log::read() void LogDB::Log::read( ev::io& watcher )
{ {
if( !tcp )
return;
int n = tcp->available(); int n = tcp->available();
n = std::min(n,bufsize); n = std::min(n,bufsize);
...@@ -370,7 +386,6 @@ void LogDB::Log::read() ...@@ -370,7 +386,6 @@ void LogDB::Log::read()
else if( n == 0 ) else if( n == 0 )
{ {
dbinfo << name << ": " << ip << ":" << port << " connection is closed.." << endl; dbinfo << name << ": " << ip << ":" << port << " connection is closed.." << endl;
tcp->disconnect();
if( !text.empty() ) if( !text.empty() )
{ {
sigRead.emit(this,text); sigRead.emit(this,text);
...@@ -378,11 +393,57 @@ void LogDB::Log::read() ...@@ -378,11 +393,57 @@ void LogDB::Log::read()
if( text.capacity() < reservsize ) if( text.capacity() < reservsize )
text.reserve(reservsize); text.reserve(reservsize);
} }
close();
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::Log::write() void LogDB::Log::write( ev::io& io )
{ {
UTCPCore::Buffer* buffer = 0;
if( wbuf.empty() )
{
io.set(EV_READ);
return;
}
buffer = wbuf.front();
if( !buffer )
return;
ssize_t ret = ::write(io.fd, buffer->dpos(), buffer->nbytes());
if( ret < 0 )
{
dbwarn << peername << "(write): write to socket error(" << errno << "): " << strerror(errno) << endl;
if( errno == EPIPE || errno == EBADF )
{
dbwarn << peername << "(write): write error.. terminate session.." << endl;
io.set(EV_NONE);
close();
}
return;
}
buffer->pos += ret;
if( buffer->nbytes() == 0 )
{
wbuf.pop();
delete buffer;
}
if( wbuf.empty() )
io.set(EV_READ);
else
io.set(EV_WRITE);
}
// -----------------------------------------------------------------------------
void LogDB::Log::close()
{
tcp->disconnect();
//tcp = nullptr;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -33,6 +33,9 @@ ...@@ -33,6 +33,9 @@
#include "EventLoopServer.h" #include "EventLoopServer.h"
#include "UTCPStream.h" #include "UTCPStream.h"
#include "LogReader.h" #include "LogReader.h"
#include "UHttpRequestHandler.h"
#include "UHttpServer.h"
#include "UTCPCore.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
namespace uniset namespace uniset
{ {
...@@ -63,7 +66,9 @@ namespace uniset ...@@ -63,7 +66,9 @@ namespace uniset
\section sec_LogDB_REST LogDB REST API \section sec_LogDB_REST LogDB REST API
\todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку,... \todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку,...
\todo Реализовать посылку команд
\todo Добавить ротацию БД \todo Добавить ротацию БД
\todo REST API: продумать команды и реализовать
\todo Продумать поддержку websocket \todo Продумать поддержку websocket
*/ */
class LogDB: class LogDB:
...@@ -114,14 +119,17 @@ namespace uniset ...@@ -114,14 +119,17 @@ namespace uniset
std::string ip; std::string ip;
int port = { 0 }; int port = { 0 };
std::string cmd; std::string cmd;
std::string peername;
std::shared_ptr<DebugStream> dblog; std::shared_ptr<DebugStream> dblog;
bool connect() noexcept; bool connect() noexcept;
bool isConnected() const; bool isConnected() const;
void ioprepare( ev::dynamic_loop& loop ); void ioprepare( ev::dynamic_loop& loop );
void read();
void write();
void event( ev::io& watcher, int revents ); void event( ev::io& watcher, int revents );
void read( ev::io& watcher);
void write(ev::io& io );
void close();
typedef sigc::signal<void, Log*, const std::string&> ReadSignal; typedef sigc::signal<void, Log*, const std::string&> ReadSignal;
ReadSignal signal_on_read(); ReadSignal signal_on_read();
...@@ -135,6 +143,9 @@ namespace uniset ...@@ -135,6 +143,9 @@ namespace uniset
static const size_t reservsize = { 1000 }; static const size_t reservsize = { 1000 };
std::string text; std::string text;
// буфер для посылаемых данных (write buffer)
std::queue<UTCPCore::Buffer*> wbuf;
}; };
std::vector< std::shared_ptr<Log> > logservers; std::vector< std::shared_ptr<Log> > logservers;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <ostream> #include <ostream>
#include <cstring> #include <cstring>
#include <vector>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
namespace uniset namespace uniset
{ {
...@@ -56,6 +57,12 @@ namespace uniset ...@@ -56,6 +57,12 @@ namespace uniset
std::memset(logname, 0, sizeof(logname)); std::memset(logname, 0, sizeof(logname));
} }
explicit lsMessage( Command c, uint d, const std::string& logname ):
magic(MAGICNUM),cmd(c),data(d)
{
setLogName(logname);
}
uint magic; uint magic;
Command cmd; Command cmd;
uint data; uint data;
...@@ -70,7 +77,22 @@ namespace uniset ...@@ -70,7 +77,22 @@ namespace uniset
// char logfile[MAXLOGFILENAME]; // char logfile[MAXLOGFILENAME];
} __attribute__((packed)); } __attribute__((packed));
std::ostream& operator<<(std::ostream& os, lsMessage& m ); std::ostream& operator<<(std::ostream& os, const lsMessage& m );
/*! Разбор строки на команды:
*
* [-a | --add] info,warn,crit,... [logfilter] - Add log levels.
* [-d | --del] info,warn,crit,... [logfilter] - Delete log levels.
* [-s | --set] info,warn,crit,... [logfilter] - Set log levels.
* [-o | --off] [logfilter] - Off the write log file (if enabled).
* [-e | --on] [logfilter] - On(enable) the write log file (if before disabled).
* [-r | --rotate] [logfilter] - rotate log file.
* [-u | --save-loglevels] [logfilter] - save log levels (disable restore after disconnected).
* [-y | --restore-loglevels] [logfilter] - restore default log levels.
*
* 'logfilter' - regexp for name of log. Default: ALL logs
*/
std::vector<lsMessage> getCommands( const std::string& cmd );
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
} // end of uniset namespace } // end of uniset namespace
......
...@@ -14,7 +14,11 @@ ...@@ -14,7 +14,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <getopt.h> // не хорошо завязыватся на getopt.. но пока так удобнее
#include "UniSetTypes.h"
#include "LogServerTypes.h" #include "LogServerTypes.h"
#include "Debug.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
namespace uniset namespace uniset
{ {
...@@ -68,7 +72,7 @@ namespace uniset ...@@ -68,7 +72,7 @@ namespace uniset
return os; return os;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
std::ostream& LogServerTypes::operator<<(std::ostream& os, LogServerTypes::lsMessage& m ) std::ostream& LogServerTypes::operator<<(std::ostream& os, const LogServerTypes::lsMessage& m )
{ {
return os << " magic=" << m.magic << " cmd=" << m.cmd << " data=" << m.data; return os << " magic=" << m.magic << " cmd=" << m.cmd << " data=" << m.data;
} }
...@@ -80,4 +84,187 @@ namespace uniset ...@@ -80,4 +84,187 @@ namespace uniset
logname[s] = '\0'; logname[s] = '\0';
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
static struct option longopts[] =
{
{ "add", required_argument, 0, 'a' },
{ "del", required_argument, 0, 'd' },
{ "set", required_argument, 0, 's' },
{ "off", required_argument, 0, 'o' },
{ "on", required_argument, 0, 'e' },
{ "save-loglevels", required_argument, 0, 'u' },
{ "restore-loglevels", required_argument, 0, 'y' },
{ "rotate", optional_argument, 0, 'r' },
{ "logfilter", required_argument, 0, 'n' },
{ "timeout", required_argument, 0, 't' },
{ "reconnect-delay", required_argument, 0, 'x' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
static const char* checkArg( int i, int argc, const char* argv[] )
{
if( i < argc && (argv[i])[0] != '-' )
return argv[i];
return 0;
}
// --------------------------------------------------------------------------
std::vector<LogServerTypes::lsMessage> LogServerTypes::getCommands( const std::string& cmd )
{
// формируем argc, argv и проходим getopt-ом
// пока это самый простой способ..
auto v = uniset::explode_str(cmd,' ');
const size_t argc = v.size()+1;
const char** argv = new const char*[argc];
argv[0] = " ";
for( size_t i=1; i<argc; i++ )
argv[i] = v[i-1].c_str(); // use strdup?
int optindex = 0;
int opt = 0;
vector<lsMessage> vcmd;
while(1)
{
opt = getopt_long(argc, (char**)argv, "la:d:s:n:eorx:t:uby:", longopts, &optindex);
if( opt == -1 )
break;
switch (opt)
{
case 'a':
{
LogServerTypes::Command cmd = LogServerTypes::cmdAddLevel;
std::string filter("");
std::string d = string(optarg);
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, (int)Debug::value(d), filter);
}
break;
case 'd':
{
LogServerTypes::Command cmd = LogServerTypes::cmdDelLevel;
std::string filter("");
std::string d = string(optarg);
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, (int)Debug::value(d), filter );
}
break;
case 's':
{
LogServerTypes::Command cmd = LogServerTypes::cmdSetLevel;
std::string filter("");
std::string d = string(optarg);
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, (int)Debug::value(d), filter );
}
break;
case 'l':
{
std::string filter("");
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(LogServerTypes::cmdList, 0, filter);
}
break;
case 'o':
{
LogServerTypes::Command cmd = LogServerTypes::cmdOffLogFile;
std::string filter("");
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, 0, filter);
}
break;
case 'u': // --save-loglevels
{
LogServerTypes::Command cmd = LogServerTypes::cmdSaveLogLevel;
std::string filter("");
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, 0, filter);
}
break;
case 'y': // --restore-loglevels
{
LogServerTypes::Command cmd = LogServerTypes::cmdRestoreLogLevel;
std::string filter("");
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, 0, filter);
}
break;
case 'e':
{
LogServerTypes::Command cmd = LogServerTypes::cmdOnLogFile;
std::string filter("");
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, 0, filter);
}
break;
case 'r':
{
LogServerTypes::Command cmd = LogServerTypes::cmdRotate;
std::string filter("");
const char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.emplace_back(cmd, 0, filter);
}
break;
case '?':
default:
break;
}
}
delete[] argv;
return vcmd;
}
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
} // end of namespace uniset } // end of namespace uniset
...@@ -396,7 +396,18 @@ namespace uniset ...@@ -396,7 +396,18 @@ namespace uniset
if( ret != sizeof(msg) || msg.magic != LogServerTypes::MAGICNUM ) if( ret != sizeof(msg) || msg.magic != LogServerTypes::MAGICNUM )
{ {
if( mylog.is_warn() ) if( mylog.is_warn() )
mylog.warn() << peername << "(LogSession::readEvent): BAD MESSAGE..." << endl; {
ostringstream err;
err << peername << "(LogSession::readEvent): MESSAGE ERROR: ";
if( msg.magic != LogServerTypes::MAGICNUM )
err << "BAD MAGICNUM";
if( ret != sizeof(msg) )
err << "BAD soze of message (" << ret << ")";
mylog.warn() << err.str() << endl;
}
return; return;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment