Commit a191994e authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

(websocketgate): added LogServer, minor optimization

parent f8b67ef8
...@@ -63,9 +63,16 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id ...@@ -63,9 +63,16 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id
if( mylog->verbose() == 0 ) if( mylog->verbose() == 0 )
mylog->verbose(1); mylog->verbose(1);
loga = make_shared<LogAgregator>(myname + "-loga");
loga->add(mylog);
loga->add(ulog());
logserv = make_shared<LogServer>(loga);
logserv->init( prefix + "logserver", cnode );
UniXML::iterator it(cnode); UniXML::iterator it(cnode);
int maxCacheSize = conf->getArgPInt("--" + prefix + "max-ui-cache-size", it.getProp("msgUIChacheSize"), 5000); int maxCacheSize = conf->getArgPInt("--" + prefix + "max-ui-cache-size", it.getProp("msgUICacheSize"), 5000);
ui->setCacheMaxSize(maxCacheSize); ui->setCacheMaxSize(maxCacheSize);
shm = make_shared<SMInterface>(shmID, ui, getId(), ic); shm = make_shared<SMInterface>(shmID, ui, getId(), ic);
...@@ -75,7 +82,6 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id ...@@ -75,7 +82,6 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id
wscmd = make_shared<ev::async>(); wscmd = make_shared<ev::async>();
wsactivate.set<UWebSocketGate, &UWebSocketGate::onActivate>(this); wsactivate.set<UWebSocketGate, &UWebSocketGate::onActivate>(this);
wscmd->set<UWebSocketGate, &UWebSocketGate::onCommand>(this); wscmd->set<UWebSocketGate, &UWebSocketGate::onCommand>(this);
sigTERM.set<UWebSocketGate, &UWebSocketGate::onTerminate>(this);
iocheck.set<UWebSocketGate, &UWebSocketGate::checkMessages>(this); iocheck.set<UWebSocketGate, &UWebSocketGate::checkMessages>(this);
maxMessagesProcessing = conf->getArgPInt("--" + prefix + "max-messages-processing", conf->getField("maxMessagesProcessing"), maxMessagesProcessing); maxMessagesProcessing = conf->getArgPInt("--" + prefix + "max-messages-processing", conf->getField("maxMessagesProcessing"), maxMessagesProcessing);
...@@ -89,6 +95,12 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id ...@@ -89,6 +95,12 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id
if( sz > 0 ) if( sz > 0 )
setMaxSizeOfMessageQueue(sz); setMaxSizeOfMessageQueue(sz);
if( findArgParam("--" + prefix + "run-logserver", conf->getArgc(), conf->getArgv()) != -1 )
{
logserv_host = conf->getArg2Param("--" + prefix + "logserver-host", it.getProp("logserverHost"), "localhost");
logserv_port = conf->getArgPInt("--" + prefix + "logserver-port", it.getProp("logserverPort"), getId());
}
#ifndef DISABLE_REST_API #ifndef DISABLE_REST_API
wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "heartbeat-time", it.getProp("wsHeartbeatTimeTime"), int(wsHeartbeatTime_sec * 1000)) / 1000.0; wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "heartbeat-time", it.getProp("wsHeartbeatTimeTime"), int(wsHeartbeatTime_sec * 1000)) / 1000.0;
wsSendTime_sec = (float)conf->getArgPInt("--" + prefix + "send-time", it.getProp("wsSendTime"), int(wsSendTime_sec * 1000.0)) / 1000.0; wsSendTime_sec = (float)conf->getArgPInt("--" + prefix + "send-time", it.getProp("wsSendTime"), int(wsSendTime_sec * 1000.0)) / 1000.0;
...@@ -178,36 +190,28 @@ UWebSocketGate::~UWebSocketGate() ...@@ -178,36 +190,28 @@ UWebSocketGate::~UWebSocketGate()
runlock->unlock(); runlock->unlock();
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
void UWebSocketGate::onTerminate( ev::async& watcher, int revents ) void UWebSocketGate::terminate()
{ {
if( EV_ERROR & revents ) myinfo << myname << "(terminate): terminate..." << endl;
{
mycrit << myname << "(onTerminate): invalid event" << endl;
return;
}
myinfo << myname << "(onTerminate): terminate..." << endl;
watcher.stop();
//evsig.loop.break_loop();
try try
{ {
httpserv->stop(); httpserv->stop();
httpserv = nullptr;
} }
catch( std::exception& ex ) catch( std::exception& ex )
{ {
myinfo << myname << "(onTerminate): " << ex.what() << endl; myinfo << myname << "(terminate): http server: " << ex.what() << endl;
} }
try try
{ {
evstop(); if( evIsActive() )
evstop();
} }
catch( std::exception& ex ) catch( std::exception& ex )
{ {
myinfo << myname << "(onTerminate): " << ex.what() << endl; myinfo << myname << "(terminate): evstop: " << ex.what() << endl;
} }
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
...@@ -246,6 +250,7 @@ uniset::SimpleInfo* UWebSocketGate::getInfo( const char* userparam ) ...@@ -246,6 +250,7 @@ uniset::SimpleInfo* UWebSocketGate::getInfo( const char* userparam )
inf << i->info << endl; inf << i->info << endl;
// inf << vmon.pretty_str() << endl; // inf << vmon.pretty_str() << endl;
inf << endl; inf << endl;
inf << "LogServer: " << logserv_host << ":" << logserv_port << endl;
{ {
uniset_rwmutex_wrlock lock(wsocksMutex); uniset_rwmutex_wrlock lock(wsocksMutex);
...@@ -352,6 +357,10 @@ void UWebSocketGate::help_print() ...@@ -352,6 +357,10 @@ void UWebSocketGate::help_print()
cout << "--prefix-log-verbosity N - Уровень подробностей [1...5]" << endl; cout << "--prefix-log-verbosity N - Уровень подробностей [1...5]" << endl;
cout << " Пример параметров для запуска с подробными логами: " << endl; cout << " Пример параметров для запуска с подробными логами: " << endl;
cout << " --ws-log-add-levels any --ws-log-verbosity 5" << endl; cout << " --ws-log-add-levels any --ws-log-verbosity 5" << endl;
cout << " LogServer: " << endl;
cout << "--prefix-run-logserver - run logserver. Default: localhost:id" << endl;
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::run( bool async ) void UWebSocketGate::run( bool async )
...@@ -380,9 +389,6 @@ void UWebSocketGate::evprepare() ...@@ -380,9 +389,6 @@ void UWebSocketGate::evprepare()
wscmd->set(loop); wscmd->set(loop);
wscmd->start(); wscmd->start();
sigTERM.set(loop);
sigTERM.start();
iocheck.set(loop); iocheck.set(loop);
iocheck.start(0, check_sec); iocheck.start(0, check_sec);
} }
...@@ -614,7 +620,11 @@ void UWebSocketGate::onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco ...@@ -614,7 +620,11 @@ void UWebSocketGate::onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UWebSocketGate::deactivateObject() bool UWebSocketGate::deactivateObject()
{ {
sigTERM.send(); terminate();
if( logserv && logserv->isRunning() )
logserv->terminate();
return UniSetObject::deactivateObject(); return UniSetObject::deactivateObject();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -628,6 +638,65 @@ bool UWebSocketGate::activateObject() ...@@ -628,6 +638,65 @@ bool UWebSocketGate::activateObject()
return ret; return ret;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::sysCommand( const uniset::SystemMessage* _sm )
{
UniSetObject::sysCommand(_sm);
switch( _sm->command )
{
case SystemMessage::WatchDog:
case SystemMessage::StartUp:
{
try
{
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
myinfo << myname << "(sysCommand): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->async_run(logserv_host, logserv_port);
}
}
catch( std::exception& ex )
{
mywarn << myname << "(sysCommand): CAN`t run log server err: " << ex.what() << endl;
}
catch( ... )
{
mywarn << myname << "(sysCommand): CAN`t run log server err: catch ..." << endl;
}
break;
}
case SystemMessage::FoldUp:
case SystemMessage::Finish:
break;
case SystemMessage::LogRotate:
{
if( logserv && !logserv_host.empty() && logserv_port != 0 )
{
try
{
mylogany << myname << "(sysCommand): try restart logserver.." << endl;
logserv->check(true);
}
catch( std::exception& ex )
{
mywarn << myname << "(sysCommand): CAN`t restart log server err: " << ex.what() << endl;
}
catch( ... )
{
mywarn << myname << "(sysCommand): CAN`t restart log server err: catch ..." << endl;
}
}
}
break;
default:
break;
}
}
// -----------------------------------------------------------------------------
std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::Net::HTTPServerRequest* req, std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::Net::HTTPServerRequest* req,
Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp ) Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp )
{ {
......
...@@ -212,6 +212,10 @@ namespace uniset ...@@ -212,6 +212,10 @@ namespace uniset
{ {
return mylog; return mylog;
} }
inline std::shared_ptr<uniset::LogAgregator> logAgregator() noexcept
{
return loga;
}
#ifndef DISABLE_REST_API #ifndef DISABLE_REST_API
virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override; virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
...@@ -224,6 +228,7 @@ namespace uniset ...@@ -224,6 +228,7 @@ namespace uniset
virtual bool activateObject() override; virtual bool activateObject() override;
virtual bool deactivateObject() override; virtual bool deactivateObject() override;
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
void run( bool async ); void run( bool async );
virtual void evfinish() override; virtual void evfinish() override;
virtual void evprepare() override; virtual void evprepare() override;
...@@ -242,8 +247,7 @@ namespace uniset ...@@ -242,8 +247,7 @@ namespace uniset
Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message ); Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
void makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp ); void makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp );
#endif #endif
ev::async sigTERM; void terminate();
void onTerminate( ev::async& watcher, int revents );
ev::async wsactivate; // активация WebSocket-ов ev::async wsactivate; // активация WebSocket-ов
std::shared_ptr<ev::async> wscmd; std::shared_ptr<ev::async> wscmd;
...@@ -256,9 +260,14 @@ namespace uniset ...@@ -256,9 +260,14 @@ namespace uniset
int maxMessagesProcessing = { 100 }; int maxMessagesProcessing = { 100 };
std::shared_ptr<DebugStream> mylog; std::shared_ptr<DebugStream> mylog;
std::shared_ptr<uniset::LogAgregator> loga;
std::shared_ptr<SMInterface> shm; std::shared_ptr<SMInterface> shm;
std::unique_ptr<uniset::RunLock> runlock; std::unique_ptr<uniset::RunLock> runlock;
std::shared_ptr<uniset::LogServer> logserv;
std::string logserv_host = {""};
int logserv_port = {0};
#ifndef DISABLE_REST_API #ifndef DISABLE_REST_API
std::shared_ptr<Poco::Net::HTTPServer> httpserv; std::shared_ptr<Poco::Net::HTTPServer> httpserv;
std::string httpHost = { "" }; std::string httpHost = { "" };
......
...@@ -2,4 +2,4 @@ ...@@ -2,4 +2,4 @@
ulimit -Sc 1000000 ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-wsgate --confile test.xml --ws-name UWebSocketGate1 --ws-log-add-levels any --ws-log-verbosity 5 $* uniset2-start.sh -f ./uniset2-wsgate --confile test.xml --ws-name UWebSocketGate1 --ws-log-add-levels any --ws-log-verbosity 5 --ws-run-logserver $*
...@@ -30,8 +30,8 @@ int main(int argc, const char* argv[] ) ...@@ -30,8 +30,8 @@ int main(int argc, const char* argv[] )
int returnCode = session.applyCommandLine( argc, argv ); int returnCode = session.applyCommandLine( argc, argv );
// if( returnCode != 0 ) // Indicates a command line error // if( returnCode != 0 ) // Indicates a command line error
// return returnCode; // return returnCode;
auto conf = uniset_init(argc, argv); auto conf = uniset_init(argc, argv);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment