Commit 549a3dc5 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogServer): сделал test-logserver "noinst"

+ ..небольшое переформатирование кода.. (LogServer): оптимизировал работу с очередью
parent 98b2caf9
bin_PROGRAMS = @PACKAGE@-test-logserver @PACKAGE@-log
bin_PROGRAMS = @PACKAGE@-log
noinst_PROGRAMS = @PACKAGE@-test-logserver
@PACKAGE@_test_logserver_SOURCES = logserver.cc
@PACKAGE@_test_logserver_LDADD = $(top_builddir)/lib/libUniSet2.la $(COMCPP_LIBS)
......
......@@ -13,9 +13,9 @@ using namespace std;
static struct option longopts[] = {
{ "help", no_argument, 0, 'h' },
{ "verbose", no_argument, 0, 'v' },
{ "iaddr", required_argument, 0, 'a' },
{ "iaddr", required_argument, 0, 'i' },
{ "port", required_argument, 0, 'p' },
{ "add", required_argument, 0, 'l' },
{ "add", required_argument, 0, 'a' },
{ "del", required_argument, 0, 'd' },
{ "set", required_argument, 0, 's' },
{ "off", required_argument, 0, 'o' },
......@@ -28,12 +28,12 @@ static void print_help()
{
printf("-h, --help - this message\n");
printf("-v, --verbose - Print all messages to stdout\n");
printf("[-a|--iaddr] addr - ULogServer ip or hostname.\n");
printf("[-i|--iaddr] addr - ULogServer ip or hostname.\n");
printf("[-p|--port] port - ULogServer port.\n");
printf("\n");
printf("Commands:\n");
printf("[--add | -l] info,warn,crit,... - Add log levels.\n");
printf("[--add | -a] info,warn,crit,... - Add log levels.\n");
printf("[--del | -d] info,warn,crit,... - Delete log levels.\n");
printf("[--set | -s] info,wanr,crit,... - Set log levels.\n");
printf("--off, -o - Off the write log file (if enabled).\n");
......@@ -55,7 +55,7 @@ int main( int argc, char **argv )
try
{
while( (opt = getopt_long(argc, argv, "hva:p:l:d:s:onr",longopts,&optindex)) != -1 )
while( (opt = getopt_long(argc, argv, "hva:p:i:d:s:onr",longopts,&optindex)) != -1 )
{
switch (opt)
{
......@@ -63,7 +63,7 @@ int main( int argc, char **argv )
print_help();
return 0;
case 'l':
case 'a':
{
cmd = LogServerTypes::cmdAddLevel;
sdata = string(optarg);
......@@ -91,7 +91,7 @@ int main( int argc, char **argv )
cmd = LogServerTypes::cmdRotate;
break;
case 'a':
case 'i':
addr = string(optarg);
break;
......
......@@ -82,6 +82,7 @@ int main( int argc, char **argv )
}
LogServer ls(dlog);
// LogServer ls(cout);
dlog.addLevel(Debug::ANY);
ls.run( addr, port, true );
......
......@@ -32,6 +32,9 @@
</Services>
</UniSet>
<dlog name="dlog"/>
<LogServer name="dlog" port="3333" host="localhost" />
<LogServer name="ulog" port="3335" host="localhost" />
<settings>
<TestProc name="TestProc1"
on_s="Input1_S"
......
......@@ -16,14 +16,16 @@
#ifdef UNISET_ENABLE_IO
#include "IOControl.h"
#endif
#include "LogServer.h"
// --------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// --------------------------------------------------------------------------
const int MaxAddNum = 10;
const unsigned int MaxAddNum = 10;
// --------------------------------------------------------------------------
static void help_print( int argc, const char* argv[] );
static LogServer* run_logserver( const std::string& cnamem, DebugStream& log );
// --------------------------------------------------------------------------
int main( int argc, const char **argv )
{
......@@ -40,8 +42,8 @@ int main( int argc, const char **argv )
string logfilename = conf->getArgParam("--logfile", "smemory-plus.log");
string logname( conf->getLogDir() + logfilename );
UniSetExtensions::dlog.logFile( logname );
ulog.logFile( logname );
// UniSetExtensions::dlog.logFile( logname );
// ulog.logFile( logname );
conf->initDebug(UniSetExtensions::dlog,"dlog");
UniSetActivator* act = UniSetActivator::Instance();
......@@ -122,7 +124,7 @@ int main( int argc, const char **argv )
stringstream p;
p << "mbs";
if( i > 0 ) p << i;
if( dlog.is_info() )
dlog.info() << "(smemory-plus): add MBSlave(" << p.str() << ")" << endl;
......@@ -184,6 +186,18 @@ int main( int argc, const char **argv )
(*it)->start();
#endif
if( run_logserver("ulog",ulog) == 0 )
{
ulog.crit() << "(smemory-plus): run logserver for 'ulog' FAILED" << endl;
return 1;
}
if( run_logserver("dlog",dlog) == 0 )
{
dlog.crit() << "(smemory-plus): run logserver for 'dlog' FAILED" << endl;
return 1;
}
act->run(false);
on_sigchild(SIGTERM);
......@@ -240,3 +254,47 @@ void help_print( int argc, const char* argv[] )
cout << "--confile - Use confile. Default: configure.xml" << endl;
cout << "--logfile - Use logfile. Default: smemory-plus.log" << endl;
}
// -----------------------------------------------------------------------------
LogServer* run_logserver( const std::string& cname, DebugStream& log )
{
const UniXML* xml = UniSetTypes::conf->getConfXML();
xmlNode* cnode = UniSetTypes::conf->findNode(xml->getFirstNode(),"LogServer",cname);
if( cnode == 0 )
{
cerr << "(init_ulogserver): Not found xmlnode for '" << cname << "'" << endl;
return 0;
}
UniXML::iterator it(cnode);
LogServer* ls = new LogServer( log );
timeout_t sessTimeout = conf->getArgPInt("--" + cname + "-session-timeout",it.getProp("sessTimeout"),3600000);
timeout_t cmdTimeout = conf->getArgPInt("--" + cname + "-cmd-timeout",it.getProp("cmdTimeout"),2000);
timeout_t outTimeout = conf->getArgPInt("--" + cname + "-out-timeout",it.getProp("outTimeout"),2000);
ls->setSessionTimeout(sessTimeout);
ls->setCmdTimeout(cmdTimeout);
ls->setOutTimeout(outTimeout);
std::string host = conf->getArgParam("--" + cname + "-host",it.getProp("host"));
if( host.empty() )
{
cerr << "(init_ulogserver): " << cname << ": unknown host.." << endl;
delete ls;
return 0;
}
ost::tpport_t port = conf->getArgPInt("--" + cname + "-port",it.getProp("port"),0);
if( port == 0 )
{
cerr << "(init_ulogserver): " << cname << ": unknown port.." << endl;
delete ls;
return 0;
}
ls->run(host, port, true);
return ls;
}
// -----------------------------------------------------------------------------
......@@ -31,6 +31,7 @@ ${START} -f ./uniset2-smemory-plus --smemory-id SharedMemory --confile test.xml
--mbtcp2-gateway-port 2049 \
--mbtcp2-recv-timeout 200 \
--mbtcp2-force-out 1 \
$*
# --add-rtu \
# --rs-dev /dev/cbsideA1 \
# --rs-id RTUExchange \
......
......@@ -24,7 +24,6 @@
#include <sigc++/sigc++.h>
#include "Debug.h"
#ifdef TEST_DEBUGSTREAM
#include <string>
struct Debug {
......@@ -201,7 +200,7 @@ public:
const DebugStream &operator=(const DebugStream& r);
protected:
void sbuf_overflow( const std::string& s );
void sbuf_overflow( const std::string& s );
private:
/// The current debug level
......@@ -210,14 +209,14 @@ private:
std::ostream nullstream;
///
struct debugstream_internal;
struct debugstream_sbuf;
struct debugstream_sbuf;
///
debugstream_internal * internal;
debugstream_sbuf * internal_sbuf;
debugstream_sbuf * internal_sbuf;
bool show_datetime;
std::string fname;
StreamEvent_Signal s_stream;
StreamEvent_Signal s_stream;
};
#endif
......@@ -15,6 +15,7 @@ class LogServer
public:
LogServer( DebugStream& log );
LogServer( std::ostream& os );
~LogServer();
inline void setSessionTimeout( timeout_t msec ){ sessTimeout = msec; }
......@@ -45,6 +46,7 @@ class LogServer
ost::TCPSocket* tcp;
DebugStream* elog;
std::ostream* oslog;
};
// -------------------------------------------------------------------------
#endif // LogServer_H_
......
......@@ -6,34 +6,34 @@
// -------------------------------------------------------------------------
namespace LogServerTypes
{
const unsigned int MAGICNUM = 0x20140904;
enum Command
{
cmdNOP, /*!< отсутствие команды */
cmdSetLevel, /*!< установить уровень вывода */
cmdAddLevel, /*!< добавить уровень вывода */
cmdDelLevel, /*!< удалить уровень вывода */
cmdRotate, /*!< пересоздать файл с логами */
cmdOffLogFile, /*!< отключить запись файла логов (если включена) */
cmdOnLogFile /*!< включить запись файла логов (если была отключена) */
// cmdSetLogFile
};
const unsigned int MAGICNUM = 0x20140904;
enum Command
{
cmdNOP, /*!< отсутствие команды */
cmdSetLevel, /*!< установить уровень вывода */
cmdAddLevel, /*!< добавить уровень вывода */
cmdDelLevel, /*!< удалить уровень вывода */
cmdRotate, /*!< пересоздать файл с логами */
cmdOffLogFile, /*!< отключить запись файла логов (если включена) */
cmdOnLogFile /*!< включить запись файла логов (если была отключена) */
// cmdSetLogFile
};
std::ostream& operator<<(std::ostream& os, Command c );
std::ostream& operator<<(std::ostream& os, Command c );
struct lsMessage
{
lsMessage():magic(MAGICNUM),cmd(cmdNOP),data(0){}
unsigned int magic;
Command cmd;
unsigned int data;
struct lsMessage
{
lsMessage():magic(MAGICNUM),cmd(cmdNOP),data(0){}
unsigned int magic;
Command cmd;
unsigned int data;
// для команды 'cmdSetLogFile'
// static const short MAXLOGFILENAME = 100;
// char logname[MAXLOGFILENAME];
}__attribute__((packed));
std::ostream& operator<<(std::ostream& os, lsMessage& m );
// для команды 'cmdSetLogFile'
// static const short MAXLOGFILENAME = 100;
// char logname[MAXLOGFILENAME];
}__attribute__((packed));
std::ostream& operator<<(std::ostream& os, lsMessage& m );
}
// -------------------------------------------------------------------------
#endif // LogServerTypes_H_
......
......@@ -14,7 +14,7 @@ class LogSession:
{
public:
LogSession( ost::TCPSocket &server, DebugStream* log, timeout_t sessTimeout=10000, timeout_t cmdTimeout=2000, timeout_t outTimeout=2000 );
LogSession( ost::TCPSocket &server, DebugStream* log, timeout_t sessTimeout=10000, timeout_t cmdTimeout=2000, timeout_t outTimeout=2000, timeout_t delay=2000 );
virtual ~LogSession();
typedef sigc::slot<void, LogSession*> FinalSlot;
......@@ -25,7 +25,8 @@ class LogSession:
protected:
virtual void run();
virtual void final();
void logOnEvent( const std::string& s );
void logOnEvent( const std::string& s );
void readStream();
private:
typedef std::deque<std::string> LogBuffer;
......@@ -37,6 +38,7 @@ class LogSession:
timeout_t sessTimeout;
timeout_t cmdTimeout;
timeout_t outTimeout;
timeout_t delayTime;
PassiveTimer ptSessionTimeout;
FinalSlot slFin;
......@@ -44,7 +46,6 @@ class LogSession:
UniSetTypes::uniset_rwmutex mLBuf;
DebugStream slog;
};
// -------------------------------------------------------------------------
#endif // LogSession_H_
......
......@@ -33,8 +33,8 @@
#include "IONotifyController.h"
// ------------------------------------------------------------------------------------------
/*!
Интерфейс для записи в файл и восстановления из файла списка заказчиков по датчикам для
IONotifyController-а (NC).
Интерфейс для записи в файл и восстановления из файла списка заказчиков по датчикам для
IONotifyController-а (NC).
\note Это абстрактный интерфейс. В чистом виде не используется.
*/
......@@ -108,8 +108,8 @@ class NCRestorer
// ------------------------------------------------------------------------------------------
/*!
* \brief Реализация сохранения списка заказчиков в xml.
Данный класс работает с глобальным xml-файлом проекта (обычно configure.xml),
поэтому НЕ реализаует функции записи (dump)-а.
Данный класс работает с глобальным xml-файлом проекта (обычно configure.xml),
поэтому НЕ реализаует функции записи (dump)-а.
*/
class NCRestorer_XML:
public Restorer_XML,
......
......@@ -19,10 +19,10 @@ iaddr("")
LogReader::~LogReader()
{
if( isConnection() )
{
{
(*tcp) << endl;
disconnect();
}
}
}
// -------------------------------------------------------------------------
void LogReader::connect( const std::string& addr, ost::tpport_t _port, timeout_t msec )
......@@ -104,69 +104,69 @@ bool LogReader::isConnection()
// -------------------------------------------------------------------------
void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, LogServerTypes::Command cmd, int data, bool verbose )
{
timeout_t inTimeout = 10000;
timeout_t outTimeout = 6000;
timeout_t reconDelay = 5000;
char buf[100001];
if( verbose )
rlog.addLevel(Debug::ANY);
bool send_ok = false;
while( true )
{
if( !isConnection() )
connect(_addr,_port,reconDelay);
if( !isConnection() )
{
rlog.warn() << "**** connection timeout.." << endl;
msleep(reconDelay);
continue;
}
if( !send_ok && cmd != LogServerTypes::cmdNOP )
{
if( tcp->isPending(ost::Socket::pendingOutput,outTimeout) )
{
rlog.info() << "** send command: '" << cmd << "' data='" << data << "'" << endl;
LogServerTypes::lsMessage msg;
msg.cmd = cmd;
msg.data = data;
for( size_t i=0; i<sizeof(msg); i++ )
(*tcp) << ((unsigned char*)(&msg))[i];
tcp->sync();
send_ok = true;
}
else
rlog.warn() << "**** SEND COMMAND ('" << cmd << "' FAILED!" << endl;
}
while( tcp->isPending(ost::Socket::pendingInput,inTimeout) )
{
int n = tcp->peek( buf,sizeof(buf)-1 );
if( n > 0 )
{
tcp->read(buf,n);
buf[n] = '\0';
cout << buf;
}
else
break;
}
rlog.warn() << "...connection timeout..." << endl;
send_ok = false; // ??!! делать ли?
disconnect();
}
timeout_t inTimeout = 10000;
timeout_t outTimeout = 6000;
timeout_t reconDelay = 5000;
char buf[100001];
if( verbose )
rlog.addLevel(Debug::ANY);
bool send_ok = false;
while( true )
{
if( !isConnection() )
connect(_addr,_port,reconDelay);
if( !isConnection() )
{
rlog.warn() << "**** connection timeout.." << endl;
msleep(reconDelay);
continue;
}
if( !send_ok && cmd != LogServerTypes::cmdNOP )
{
if( tcp->isPending(ost::Socket::pendingOutput,outTimeout) )
{
rlog.info() << "** send command: '" << cmd << "' data='" << data << "'" << endl;
LogServerTypes::lsMessage msg;
msg.cmd = cmd;
msg.data = data;
for( size_t i=0; i<sizeof(msg); i++ )
(*tcp) << ((unsigned char*)(&msg))[i];
tcp->sync();
send_ok = true;
}
else
rlog.warn() << "**** SEND COMMAND ('" << cmd << "' FAILED!" << endl;
}
while( tcp->isPending(ost::Socket::pendingInput,inTimeout) )
{
int n = tcp->peek( buf,sizeof(buf)-1 );
if( n > 0 )
{
tcp->read(buf,n);
buf[n] = '\0';
cout << buf;
}
else
break;
}
rlog.warn() << "...connection timeout..." << endl;
send_ok = false; // ??!! делать ли?
disconnect();
}
if( isConnection() )
{
(*tcp) << endl;
disconnect();
}
{
(*tcp) << endl;
disconnect();
}
}
// -------------------------------------------------------------------------
......@@ -9,19 +9,32 @@ using namespace UniSetTypes;
// -------------------------------------------------------------------------
LogServer::~LogServer()
{
cancelled = true;
cancelled = true;
{
uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
delete i;
}
{
uniset_rwmutex_wrlock l(mutSList);
for( auto& i: slist )
delete i;
}
if( thr )
{
thr->stop();
delete thr;
}
if( thr )
{
thr->stop();
delete thr;
}
}
// -------------------------------------------------------------------------
LogServer::LogServer( std::ostream& os ):
timeout(TIMEOUT_INF),
sessTimeout(3600000),
cmdTimeout(2000),
outTimeout(2000),
cancelled(false),
thr(0),
tcp(0),
elog(0),
oslog(&os)
{
}
// -------------------------------------------------------------------------
LogServer::LogServer( DebugStream& log ):
......@@ -32,7 +45,8 @@ outTimeout(2000),
cancelled(false),
thr(0),
tcp(0),
elog(&log)
elog(&log),
oslog(0)
{
}
// -------------------------------------------------------------------------
......@@ -50,89 +64,89 @@ elog(0)
// -------------------------------------------------------------------------
void LogServer::run( const std::string& addr, ost::tpport_t port, bool thread )
{
try
{
ost::InetAddress iaddr(addr.c_str());
tcp = new ost::TCPSocket(iaddr,port);
}
catch( ost::Socket *socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
ostringstream err;
try
{
ost::InetAddress iaddr(addr.c_str());
tcp = new ost::TCPSocket(iaddr,port);
}
catch( ost::Socket *socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
ostringstream err;
err << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum;
err << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum;
if( errnum == ost::Socket::errBindingFailed )
err << "bind failed; port busy" << endl;
else
err << "client socket failed" << endl;
if( errnum == ost::Socket::errBindingFailed )
err << "bind failed; port busy" << endl;
else
err << "client socket failed" << endl;
throw SystemError( err.str() );
}
throw SystemError( err.str() );
}
if( !thread )
work();
else
{
if( !thread )
work();
else
{
thr = new ThreadCreator<LogServer>(this, &LogServer::work);
thr->start();
}
}
}
// -------------------------------------------------------------------------
void LogServer::work()
{
cancelled = false;
while( !cancelled )
{
try
{
while( tcp->isPendingConnection(timeout) )
{
LogSession* s = new LogSession(*tcp, elog, sessTimeout, cmdTimeout, outTimeout);
{
uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s);
}
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
s->detach();
}
}
catch( ost::Socket *socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
cerr << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum << endl;
if( errnum == ost::Socket::errBindingFailed )
{
cerr << "bind failed; port busy" << endl;
// ::exit(-1);
}
else
cerr << "client socket failed" << endl;
}
}
cancelled = false;
while( !cancelled )
{
try
{
while( tcp->isPendingConnection(timeout) )
{
LogSession* s = new LogSession(*tcp, elog, sessTimeout, cmdTimeout, outTimeout);
{
uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s);
}
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
s->detach();
}
}
catch( ost::Socket *socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
cerr << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum << endl;
if( errnum == ost::Socket::errBindingFailed )
{
cerr << "bind failed; port busy" << endl;
// ::exit(-1);
}
else
cerr << "client socket failed" << endl;
}
}
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
(*i)->disconnect();
}
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
(*i)->disconnect();
}
}
// -------------------------------------------------------------------------
void LogServer::sessionFinished( LogSession* s )
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
{
if( (*i) == s )
{
// cerr << "session '" << s->getClientAddress() << "' closed.." << endl;
slist.erase(i);
return;
}
}
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
{
if( (*i) == s )
{
// cerr << "session '" << s->getClientAddress() << "' closed.." << endl;
slist.erase(i);
return;
}
}
}
// -------------------------------------------------------------------------
......@@ -5,35 +5,35 @@ using namespace std;
// -------------------------------------------------------------------------
std::ostream& LogServerTypes::operator<<(std::ostream& os, LogServerTypes::Command cmd )
{
switch( cmd )
{
case LogServerTypes::cmdSetLevel:
return os << "cmdSetLevel";
switch( cmd )
{
case LogServerTypes::cmdSetLevel:
return os << "cmdSetLevel";
case LogServerTypes::cmdAddLevel:
return os << "cmdAddLevel";
case LogServerTypes::cmdAddLevel:
return os << "cmdAddLevel";
case LogServerTypes::cmdDelLevel:
return os << "cmdDelLevel";
case LogServerTypes::cmdDelLevel:
return os << "cmdDelLevel";
case LogServerTypes::cmdRotate:
return os << "cmdRotate";
case LogServerTypes::cmdRotate:
return os << "cmdRotate";
case LogServerTypes::cmdOffLogFile:
return os << "cmdOffLogFile";
case LogServerTypes::cmdOffLogFile:
return os << "cmdOffLogFile";
case LogServerTypes::cmdOnLogFile:
return os << "cmdOnLogFile";
case LogServerTypes::cmdOnLogFile:
return os << "cmdOnLogFile";
default:
return os << "Unknown";
}
default:
return os << "Unknown";
}
return os;
return os;
}
// -------------------------------------------------------------------------
std::ostream& LogServerTypes::operator<<(std::ostream& os, LogServerTypes::lsMessage& m )
{
return os << " magic=" << m.magic << " cmd=" << m.cmd << " data=" << m.data;
return os << " magic=" << m.magic << " cmd=" << m.cmd << " data=" << m.data;
}
// -------------------------------------------------------------------------
#include <iostream>
#include <string>
#include <sstream>
#include <fcntl.h>
#include <errno.h>
#include <cstring>
......@@ -15,13 +16,13 @@ LogSession::~LogSession()
{
cancelled = true;
if( isRunning() )
{
{
disconnect();
ost::Thread::join();
}
}
}
// -------------------------------------------------------------------------
LogSession::LogSession( ost::TCPSocket &server, DebugStream* _log, timeout_t _sessTimeout, timeout_t _cmdTimeout, timeout_t _outTimeout ):
LogSession::LogSession( ost::TCPSocket &server, DebugStream* _log, timeout_t _sessTimeout, timeout_t _cmdTimeout, timeout_t _outTimeout, timeout_t _delay ):
TCPSession(server),
peername(""),
caddr(""),
......@@ -29,16 +30,17 @@ log(_log),
sessTimeout(_sessTimeout),
cmdTimeout(_cmdTimeout),
outTimeout(_outTimeout),
delayTime(_delay),
cancelled(false)
{
// slog.addLevel(Debug::ANY);
log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
// slog.addLevel(Debug::ANY);
log->signal_stream_event().connect( sigc::mem_fun(this, &LogSession::logOnEvent) );
}
// -------------------------------------------------------------------------
void LogSession::logOnEvent( const std::string& s )
{
uniset_rwmutex_wrlock l(mLBuf);
lbuf.push_back(s);
uniset_rwmutex_wrlock l(mLBuf);
lbuf.push_back(s);
}
// -------------------------------------------------------------------------
void LogSession::run()
......@@ -68,92 +70,109 @@ void LogSession::run()
// Команды могут посылаться только в начале сессии..
if( isPending(Socket::pendingInput, cmdTimeout) )
{
LogServerTypes::lsMessage msg;
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek( (void*)(&msg),sizeof(msg)) > 0 )
{
ssize_t ret = readData( &msg,sizeof(msg) );
if( ret!=sizeof(msg) || msg.magic!=LogServerTypes::MAGICNUM )
slog.warn() << peername << "(run): BAD MESSAGE..." << endl;
else
{
slog.info() << peername << "(run): receive command: '" << msg.cmd << "'" << endl;
// Обработка команд..
// \warning Работа с логом ведётся без mutex-а, хотя он разделяется отдельными потоками
switch( msg.cmd )
{
case LogServerTypes::cmdSetLevel:
log->level( (Debug::type)msg.data );
break;
case LogServerTypes::cmdAddLevel:
log->addLevel((Debug::type)msg.data );
break;
case LogServerTypes::cmdDelLevel:
log->delLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdRotate:
{
string lfile( log->getLogFile() );
if( !lfile.empty() )
log->logFile(lfile);
}
break;
case LogServerTypes::cmdOffLogFile:
{
string lfile( log->getLogFile() );
if( !lfile.empty() )
log->logFile("");
}
break;
case LogServerTypes::cmdOnLogFile:
{
if( !oldLogFile.empty() && oldLogFile != log->getLogFile() )
log->logFile(oldLogFile);
}
break;
default:
slog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break;
}
}
}
}
LogServerTypes::lsMessage msg;
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek( (void*)(&msg),sizeof(msg)) > 0 )
{
ssize_t ret = readData( &msg,sizeof(msg) );
if( ret!=sizeof(msg) || msg.magic!=LogServerTypes::MAGICNUM )
slog.warn() << peername << "(run): BAD MESSAGE..." << endl;
else
{
slog.info() << peername << "(run): receive command: '" << msg.cmd << "'" << endl;
// Обработка команд..
// \warning Работа с логом ведётся без mutex-а, хотя он разделяется отдельными потоками
switch( msg.cmd )
{
case LogServerTypes::cmdSetLevel:
log->level( (Debug::type)msg.data );
break;
case LogServerTypes::cmdAddLevel:
log->addLevel((Debug::type)msg.data );
break;
case LogServerTypes::cmdDelLevel:
log->delLevel( (Debug::type)msg.data );
break;
case LogServerTypes::cmdRotate:
{
string lfile( log->getLogFile() );
if( !lfile.empty() )
log->logFile(lfile);
}
break;
case LogServerTypes::cmdOffLogFile:
{
string lfile( log->getLogFile() );
if( !lfile.empty() )
log->logFile("");
}
break;
case LogServerTypes::cmdOnLogFile:
{
if( !oldLogFile.empty() && oldLogFile != log->getLogFile() )
log->logFile(oldLogFile);
}
break;
default:
slog.warn() << peername << "(run): Unknown command '" << msg.cmd << "'" << endl;
break;
}
}
}
}
cancelled = false;
while( !cancelled && !ptSessionTimeout.checkTime() )
{
// проверка только ради проверки "целостности" соединения
if( isPending(Socket::pendingInput, 10) )
{
char buf[10];
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek(buf,sizeof(buf)) <=0 )
break;
}
// проверка только ради проверки "целостности" соединения
if( isPending(Socket::pendingInput, 10) )
{
char buf[10];
// проверяем канал..(если данных нет, значит "клиент отвалился"...
if( peek(buf,sizeof(buf)) <=0 )
break;
}
if( isPending(Socket::pendingOutput, outTimeout) )
{
// slog.warn() << peername << "(run): send.." << endl;
ptSessionTimeout.reset();
uniset_rwmutex_wrlock l(mLBuf);
if( !lbuf.empty() )
{
slog.info() << peername << "(run): send messages.." << endl;
while( !lbuf.empty() )
// slog.warn() << peername << "(run): send.." << endl;
ptSessionTimeout.reset();
if( log )
{
// чтобы не застревать на посылке в сеть..
// делаем через промежуточный буффер (stringstream)
ostringstream sbuf;
bool send = false;
{
*tcp() << lbuf.front();
lbuf.pop_front();
uniset_rwmutex_wrlock l(mLBuf);
if( !lbuf.empty() )
{
slog.info() << peername << "(run): send messages.." << endl;
while( !lbuf.empty() )
{
sbuf << lbuf.front();
lbuf.pop_front();
}
send = true;
}
}
tcp()->sync();
if( send )
{
*tcp() << sbuf.str();
tcp()->sync();
}
// чтобы постоянно не проверять... (надо переделать на condition)
sleep(delayTime);
}
}
}
}
if( slog.debugging(Debug::INFO) )
......
......@@ -16,6 +16,7 @@
//#include "DebugStream.h"
#include "Debug.h"
#include "Mutex.h"
//�Since the current C++ lib in egcs does not have a standard implementation
// of basic_streambuf and basic_filebuf we don't have to include this
......@@ -229,11 +230,10 @@ private:
};
///
class stringsigbuf : public stringbuf {
class stringsigbuf : public streambuf {
public:
stringsigbuf():sb(0)
stringsigbuf():sb(new stringbuf())
{
sb = new stringbuf();
}
~stringsigbuf()
......@@ -243,7 +243,7 @@ public:
///
stringsigbuf( stringbuf* b )
: stringbuf(), sb(b) {}
: streambuf(), sb(b) {}
typedef sigc::signal<void,const std::string&> StrBufOverflow_Signal;
inline StrBufOverflow_Signal signal_overflow(){ return s_overflow; }
......@@ -252,11 +252,13 @@ protected:
#ifdef MODERN_STL_STREAMS
///
virtual int sync() {
UniSetTypes::uniset_mutex_lock l(mut);
return sb->pubsync();
}
///
virtual streamsize xsputn(char_type const * p, streamsize n) {
UniSetTypes::uniset_mutex_lock l(mut);
return sb->sputn(p, n);
}
///
......@@ -264,9 +266,12 @@ protected:
int_type r = sb->sputc(c);
if( r == '\n' )
{
UniSetTypes::uniset_mutex_lock l(mut);
s_overflow.emit( sb->str() );
delete sb;
sb = new stringbuf();
sb->str("");
// stringbuf* old = sb;
// sb = new stringbuf();
// delete old;
}
return r;
}
......@@ -287,7 +292,15 @@ protected:
if( r == '\n' )
{
s_overflow.emit( sb->str() );
sb->rdbug.clear();
if( r == '\n' )
{
s_overflow.emit( sb->str() );
uniset_mutex_lock l(mut);
// из-за многопоточности.. сперва переприсвоим указатель на новый поток..
// а потом уже удалим старый..
stringbuf* old = sb;
sb = new stringbuf();
delete old;
}
return r;
}
......@@ -295,7 +308,8 @@ protected:
private:
///
StrBufOverflow_Signal s_overflow;
stringbuf* sb;
stringbuf* sb;
UniSetTypes::uniset_mutex mut;
};
//--------------------------------------------------------------------------
......@@ -376,6 +390,10 @@ void DebugStream::logFile( const std::string& f )
internal = new debugstream_internal;
}
if( !internal_sbuf ) {
internal_sbuf = new debugstream_sbuf;
}
if( !f.empty() )
{
internal->fbuf.open(f.c_str(), ios::out|ios::app);
......
......@@ -30,8 +30,9 @@ int main( int argc, const char **argv )
if( tlog.is_level1() )
tlog.level1() << ": is level1..." << endl;
cout << "********" << endl;
cout << "==================" << endl;
cout << ss.str();
cout << "==================" << endl;
return 0;
}
......@@ -3,7 +3,7 @@
#include "Mutex.h"
#include "ThreadCreator.h"
#include "UniSetTypes.h"
#include "modbus/TCPCheck.h"
#include "TCPCheck.h"
using namespace std;
using namespace UniSetTypes;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment