Commit f7613d10 authored by Pavel Vainerman's avatar Pavel Vainerman

Предварительная реализация LogServer и LogReader

parent 33242d3a
......@@ -3,7 +3,7 @@
############################################################################
SUBDIRS = scripts Admin NullController SViewer-text \
SMonit MBTester codegen SImitator
SMonit MBTester codegen SImitator ULog
bin_PROGRAMS = @PACKAGE@-test-logserver @PACKAGE@-log
@PACKAGE@_test_logserver_SOURCES = logserver.cc
@PACKAGE@_test_logserver_LDADD = $(top_builddir)/lib/libUniSet2.la $(COMCPP_LIBS)
@PACKAGE@_test_logserver_CPPFLAGS = $(COMCPP_CFLAGS)
@PACKAGE@_log_SOURCES = log.cc
@PACKAGE@_log_LDADD = $(top_builddir)/lib/libUniSet2.la $(COMCPP_LIBS)
@PACKAGE@_log_CPPFLAGS = $(COMCPP_CFLAGS)
// --------------------------------------------------------------------------
#include <string>
#include <getopt.h>
#include "Debug.h"
#include "UniSetTypes.h"
#include "Exceptions.h"
#include "LogReader.h"
// --------------------------------------------------------------------------
using namespace UniSetTypes;
using namespace std;
// --------------------------------------------------------------------------
static struct option longopts[] = {
{ "help", no_argument, 0, 'h' },
{ "iaddr", required_argument, 0, 'a' },
{ "port", required_argument, 0, 'p' },
{ "verbose", no_argument, 0, 'v' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
static void print_help()
{
printf("-h|--help - this message\n");
// printf("[-t|--timeout] msec - Timeout. Default: 2000.\n");
printf("[-v|--verbose] - Print all messages to stdout\n");
printf("[-a|--iaddr] addr - Inet address for listen connections.\n");
printf("[-p|--port] port - Bind port.\n");
}
// --------------------------------------------------------------------------
int main( int argc, char **argv )
{
int optindex = 0;
int opt = 0;
int verb = 0;
string addr("localhost");
int port = 3333;
int tout = 2000;
DebugStream dlog;
try
{
while( (opt = getopt_long(argc, argv, "hva:p:",longopts,&optindex)) != -1 )
{
switch (opt)
{
case 'h':
print_help();
return 0;
case 'a':
addr = string(optarg);
break;
case 'p':
port = uni_atoi(optarg);
break;
case 'v':
verb = 1;
break;
case '?':
default:
printf("? argumnet\n");
return 0;
}
}
if( verb )
{
cout << "(init): read from " << addr << ":" << port
// << " timeout=" << tout << " msec "
<< endl;
dlog.addLevel( Debug::type(Debug::CRIT | Debug::WARN | Debug::INFO) );
}
LogReader lr;
lr.readlogs( addr, port, TIMEOUT_INF );
}
catch( SystemError& err )
{
cerr << "(log): " << err << endl;
}
catch( Exception& ex )
{
cerr << "(log): " << ex << endl;
}
catch(...)
{
cerr << "(log): catch(...)" << endl;
}
return 0;
}
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
#include <string>
#include <getopt.h>
#include "Debug.h"
#include "UniSetTypes.h"
#include "Exceptions.h"
#include "LogServer.h"
// --------------------------------------------------------------------------
using namespace UniSetTypes;
using namespace std;
// --------------------------------------------------------------------------
static struct option longopts[] = {
{ "help", no_argument, 0, 'h' },
{ "iaddr", required_argument, 0, 'a' },
{ "port", required_argument, 0, 'p' },
{ "verbose", no_argument, 0, 'v' },
{ NULL, 0, 0, 0 }
};
// --------------------------------------------------------------------------
static void print_help()
{
printf("-h|--help - this message\n");
// printf("[-t|--timeout] msec - Timeout. Default: 2000.\n");
printf("[-v|--verbose] - Print all messages to stdout\n");
printf("[-a|--iaddr] addr - Inet address for listen connections.\n");
printf("[-p|--port] port - Bind port.\n");
}
// --------------------------------------------------------------------------
int main( int argc, char **argv )
{
int optindex = 0;
int opt = 0;
int verb = 0;
string addr("localhost");
int port = 3333;
int tout = 2000;
DebugStream dlog;
try
{
while( (opt = getopt_long(argc, argv, "hva:p:",longopts,&optindex)) != -1 )
{
switch (opt)
{
case 'h':
print_help();
return 0;
case 'a':
addr = string(optarg);
break;
case 'p':
port = uni_atoi(optarg);
break;
case 'v':
verb = 1;
break;
case '?':
default:
printf("? argumnet\n");
return 0;
}
}
if( verb )
{
cout << "(init): listen " << addr << ":" << port
// << " timeout=" << tout << " msec "
<< endl;
dlog.addLevel( Debug::type(Debug::CRIT | Debug::WARN | Debug::INFO) );
}
LogServer ls;
ls.run( addr, port, TIMEOUT_INF, false );
}
catch( SystemError& err )
{
cerr << "(logserver): " << err << endl;
}
catch( Exception& ex )
{
cerr << "(logserver): " << ex << endl;
}
catch(...)
{
cerr << "(logserver): catch(...)" << endl;
}
return 0;
}
// --------------------------------------------------------------------------
......@@ -221,12 +221,14 @@ AC_CONFIG_FILES([Makefile
IDL/Processes/Makefile
src/Communications/Makefile
src/Communications/Modbus/Makefile
src/Communications/TCP/Makefile
src/Interfaces/Makefile
src/ObjectRepository/Makefile
src/Processes/Makefile
src/Services/Makefile
src/Timers/Makefile
src/Various/Makefile
src/LogServer/Makefile
src/Makefile
include/Makefile
include/modbus/Makefile
......@@ -246,6 +248,7 @@ AC_CONFIG_FILES([Makefile
Utilities/codegen/Makefile
Utilities/codegen/uniset2-codegen
Utilities/codegen/tests/Makefile
Utilities/ULog/Makefile
extensions/Makefile
extensions/libUniSet2Extensions.pc
extensions/lib/Makefile
......
......@@ -22,6 +22,8 @@
#include <iostream>
#include <string>
#include <sigc++/sigc++.h>
#include "Debug.h"
#ifdef TEST_DEBUGSTREAM
#include <string>
......
#ifndef LogReader_H_
#define LogReader_H_
// -------------------------------------------------------------------------
#include <string>
#include <queue>
#include <cc++/socket.h>
#include "UTCPStream.h"
#include "DebugStream.h"
// -------------------------------------------------------------------------
class LogReader
{
public:
LogReader();
~LogReader();
void readlogs( const std::string& addr, ost::tpport_t port, timeout_t tout=TIMEOUT_INF );
bool isConnection();
protected:
void connect( const std::string& addr, ost::tpport_t port, timeout_t tout=TIMEOUT_INF );
void connect( ost::InetAddress addr, ost::tpport_t port, timeout_t tout=TIMEOUT_INF );
void disconnect();
private:
UTCPStream* tcp;
std::string iaddr;
ost::tpport_t port;
DebugStream rlog;
};
// -------------------------------------------------------------------------
#endif // LogReader_H_
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
#ifndef LogServer_H_
#define LogServer_H_
// -------------------------------------------------------------------------
#include <list>
#include <string>
#include <cc++/socket.h>
#include "Mutex.h"
#include "DebugStream.h"
#include "ThreadCreator.h"
class LogSession;
// -------------------------------------------------------------------------
class LogServer
{
public:
LogServer();
~LogServer();
void run( const std::string& addr, ost::tpport_t port, timeout_t msec=60000, bool thread=true );
protected:
void work();
void sessionFinished( LogSession* s );
private:
typedef std::list<LogSession*> SessionList;
SessionList slist;
UniSetTypes::uniset_rwmutex mutSList;
timeout_t timeout;
timeout_t session_timeout;
std::atomic_bool cancelled;
DebugStream mylog;
ThreadCreator<LogServer>* thr;
ost::TCPSocket* tcp;
};
// -------------------------------------------------------------------------
#endif // LogServer_H_
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
#ifndef LogSession_H_
#define LogSession_H_
// -------------------------------------------------------------------------
#include <string>
#include <deque>
#include <cc++/socket.h>
#include "Mutex.h"
#include "DebugStream.h"
// -------------------------------------------------------------------------
class LogSession:
public ost::TCPSession
{
public:
LogSession( ost::TCPSocket &server, timeout_t timeout );
virtual ~LogSession();
typedef sigc::slot<void, LogSession*> FinalSlot;
void connectFinalSession( FinalSlot sl );
inline std::string getClientAddress(){ return caddr; }
protected:
virtual void run();
virtual void final();
private:
typedef std::deque<std::string> LogBuffer;
LogBuffer lbuf;
std::string peername;
std::string caddr;
timeout_t timeout;
FinalSlot slFin;
std::atomic_bool cancelled;
UniSetTypes::uniset_rwmutex mLBuf;
DebugStream slog;
};
// -------------------------------------------------------------------------
#endif // LogSession_H_
// -------------------------------------------------------------------------
......@@ -15,10 +15,11 @@ libUniSet2_la_LDFLAGS = -version-info @LIBVER@
libUniSet2_la_LIBADD = -lm \
$(top_builddir)/src/Communications/libCommunications.la \
$(top_builddir)/src/Communications/Modbus/libModbus.la \
$(top_builddir)/src/Communications/TCP/libTCP.la \
$(top_builddir)/src/Interfaces/libInterfaces.la \
$(top_builddir)/src/ObjectRepository/libObjectsRepository.la \
$(top_builddir)/src/Processes/libProcesses.la \
$(top_builddir)/src/Services/libServices.la \
$(top_builddir)/src/Timers/libTimers.la \
$(top_builddir)/src/Various/libVarious.la
$(top_builddir)/src/Various/libVarious.la \
$(top_builddir)/src/LogServer/libLogServer.la
############################################################################
# This file is part of the UniSet library #
############################################################################
SUBDIRS=Modbus
SUBDIRS=TCP Modbus
noinst_LTLIBRARIES = libCommunications.la
libCommunications_la_SOURCES = ComPort.cc ComPort485F.cc
......
......@@ -2,10 +2,11 @@
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libModbus.la
libModbus_la_SOURCES = ModbusTypes.cc ModbusHelpers.cc ModbusTCPSession.cc UTCPStream.cc \
libModbus_la_SOURCES = ModbusTypes.cc ModbusHelpers.cc ModbusTCPSession.cc \
ModbusClient.cc ModbusServer.cc ModbusServerSlot.cc \
ModbusRTUSlave.cc ModbusRTUSlaveSlot.cc ModbusRTUMaster.cc \
ModbusTCPCore.cc ModbusTCPServer.cc ModbusTCPServerSlot.cc ModbusTCPMaster.cc TCPCheck.cc
ModbusTCPCore.cc ModbusTCPServer.cc ModbusTCPServerSlot.cc ModbusTCPMaster.cc
libModbus_la_CXXFLAGS = -I$(top_builddir)/include/Communications/modbus $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
#libModbus_la_LIBADD = $(top_builddir)/src/Communications/TCP/libTCP.la $(SIGC_LIBS) $(COMCPP_LIBS)
libModbus_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS)
############################################################################
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libTCP.la
libTCP_la_SOURCES = UTCPStream.cc TCPCheck.cc
libTCP_la_CXXFLAGS = $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libTCP_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS)
......@@ -2,7 +2,7 @@
#include <sstream>
#include "UniSetTypes.h"
#include "PassiveTimer.h"
#include "modbus/TCPCheck.h"
#include "TCPCheck.h"
// -----------------------------------------------------------------------------
using namespace std;
// -----------------------------------------------------------------------------
......
......@@ -4,7 +4,7 @@
#include <errno.h>
#include <cstring>
#include <cc++/socket.h>
#include "modbus/UTCPStream.h"
#include "UTCPStream.h"
#include "PassiveTimer.h"
#include "UniSetTypes.h"
// -------------------------------------------------------------------------
......
#include <string.h>
#include <errno.h>
#include <iostream>
#include <sstream>
#include "Exceptions.h"
#include "LogReader.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// -------------------------------------------------------------------------
LogReader::LogReader():
tcp(0),
iaddr("")
{
}
// -------------------------------------------------------------------------
LogReader::~LogReader()
{
if( isConnection() )
disconnect();
}
// -------------------------------------------------------------------------
void LogReader::connect( const std::string& addr, ost::tpport_t _port, timeout_t msec )
{
ost::InetAddress ia(addr.c_str());
connect(ia,_port,msec);
}
// -------------------------------------------------------------------------
void LogReader::connect( ost::InetAddress addr, ost::tpport_t _port, timeout_t msec )
{
if( tcp )
{
disconnect();
delete tcp;
tcp = 0;
}
// if( !tcp )
// {
ostringstream s;
s << addr;
iaddr = s.str();
port = _port;
if( rlog.is_info() )
rlog.info() << "(LogReader): connect to " << iaddr << ":" << port << endl;
ost::Thread::setException(ost::Thread::throwException);
try
{
tcp = new UTCPStream();
tcp->create(iaddr,port,true,500);
tcp->setTimeout(msec);
}
catch( std::exception& e )
{
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " error: " << e.what();
rlog.crit() << s.str() << std::endl;
}
}
catch( ... )
{
if( rlog.debugging(Debug::CRIT) )
{
ostringstream s;
s << "(LogReader): connection " << s.str() << " error: catch ...";
rlog.crit() << s.str() << std::endl;
}
}
// }
}
// -------------------------------------------------------------------------
void LogReader::disconnect()
{
if( rlog.is_info() )
rlog.info() << iaddr << "(LogReader): disconnect." << endl;
if( !tcp )
return;
tcp->disconnect();
delete tcp;
tcp = 0;
}
// -------------------------------------------------------------------------
bool LogReader::isConnection()
{
return tcp && tcp->isConnected();
}
// -------------------------------------------------------------------------
void LogReader::readlogs( const std::string& _addr, ost::tpport_t _port, timeout_t msec )
{
if( !isConnection() )
connect(_addr,_port,msec);
if( !isConnection() )
throw TimeOut();
char buf[1000];
while( tcp->isPending(ost::Socket::pendingInput,msec) )
{
int n = tcp->peek( buf,sizeof(buf) );
if( n > 0 )
{
tcp->read(buf,n);
cout << buf;
}
#if 0
if( tcp->gcount() > 0 )
break;
int n = tcp->peek( buf,sizeof(buf) );
tcp->read(buf,sizeof(buf));
for( int i=0; i<n; i++ )
cout << buf[i];
#endif
}
}
// -------------------------------------------------------------------------
#include <sstream>
#include "LogServer.h"
#include "UniSetTypes.h"
#include "Exceptions.h"
#include "LogSession.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// -------------------------------------------------------------------------
LogServer::~LogServer()
{
cancelled = true;
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
delete (*i);
}
if( thr )
{
thr->stop();
delete thr;
}
}
// -------------------------------------------------------------------------
LogServer::LogServer():
timeout(600000),
session_timeout(3600000),
cancelled(false),
thr(0),
tcp(0)
{
}
// -------------------------------------------------------------------------
void LogServer::run( const std::string& addr, ost::tpport_t port, timeout_t msec, bool thread )
{
try
{
ost::InetAddress iaddr(addr.c_str());
tcp = new ost::TCPSocket(iaddr,port);
}
catch( ost::Socket *socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
ostringstream err;
err << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum;
if( errnum == ost::Socket::errBindingFailed )
err << "bind failed; port busy" << endl;
else
err << "client socket failed" << endl;
throw SystemError( err.str() );
}
if( !thread )
work();
else
{
thr = new ThreadCreator<LogServer>(this, &LogServer::work);
thr->start();
}
}
// -------------------------------------------------------------------------
void LogServer::work()
{
cancelled = false;
while( !cancelled )
{
try
{
while( tcp->isPendingConnection(timeout) )
{
LogSession* s = new LogSession(*tcp, session_timeout);
{
uniset_rwmutex_wrlock l(mutSList);
slist.push_back(s);
}
s->connectFinalSession( sigc::mem_fun(this, &LogServer::sessionFinished) );
s->detach();
}
}
catch( ost::Socket *socket )
{
ost::tpport_t port;
int errnum = socket->getErrorNumber();
ost::InetAddress saddr = (ost::InetAddress)socket->getPeer(&port);
cerr << "socket error " << saddr.getHostname() << ":" << port << " = " << errnum << endl;
if( errnum == ost::Socket::errBindingFailed )
{
cerr << "bind failed; port busy" << endl;
// ::exit(-1);
}
else
cerr << "client socket failed" << endl;
}
cout << "timeout after 30 seconds inactivity, exiting" << endl;
}
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
(*i)->disconnect();
}
}
// -------------------------------------------------------------------------
void LogServer::sessionFinished( LogSession* s )
{
uniset_rwmutex_wrlock l(mutSList);
for( SessionList::iterator i=slist.begin(); i!=slist.end(); ++i )
{
if( (*i) == s )
{
slist.erase(i);
return;
}
}
}
// -------------------------------------------------------------------------
#include <iostream>
#include <string>
#include <fcntl.h>
#include <errno.h>
#include <cstring>
#include <cc++/socket.h>
#include "LogSession.h"
#include "UniSetTypes.h"
// -------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// -------------------------------------------------------------------------
LogSession::~LogSession()
{
cancelled = true;
if( isRunning() )
{
disconnect();
ost::Thread::join();
}
}
// -------------------------------------------------------------------------
LogSession::LogSession( ost::TCPSocket &server, timeout_t msec ):
TCPSession(server),
peername(""),
caddr(""),
timeout(msec),
cancelled(false)
{
}
// -------------------------------------------------------------------------
void LogSession::run()
{
if( cancelled )
return;
{
ost::tpport_t p;
ost::InetAddress iaddr = getIPV4Peer(&p);
// resolve..
caddr = string( iaddr.getHostname() );
ostringstream s;
s << iaddr << ":" << p;
peername = s.str();
}
if( slog.debugging(Debug::INFO) )
slog[Debug::INFO] << peername << "(run): run thread of sessions.." << endl;
cancelled = false;
while( !cancelled && isPending(Socket::pendingOutput, timeout) )
{
// char rbuf[100];
// int ret = readData(&rbuf,sizeof(rbuf));
*tcp() << "test log... test log" << endl;
sleep(8000);
}
if( slog.debugging(Debug::INFO) )
slog[Debug::INFO] << peername << "(run): stop thread of sessions..disconnect.." << endl;
disconnect();
if( slog.debugging(Debug::INFO) )
slog[Debug::INFO] << peername << "(run): thread stopping..." << endl;
}
// -------------------------------------------------------------------------
void LogSession::final()
{
slFin(this);
delete this;
}
// -------------------------------------------------------------------------
void LogSession::connectFinalSession( FinalSlot sl )
{
slFin = sl;
}
// ---------------------------------------------------------------------
\ No newline at end of file
############################################################################
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libLogServer.la
libLogServer_la_CPPFLAGS = $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
#libLogServer_la_LIBADD = $(top_builddir)/src/Communications/TCP/libTCP.la $(SIGC_LIBS) $(COMCPP_LIBS)
libLogServer_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS)
libLogServer_la_SOURCES = LogServer.cc LogSession.cc LogReader.cc
......@@ -2,5 +2,5 @@
# This file is part of the UniSet library #
############################################################################
SUBDIRS=ObjectRepository Processes Interfaces Timers Services Various Communications
SUBDIRS=ObjectRepository Processes Interfaces Timers Services Various Communications LogServer
......@@ -6,7 +6,7 @@ noinst_LTLIBRARIES = libVarious.la
libVarious_la_CPPFLAGS = $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libVarious_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS)
libVarious_la_SOURCES = DebugStream.cc Debug.cc UniXML.cc MessageType.cc Configuration.cc \
Restorer_XML.cc RunLock.cc Mutex.cc SViewer.cc SMonitor.cc LT_Object.cc WDTInterface.cc
Restorer_XML.cc RunLock.cc Mutex.cc SViewer.cc SMonitor.cc LT_Object.cc WDTInterface.cc
local-clean:
rm -rf *iSK.cc
......@@ -5,10 +5,11 @@
using namespace std;
using namespace UniSetTypes;
std::ostringstream ss;
void check_log_signal( const string& s )
{
// cout << "log signal: ||| " << s << "|||";
cout << "*ENDL*[" << s << flush;
ss << s;
}
int main( int argc, const char **argv )
......@@ -29,5 +30,8 @@ int main( int argc, const char **argv )
if( tlog.is_level1() )
tlog.level1() << ": is level1..." << endl;
cout << "********" << endl;
cout << ss.str();
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment