Commit 707026a4 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): добавил ограничение на размер буфера для сообщений (200), добавил…

(LogServer): добавил ограничение на размер буфера для сообщений (200), добавил статистику использования
parent d5bdd661
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
Name: libuniset2 Name: libuniset2
Version: 2.2 Version: 2.2
Release: alt31.3 Release: alt31.4
Summary: UniSet - library for building distributed industrial control systems Summary: UniSet - library for building distributed industrial control systems
License: LGPL License: LGPL
...@@ -486,6 +486,9 @@ mv -f %buildroot%python_sitelibdir_noarch/* %buildroot%python_sitelibdir/%oname ...@@ -486,6 +486,9 @@ mv -f %buildroot%python_sitelibdir_noarch/* %buildroot%python_sitelibdir/%oname
# .. # ..
%changelog %changelog
* Sun Apr 10 2016 Pavel Vainerman <pv@altlinux.ru> 2.2-alt31.4
- LogSession: add buffer limit.. (200 messages, ~30kB)
* Sun Apr 03 2016 Pavel Vainerman <pv@altlinux.ru> 2.2-alt31.3 * Sun Apr 03 2016 Pavel Vainerman <pv@altlinux.ru> 2.2-alt31.3
- ModbusSlave: iowait optimization - ModbusSlave: iowait optimization
......
...@@ -984,7 +984,10 @@ UniSetTypes::SimpleInfo* SharedMemory::getInfo( CORBA::Long userparam ) ...@@ -984,7 +984,10 @@ UniSetTypes::SimpleInfo* SharedMemory::getInfo( CORBA::Long userparam )
inf << i->info << endl; inf << i->info << endl;
inf << vmon.pretty_str() << endl; inf << vmon.pretty_str() << endl;
inf << "LogServer: " << logserv_host << ":" << logserv_port << endl; if( logserv )
inf << logserv->getShortInfo() << endl;
else
inf << "No logserver running." << endl;
i->info = inf.str().c_str(); i->info = inf.str().c_str();
return i._retn(); return i._retn();
......
...@@ -11,8 +11,7 @@ smemory_test_SOURCES = TestProc_SK.cc TestProc.cc smemory-test.cc ...@@ -11,8 +11,7 @@ smemory_test_SOURCES = TestProc_SK.cc TestProc.cc smemory-test.cc
BUILT_SOURCES=TestProc_SK.cc TestProc_SK.h BUILT_SOURCES=TestProc_SK.cc TestProc_SK.h
TestProc_SK.cc: testproc.src.xml TestProc_SK.cc TestProc_SK.h: testproc.src.xml
# ./uniset2-codegen --ask -n TestProc --local-include --topdir $(top_builddir)/ --no-main testproc.src.xml
$(top_builddir)/Utilities/codegen/@PACKAGE@-codegen -l $(top_builddir)/Utilities/codegen --ask -n TestProc --local-include --topdir $(top_builddir)/ --no-main testproc.src.xml $(top_builddir)/Utilities/codegen/@PACKAGE@-codegen -l $(top_builddir)/Utilities/codegen --ask -n TestProc --local-include --topdir $(top_builddir)/ --no-main testproc.src.xml
clean-local: clean-local:
......
...@@ -108,6 +108,8 @@ class LogServer: ...@@ -108,6 +108,8 @@ class LogServer:
static std::string help_print( const std::string& prefix ); static std::string help_print( const std::string& prefix );
std::string getShortInfo();
protected: protected:
LogServer(); LogServer();
......
...@@ -61,12 +61,22 @@ class LogSession ...@@ -61,12 +61,22 @@ class LogSession
mylog.delLevel(t); mylog.delLevel(t);
} }
//! Установить размер буфера для сообщений (количество записей. Не размер в байтах!!)
void setMaxBufSize( size_t num );
inline size_t getMaxBufSize()
{
return maxRecordsNum;
}
// запуск обработки входящих запросов // запуск обработки входящих запросов
void run( const ev::loop_ref& loop ); void run( const ev::loop_ref& loop );
void terminate(); void terminate();
bool isAcive(); bool isAcive();
std::string getShortInfo();
protected: protected:
LogSession( ost::TCPSocket& server ); LogSession( ost::TCPSocket& server );
...@@ -83,9 +93,25 @@ class LogSession ...@@ -83,9 +93,25 @@ class LogSession
timeout_t cmdTimeout = { 2000 }; timeout_t cmdTimeout = { 2000 };
// Т.к. сообщений может быть ОЧЕНЬ МНОГО.. сеть медленная
// очередь может не успевать рассасываться,
// то потенциально может "скушаться" вся память.
// Поэтому приходиться ограничить доступное количество записей.
// Рассчитываем, что средний размер одного сообщения 150 символов (байт)
// тогда выделяем буфер на 200 сообщений (~ 30кB)
// На самом деле сообщения могут быть совершенно разные..
size_t maxRecordsNum = { 30000 }; // максимальное количество сообщение в очереди
private: private:
std::queue<UTCPCore::Buffer*> logbuf; std::queue<UTCPCore::Buffer*> logbuf;
std::mutex logbuf_mutex; std::mutex logbuf_mutex;
bool lostMsg = { false };
// статистика по использованию буфера
size_t maxCount = { 0 }; // максимальное количество побывавшее в очереди
size_t minSizeMsg = { 0 }; // минимальная встретившаяся длинна сообщения
size_t maxSizeMsg = { 0 }; // максимальная встретившаяся длинна сообщения
size_t numLostMsg = { 0 }; // количество потерянных сообщений
std::string peername = { "" }; std::string peername = { "" };
std::string caddr = { "" }; std::string caddr = { "" };
......
...@@ -236,3 +236,17 @@ std::string LogServer::help_print( const std::string& prefix ) ...@@ -236,3 +236,17 @@ std::string LogServer::help_print( const std::string& prefix )
return std::move( h.str() ); return std::move( h.str() );
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
string LogServer::getShortInfo()
{
ostringstream inf;
inf << "LogServer: " << myname << endl;
{
uniset_rwmutex_wrlock l(mutSList);
for( const auto& s: slist )
inf << " " << s->getShortInfo() << endl;
}
return std::move(inf.str());
}
// -----------------------------------------------------------------------------
...@@ -55,11 +55,6 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _ ...@@ -55,11 +55,6 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _
caddr(""), caddr(""),
log(_log) log(_log)
{ {
if( log )
conn = log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
else
mylog.crit() << "LOG NULL!!" << endl;
auto ag = dynamic_pointer_cast<LogAgregator>(log); auto ag = dynamic_pointer_cast<LogAgregator>(log);
if( ag ) if( ag )
...@@ -91,15 +86,50 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _ ...@@ -91,15 +86,50 @@ LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _
io.set<LogSession, &LogSession::callback>(this); io.set<LogSession, &LogSession::callback>(this);
cmdTimer.set<LogSession, &LogSession::onCmdTimeout>(this); cmdTimer.set<LogSession, &LogSession::onCmdTimeout>(this);
asyncEvent.set<LogSession, &LogSession::event>(this); asyncEvent.set<LogSession, &LogSession::event>(this);
if( log )
conn = log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
else
mylog.crit() << "LOG NULL!!" << endl;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void LogSession::logOnEvent( const std::string& s ) void LogSession::logOnEvent( const std::string& s )
{ {
if( cancelled ) if( cancelled || s.empty() )
return; return;
std::unique_lock<std::mutex> lk(logbuf_mutex); { // чтобы поменьше удерживать mutex
logbuf.emplace(new UTCPCore::Buffer(s)); std::unique_lock<std::mutex> lk(logbuf_mutex);
// собираем статистику..
// --------------------------
if( s.size() < minSizeMsg || minSizeMsg==0 )
minSizeMsg = s.size();
if( s.size() > maxSizeMsg )
maxSizeMsg = s.size();
if( logbuf.size() > maxCount )
maxCount = logbuf.size();
// --------------------------
// проверяем на переполнение..
if( logbuf.size() >= maxRecordsNum )
{
numLostMsg++;
if( !lostMsg )
{
ostringstream err;
err << "The buffer is full. Message is lost...(size of buffer " << maxRecordsNum << ")" << endl;
logbuf.emplace(new UTCPCore::Buffer(std::move(err.str())));
lostMsg = true;
}
return;
}
lostMsg = false;
logbuf.emplace(new UTCPCore::Buffer(s));
}
if( asyncEvent.is_active() ) if( asyncEvent.is_active() )
asyncEvent.send(); asyncEvent.send();
...@@ -136,7 +166,6 @@ void LogSession::terminate() ...@@ -136,7 +166,6 @@ void LogSession::terminate()
{ {
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
while( !logbuf.empty() ) while( !logbuf.empty() )
logbuf.pop(); logbuf.pop();
} }
...@@ -155,7 +184,7 @@ void LogSession::event( ev::async& watcher, int revents ) ...@@ -155,7 +184,7 @@ void LogSession::event( ev::async& watcher, int revents )
return; return;
} }
io.set(ev::READ | ev::WRITE); io.set(ev::WRITE);
} }
// --------------------------------------------------------------------- // ---------------------------------------------------------------------
void LogSession::callback( ev::io& watcher, int revents ) void LogSession::callback( ev::io& watcher, int revents )
...@@ -195,12 +224,19 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -195,12 +224,19 @@ void LogSession::writeEvent( ev::io& watcher )
if( cancelled ) if( cancelled )
return; return;
std::unique_lock<std::mutex> lk(logbuf_mutex); UTCPCore::Buffer* buffer = 0;
if( logbuf.empty() ) {
return; std::unique_lock<std::mutex> lk(logbuf_mutex);
auto buffer = logbuf.front(); if( logbuf.empty() )
{
io.set(EV_NONE);
return;
}
buffer = logbuf.front();
}
if( !buffer ) if( !buffer )
return; return;
...@@ -227,14 +263,26 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -227,14 +263,26 @@ void LogSession::writeEvent( ev::io& watcher )
if( buffer->nbytes() == 0 ) if( buffer->nbytes() == 0 )
{ {
logbuf.pop(); {
std::unique_lock<std::mutex> lk(logbuf_mutex);
logbuf.pop();
}
delete buffer; delete buffer;
} }
if( logbuf.empty() )
io.set(ev::READ); {
else std::unique_lock<std::mutex> lk(logbuf_mutex);
io.set(ev::READ | ev::WRITE); if( logbuf.empty() )
{
io.set(EV_NONE);
return;
}
}
io.set(ev::WRITE);
//io.set(ev::READ | ev::WRITE);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
size_t LogSession::readData( unsigned char* buf, int len ) size_t LogSession::readData( unsigned char* buf, int len )
...@@ -340,7 +388,7 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes:: ...@@ -340,7 +388,7 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::
if( cmdLogName.empty() || cmdLogName == "ALL" || log->getLogFile() == cmdLogName ) if( cmdLogName.empty() || cmdLogName == "ALL" || log->getLogFile() == cmdLogName )
{ {
LogAgregator::iLog llog(log, log->getLogName()); LogAgregator::iLog llog(log, log->getLogName());
loglist.push_back(llog); loglist.emplace_back(llog);
} }
} }
...@@ -407,7 +455,7 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes:: ...@@ -407,7 +455,7 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::
{ {
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
logbuf.push(new UTCPCore::Buffer(s.str())); logbuf.emplace(new UTCPCore::Buffer(s.str()));
} }
io.set(ev::WRITE); io.set(ev::WRITE);
...@@ -430,8 +478,35 @@ void LogSession::connectFinalSession( FinalSlot sl ) ...@@ -430,8 +478,35 @@ void LogSession::connectFinalSession( FinalSlot sl )
slFin = sl; slFin = sl;
} }
// --------------------------------------------------------------------- // ---------------------------------------------------------------------
void LogSession::setMaxBufSize( size_t num )
{
std::unique_lock<std::mutex> lk(logbuf_mutex);
maxRecordsNum = num;
}
// ---------------------------------------------------------------------
bool LogSession::isAcive() bool LogSession::isAcive()
{ {
return io.is_active(); return io.is_active();
} }
// --------------------------------------------------------------------- // ---------------------------------------------------------------------
string LogSession::getShortInfo()
{
size_t sz = 0;
{
std::unique_lock<std::mutex> lk(logbuf_mutex);
sz = logbuf.size();
}
ostringstream inf;
inf << "client: " << caddr << endl
<< " buffer[" << maxRecordsNum << "]: size=" << sz
<< " maxCount=" << maxCount
<< " minSizeMsg=" << minSizeMsg
<< " maxSizeMsg=" << maxSizeMsg
<< " numLostMsg=" << numLostMsg
<< endl;
return std::move(inf.str());
}
// ---------------------------------------------------------------------
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment