Commit dcc56de5 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): реализовал поддержку обработки нескольких команд.

(LogReader): рефакторинг
parent 1f406d70
// --------------------------------------------------------------------------
#include <string>
#include <vector>
#include <getopt.h>
#include "Debug.h"
#include "UniSetTypes.h"
......@@ -25,7 +26,7 @@ static struct option longopts[] =
{ "list", optional_argument, 0, 'l' },
{ "rotate", optional_argument, 0, 'r' },
{ "logfilter", required_argument, 0, 'n' },
{ "command-only", no_argument, 0, 'b' },
{ "command-only", no_argument, 0, 'c' },
{ "timeout", required_argument, 0, 'w' },
{ "reconnect-delay", required_argument, 0, 'x' },
{ NULL, 0, 0, 0 }
......@@ -37,7 +38,7 @@ static void print_help()
printf("-v, --verbose - Print all messages to stdout\n");
printf("[-i|--iaddr] addr - LogServer ip or hostname.\n");
printf("[-p|--port] port - LogServer port.\n");
printf("[-b|--command-only] - Send command and break. (No read logs).\n");
printf("[-c|--command-only] - Send command and break. (No read logs).\n");
printf("[-w|--timeout] msec - Timeout for wait data. Default: 0 - endless waiting\n");
printf("[-x|--reconnect-delay] msec - Pause for repeat connect to LogServer. Default: 5000 msec.\n");
......@@ -58,6 +59,7 @@ static void print_help()
// --------------------------------------------------------------------------
static char* checkArg( int i, int argc, char* argv[] );
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
int main( int argc, char** argv )
{
// std::ios::sync_with_stdio(false); // нельзя отключать.. тогда "обмен с сервером" рассинхронизируется
......@@ -68,11 +70,10 @@ int main( int argc, char** argv )
string addr("localhost");
int port = 3333;
DebugStream dlog;
int cmd = LogServerTypes::cmdNOP;
int data = 0;
string sdata("");
int cmdonly = 0;
vector<LogReader::Command> vcmd;
string logfilter("");
LogServerTypes::Command cmd = LogServerTypes::cmdNOP;
int cmdonly = 0;
timeout_t tout = 0;
timeout_t rdelay = 5000;
......@@ -88,96 +89,109 @@ int main( int argc, char** argv )
case 'a':
{
cmd = LogServerTypes::cmdAddLevel;
sdata = string(optarg);
LogServerTypes::Command cmd = LogServerTypes::cmdAddLevel;
std::string filter("");
std::string d = string(optarg);
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
vcmd.push_back( LogReader::Command(cmd, (int)Debug::value(d), filter) );
}
break;
case 'd':
{
cmd = LogServerTypes::cmdDelLevel;
sdata = string(optarg);
LogServerTypes::Command cmd = LogServerTypes::cmdDelLevel;
std::string filter("");
std::string d = string(optarg);
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
vcmd.push_back( LogReader::Command(cmd, (int)Debug::value(d), filter) );
}
break;
case 's':
{
cmd = LogServerTypes::cmdSetLevel;
sdata = string(optarg);
LogServerTypes::Command cmd = LogServerTypes::cmdSetLevel;
std::string filter("");
std::string d = string(optarg);
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
vcmd.push_back( LogReader::Command(cmd, (int)Debug::value(d), filter) );
}
break;
case 'l':
{
cmd = LogServerTypes::cmdList;
cmdonly = 1;
cmd = LogServerTypes::cmdList;
std::string filter("");
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
logfilter = filter;
}
break;
case 'o':
{
cmd = LogServerTypes::cmdOffLogFile;
LogServerTypes::Command cmd = LogServerTypes::cmdOffLogFile;
std::string filter("");
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
vcmd.push_back( LogReader::Command(cmd, 0, filter) );
}
break;
case 'f':
case 'e':
{
cmd = LogServerTypes::cmdFilterMode;
logfilter = string(optarg);
LogServerTypes::Command cmd = LogServerTypes::cmdOnLogFile;
std::string filter("");
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
filter = string(arg2);
vcmd.push_back( LogReader::Command(cmd, 0, filter) );
}
break;
case 'e':
case 'f':
{
cmd = LogServerTypes::cmdOnLogFile;
cmd = LogServerTypes::cmdFilterMode;
std::string filter("");
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
logfilter = filter;
}
break;
case 'r':
{
cmd = LogServerTypes::cmdRotate;
LogServerTypes::Command cmd = LogServerTypes::cmdRotate;
std::string filter("");
char* arg2 = checkArg(optind, argc, argv);
if( arg2 )
logfilter = string(arg2);
else
logfilter = "";
filter = string(arg2);
vcmd.push_back( LogReader::Command(cmd, 0, filter) );
}
break;
......@@ -185,7 +199,7 @@ int main( int argc, char** argv )
addr = string(optarg);
break;
case 'b':
case 'c':
cmdonly = 1;
break;
......@@ -223,16 +237,12 @@ int main( int argc, char** argv )
lr.setCommandOnlyMode(cmdonly);
lr.setinTimeout(tout);
lr.setReconnectDelay(rdelay);
if( !vcmd.empty() )
lr.sendCommand(addr, port, vcmd, cmdonly, verb);
if( !sdata.empty() )
{
data = (int)Debug::value(sdata);
if( verb )
cout << "SEND COMMAND: '" << (LogServerTypes::Command)cmd << " data='" << sdata << "'(" << (int)data << ")" << endl;
}
lr.readlogs( addr, port, (LogServerTypes::Command)cmd, data, logfilter, verb );
if( !cmdonly )
lr.readlogs( addr, port, cmd, logfilter, verb );
}
catch( const SystemError& err )
{
......
......@@ -3,6 +3,7 @@
// -------------------------------------------------------------------------
#include <string>
#include <queue>
#include <vector>
#include <cc++/socket.h>
#include "UTCPStream.h"
#include "DebugStream.h"
......@@ -15,13 +16,20 @@ class LogReader
LogReader();
~LogReader();
void readlogs( const std::string& addr, ost::tpport_t port,
LogServerTypes::Command c = LogServerTypes::cmdNOP,
int data = 0,
const std::string& logname = "",
bool verbose = false );
struct Command
{
Command( LogServerTypes::Command c, unsigned int d, const std::string& f = "" ): cmd(c), data(d), logfilter(f) {}
LogServerTypes::Command cmd = { LogServerTypes::cmdNOP };
unsigned int data = {0};
std::string logfilter = { "" };
};
void sendCommand( const std::string& addr, ost::tpport_t port,
std::vector<Command>& vcmd, bool cmd_only = true,
bool verbose = false );
void readlogs( const std::string& addr, ost::tpport_t port, LogServerTypes::lsMessage& m, bool verbose = false );
void readlogs( const std::string& addr, ost::tpport_t port, LogServerTypes::Command c = LogServerTypes::cmdNOP, const std::string logfilter = "", bool verbose = false );
bool isConnection();
......@@ -58,6 +66,7 @@ class LogReader
void connect( ost::InetAddress addr, ost::tpport_t port, timeout_t tout = TIMEOUT_INF );
void disconnect();
void logOnEvent( const std::string& s );
void sendCommand(LogServerTypes::lsMessage& msg, bool verbose = false );
timeout_t inTimeout;
timeout_t outTimeout;
......
......@@ -17,6 +17,8 @@ namespace LogServerTypes
cmdRotate, /*!< пересоздать файл с логами */
cmdOffLogFile, /*!< отключить запись файла логов (если включена) */
cmdOnLogFile, /*!< включить запись файла логов (если была отключена) */
// команды требующий ответа..
cmdList, /*!< вывести список контролируемых логов */
cmdFilterMode /*!< включить режим работы "фильтр" - вывод только от интересующих логов, заданных в lognmae (regexp) */
// cmdSetLogFile
......
......@@ -119,36 +119,154 @@ bool LogReader::isConnection()
return tcp && tcp->isConnected();
}
// -------------------------------------------------------------------------
void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServerTypes::Command cmd, int data, const std::string& logname, bool verbose )
void LogReader::sendCommand( const std::string& _addr, ost::tpport_t _port, std::vector<Command>& vcmd, bool cmd_only, bool verbose )
{
if( vcmd.empty() )
return;
char buf[100001];
if( verbose )
rlog.addLevel(Debug::ANY);
if( outTimeout == 0 )
outTimeout = TIMEOUT_INF;
for( const auto& c : vcmd )
{
if( c.cmd == LogServerTypes::cmdNOP )
continue;
if( c.cmd == LogServerTypes::cmdFilterMode )
{
if( verbose )
cerr << "WARNING: sendCommand() ignore '" << c.cmd << "'..." << endl;
continue;
}
LogServerTypes::lsMessage msg;
msg.cmd = c.cmd;
msg.data = c.data;
msg.setLogName(c.logfilter);
unsigned int n = 2; // две попытки на посылку
while( n > 0 )
{
try
{
if( !isConnection() )
connect(_addr, _port, reconDelay);
if( !isConnection() )
{
rlog.warn() << "(LogReader): **** connection timeout.." << endl;
if( cmdonly )
return;
n--;
if( n == 0 )
break;
msleep(reconDelay);
continue;
}
sendCommand(msg, verbose);
break;
}
catch( const ost::SockException& e )
{
cerr << "(LogReader): " << e.getString() << " (" << _addr << ")" << endl;
}
catch( const std::exception& ex )
{
cerr << "(LogReader): " << ex.what() << endl;
}
n--;
}
} // end for send all command
// после команд.. выводим список текущий..
timeout_t reply_timeout = 4000; // TIMEOUT_INF;
LogServerTypes::lsMessage msg;
msg.cmd = cmd;
msg.data = data;
msg.setLogName(logname);
readlogs(_addr, _port, msg, verbose );
msg.cmd = LogServerTypes::cmdList;
msg.data = 0;
msg.setLogName("ALL");
sendCommand(msg, verbose);
// теперь ждём ответ..
try
{
int a = 3;
while( a > 0 && tcp->isPending(ost::Socket::pendingInput, reply_timeout) )
{
int n = 0;
do
{
n = tcp->peek( buf, sizeof(buf) - 1 );
if( n > 0 )
{
tcp->read(buf, n);
buf[n] = '\0';
log << buf;
a--;
}
}
while( n > 0 );
}
// rlog.warn() << "(LogReader): ...wait reply timeout..." << endl;
}
catch( const ost::SockException& e )
{
cerr << "(LogReader): " << e.getString() << " (" << _addr << ")" << endl;
}
catch( const std::exception& ex )
{
cerr << "(LogReader): " << ex.what() << endl;
}
if( cmdonly && isConnection() )
disconnect();
}
// -------------------------------------------------------------------------
void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServerTypes::lsMessage& msg, bool verbose )
void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServerTypes::Command cmd, const std::string logfilter, bool verbose )
{
char buf[100001];
if( verbose )
rlog.addLevel(Debug::ANY);
bool send_ok = false;
if( inTimeout == 0 )
inTimeout = TIMEOUT_INF;
if( outTimeout == 0 )
outTimeout = TIMEOUT_INF;
unsigned int n = 1;
unsigned int rcount = 1;
if( readcount > 0 )
n = readcount;
rcount = readcount;
LogServerTypes::lsMessage msg;
msg.cmd = cmd;
msg.data = 0;
msg.setLogName(logfilter);
while( n > 0 )
bool send_ok = cmd == LogServerTypes::cmdNOP ? true : false;
while( rcount > 0 )
{
try
{
......@@ -159,61 +277,45 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
{
rlog.warn() << "(LogReader): **** connection timeout.." << endl;
if( cmdonly )
return;
if( readcount > 0 )
n--;
if( rcount>0 && readcount > 0 )
rcount--;
if( n < 0 )
if( rcount == 0 )
break;
msleep(reconDelay);
continue;
}
if( !send_ok && msg.cmd != LogServerTypes::cmdNOP )
if( !send_ok )
{
if( tcp->isPending(ost::Socket::pendingOutput, outTimeout) )
{
rlog.info() << "(LogReader): ** send command: cmd='" << msg.cmd << "' logname='" << msg.logname << "' data='" << msg.data << "'" << endl;
for( size_t i = 0; i < sizeof(msg); i++ )
(*tcp) << ((unsigned char*)(&msg))[i];
tcp->sync();
send_ok = true;
}
else
rlog.warn() << "(LogReader): **** SEND COMMAND ('" << msg.cmd << "' FAILED!" << endl;
if( cmdonly && msg.cmd != LogServerTypes::cmdList )
{
disconnect();
return;
}
sendCommand(msg, verbose);
send_ok = true;
}
while( (!cmdonly || msg.cmd == LogServerTypes::cmdList) && tcp->isPending(ost::Socket::pendingInput, inTimeout) )
while( tcp->isPending(ost::Socket::pendingInput, inTimeout) )
{
int n = tcp->peek( buf, sizeof(buf) - 1 );
if( n > 0 )
{
tcp->read(buf, n);
buf[n] = '\0';
log << buf;
// if( msg.cmd == LogServerTypes::cmdList )
// break;
}
else
if( rcount>0 && readcount > 0 )
rcount--;
if( readcount>0 && rcount==0 )
break;
}
rlog.warn() << "(LogReader): ...connection timeout..." << endl;
send_ok = false; // ??!! делать ли?
if( rcount>0 && readcount > 0 )
rcount--;
if( rcount != 0 )
rlog.warn() << "(LogReader): ...connection timeout..." << endl;
disconnect();
}
catch( const ost::SockException& e )
......@@ -225,11 +327,8 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
cerr << "(LogReader): " << ex.what() << endl;
}
if( cmdonly )
if( rcount==0 && readcount > 0 )
break;
if( readcount > 0 )
n--;
}
if( isConnection() )
......@@ -241,3 +340,29 @@ void LogReader::logOnEvent( const std::string& s )
m_logsig.emit(s);
}
// -------------------------------------------------------------------------
void LogReader::sendCommand(LogServerTypes::lsMessage& msg, bool verbose )
{
try
{
if( tcp->isPending(ost::Socket::pendingOutput, outTimeout) )
{
rlog.info() << "(LogReader): ** send command: cmd='" << msg.cmd << "' logname='" << msg.logname << "' data='" << msg.data << "'" << endl;
for( size_t i = 0; i < sizeof(msg); i++ )
(*tcp) << ((unsigned char*)(&msg))[i];
tcp->sync();
}
else
rlog.warn() << "(LogReader): **** SEND COMMAND ('" << msg.cmd << "' FAILED!" << endl;
}
catch( const ost::SockException& e )
{
cerr << "(LogReader): " << e.getString() << endl; // " (" << _addr << ")" << endl;
}
catch( const std::exception& ex )
{
cerr << "(LogReader): " << ex.what() << endl;
}
}
// -------------------------------------------------------------------------
......@@ -31,6 +31,9 @@ std::ostream& LogServerTypes::operator<<(std::ostream& os, LogServerTypes::Comma
case LogServerTypes::cmdFilterMode:
return os << "cmdFilterMode";
case LogServerTypes::cmdNOP:
return os << "No command(NOP)";
default:
return os << "Unknown";
}
......
......@@ -95,7 +95,7 @@ void LogSession::run()
setKeepAlive(true);
// Команды могут посылаться только в начале сессии..
if( isPending(Socket::pendingInput, cmdTimeout) )
while( isPending(Socket::pendingInput, cmdTimeout) )
{
LogServerTypes::lsMessage msg;
......@@ -105,137 +105,142 @@ void LogSession::run()
ssize_t ret = readData( &msg, sizeof(msg) );
if( ret != sizeof(msg) || msg.magic != LogServerTypes::MAGICNUM )
slog.warn() << peername << "(run): BAD MESSAGE..." << endl;
else
{
slog.info() << peername << "(run): receive command: '" << msg.cmd << "'" << endl;
string cmdLogName(msg.logname);
slog.warn() << peername << "(run): BAD MESSAGE..." << endl;
continue;
}
std::list<LogAgregator::iLog> loglist;
slog.info() << peername << "(run): receive command: '" << msg.cmd << "'" << endl;
string cmdLogName(msg.logname);
if( alog ) // если у нас "агрегатор", то работаем с его списком потоков
{
if( cmdLogName.empty() || cmdLogName == "ALL" )
loglist = alog->getLogList();
else
loglist = alog->getLogList(cmdLogName);
}
else
{
if( cmdLogName.empty() || cmdLogName == "ALL" || log->getLogFile() == cmdLogName )
{
LogAgregator::iLog llog(log, log->getLogName());
loglist.push_back(llog);
}
}
std::list<LogAgregator::iLog> loglist;
// если команда "вывести список"
// выводим и завершаем работу
if( msg.cmd == LogServerTypes::cmdList )
if( alog ) // если у нас "агрегатор", то работаем с его списком потоков
{
if( cmdLogName.empty() || cmdLogName == "ALL" )
loglist = alog->getLogList();
else
loglist = alog->getLogList(cmdLogName);
}
else
{
if( cmdLogName.empty() || cmdLogName == "ALL" || log->getLogFile() == cmdLogName )
{
ostringstream s;
s << "List of managed logs(filter='" << cmdLogName << "'):" << endl;
s << "=====================" << endl;
LogAgregator::printLogList(s,loglist);
s << "=====================" << endl << endl;
if( isPending(Socket::pendingOutput, cmdTimeout) )
{
*tcp() << s.str();
tcp()->sync();
}
// вывели список и завершили работу..
cancelled = true;
disconnect();
return;
LogAgregator::iLog llog(log, log->getLogName());
loglist.push_back(llog);
}
}
if( msg.cmd == LogServerTypes::cmdFilterMode )
{
// отлючаем старый обработчик
if( conn )
conn.disconnect();
}
if( msg.cmd == LogServerTypes::cmdFilterMode )
{
// отлючаем старый обработчик
if( conn )
conn.disconnect();
}
// обрабатываем команды только если нашли подходящие логи
for( auto && l : loglist )
// обрабатываем команды только если нашли подходящие логи
for( auto && l : loglist )
{
// Обработка команд..
// \warning Работа с логом ведётся без mutex-а, хотя он разделяется отдельными потоками
switch( msg.cmd )
{
// Обработка команд..
// \warning Работа с логом ведётся без mutex-а, хотя он разделяется отдельными потоками
switch( msg.cmd )
{
case LogServerTypes::cmdSetLevel:
l.log->level( (Debug::type)msg.data );
break;
case LogServerTypes::cmdSetLevel:
l.log->level( (Debug::type)msg.data );
break;
case LogServerTypes::cmdAddLevel:
l.log->addLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdAddLevel:
l.log->addLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdDelLevel:
l.log->delLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdDelLevel:
l.log->delLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdRotate:
l.log->onLogFile(true);
break;
case LogServerTypes::cmdRotate:
l.log->onLogFile(true);
break;
case LogServerTypes::cmdList: // обработали выше (в начале)
break;
case LogServerTypes::cmdList: // обработали выше (в начале)
break;
case LogServerTypes::cmdOffLogFile:
l.log->offLogFile();
break;
case LogServerTypes::cmdOffLogFile:
l.log->offLogFile();
break;
case LogServerTypes::cmdOnLogFile:
l.log->onLogFile();
break;
case LogServerTypes::cmdOnLogFile:
l.log->onLogFile();
break;
case LogServerTypes::cmdFilterMode:
l.log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
break;
case LogServerTypes::cmdFilterMode:
l.log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
break;
default:
slog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break;
}
default:
slog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break;
}
} // end if for
// Выводим итоговый получившийся список (с учётом выполненных команд)
// если команда "вывести список"
// выводим и завершаем работу
if( msg.cmd == LogServerTypes::cmdList )
{
ostringstream s;
if( msg.cmd == LogServerTypes::cmdFilterMode )
{
s << "List of managed logs(filter='" << cmdLogName << "'):" << endl;
s << "=====================" << endl;
LogAgregator::printLogList(s,loglist);
s << "=====================" << endl << endl;
}
else
{
s << "List of managed logs:" << endl;
s << "=====================" << endl;
// выводим полный список
if( alog )
{
auto lst = alog->getLogList();
LogAgregator::printLogList(s,lst);
}
else
s << log->getLogName() << " [" << Debug::str(log->level()) << " ]" << endl;
s << "=====================" << endl << endl;
}
s << "List of managed logs(filter='" << cmdLogName << "'):" << endl;
s << "=====================" << endl;
LogAgregator::printLogList(s, loglist);
s << "=====================" << endl << endl;
if( isPending(Socket::pendingOutput, cmdTimeout) )
{
*tcp() << s.str();
tcp()->sync();
}
// вывели список и завершили работу..
// cancelled = true;
// disconnect();
// return;
}
}
} // end of while pending input (cmd processing)..
#if 0
// Выводим итоговый получившийся список (с учётом выполненных команд)
ostringstream s;
if( msg.cmd == LogServerTypes::cmdFilterMode )
{
s << "List of managed logs(filter='" << cmdLogName << "'):" << endl;
s << "=====================" << endl;
LogAgregator::printLogList(s, loglist);
s << "=====================" << endl << endl;
}
else
{
s << "List of managed logs:" << endl;
s << "=====================" << endl;
// выводим полный список
if( alog )
{
auto lst = alog->getLogList();
LogAgregator::printLogList(s, lst);
}
else
s << log->getLogName() << " [" << Debug::str(log->level()) << " ]" << endl;
s << "=====================" << endl << endl;
}
if( isPending(Socket::pendingOutput, cmdTimeout) )
{
*tcp() << s.str();
tcp()->sync();
}
#endif
cancelled = false;
......
......@@ -94,18 +94,16 @@ TEST_CASE("LogAgregator", "[LogServer][LogAgregator]" )
log2->any() << test_msg2;
REQUIRE( la_msg.str() == test_msg2 );
#if 0
auto lst = la->getLogList();
REQUIRE( lst.size() == 2 );
// Проверка поиска по регулярным выражениям
auto lst2 = la->getLogList("lo.*");
auto lst2 = la->getLogList("/lo.*");
REQUIRE( lst2.size() == 2 );
auto lst3 = la->getLogList("log\\d{1}");
auto lst3 = la->getLogList("/log\\d{1}");
REQUIRE( lst3.size() == 2 );
#endif
}
// --------------------------------------------------------------------------
TEST_CASE("LogServer", "[LogServer]" )
......@@ -170,7 +168,7 @@ TEST_CASE("LogServer", "[LogServer]" )
}
g_read_cancel = true;
msleep(readTimeout);
if( r_thr->joinable() )
r_thr->join();
}
......@@ -236,6 +234,7 @@ TEST_CASE("MaxSessions", "[LogServer]" )
}
g_read_cancel = true;
msleep(readTimeout);
if( r1_thr->joinable() )
r1_thr->join();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment