Commit a73d69c2 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): первая версия реализации "ограничения по количеству сессий"

parent 90a08483
...@@ -16,6 +16,7 @@ static struct option longopts[] = { ...@@ -16,6 +16,7 @@ static struct option longopts[] = {
{ "port", required_argument, 0, 'p' }, { "port", required_argument, 0, 'p' },
{ "verbose", no_argument, 0, 'v' }, { "verbose", no_argument, 0, 'v' },
{ "delay", required_argument, 0, 'd' }, { "delay", required_argument, 0, 'd' },
{ "max-sessions", required_argument, 0, 'm' },
{ NULL, 0, 0, 0 } { NULL, 0, 0, 0 }
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -27,6 +28,7 @@ static void print_help() ...@@ -27,6 +28,7 @@ static void print_help()
printf("[-i|--iaddr] addr - Inet address for listen connections.\n"); printf("[-i|--iaddr] addr - Inet address for listen connections.\n");
printf("[-p|--port] port - Bind port.\n"); printf("[-p|--port] port - Bind port.\n");
printf("[-d|--delay] msec - Delay for generate message. Default 5000.\n"); printf("[-d|--delay] msec - Delay for generate message. Default 5000.\n");
printf("[-m|--max-sessions] num - Maximum count sessions for server. Default: 5\n");
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
int main( int argc, char **argv ) int main( int argc, char **argv )
...@@ -38,10 +40,11 @@ int main( int argc, char **argv ) ...@@ -38,10 +40,11 @@ int main( int argc, char **argv )
int port = 3333; int port = 3333;
//int tout = 2000; //int tout = 2000;
timeout_t delay = 5000; timeout_t delay = 5000;
int msess = 5;
try try
{ {
while( (opt = getopt_long(argc, argv, "hvi:p:d:",longopts,&optindex)) != -1 ) while( (opt = getopt_long(argc, argv, "hvi:p:d:m:",longopts,&optindex)) != -1 )
{ {
switch (opt) switch (opt)
{ {
...@@ -61,6 +64,10 @@ int main( int argc, char **argv ) ...@@ -61,6 +64,10 @@ int main( int argc, char **argv )
delay = uni_atoi(optarg); delay = uni_atoi(optarg);
break; break;
case 'm':
msess = uni_atoi(optarg);
break;
case 'v': case 'v':
verb = 1; verb = 1;
break; break;
...@@ -106,7 +113,8 @@ int main( int argc, char **argv ) ...@@ -106,7 +113,8 @@ int main( int argc, char **argv )
} }
LogServer ls(la); LogServer ls(la);
// LogServer ls(cout); ls.setMaxSessionCount(msess);
dlog->addLevel(Debug::ANY); dlog->addLevel(Debug::ANY);
dlog2->addLevel(Debug::ANY); dlog2->addLevel(Debug::ANY);
......
...@@ -58,6 +58,7 @@ class LogServer ...@@ -58,6 +58,7 @@ class LogServer
inline void setCmdTimeout( timeout_t msec ){ cmdTimeout = msec; } inline void setCmdTimeout( timeout_t msec ){ cmdTimeout = msec; }
inline void setOutTimeout( timeout_t msec ){ outTimeout = msec; } inline void setOutTimeout( timeout_t msec ){ outTimeout = msec; }
inline void setSessionLog( Debug::type t ){ sessLogLevel = t; } inline void setSessionLog( Debug::type t ){ sessLogLevel = t; }
inline void setMaxSessionCount( int num ){ sessMaxCount = num; }
void run( const std::string& addr, ost::tpport_t port, bool thread=true ); void run( const std::string& addr, ost::tpport_t port, bool thread=true );
...@@ -77,6 +78,7 @@ class LogServer ...@@ -77,6 +78,7 @@ class LogServer
timeout_t cmdTimeout; timeout_t cmdTimeout;
timeout_t outTimeout; timeout_t outTimeout;
Debug::type sessLogLevel; Debug::type sessLogLevel;
int sessMaxCount;
std::atomic_bool cancelled; std::atomic_bool cancelled;
DebugStream mylog; DebugStream mylog;
......
...@@ -30,11 +30,18 @@ class LogSession: ...@@ -30,11 +30,18 @@ class LogSession:
inline void delSessionLogLevel( Debug::type t ){ slog.delLevel(t); } inline void delSessionLogLevel( Debug::type t ){ slog.delLevel(t); }
protected: protected:
LogSession( ost::TCPSocket& server );
virtual void run(); virtual void run();
virtual void final(); virtual void final();
void logOnEvent( const std::string& s ); void logOnEvent( const std::string& s );
void readStream(); void readStream();
timeout_t sessTimeout;
timeout_t cmdTimeout;
timeout_t outTimeout;
timeout_t delayTime;
private: private:
typedef std::deque<std::string> LogBuffer; typedef std::deque<std::string> LogBuffer;
LogBuffer lbuf; LogBuffer lbuf;
...@@ -42,10 +49,6 @@ class LogSession: ...@@ -42,10 +49,6 @@ class LogSession:
std::string caddr; std::string caddr;
std::shared_ptr<DebugStream> log; std::shared_ptr<DebugStream> log;
timeout_t sessTimeout;
timeout_t cmdTimeout;
timeout_t outTimeout;
timeout_t delayTime;
PassiveTimer ptSessionTimeout; PassiveTimer ptSessionTimeout;
FinalSlot slFin; FinalSlot slFin;
...@@ -55,5 +58,22 @@ class LogSession: ...@@ -55,5 +58,22 @@ class LogSession:
DebugStream slog; DebugStream slog;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! Сессия просто заверщающаяся с указанным сообщением */
class NullLogSession:
public LogSession
{
public:
NullLogSession( ost::TCPSocket& server, const std::string& _msg );
virtual ~NullLogSession();
protected:
virtual void run();
private:
std::string msg;
};
// -------------------------------------------------------------------------
#endif // LogSession_H_ #endif // LogSession_H_
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -43,6 +43,7 @@ sessTimeout(3600000), ...@@ -43,6 +43,7 @@ sessTimeout(3600000),
cmdTimeout(2000), cmdTimeout(2000),
outTimeout(2000), outTimeout(2000),
sessLogLevel(Debug::NONE), sessLogLevel(Debug::NONE),
sessMaxCount(10),
cancelled(false), cancelled(false),
thr(0), thr(0),
tcp(0), tcp(0),
...@@ -57,6 +58,7 @@ sessTimeout(3600000), ...@@ -57,6 +58,7 @@ sessTimeout(3600000),
cmdTimeout(2000), cmdTimeout(2000),
outTimeout(2000), outTimeout(2000),
sessLogLevel(Debug::NONE), sessLogLevel(Debug::NONE),
sessMaxCount(10),
cancelled(false), cancelled(false),
thr(0), thr(0),
tcp(0), tcp(0),
...@@ -106,6 +108,21 @@ void LogServer::work() ...@@ -106,6 +108,21 @@ void LogServer::work()
{ {
while( !cancelled && tcp->isPendingConnection(timeout) ) while( !cancelled && tcp->isPendingConnection(timeout) )
{ {
{
uniset_rwmutex_wrlock l(mutSList);
int sz = slist.size();
if( sz >= sessMaxCount )
{
ostringstream err;
err << "(LOG SERVER): Exceeded the limit on the number of sessions = " << sessMaxCount << endl;
auto s = make_shared<NullLogSession>(*tcp,err.str());
slist.push_back(s);
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
s->detach();
continue;
}
}
auto s = make_shared<LogSession>(*tcp, elog, sessTimeout, cmdTimeout, outTimeout); auto s = make_shared<LogSession>(*tcp, elog, sessTimeout, cmdTimeout, outTimeout);
s->setSessionLogLevel(sessLogLevel); s->setSessionLogLevel(sessLogLevel);
{ {
......
...@@ -24,6 +24,12 @@ LogSession::~LogSession() ...@@ -24,6 +24,12 @@ LogSession::~LogSession()
} }
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogSession::LogSession( ost::TCPSocket& server ):
TCPSession(server)
{
}
// -------------------------------------------------------------------------
LogSession::LogSession( ost::TCPSocket &server, std::shared_ptr<DebugStream>& _log, timeout_t _sessTimeout, timeout_t _cmdTimeout, timeout_t _outTimeout, timeout_t _delay ): LogSession::LogSession( ost::TCPSocket &server, std::shared_ptr<DebugStream>& _log, timeout_t _sessTimeout, timeout_t _cmdTimeout, timeout_t _outTimeout, timeout_t _delay ):
TCPSession(server), TCPSession(server),
peername(""), peername(""),
...@@ -239,3 +245,38 @@ void LogSession::connectFinalSession( FinalSlot sl ) ...@@ -239,3 +245,38 @@ void LogSession::connectFinalSession( FinalSlot sl )
slFin = sl; slFin = sl;
} }
// --------------------------------------------------------------------- // ---------------------------------------------------------------------
// ---------------------------------------------------------------------
NullLogSession::NullLogSession( ost::TCPSocket& server, const std::string& _msg ):
LogSession(server),
msg(_msg)
{
}
// ---------------------------------------------------------------------
NullLogSession::~NullLogSession()
{
}
// ---------------------------------------------------------------------
void NullLogSession::run()
{
int i = 0;
while( isConnected() && i++<=3 )
{
if( isPending(Socket::pendingInput, 10) )
{
char buf[10];
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek(buf,sizeof(buf)) <=0 )
break;
}
if( isPending(Socket::pendingOutput) )
{
*tcp() << msg << endl;
tcp()->sync();
}
msleep(5000);
}
}
// ---------------------------------------------------------------------
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment