Commit 1c0113f4 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): реализовал удалённое чтение логов (сервер и клиент).

parent f7613d10
...@@ -20,7 +20,6 @@ static struct option longopts[] = { ...@@ -20,7 +20,6 @@ static struct option longopts[] = {
static void print_help() static void print_help()
{ {
printf("-h|--help - this message\n"); printf("-h|--help - this message\n");
// printf("[-t|--timeout] msec - Timeout. Default: 2000.\n");
printf("[-v|--verbose] - Print all messages to stdout\n"); printf("[-v|--verbose] - Print all messages to stdout\n");
printf("[-a|--iaddr] addr - Inet address for listen connections.\n"); printf("[-a|--iaddr] addr - Inet address for listen connections.\n");
printf("[-p|--port] port - Bind port.\n"); printf("[-p|--port] port - Bind port.\n");
...@@ -33,7 +32,6 @@ int main( int argc, char **argv ) ...@@ -33,7 +32,6 @@ int main( int argc, char **argv )
int verb = 0; int verb = 0;
string addr("localhost"); string addr("localhost");
int port = 3333; int port = 3333;
int tout = 2000;
DebugStream dlog; DebugStream dlog;
try try
...@@ -67,15 +65,13 @@ int main( int argc, char **argv ) ...@@ -67,15 +65,13 @@ int main( int argc, char **argv )
if( verb ) if( verb )
{ {
cout << "(init): read from " << addr << ":" << port cout << "(init): read from " << addr << ":" << port << endl;
// << " timeout=" << tout << " msec "
<< endl;
dlog.addLevel( Debug::type(Debug::CRIT | Debug::WARN | Debug::INFO) ); dlog.addLevel( Debug::type(Debug::CRIT | Debug::WARN | Debug::INFO) );
} }
LogReader lr; LogReader lr;
lr.readlogs( addr, port, TIMEOUT_INF ); lr.readlogs( addr, port, verb );
} }
catch( SystemError& err ) catch( SystemError& err )
{ {
......
...@@ -14,6 +14,7 @@ static struct option longopts[] = { ...@@ -14,6 +14,7 @@ static struct option longopts[] = {
{ "iaddr", required_argument, 0, 'a' }, { "iaddr", required_argument, 0, 'a' },
{ "port", required_argument, 0, 'p' }, { "port", required_argument, 0, 'p' },
{ "verbose", no_argument, 0, 'v' }, { "verbose", no_argument, 0, 'v' },
{ "delay", required_argument, 0, 'd' },
{ NULL, 0, 0, 0 } { NULL, 0, 0, 0 }
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -24,6 +25,7 @@ static void print_help() ...@@ -24,6 +25,7 @@ static void print_help()
printf("[-v|--verbose] - Print all messages to stdout\n"); printf("[-v|--verbose] - Print all messages to stdout\n");
printf("[-a|--iaddr] addr - Inet address for listen connections.\n"); printf("[-a|--iaddr] addr - Inet address for listen connections.\n");
printf("[-p|--port] port - Bind port.\n"); printf("[-p|--port] port - Bind port.\n");
printf("[-d|--delay] msec - Delay for generate message. Default 5000.\n");
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
int main( int argc, char **argv ) int main( int argc, char **argv )
...@@ -35,10 +37,11 @@ int main( int argc, char **argv ) ...@@ -35,10 +37,11 @@ int main( int argc, char **argv )
int port = 3333; int port = 3333;
int tout = 2000; int tout = 2000;
DebugStream dlog; DebugStream dlog;
timeout_t delay = 5000;
try try
{ {
while( (opt = getopt_long(argc, argv, "hva:p:",longopts,&optindex)) != -1 ) while( (opt = getopt_long(argc, argv, "hva:p:d:",longopts,&optindex)) != -1 )
{ {
switch (opt) switch (opt)
{ {
...@@ -54,6 +57,10 @@ int main( int argc, char **argv ) ...@@ -54,6 +57,10 @@ int main( int argc, char **argv )
port = uni_atoi(optarg); port = uni_atoi(optarg);
break; break;
case 'd':
delay = uni_atoi(optarg);
break;
case 'v': case 'v':
verb = 1; verb = 1;
break; break;
...@@ -74,8 +81,20 @@ int main( int argc, char **argv ) ...@@ -74,8 +81,20 @@ int main( int argc, char **argv )
dlog.addLevel( Debug::type(Debug::CRIT | Debug::WARN | Debug::INFO) ); dlog.addLevel( Debug::type(Debug::CRIT | Debug::WARN | Debug::INFO) );
} }
LogServer ls; LogServer ls(dlog);
ls.run( addr, port, TIMEOUT_INF, false ); dlog.addLevel(Debug::ANY);
ls.run( addr, port, true );
unsigned int i=0;
while( true )
{
dlog << "[" << ++i << "] Test message for log" << endl;
dlog.info() << ": INFO message" << endl;
dlog.warn() << ": WARN message" << endl;
dlog.crit() << ": CRIT message" << endl;
msleep(delay);
}
} }
catch( SystemError& err ) catch( SystemError& err )
{ {
......
...@@ -14,7 +14,7 @@ class LogReader ...@@ -14,7 +14,7 @@ class LogReader
LogReader(); LogReader();
~LogReader(); ~LogReader();
void readlogs( const std::string& addr, ost::tpport_t port, timeout_t tout=TIMEOUT_INF ); void readlogs( const std::string& addr, ost::tpport_t port, bool verbose = false );
bool isConnection(); bool isConnection();
......
...@@ -14,12 +14,13 @@ class LogServer ...@@ -14,12 +14,13 @@ class LogServer
{ {
public: public:
LogServer(); LogServer( DebugStream& log );
~LogServer(); ~LogServer();
void run( const std::string& addr, ost::tpport_t port, timeout_t msec=60000, bool thread=true ); void run( const std::string& addr, ost::tpport_t port, bool thread=true );
protected: protected:
LogServer();
void work(); void work();
void sessionFinished( LogSession* s ); void sessionFinished( LogSession* s );
...@@ -36,6 +37,7 @@ class LogServer ...@@ -36,6 +37,7 @@ class LogServer
ThreadCreator<LogServer>* thr; ThreadCreator<LogServer>* thr;
ost::TCPSocket* tcp; ost::TCPSocket* tcp;
DebugStream* elog;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // LogServer_H_ #endif // LogServer_H_
......
...@@ -7,13 +7,14 @@ ...@@ -7,13 +7,14 @@
#include <cc++/socket.h> #include <cc++/socket.h>
#include "Mutex.h" #include "Mutex.h"
#include "DebugStream.h" #include "DebugStream.h"
#include "PassiveTimer.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class LogSession: class LogSession:
public ost::TCPSession public ost::TCPSession
{ {
public: public:
LogSession( ost::TCPSocket &server, timeout_t timeout ); LogSession( ost::TCPSocket &server, DebugStream* log, timeout_t timeout );
virtual ~LogSession(); virtual ~LogSession();
typedef sigc::slot<void, LogSession*> FinalSlot; typedef sigc::slot<void, LogSession*> FinalSlot;
...@@ -24,6 +25,7 @@ class LogSession: ...@@ -24,6 +25,7 @@ class LogSession:
protected: protected:
virtual void run(); virtual void run();
virtual void final(); virtual void final();
void logOnEvent( const std::string& s );
private: private:
typedef std::deque<std::string> LogBuffer; typedef std::deque<std::string> LogBuffer;
...@@ -32,12 +34,14 @@ class LogSession: ...@@ -32,12 +34,14 @@ class LogSession:
std::string caddr; std::string caddr;
timeout_t timeout; timeout_t timeout;
PassiveTimer ptSessionTimeout;
FinalSlot slFin; FinalSlot slFin;
std::atomic_bool cancelled; std::atomic_bool cancelled;
UniSetTypes::uniset_rwmutex mLBuf; UniSetTypes::uniset_rwmutex mLBuf;
DebugStream slog; DebugStream slog;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // LogSession_H_ #endif // LogSession_H_
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <sstream> #include <sstream>
#include "Exceptions.h" #include "Exceptions.h"
#include "LogReader.h" #include "LogReader.h"
#include "UniSetTypes.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
...@@ -18,7 +19,10 @@ iaddr("") ...@@ -18,7 +19,10 @@ iaddr("")
LogReader::~LogReader() LogReader::~LogReader()
{ {
if( isConnection() ) if( isConnection() )
{
(*tcp) << endl;
disconnect(); disconnect();
}
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogReader::connect( const std::string& addr, ost::tpport_t _port, timeout_t msec ) void LogReader::connect( const std::string& addr, ost::tpport_t _port, timeout_t msec )
...@@ -31,6 +35,7 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m ...@@ -31,6 +35,7 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m
{ {
if( tcp ) if( tcp )
{ {
(*tcp) << endl;
disconnect(); disconnect();
delete tcp; delete tcp;
tcp = 0; tcp = 0;
...@@ -53,6 +58,7 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m ...@@ -53,6 +58,7 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m
tcp = new UTCPStream(); tcp = new UTCPStream();
tcp->create(iaddr,port,true,500); tcp->create(iaddr,port,true,500);
tcp->setTimeout(msec); tcp->setTimeout(msec);
tcp->setKeepAlive(true);
} }
catch( std::exception& e ) catch( std::exception& e )
{ {
...@@ -62,6 +68,9 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m ...@@ -62,6 +68,9 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m
s << "(LogReader): connection " << s.str() << " error: " << e.what(); s << "(LogReader): connection " << s.str() << " error: " << e.what();
rlog.crit() << s.str() << std::endl; rlog.crit() << s.str() << std::endl;
} }
delete tcp;
tcp = 0;
} }
catch( ... ) catch( ... )
{ {
...@@ -77,12 +86,12 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m ...@@ -77,12 +86,12 @@ void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t m
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogReader::disconnect() void LogReader::disconnect()
{ {
if( rlog.is_info() )
rlog.info() << iaddr << "(LogReader): disconnect." << endl;
if( !tcp ) if( !tcp )
return; return;
if( rlog.is_info() )
rlog.info() << iaddr << "(LogReader): disconnect." << endl;
tcp->disconnect(); tcp->disconnect();
delete tcp; delete tcp;
tcp = 0; tcp = 0;
...@@ -93,32 +102,48 @@ bool LogReader::isConnection() ...@@ -93,32 +102,48 @@ bool LogReader::isConnection()
return tcp && tcp->isConnected(); return tcp && tcp->isConnected();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, timeout_t msec ) void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, bool verbose )
{ {
timeout_t inTimeout = 10000;
timeout_t reconDelay = 5000;
char buf[100001];
if( verbose )
rlog.addLevel(Debug::ANY);
while( true )
{
if( !isConnection() ) if( !isConnection() )
connect(_addr,_port,msec); connect(_addr,_port,reconDelay);
if( !isConnection() ) if( !isConnection() )
throw TimeOut(); {
rlog.warn() << "**** connection timeout.." << endl;
msleep(reconDelay);
continue;
}
char buf[1000]; while( tcp->isPending(ost::Socket::pendingInput,inTimeout) )
while( tcp->isPending(ost::Socket::pendingInput,msec) )
{ {
int n = tcp->peek( buf,sizeof(buf) ); int n = tcp->peek( buf,sizeof(buf)-1 );
if( n > 0 ) if( n > 0 )
{ {
tcp->read(buf,n); tcp->read(buf,n);
buf[n] = '\0';
cout << buf; cout << buf;
} }
#if 0 else
if( tcp->gcount() > 0 )
break; break;
}
int n = tcp->peek( buf,sizeof(buf) ); rlog.warn() << "...connection timeout..." << endl;
tcp->read(buf,sizeof(buf)); disconnect();
for( int i=0; i<n; i++ ) }
cout << buf[i];
#endif if( isConnection() )
{
(*tcp) << endl;
disconnect();
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -13,8 +13,8 @@ LogServer::~LogServer() ...@@ -13,8 +13,8 @@ LogServer::~LogServer()
{ {
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i ) for( auto& i: slist )
delete (*i); delete i;
} }
if( thr ) if( thr )
...@@ -24,16 +24,27 @@ LogServer::~LogServer() ...@@ -24,16 +24,27 @@ LogServer::~LogServer()
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogServer::LogServer( DebugStream& log ):
timeout(TIMEOUT_INF),
session_timeout(3600000),
cancelled(false),
thr(0),
tcp(0),
elog(&log)
{
}
// -------------------------------------------------------------------------
LogServer::LogServer(): LogServer::LogServer():
timeout(600000), timeout(TIMEOUT_INF),
session_timeout(3600000), session_timeout(3600000),
cancelled(false), cancelled(false),
thr(0), thr(0),
tcp(0) tcp(0),
elog(0)
{ {
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogServer::run( const std::string& addr, ost::tpport_t port, timeout_t msec, bool thread ) void LogServer::run( const std::string& addr, ost::tpport_t port, bool thread )
{ {
try try
{ {
...@@ -75,7 +86,7 @@ void LogServer::work() ...@@ -75,7 +86,7 @@ void LogServer::work()
{ {
while( tcp->isPendingConnection(timeout) ) while( tcp->isPendingConnection(timeout) )
{ {
LogSession* s = new LogSession(*tcp, session_timeout); LogSession* s = new LogSession(*tcp, elog, session_timeout);
{ {
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s); slist.push_back(s);
...@@ -98,8 +109,6 @@ void LogServer::work() ...@@ -98,8 +109,6 @@ void LogServer::work()
else else
cerr << "client socket failed" << endl; cerr << "client socket failed" << endl;
} }
cout << "timeout after 30 seconds inactivity, exiting" << endl;
} }
{ {
...@@ -116,6 +125,7 @@ void LogServer::sessionFinished( LogSession* s ) ...@@ -116,6 +125,7 @@ void LogServer::sessionFinished( LogSession* s )
{ {
if( (*i) == s ) if( (*i) == s )
{ {
// cerr << "session '" << s->getClientAddress() << "' closed.." << endl;
slist.erase(i); slist.erase(i);
return; return;
} }
......
...@@ -20,13 +20,21 @@ LogSession::~LogSession() ...@@ -20,13 +20,21 @@ LogSession::~LogSession()
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogSession::LogSession( ost::TCPSocket &server, timeout_t msec ): LogSession::LogSession( ost::TCPSocket &server, DebugStream* log, timeout_t msec ):
TCPSession(server), TCPSession(server),
peername(""), peername(""),
caddr(""), caddr(""),
timeout(msec), timeout(msec),
cancelled(false) cancelled(false)
{ {
// slog.addLevel(Debug::ANY);
log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
}
// -------------------------------------------------------------------------
void LogSession::logOnEvent( const std::string& s )
{
uniset_rwmutex_wrlock l(mLBuf);
lbuf.push_back(s);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::run() void LogSession::run()
...@@ -49,13 +57,40 @@ void LogSession::run() ...@@ -49,13 +57,40 @@ void LogSession::run()
if( slog.debugging(Debug::INFO) ) if( slog.debugging(Debug::INFO) )
slog[Debug::INFO] << peername << "(run): run thread of sessions.." << endl; slog[Debug::INFO] << peername << "(run): run thread of sessions.." << endl;
ptSessionTimeout.setTiming(10000);
timeout_t inTimeout = 2000;
timeout_t outTimeout = 2000;
cancelled = false; cancelled = false;
while( !cancelled && isPending(Socket::pendingOutput, timeout) ) while( !cancelled && !ptSessionTimeout.checkTime() )
{
if( isPending(Socket::pendingInput, inTimeout) )
{
char buf[100];
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek(buf,sizeof(buf)) <=0 )
break;
ptSessionTimeout.reset();
slog.warn() << peername << "(run): receive command.." << endl;
// Обработка команд..
}
if( isPending(Socket::pendingOutput, outTimeout) )
{ {
// char rbuf[100]; // slog.warn() << peername << "(run): send.." << endl;
// int ret = readData(&rbuf,sizeof(rbuf)); ptSessionTimeout.reset();
*tcp() << "test log... test log" << endl; uniset_rwmutex_wrlock l(mLBuf);
sleep(8000);
if( !lbuf.empty() )
slog.info() << peername << "(run): send messages.." << endl;
while( !lbuf.empty() )
{
*tcp() << lbuf.front();
lbuf.pop_front();
}
}
} }
if( slog.debugging(Debug::INFO) ) if( slog.debugging(Debug::INFO) )
...@@ -69,6 +104,7 @@ void LogSession::run() ...@@ -69,6 +104,7 @@ void LogSession::run()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::final() void LogSession::final()
{ {
*tcp() << endl;
slFin(this); slFin(this);
delete this; delete this;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment