Commit 4830c243 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): версия с одни специальным NullSession-потоком при превышении

числа сессий..
parent a73d69c2
......@@ -123,6 +123,7 @@ int main( int argc, char **argv )
unsigned int i=0;
while( true )
// for( int n=0; n<2; n++ )
{
dlog->any() << "[" << ++i << "] Test message for log" << endl;
dlog->info() << ": dlog : INFO message" << endl;
......@@ -135,6 +136,7 @@ int main( int argc, char **argv )
msleep(delay);
}
}
catch( SystemError& err )
{
......
......@@ -11,6 +11,7 @@
#include "ThreadCreator.h"
class LogSession;
class LogAgregator;
class NullLogSession;
// -------------------------------------------------------------------------
/*! \page pgLogServer Лог сервер
Лог сервер предназначен для возможности удалённого чтения логов (DebugStream).
......@@ -87,6 +88,8 @@ class LogServer
ost::TCPSocket* tcp;
std::shared_ptr<DebugStream> elog;
std::ostream* oslog;
std::shared_ptr<NullLogSession> nullsess;
};
// -------------------------------------------------------------------------
#endif // LogServer_H_
......
......@@ -23,6 +23,7 @@ class LogSession:
typedef sigc::slot<void, std::shared_ptr<LogSession>> FinalSlot;
void connectFinalSession( FinalSlot sl );
inline void cancel(){ cancelled = true; }
inline std::string getClientAddress(){ return caddr; }
inline void setSessionLogLevel( Debug::type t ){ slog.level(t); }
......@@ -60,19 +61,30 @@ class LogSession:
// -------------------------------------------------------------------------
/*! Сессия просто заверщающаяся с указанным сообщением */
class NullLogSession:
public LogSession
public ost::Thread
{
public:
NullLogSession( ost::TCPSocket& server, const std::string& _msg );
virtual ~NullLogSession();
NullLogSession( const std::string& _msg );
~NullLogSession();
void add( ost::TCPSocket& server );
void setMessage( const std::string& _msg );
inline void cancel(){ cancelled = true; }
protected:
virtual void run();
virtual void final();
private:
std::string msg;
typedef std::list< std::shared_ptr<ost::TCPStream> > TCPStreamList;
TCPStreamList slist;
UniSetTypes::uniset_rwmutex smutex;
std::atomic_bool cancelled;
};
// -------------------------------------------------------------------------
#endif // LogSession_H_
......
......@@ -10,24 +10,27 @@ using namespace UniSetTypes;
// -------------------------------------------------------------------------
LogServer::~LogServer()
{
cancelled = true;
if( thr )
{
thr->stop();
if( thr->isRunning() )
thr->join();
delete thr;
}
if( nullsess )
nullsess->cancel();
{
// uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
{
if( i->isRunning() )
i.reset();
i->cancel();
}
}
cancelled = true;
if( thr )
{
thr->stop();
if( thr->isRunning() )
thr->join();
delete thr;
}
delete tcp;
}
// -------------------------------------------------------------------------
......@@ -72,6 +75,14 @@ void LogServer::run( const std::string& addr, ost::tpport_t port, bool thread )
{
ost::InetAddress iaddr(addr.c_str());
tcp = new ost::TCPSocket(iaddr,port);
#if 0
if( !nullsess )
{
ostringstream err;
err << "(LOG SERVER): Exceeded the limit on the number of sessions = " << sessMaxCount << endl;
nullsess = make_shared<NullLogSession>(err.str());
}
#endif
}
catch( ost::Socket *socket )
{
......@@ -115,10 +126,17 @@ void LogServer::work()
{
ostringstream err;
err << "(LOG SERVER): Exceeded the limit on the number of sessions = " << sessMaxCount << endl;
auto s = make_shared<NullLogSession>(*tcp,err.str());
slist.push_back(s);
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
s->detach();
if( !nullsess )
{
ostringstream err;
err << "(LOG SERVER): Exceeded the limit on the number of sessions = " << sessMaxCount << endl;
nullsess = make_shared<NullLogSession>(err.str());
nullsess->detach(); // start();
}
else
nullsess->setMessage(err.str());
nullsess->add(*tcp);
continue;
}
}
......@@ -155,6 +173,9 @@ void LogServer::work()
// uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
i->disconnect();
if( nullsess )
nullsess->cancel();
}
}
// -------------------------------------------------------------------------
......
......@@ -20,7 +20,8 @@ LogSession::~LogSession()
if( isRunning() )
{
disconnect();
ost::Thread::join();
// if( isRunning() )
// ost::Thread::join();
}
}
// -------------------------------------------------------------------------
......@@ -236,7 +237,17 @@ void LogSession::run()
void LogSession::final()
{
tcp()->sync();
slFin( shared_from_this() );
try
{
auto s = shared_from_this();
if( s )
slFin(s);
}
catch( std::bad_weak_ptr )
{
}
delete this;
}
// -------------------------------------------------------------------------
......@@ -246,37 +257,80 @@ void LogSession::connectFinalSession( FinalSlot sl )
}
// ---------------------------------------------------------------------
// ---------------------------------------------------------------------
NullLogSession::NullLogSession( ost::TCPSocket& server, const std::string& _msg ):
LogSession(server),
msg(_msg)
NullLogSession::NullLogSession( const std::string& _msg ):
msg(_msg),
cancelled(false)
{
}
// ---------------------------------------------------------------------
NullLogSession::~NullLogSession()
{
cancelled = true;
if( isRunning() )
exit(); // terminate();
}
// ---------------------------------------------------------------------
void NullLogSession::add( ost::TCPSocket& sock )
{
uniset_rwmutex_wrlock l(smutex);
auto s = make_shared<ost::TCPStream>();
s->connect(sock);
slist.push_back(s);
}
// ---------------------------------------------------------------------
void NullLogSession::setMessage( const std::string& _msg )
{
uniset_rwmutex_wrlock l(smutex);
msg = _msg;
}
// ---------------------------------------------------------------------
void NullLogSession::run()
{
int i = 0;
while( isConnected() && i++<=3 )
while( !cancelled )
{
if( isPending(Socket::pendingInput, 10) )
{
char buf[10];
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek(buf,sizeof(buf)) <=0 )
break;
}
uniset_rwmutex_wrlock l(smutex);
for( auto i=slist.begin(); !cancelled && i!=slist.end(); ++i )
{
auto s(*i);
if( s->isPending(ost::Socket::pendingInput, 10) )
{
char buf[10];
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( s->peek(buf,sizeof(buf)) <=0 )
{
i = slist.erase(i);
continue;
}
}
if( isPending(Socket::pendingOutput) )
{
*tcp() << msg << endl;
tcp()->sync();
if( s->isPending(ost::Socket::pendingOutput) )
{
(*s.get()) << msg << endl;
s->sync();
}
}
}
if( cancelled )
break;
msleep(5000);
}
}
// ---------------------------------------------------------------------
void NullLogSession::final()
{
#if 0
{
uniset_rwmutex_wrlock l(smutex);
for( auto i=slist.begin(); i!=slist.end(); ++i )
{
auto s(*i);
if( s )
s->disconnect();
}
}
#endif
}
// ---------------------------------------------------------------------
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment