Commit a20dcb5f authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): решение проблеммы с закрытием соединения на стороне сервера

parent 7e0f414b
......@@ -20,10 +20,7 @@ cmdonly(false)
LogReader::~LogReader()
{
if( isConnection() )
{
(*tcp) << endl;
disconnect();
}
}
// -------------------------------------------------------------------------
void LogReader::connect( const std::string& addr, ost::tpport_t _port, timeout_t msec )
......@@ -155,7 +152,10 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
rlog.warn() << "**** SEND COMMAND ('" << msg.cmd << "' FAILED!" << endl;
if( cmdonly )
{
disconnect();
return;
}
}
while( !cmdonly && tcp->isPending(ost::Socket::pendingInput,inTimeout) )
......@@ -167,7 +167,7 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
buf[n] = '\0';
cout << buf;
}
else
else
break;
}
......@@ -177,9 +177,6 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
}
if( isConnection() )
{
(*tcp) << endl;
disconnect();
}
}
// -------------------------------------------------------------------------
......@@ -10,18 +10,24 @@ using namespace UniSetTypes;
LogServer::~LogServer()
{
cancelled = true;
{
uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
delete i;
}
if( thr )
{
thr->stop();
if( thr->isRunning() )
thr->join();
delete thr;
}
{
// uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
{
if( i->isRunning() )
delete i;
}
}
delete tcp;
}
// -------------------------------------------------------------------------
LogServer::LogServer( std::ostream& os ):
......@@ -102,7 +108,7 @@ void LogServer::work()
{
try
{
while( tcp->isPendingConnection(timeout) )
while( !cancelled && tcp->isPendingConnection(timeout) )
{
LogSession* s = new LogSession(*tcp, elog, sessTimeout, cmdTimeout, outTimeout);
{
......@@ -129,10 +135,12 @@ void LogServer::work()
}
}
cerr << "*** LOG SERVER THREAD STOPPED.." << endl;
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
(*i)->disconnect();
// uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
i->disconnect();
}
}
// -------------------------------------------------------------------------
......@@ -143,7 +151,6 @@ void LogServer::sessionFinished( LogSession* s )
{
if( (*i) == s )
{
// cerr << "session '" << s->getClientAddress() << "' closed.." << endl;
slist.erase(i);
return;
}
......
......@@ -39,8 +39,8 @@ std::ostream& LogServerTypes::operator<<(std::ostream& os, LogServerTypes::lsMes
// -------------------------------------------------------------------------
void LogServerTypes::lsMessage::setLogName( const std::string& name )
{
size_t s = name.size()> MAXLOGNAME ? MAXLOGNAME : name.size();
memcpy( &logname, name.c_str(), s );
size_t s = name.size()> MAXLOGNAME ? MAXLOGNAME : name.size();
memcpy( &logname, name.c_str(), s );
logname[s] = '\0';
}
// -------------------------------------------------------------------------
......@@ -68,6 +68,9 @@ void LogSession::run()
string oldLogFile( log->getLogFile() );
setKeepAlive(true);
// setTimeout(sessTimeout);
// Команды могут посылаться только в начале сессии..
if( isPending(Socket::pendingInput, cmdTimeout) )
{
......@@ -84,65 +87,65 @@ void LogSession::run()
slog.info() << peername << "(run): receive command: '" << msg.cmd << "'" << endl;
string cmdLogName(msg.logname);
DebugStream* cmdlog = 0;
DebugStream* cmdlog = 0;
if( !cmdLogName.empty () )
{
LogAgregator* lag = dynamic_cast<LogAgregator*>(log);
cmdlog = lag ? lag->getLog( cmdLogName ) : log;
}
if( !cmdLogName.empty () )
{
LogAgregator* lag = dynamic_cast<LogAgregator*>(log);
cmdlog = lag ? lag->getLog( cmdLogName ) : log;
}
// обрабатываем команды только если нашли log
// обрабатываем команды только если нашли log
if( cmdlog )
{
// Обработка команд..
// \warning Работа с логом ведётся без mutex-а, хотя он разделяется отдельными потоками
switch( msg.cmd )
{
case LogServerTypes::cmdSetLevel:
cmdlog->level( (Debug::type)msg.data );
break;
case LogServerTypes::cmdAddLevel:
cmdlog->addLevel((Debug::type)msg.data );
break;
case LogServerTypes::cmdDelLevel:
cmdlog->delLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdRotate:
{
string lfile( cmdlog->getLogFile() );
if( !lfile.empty() )
cmdlog->logFile(lfile);
}
break;
case LogServerTypes::cmdOffLogFile:
{
string lfile( cmdlog->getLogFile() );
if( !lfile.empty() )
cmdlog->logFile("");
}
break;
case LogServerTypes::cmdOnLogFile:
{
if( !oldLogFile.empty() && oldLogFile != cmdlog->getLogFile() )
cmdlog->logFile(oldLogFile);
}
break;
default:
slog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break;
}
}
// Обработка команд..
// \warning Работа с логом ведётся без mutex-а, хотя он разделяется отдельными потоками
switch( msg.cmd )
{
case LogServerTypes::cmdSetLevel:
cmdlog->level( (Debug::type)msg.data );
break;
case LogServerTypes::cmdAddLevel:
cmdlog->addLevel((Debug::type)msg.data );
break;
case LogServerTypes::cmdDelLevel:
cmdlog->delLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdRotate:
{
string lfile( cmdlog->getLogFile() );
if( !lfile.empty() )
cmdlog->logFile(lfile);
}
break;
case LogServerTypes::cmdOffLogFile:
{
string lfile( cmdlog->getLogFile() );
if( !lfile.empty() )
cmdlog->logFile("");
}
break;
case LogServerTypes::cmdOnLogFile:
{
if( !oldLogFile.empty() && oldLogFile != cmdlog->getLogFile() )
cmdlog->logFile(oldLogFile);
}
break;
default:
slog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break;
}
}
}
}
}
cancelled = false;
while( !cancelled && !ptSessionTimeout.checkTime() )
while( !cancelled && isConnected() ) // !ptSessionTimeout.checkTime()
{
// проверка только ради проверки "целостности" соединения
if( isPending(Socket::pendingInput, 10) )
......@@ -155,37 +158,35 @@ void LogSession::run()
if( isPending(Socket::pendingOutput, outTimeout) )
{
// slog.warn() << peername << "(run): send.." << endl;
ptSessionTimeout.reset();
//slog.info() << peername << "(run): send.." << endl;
// ptSessionTimeout.reset();
// чтобы не застревать на посылке в сеть..
// делаем через промежуточный буффер (stringstream)
ostringstream sbuf;
bool send = false;
{
uniset_rwmutex_wrlock l(mLBuf);
if( !lbuf.empty() )
{
slog.info() << peername << "(run): send messages.." << endl;
while( !lbuf.empty() )
{
sbuf << lbuf.front();
lbuf.pop_front();
}
send = true;
}
}
if( log )
if( send )
{
// чтобы не застревать на посылке в сеть..
// делаем через промежуточный буффер (stringstream)
ostringstream sbuf;
bool send = false;
{
uniset_rwmutex_wrlock l(mLBuf);
if( !lbuf.empty() )
{
slog.info() << peername << "(run): send messages.." << endl;
while( !lbuf.empty() )
{
sbuf << lbuf.front();
lbuf.pop_front();
}
send = true;
}
}
if( send )
{
*tcp() << sbuf.str();
tcp()->sync();
}
// чтобы постоянно не проверять... (надо переделать на condition)
sleep(delayTime);
}
*tcp() << sbuf.str();
tcp()->sync();
}
// чтобы постоянно не проверять... (надо переделать на condition)
sleep(delayTime);
}
}
......@@ -200,9 +201,9 @@ void LogSession::run()
// -------------------------------------------------------------------------
void LogSession::final()
{
tcp()->sync();
slFin(this);
delete this;
tcp()->sync();
slFin(this);
delete this;
}
// -------------------------------------------------------------------------
void LogSession::connectFinalSession( FinalSlot sl )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment