Commit f35bfd43 authored by Pavel Vainerman's avatar Pavel Vainerman

Рефакторинг некоторых интерфейсов и названий функций.

Вместо run(..., bool thread) сделал две функции: run(..) async_run(...)
parent 809d62c7
......@@ -75,7 +75,7 @@ void MBTCPServer::setMaxSessions( size_t max )
// -------------------------------------------------------------------------
void MBTCPServer::execute()
{
sslot->run( vaddr, false );
sslot->run(vaddr);
}
// -------------------------------------------------------------------------
void MBTCPServer::sigterm( int signo )
......
......@@ -74,7 +74,7 @@ int main( int argc, char* argv[], char* envp[] )
auto log = make_shared<DebugStream>();
LogServer ls(log);
ls.run(addr, port, true);
ls.async_run(addr, port);
char buf[10000];
......
......@@ -80,7 +80,7 @@ int main( int argc, char* argv[], char* envp[] )
LogServer ls(zlog);
cout << "wrap: server " << addr << ":" << port << endl;
ls.run( addr, port, true );
ls.async_run( addr, port );
char buf[5000];
......
......@@ -182,7 +182,7 @@ int main( int argc, char** argv )
dlog3->addLevel(Debug::ANY);
dlog4->addLevel(Debug::ANY);
ls.run( addr, port, true );
ls.async_run( addr, port );
if( verb )
ls.setSessionLog(Debug::ANY);
......
......@@ -491,7 +491,7 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::preSysCommand( const SystemMessage*
{
ostate = "StartUp: run log server...";
myinfo &lt;&lt; myname &lt;&lt; "(preSysCommand): run log server " &lt;&lt; logserv_host &lt;&lt; ":" &lt;&lt; logserv_port &lt;&lt; endl;
logserv-&gt;run(logserv_host, logserv_port, true);
logserv-&gt;async_run(logserv_host, logserv_port);
}
}
catch( std::exception&amp; ex )
......
......@@ -16,7 +16,7 @@
Name: libuniset2
Version: 2.6
Release: alt31
Release: alt32
Summary: UniSet - library for building distributed industrial control systems
License: LGPL
......@@ -508,6 +508,9 @@ rm -f %buildroot%_libdir/*.la
# history of current unpublished changes
%changelog
* Sat Jun 03 2017 Pavel Vainerman <pv@altlinux.ru> 2.6-alt32
- refactoring function names
* Sat Jun 03 2017 Pavel Vainerman <pv@altlinux.ru> 2.6-alt31
- (EventLoopServer): refactoring start process
......
......@@ -1261,7 +1261,7 @@ namespace uniset
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
ioinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
PassiveTimer ptAct(activateTimeout);
......
......@@ -2808,7 +2808,7 @@ namespace uniset
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
mbinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
if( devices.empty() )
......
......@@ -97,7 +97,7 @@ void MBTCPTestServer::setLog( std::shared_ptr<DebugStream> dlog )
void MBTCPTestServer::execute()
{
if( sslot )
sslot->run( vaddr, true );
sslot->async_run( vaddr );
}
// -------------------------------------------------------------------------
void MBTCPTestServer::sigterm( int signo )
......
......@@ -654,7 +654,7 @@ namespace uniset
{
try
{
if( tcpserver->run( vaddr, true ) )
if( tcpserver->async_run(vaddr) )
break;
if( tcpBreakIfFailRun )
......@@ -901,7 +901,7 @@ namespace uniset
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
mbinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
waitSMReady();
......
......@@ -236,7 +236,7 @@ namespace uniset
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
sminfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
PassiveTimer ptAct(activateTimeout);
......
......@@ -596,7 +596,7 @@ void UNetExchange::sysCommand( const uniset::SystemMessage* sm )
try
{
unetinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
catch( std::exception& ex )
{
......
......@@ -229,7 +229,7 @@ void UNetReceiver::start()
if( !activated )
{
activated = true;
if( !loop.evrun(this, true, evrunTimeout) )
if( !loop.async_evrun(this, evrunTimeout) )
{
unetcrit << myname << "(start): evrun FAILED! (timeout=" << evrunTimeout << " msec)" << endl;
std::terminate();
......@@ -251,7 +251,6 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
if( upStrategy == useUpdateEventLoop )
{
evUpdate.set(eloop);
evUpdate.start();
evUpdate.start( 0, ((float)updatepause / 1000.) );
}
......
......@@ -8,7 +8,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2017-05-29+03:00
// generate timestamp: 2017-06-03+03:00
// -----------------------------------------------------------------------------
#ifndef UObject_SK_H_
#define UObject_SK_H_
......
......@@ -11,7 +11,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2017-05-29+03:00
// generate timestamp: 2017-06-03+03:00
// -----------------------------------------------------------------------------
#include <memory>
#include <iomanip>
......@@ -429,7 +429,7 @@ void UObject_SK::preSysCommand( const SystemMessage* _sm )
{
ostate = "StartUp: run log server...";
myinfo << myname << "(preSysCommand): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
}
catch( std::exception& ex )
......
......@@ -225,7 +225,7 @@ void TestProc::mbThread()
mylog9 << myname << "(mbThread): run tcpserver.." << endl;
ModbusRTU::ModbusAddr mba(mbADDR);
auto vaddr = mbslave->addr2vaddr(mba);
mbslave->run( vaddr, false ); // true - создаёт поток
mbslave->run( vaddr ); // async_run()?
mylog9 << myname << "ModbusSlave stopped.." << endl;
}
......
......@@ -60,14 +60,17 @@ namespace uniset
bool evIsActive() const noexcept;
/*! \return TRUE - если всё удалось. return актуален только для случая когда thread = true
* \param thread - создать отдельный (асинхронный) поток для event loop.
* Если thread=false - функция не вернёт управление и будет ждать завершения работы ( см. evstop())
* \param waitPrepareTimeout_msec - сколько ждать активации, либо функция вернёт false.
* Даже если thread = false, но wather не сможет быть "активирован" функция вернёт управление
* с return false.
/*! Синхронный запуск. Функция возвращает управление (false), только если запуск не удался,
* либо был остановлен вызовом evstop();
* \param prepareTimeout_msec - сколько ждать активации, либо функция вернёт false.
*/
bool evrun( EvWatcher* w, bool thread = true, size_t waitPrepareTimeout_msec = 60000);
bool evrun( EvWatcher* w, size_t prepareTimeout_msec = 60000);
/*! Асинхронный запуск (запуск в отдельном потоке)
* \return TRUE - если всё удалось.
* \param prepareTimeout_msec - сколько ждать активации, либо функция вернёт false.
*/
bool async_evrun( EvWatcher* w, size_t prepareTimeout_msec = 60000 );
/*! \return TRUE - если это был последний EvWatcher и loop остановлен */
bool evstop( EvWatcher* w );
......@@ -117,7 +120,6 @@ namespace uniset
std::mutex looprunOK_mutex;
std::condition_variable looprunOK_event;
std::atomic_bool looprunOK_state;
ev::timer evruntimer;
};
// -------------------------------------------------------------------------
......
......@@ -33,8 +33,18 @@ namespace uniset
virtual void evprepare() {}
// Управление потоком событий
bool evrun( bool thread = true, size_t waitRunningTimeout_msec = 60000 );
void evstop();
/*! асинхронный запуск (создаётся отдельный поток)
* \return true - если всё хорошо
*/
bool async_evrun( size_t waitRunningTimeout_msec = 60000 );
/*! синхронный запуск
* функция вернёт управление, только в случае неудачного запуска
*/
bool evrun();
void evstop(); /*!< остановить раннее запущенный поток (event loop) */
ev::dynamic_loop loop;
......@@ -46,14 +56,14 @@ namespace uniset
void onLoopOK( ev::timer& t, int revents ) noexcept;
std::atomic_bool cancelled = { false };
std::atomic_bool isrunning = { false };
std::atomic_bool isactive = { false };
ev::async evterm;
std::shared_ptr<std::thread> thr;
std::mutex looprunOK_mutex;
std::condition_variable looprunOK_event;
std::atomic_bool looprunOK_state;
std::atomic_bool isrunning;
ev::timer evruntimer;
};
// -------------------------------------------------------------------------
......
......@@ -112,7 +112,9 @@ namespace uniset
sessMaxCount = num;
}
void run( const std::string& addr, Poco::UInt16 port, bool thread = true );
bool async_run( const std::string& addr, Poco::UInt16 port );
bool run( const std::string& addr, Poco::UInt16 port );
void terminate();
inline bool isRunning() const noexcept
......
......@@ -34,11 +34,19 @@ namespace uniset
ModbusTCPServer( const std::string& addr, int port = 502 );
virtual ~ModbusTCPServer();
/*! Запуск сервера
* \param thread - создавать ли отдельный поток
/*! Запуск сервера. Функция не возвращет управление.
* Но может быть прервана вызовом terminate()
* \return FALSE - если не удалось запустить
*/
bool run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr );
/*! Асинхронный запуск сервера (создаётся отдельный поток)
* \return TRUE - если поток успешно удалось запустить
*/
bool run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, bool thread = false );
bool async_run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr );
/*! остановить поток выполнения (см. run или async_run) */
virtual void terminate();
virtual bool isActive() const override;
......@@ -67,8 +75,6 @@ namespace uniset
return ignoreAddr;
}
virtual void terminate();
// Сбор статистики по соединениям...
struct SessionInfo
{
......
......@@ -81,10 +81,16 @@ namespace uniset
sessTimeout = msec;
}
// -------------------------------------------------------------------------
bool ModbusTCPServer::run( const std::unordered_set<ModbusAddr>& _vmbaddr, bool thread )
bool ModbusTCPServer::run( const std::unordered_set<ModbusAddr>& _vmbaddr )
{
vmbaddr = &_vmbaddr;
return evrun(thread);
return evrun();
}
// -------------------------------------------------------------------------
bool ModbusTCPServer::async_run( const std::unordered_set<ModbusAddr>& _vmbaddr )
{
vmbaddr = &_vmbaddr;
return async_evrun();
}
// -------------------------------------------------------------------------
bool ModbusTCPServer::isActive() const
......
......@@ -111,7 +111,7 @@ namespace uniset
mylog.info() << myname << "(LogServer): finished." << endl;
}
// -------------------------------------------------------------------------
void LogServer::run(const std::string& _addr, Poco::UInt16 _port, bool thread )
bool LogServer::run( const std::string& _addr, Poco::UInt16 _port )
{
addr = _addr;
port = _port;
......@@ -122,7 +122,20 @@ namespace uniset
myname = s.str();
}
loop.evrun(this, thread);
return loop.evrun(this);
}
// -------------------------------------------------------------------------
bool LogServer::async_run( const std::string& _addr, Poco::UInt16 _port )
{
addr = _addr;
port = _port;
{
ostringstream s;
s << _addr << ":" << _port;
myname = s.str();
}
return loop.async_evrun(this);
}
// -------------------------------------------------------------------------
void LogServer::terminate()
......
......@@ -46,10 +46,10 @@ namespace uniset
std::unique_lock<std::mutex> lock2(looprunOK_mutex);
looprunOK_event.wait_for(lock2, std::chrono::milliseconds(waitTimeout_msec), [&]()
{
return (looprunOK_state == true);
return (isrunning == true);
} );
return looprunOK_state;
return isrunning;
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::activateWatcher( EvWatcher* w, size_t waitTimeout_msec )
......@@ -88,12 +88,12 @@ namespace uniset
if( EV_ERROR & revents )
return;
looprunOK_state = true;
isrunning = true;
looprunOK_event.notify_all();
t.stop();
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::evrun( EvWatcher* w, bool thread, size_t waitTimeout_msec )
bool CommonEventLoop::evrun( EvWatcher* w, size_t waitTimeout_msec )
{
if( w == nullptr )
return false;
......@@ -113,9 +113,8 @@ namespace uniset
bool defaultLoopOK = runDefaultLoop(waitTimeout_msec);
bool ret = defaultLoopOK && activateWatcher(w, waitTimeout_msec);
// если ждать завершения не надо (thread=true)
// или activateWatcher не удалось.. выходим..
if( thread || !ret )
if( !ret )
return ret;
// ожидаем завершения основного потока..
......@@ -130,6 +129,27 @@ namespace uniset
return true;
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::async_evrun( EvWatcher* w, size_t waitTimeout_msec )
{
if( w == nullptr )
return false;
{
std::lock_guard<std::mutex> lck(wlist_mutex);
if( std::find(wlist.begin(), wlist.end(), w) != wlist.end() )
{
cerr << "(CommonEventLoop::evrun): " << w->wname() << " ALREADY ADDED.." << endl;
return false;
}
wlist.push_back(w);
}
bool defaultLoopOK = runDefaultLoop(waitTimeout_msec);
return defaultLoopOK && activateWatcher(w, waitTimeout_msec);
}
// ---------------------------------------------------------------------------
bool CommonEventLoop::evIsActive() const noexcept
{
return isrunning;
......@@ -250,8 +270,6 @@ namespace uniset
// т.к. нам надо просто зафиксировать, что loop начал работать
evruntimer.start(0);
isrunning = true;
while( !cancelled )
{
try
......
......@@ -22,18 +22,18 @@ namespace uniset
evstop();
}
// ---------------------------------------------------------------------------
bool EventLoopServer::evrun( bool thread, size_t timeout_msec )
bool EventLoopServer::evrun()
{
if( isrunning )
defaultLoop(); // <-- здесь бесконечный цикл..
return false;
}
// ---------------------------------------------------------------------------
bool EventLoopServer::async_evrun( size_t timeout_msec )
{
if( isactive )
return true;
isrunning = true;
if( !thread )
{
defaultLoop(); // <-- здесь бесконечный цикл..
return false;
}
isactive = true;
if( !thr )
thr = make_shared<std::thread>( [&] { defaultLoop(); } );
......@@ -111,10 +111,10 @@ namespace uniset
cerr << "(EventLoopServer::defaultLoop): " << ex.what() << endl;
}
looprunOK_state = false;
isrunning = false;
looprunOK_event.notify_all();
isrunning = false;
isactive = false;
}
// -------------------------------------------------------------------------
bool EventLoopServer::waitDefaultLoopRunning( size_t waitTimeout_msec )
......@@ -122,10 +122,10 @@ namespace uniset
std::unique_lock<std::mutex> lock(looprunOK_mutex);
looprunOK_event.wait_for(lock, std::chrono::milliseconds(waitTimeout_msec), [&]()
{
return (looprunOK_state == true);
return (isrunning == true);
} );
return looprunOK_state;
return isrunning;
}
// -------------------------------------------------------------------------
void EventLoopServer::onLoopOK( ev::timer& t, int revents ) noexcept
......@@ -133,7 +133,7 @@ namespace uniset
if( EV_ERROR & revents )
return;
looprunOK_state = true;
isrunning = true;
looprunOK_event.notify_all();
t.stop();
}
......
......@@ -116,7 +116,7 @@ void DBServer::sysCommand( const uniset::SystemMessage* sm )
if( !logserv_host.empty() && logserv_port != 0 && !logserv->isRunning() )
{
dbinfo << myname << "(init): run log server " << logserv_host << ":" << logserv_port << endl;
logserv->run(logserv_host, logserv_port, true);
logserv->async_run(logserv_host, logserv_port);
}
}
}
......
......@@ -146,7 +146,7 @@ TEST_CASE("LogServer", "[LogServer]" )
LogServer ls(la);
//ls.setSessionLog(Debug::ANY);
ls.run( ip, port, true );
ls.async_run( ip, port);
for( int i = 0; i < 3 && !ls.isRunning(); i++ )
msleep(600);
......@@ -213,7 +213,7 @@ TEST_CASE("MaxSessions", "[LogServer]" )
ls.setCmdTimeout(100);
//ls.setSessionLog(Debug::ANY);
ls.setMaxSessionCount(1);
ls.run( ip, port, true );
ls.async_run( ip, port );
for( int i = 0; i < 4 && !ls.isRunning(); i++ )
msleep(600);
......@@ -249,9 +249,9 @@ TEST_CASE("MaxSessions", "[LogServer]" )
{
std::lock_guard<std::mutex> l(r2_mutex);
/*
// Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1'
size_t pos = msg2.str().find("Exceeded the limit");
REQUIRE( pos != std::string::npos );
// Ищем часть сообщения об ошибке: '(LOG SERVER): Exceeded the limit on the number of sessions = 1'
size_t pos = msg2.str().find("Exceeded the limit");
REQUIRE( pos != std::string::npos );
*/
// ничего не получили..
REQUIRE( msg2.str() == "" );
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment