Commit d0a5d45e authored by Pavel Vainerman's avatar Pavel Vainerman

- сделал тест для LogServer (test_logserver)

- пепеписал LogSession на использованием condition_variable - немного переделал интерфейс LogReader (чтобы было удобнее в тесте применять)
parent 93881190
......@@ -119,7 +119,8 @@ int main( int argc, char **argv )
dlog2->addLevel(Debug::ANY);
ls.run( addr, port, true );
ls.setSessionLog(Debug::ANY);
if( verb )
ls.setSessionLog(Debug::ANY);
unsigned int i=0;
while( true )
......
......@@ -158,6 +158,9 @@ public:
inline void showDateTime(bool s)
{ show_datetime = s; }
inline void showLogType(bool s)
{ show_logtype = s; }
// короткие функции (для удобства)
// log.level1() - вывод с датой и временем "date time [LEVEL] ...",
// если вывод даты и времени не выключен при помощи showDateTime(false)
......@@ -216,6 +219,7 @@ protected:
///
debugstream_internal * internal;
bool show_datetime;
bool show_logtype;
std::string fname;
StreamEvent_Signal s_stream;
......
......@@ -25,17 +25,24 @@ class LogReader
bool isConnection();
inline void setReadCount( unsigned int n ){ readcount = n; }
inline void setCommandOnlyMode( bool s ){ cmdonly = s; }
inline void setinTimeout( timeout_t msec ){ inTimeout = msec; }
inline void setoutTimeout( timeout_t msec ){ outTimeout = msec; }
inline void setReconnectDelay( timeout_t msec ){ reconDelay = msec; }
DebugStream::StreamEvent_Signal signal_stream_event();
void setLogLevel( Debug::type );
protected:
void connect( const std::string& addr, ost::tpport_t port, timeout_t tout=TIMEOUT_INF );
void connect( ost::InetAddress addr, ost::tpport_t port, timeout_t tout=TIMEOUT_INF );
void disconnect();
void logOnEvent( const std::string& s );
timeout_t inTimeout;
timeout_t outTimeout;
......@@ -46,8 +53,12 @@ class LogReader
std::string iaddr;
ost::tpport_t port;
bool cmdonly;
unsigned int readcount; // количество циклов чтения
DebugStream rlog;
DebugStream log; // рабочий лог в который выводиться полученная информация..
DebugStream::StreamEvent_Signal m_logsig;
};
// -------------------------------------------------------------------------
#endif // LogReader_H_
......
......@@ -45,6 +45,9 @@ LogReader. Читающих клиентов может быть скольуг
logsrv.run(host,port,create_thread);
...
\endcode
\warning Логи отдаются "клиентам" только целоиком строкой. Т.е. по сети информация передаваться не будет пока не будет записан "endl".
Это сделано для "оптимизации передачи" (чтобы не передавать каждый байт)
*/
// -------------------------------------------------------------------------
class LogServer
......@@ -63,6 +66,8 @@ class LogServer
void run( const std::string& addr, ost::tpport_t port, bool thread=true );
inline bool isRunning(){ return (thr && thr->isRunning()); }
protected:
LogServer();
......
......@@ -6,6 +6,8 @@
#include <memory>
#include <deque>
#include <cc++/socket.h>
#include <condition_variable>
#include <mutex>
#include "Mutex.h"
#include "DebugStream.h"
#include "PassiveTimer.h"
......@@ -50,13 +52,15 @@ class LogSession:
std::string caddr;
std::shared_ptr<DebugStream> log;
PassiveTimer ptSessionTimeout;
// PassiveTimer ptSessionTimeout;
FinalSlot slFin;
std::atomic_bool cancelled;
UniSetTypes::uniset_rwmutex mLBuf;
DebugStream slog;
std::mutex log_mutex;
std::condition_variable log_event;
std::atomic_bool log_notify = ATOMIC_VAR_INIT(0);
};
// -------------------------------------------------------------------------
/*! Сессия просто заверщающаяся с указанным сообщением */
......
......@@ -45,7 +45,7 @@ using std::ios;
DebugStream::DebugStream(Debug::type t)
: ostream(new debugbuf(cerr.rdbuf())),
dt(t), nullstream(new nullbuf), internal(new debugstream_internal),
show_datetime(true),
show_datetime(true),show_logtype(true),
fname(""),
logname("")
{
......@@ -59,7 +59,7 @@ DebugStream::DebugStream(char const * f, Debug::type t, bool truncate )
: ostream(new debugbuf(cerr.rdbuf())),
dt(t), nullstream(new nullbuf),
internal(new debugstream_internal),
show_datetime(true),
show_datetime(true),show_logtype(true),
fname(""),
logname("")
{
......@@ -130,7 +130,10 @@ std::ostream & DebugStream::debug(Debug::type t)
{
if( show_datetime )
printDateTime(t);
*this << "(" << std::setfill(' ') << std::setw(6) << t << "): "; // "):\t";
if( show_logtype )
*this << "(" << std::setfill(' ') << std::setw(6) << t << "): "; // "):\t";
return *this;
}
......
......@@ -15,8 +15,11 @@ outTimeout(6000),
reconDelay(5000),
tcp(0),
iaddr(""),
cmdonly(false)
cmdonly(false),
readcount(0)
{
log.level(Debug::ANY);
log.signal_stream_event().connect( sigc::mem_fun(this, &LogReader::logOnEvent) );
}
// -------------------------------------------------------------------------
......@@ -26,6 +29,17 @@ LogReader::~LogReader()
disconnect();
}
// -------------------------------------------------------------------------
void LogReader::setLogLevel( Debug::type t )
{
log.level(t);
}
// -------------------------------------------------------------------------
DebugStream::StreamEvent_Signal LogReader::signal_stream_event()
{
return m_logsig;
}
// -------------------------------------------------------------------------
void LogReader::connect( const std::string& addr, ost::tpport_t _port, timeout_t msec )
{
ost::InetAddress ia(addr.c_str());
......@@ -127,14 +141,25 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
if( outTimeout == 0 )
outTimeout = TIMEOUT_INF;
while( true )
unsigned int n = 1;
if( readcount > 0 )
n = readcount;
while( n > 0 )
{
if( !isConnection() )
connect(_addr,_port,reconDelay);
if( !isConnection() )
{
rlog.warn() << "**** connection timeout.." << endl;
rlog.warn() << "(LogReader): **** connection timeout.." << endl;
if( readcount > 0 )
n--;
if( n<0 )
break;
msleep(reconDelay);
continue;
}
......@@ -143,7 +168,7 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
{
if( tcp->isPending(ost::Socket::pendingOutput,outTimeout) )
{
rlog.info() << "** send command: logname='" << msg.logname << "' cmd='" << msg.cmd << "' data='" << msg.data << "'" << endl;
rlog.info() << "(LogReader): ** send command: logname='" << msg.logname << "' cmd='" << msg.cmd << "' data='" << msg.data << "'" << endl;
// LogServerTypes::lsMessage msg;
// msg.cmd = cmd;
......@@ -155,7 +180,7 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
send_ok = true;
}
else
rlog.warn() << "**** SEND COMMAND ('" << msg.cmd << "' FAILED!" << endl;
rlog.warn() << "(LogReader): **** SEND COMMAND ('" << msg.cmd << "' FAILED!" << endl;
if( cmdonly )
{
......@@ -171,18 +196,26 @@ void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServ
{
tcp->read(buf,n);
buf[n] = '\0';
cout << buf;
log << buf;
}
else
break;
}
rlog.warn() << "...connection timeout..." << endl;
rlog.warn() << "(LogReader): ...connection timeout..." << endl;
send_ok = false; // ??!! делать ли?
disconnect();
if( readcount > 0 )
n--;
}
if( isConnection() )
disconnect();
}
// -------------------------------------------------------------------------
void LogReader::logOnEvent( const std::string& s )
{
m_logsig.emit(s);
}
// -------------------------------------------------------------------------
......@@ -42,6 +42,8 @@ outTimeout(_outTimeout),
delayTime(_delay),
cancelled(false)
{
log_notify = ATOMIC_VAR_INIT(0);
//slog.addLevel(Debug::ANY);
if( log )
log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
......@@ -49,10 +51,15 @@ cancelled(false)
slog.crit() << "LOG NULL!!" << endl;
}
// -------------------------------------------------------------------------
void LogSession::logOnEvent( const std::string& s )
void LogSession::logOnEvent( const std::string& s )
{
uniset_rwmutex_wrlock l(mLBuf);
lbuf.push_back(s);
{
std::unique_lock<std::mutex> lk(log_mutex);
lbuf.push_back(s);
log_notify = true;
}
log_event.notify_one();
}
// -------------------------------------------------------------------------
void LogSession::run()
......@@ -79,7 +86,7 @@ void LogSession::run()
slog.crit() << peername << "(run): LOG NULL!!" << endl;
ptSessionTimeout.setTiming(sessTimeout);
// ptSessionTimeout.setTiming(sessTimeout);
setKeepAlive(true);
// setTimeout(sessTimeout);
......@@ -201,7 +208,8 @@ void LogSession::run()
ostringstream sbuf;
bool send = false;
{
uniset_rwmutex_wrlock l(mLBuf);
std::unique_lock<std::mutex> lk(log_mutex);
// uniset_rwmutex_wrlock l(mLBuf);
if( !lbuf.empty() )
{
slog.info() << peername << "(run): send messages.." << endl;
......@@ -220,8 +228,13 @@ void LogSession::run()
tcp()->sync();
}
// чтобы постоянно не проверять... (надо переделать на condition)
sleep(delayTime);
{
std::unique_lock<std::mutex> lk(log_mutex);
log_event.wait_for(lk, std::chrono::milliseconds(outTimeout), [&](){ return (log_notify == true || cancelled); } );
}
if( cancelled )
break;
}
}
......
......@@ -24,7 +24,8 @@ test_triggerOUT.cc \
test_pulse.cc \
test_modbustypes.cc \
test_utypes.cc \
test_mutex.cc
test_mutex.cc \
test_logserver.cc
tests_with_conf_LDADD = $(top_builddir)/lib/libUniSet2.la $(COV_LIBS)
tests_with_conf_CPPFLAGS = -I$(top_builddir)/include $(COV_CFLAGS)
......
#include <catch.hpp>
#include <memory>
#include <sstream>
#include <thread>
#include "Mutex.h"
#include "UniSetTypes.h"
#include "LogServer.h"
#include "LogAgregator.h"
#include "LogReader.h"
// --------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// --------------------------------------------------------------------------
static const std::string ip="localhost";
static const int port = 33333;
static const std::string test_msg1="Test message N1";
static const std::string test_msg2="Test message N2";
static ostringstream msg;
static ostringstream msg2;
static ostringstream la_msg;
static uniset_mutex r1_mutex;
static uniset_mutex r2_mutex;
static std::atomic_bool g_read_cancel = ATOMIC_VAR_INIT(0);
static int readTimeout = 4000;
// --------------------------------------------------------------------------
void rlog1OnEvent( const std::string& s )
{
uniset_mutex_lock l(r1_mutex);
msg << s;
}
// --------------------------------------------------------------------------
void rlog2OnEvent( const std::string& s )
{
uniset_mutex_lock l(r2_mutex);
msg2 << s;
}
// --------------------------------------------------------------------------
void la_logOnEvent( const std::string& s )
{
la_msg << s;
}
// --------------------------------------------------------------------------
void readlog_thread1()
{
LogReader lr;
lr.setinTimeout(readTimeout);
lr.signal_stream_event().connect( sigc::ptr_fun(rlog1OnEvent) );
lr.setReadCount(1);
lr.setLogLevel(Debug::ANY);
while( !g_read_cancel )
lr.readlogs(ip, port); // ,LogServerTypes::cmdNOP,0,"",true);
}
// --------------------------------------------------------------------------
void readlog_thread2()
{
LogReader lr;
lr.setinTimeout(readTimeout);
lr.signal_stream_event().connect( sigc::ptr_fun(rlog2OnEvent) );
lr.setReadCount(1);
lr.setLogLevel(Debug::ANY);
while( !g_read_cancel )
lr.readlogs(ip, port); // ,LogServerTypes::cmdNOP,0,"",true);
}
// --------------------------------------------------------------------------
TEST_CASE("LogAgregator", "[LogServer][LogAgregator]" )
{
auto la = make_shared<LogAgregator>();
auto log1 = la->create("log1");
auto log2 = la->create("log2");
log1->showDateTime(false);
log2->showDateTime(false);
log1->showLogType(false);
log2->showLogType(false);
la->signal_stream_event().connect( sigc::ptr_fun(la_logOnEvent) );
log1->level(Debug::ANY);
log2->level(Debug::ANY);
log1->any() << test_msg1;
REQUIRE( la_msg.str() == test_msg1 );
la_msg.str("");
log2->any() << test_msg2;
REQUIRE( la_msg.str() == test_msg2 );
}
// --------------------------------------------------------------------------
TEST_CASE("LogServer", "[LogServer]" )
{
g_read_cancel = false;
auto la = make_shared<LogAgregator>();
auto log1 = la->create("log1");
auto log2 = la->create("log2");
log1->level(Debug::ANY);
log2->level(Debug::ANY);
log1->showDateTime(false);
log2->showDateTime(false);
log1->showLogType(false);
log2->showLogType(false);
la_msg.str("");
la->signal_stream_event().connect( sigc::ptr_fun(la_logOnEvent) );
LogServer ls(la);
//ls.setSessionLog(Debug::ANY);
ls.run( ip, port, true );
for( int i=0; i<3 && !ls.isRunning(); i++ )
msleep(500);
CHECK( ls.isRunning() );
msg.str("");
auto r_thr = make_shared<std::thread>(readlog_thread1);
msleep(100); // небольшая пауза на создание потока и т.п.
ostringstream m;
m << test_msg1 << endl;
// по сети строка информации передаваться не будет пока не будет записан endl!
// это сделано для "оптимизации передачи" (чтобы не передавать каждый байт)
log1->any() << m.str();
REQUIRE( la_msg.str() == m.str() );
msleep(readTimeout); // пауза на переподключение reader-а к серверу..
{
uniset_mutex_lock l(r1_mutex);
REQUIRE( msg.str() == m.str() );
}
msg.str("");
la_msg.str("");
ostringstream m2;
m2 << test_msg2 << endl;
log2->any() << m2.str();
REQUIRE( la_msg.str() == m2.str() );
msleep(readTimeout); // пауза на переподключение reader-а к серверу..
{
uniset_mutex_lock l(r1_mutex);
REQUIRE( msg.str() == m2.str() );
}
g_read_cancel = true;
if( r_thr->joinable() )
r_thr->join();
}
// --------------------------------------------------------------------------
TEST_CASE("MaxSessions", "[LogServer]" )
{
g_read_cancel = false;
auto la = make_shared<LogAgregator>();
auto log1 = la->create("log1");
auto log2 = la->create("log2");
log1->level(Debug::ANY);
log2->level(Debug::ANY);
log1->showDateTime(false);
log2->showDateTime(false);
log1->showLogType(false);
log2->showLogType(false);
la_msg.str("");
la->signal_stream_event().connect( sigc::ptr_fun(la_logOnEvent) );
LogServer ls(la);
//ls.setSessionLog(Debug::ANY);
ls.setMaxSessionCount(1);
ls.run( ip, port, true );
for( int i=0; i<3 && !ls.isRunning(); i++ )
msleep(500);
CHECK( ls.isRunning() );
msg.str("");
msg2.str("");
auto r1_thr = make_shared<std::thread>(readlog_thread1);
msleep(500); // пауза чтобы первый заведомо успел подключиться раньше второго..
auto r2_thr = make_shared<std::thread>(readlog_thread2);
msleep(100); // небольшая пауза на создание потока и т.п.
ostringstream m;
m << test_msg1 << endl;
// по сети строка информации передаваться не будет пока не будет записан endl!
// это сделано для "оптимизации передачи" (чтобы не передавать каждый байт)
log1->any() << m.str();
REQUIRE( la_msg.str() == m.str() );
msleep(readTimeout); // пауза на переподключение reader-а к серверу..
{
uniset_mutex_lock l(r1_mutex);
REQUIRE( msg.str() == m.str() );
}
{
uniset_mutex_lock l(r2_mutex);
// Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1'
size_t pos = msg2.str().find("Exceeded the limit");
REQUIRE( pos != std::string::npos );
}
g_read_cancel = true;
if( r1_thr->joinable() )
r1_thr->join();
if( r2_thr->joinable() )
r2_thr->join();
}
// --------------------------------------------------------------------------
......@@ -794,6 +794,7 @@ tests/test_triggerOUT.cc
tests/test_ui.cc
tests/test_unixml.cc
tests/test_utypes.cc
tests/test_logserver.cc
tests/tests-junit.xml
tests/tests.cc
tests/tests_bad_config.xml
......@@ -866,4 +867,4 @@ Utilities/ULog/Makefile.am
Utilities/Makefile.am
libUniSet2.pc.in
Makefile.am
uniset-config.h
\ No newline at end of file
uniset-config.h
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment