Commit 717febcb authored by Pavel Vainerman's avatar Pavel Vainerman

(UniSetActivator): Переделал механизм завершения процессов (чтобы не было "SIGSEGV").

Попытка сделать завершение на основе стандартных потоков c++11 (ModbusMaster): сделал возможность задать timeout для теста, правки тестов для более устойчивой работы.
parent e12b4c58
......@@ -54,7 +54,7 @@ int main(int argc, char* argv[] )
act->broadcast( sm.transport_msg() );
act->run(true);
int tout = 6000;
int tout = conf->getArgPInt("--timeout",8000);
PassiveTimer pt(tout);
while( !pt.checkTime() && !act->exist() )
msleep(100);
......
/home/pv/Projects.com/uniset-2.0/extensions/SharedMemory/uniset2-smemory
\ No newline at end of file
......@@ -66,10 +66,7 @@ class UniSetActivator:
virtual void run(bool thread);
virtual void stop();
virtual void oaDestroy(int signo=0);
void waitDestroy();
// inline void oakill(int signo){ raise(signo); }
virtual void uaDestroy(int signo=0);
virtual UniSetTypes::ObjectType getType() override { return UniSetTypes::ObjectType("UniSetActivator"); }
......@@ -114,8 +111,11 @@ class UniSetActivator:
private:
friend void terminate_thread();
friend void finished_thread();
static void terminated(int signo);
static void finishterm(int signo);
// static void finishterm(int signo);
static void normalexit();
static void normalterminate();
static void set_signals(bool ask);
......
......@@ -27,6 +27,12 @@
#include <signal.h>
#include <sstream>
#include <condition_variable>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include "Exceptions.h"
#include "ORepHelpers.h"
#include "UInterface.h"
......@@ -40,27 +46,157 @@ using namespace UniSetTypes;
using namespace std;
// ------------------------------------------------------------------------------------------
/*
Завершение работы организовано следующим образом.
Имеется глобальный указатель gActivator (т.к. активатор в системе должен быть только один).
Он заказывает на себя все сигналы связанные с завершением работы.
В качестве обработчика сигналов регистрируется UniSetActivator::terminated( int signo ).
В этом обработчике происходит вызов UniSetActivator::oaDestroy(int signo) для фактического
завершения работы и заказывается сигнал SIG_ALRM на время TERMINATE_TIMEOUT,
c обработчиком UniSetActivator::finishterm в котором происходит
"надежное" прибивание текущего процесса (kill(getpid(),SIGKILL)). Это сделано на тот случай, если
в oaDestroy произойдет зависание.
Завершение работы организовано следующим образом:
-------------------------------------------------
Т.к. UniSetActivator в системе только один (singleton), он заказывает на себя
все сигналы связанные с завершением работы, а также устанавливает обработчики на штатный выход из прораммы (atexit)
и обработчик на выход по исключению (set_terminate).
В качестве обработчика сигналов регистрируется функция activator_terminate( int signo ).
При вызове run() (в функции work) создаётся "глобальный" поток g_term_thread, который
засыпает до события на condition переменной g_termevent.
Когда срабатывает обработчик сигнала, он выставляет флаг завершения (g_term=true) и вызывает
g_termevent.notify_one(); чтобы процесс завершения начал свою работу. (см. activator_terminate).
После этого начинается процесс завершения.
- orb shutdown
- uaDestroy()
- завершение CORBA-потока (если был создан) (функция work).
- orb destroy
Для защиты от "зависания" во время завершения, создаётся ещё один временный поток g_fini_thread,
который в течение TERMINATE_TIMEOUT секунд ждёт события 'g_finievent'. После чего происходит
"принудительное" отключение, обработчики сигналов восстанавливаются на умолчательные и процесс
завершается..
В случае, если для UniSetActivator был создан отдельный поток ( run(true) ) при завершении работы программы
срабатывает обработчик UniSetActivator::normalexit или UniSetActivator::normalterminate, которые
делают тоже самое. Будят процесс завершения.. и дожидаются завершения его работы.
*/
// ------------------------------------------------------------------------------------------
/*! замок для блокирования совместного доступа к функции обрабтки сигналов */
static UniSetTypes::uniset_rwmutex signalMutex("Activator::signalMutex");
static std::shared_ptr<UniSetActivator> g_act;
static std::atomic_bool g_finished = ATOMIC_VAR_INIT(0);
static std::atomic_bool g_term = ATOMIC_VAR_INIT(0);
static std::atomic_bool g_done = ATOMIC_VAR_INIT(0);
static std::atomic_bool g_work_stopped = ATOMIC_VAR_INIT(0);
static std::atomic_int g_signo = ATOMIC_VAR_INIT(0);
static std::mutex g_workmutex;
static std::mutex g_termmutex;
static std::condition_variable g_termevent;
static std::mutex g_finimutex;
static std::condition_variable g_finievent;
static std::mutex g_donemutex;
static std::condition_variable g_doneevent;
static std::shared_ptr<std::thread> g_term_thread;
static std::shared_ptr<std::thread> g_fini_thread;
static const int TERMINATE_TIMEOUT = 3; // время отведенное на завершение процесса [сек]
// ------------------------------------------------------------------------------------------
static void activator_terminate( int signo )
{
ulogsys << "****** TERMINATE SIGNAL=" << signo << endl << flush;
// прежде чем вызывать notify_one следует освободить mutex...(вроде как)
{
std::unique_lock<std::mutex> locker(g_termmutex);
if( g_term )
return;
g_term = true;
}
ulogsys << "****** TERMINATE NOTIFY...(signo=" << signo << ")" << endl << flush;
g_term = true;
g_signo = signo;
g_termevent.notify_one();
}
// ------------------------------------------------------------------------------------------
void finished_thread()
{
ulogsys << "****** FINISHED THREAD START **** "<< endl << flush;
std::unique_lock<std::mutex> lk(g_finimutex);
if( g_finished )
return;
g_finished = true;
std::unique_lock<std::mutex> lkw(g_workmutex);
g_finievent.wait_for(lkw, std::chrono::milliseconds(TERMINATE_TIMEOUT*1000), [](){ return (g_work_stopped == true); } );
ulogsys << "****** FINISHED THREAD END.." << endl << flush;
}
// ------------------------------------------------------------------------------------------
static std::shared_ptr<UniSetActivator> gActivator;
static int SIGNO = 0;
static int MYPID = 0;
static const int TERMINATE_TIMEOUT = 10; // время отведенное на завершение процесса [сек]
ost::AtomicCounter procterm = 0;
ost::AtomicCounter doneterm = 0;
void terminate_thread()
{
ulogsys << "****** TERMINATE THREAD: STARTED *******" << endl << flush;
{
std::unique_lock<std::mutex> locker(g_termmutex);
g_term = false;
while( !g_term )
g_termevent.wait(locker);
// g_termmutex надо отпустить, т.к. он будет проверяться в ~UniSetActvator
}
{
std::unique_lock<std::mutex> locker(g_finimutex);
if( g_finished )
return;
}
if( g_act )
{
{
std::unique_lock<std::mutex> lk(g_finimutex);
if( g_finished )
{
ulogsys << "...FINISHED TREAD ALREADY STARTED..." << endl << flush;
return;
}
g_fini_thread = make_shared<std::thread>(finished_thread);
}
ulogsys << "TERMINATE THREAD: call terminated.." << endl << flush;
g_act->terminated(g_signo);
if( g_fini_thread && g_fini_thread->joinable() )
g_fini_thread->join();
ulogsys << "TERMINATE THREAD: FINISHED OK.." << endl << flush;
if( g_act && g_act->orb )
{
try
{
ulogsys << "TERMINATE THREAD: destroy.." << endl;
g_act->orb->destroy();
ulogsys << "TERMINATE THREAD: destroy ok.." << endl;
}
catch(omniORB::fatalException& fe)
{
ulogsys << "(uaDestroy): : поймали omniORB::fatalException:" << endl;
ulogsys << "(uaDestroy): file: " << fe.file() << endl;
ulogsys << "(uaDestroy): line: " << fe.line() << endl;
ulogsys << "(uaDestroy): mesg: " << fe.errmsg() << endl;
}
catch(std::exception& ex)
{
ulogsys << "(destructor): " << ex.what() << endl;
}
}
g_act.reset();
UniSetActivator::set_signals(false);
}
ulogsys << "TERMINATE THREAD: ..bye.." << endl;
{
std::unique_lock<std::mutex> lk(g_donemutex);
g_done = true;
}
g_doneevent.notify_all();
}
// ---------------------------------------------------------------------------
UniSetActivatorPtr UniSetActivator::inst;
// ---------------------------------------------------------------------------
......@@ -69,7 +205,7 @@ UniSetActivatorPtr UniSetActivator::Instance( const UniSetTypes::ObjectId id )
if( inst == nullptr )
{
inst = std::shared_ptr<UniSetActivator>( new UniSetActivator(id) );
gActivator = inst;
g_act = inst;
}
return inst;
......@@ -123,151 +259,78 @@ void UniSetActivator::init()
UniSetActivator::~UniSetActivator()
{
if( !procterm )
{
ulogsys << myname << "(destructor): ..." << endl << flush;
if( !omDestroy )
oaDestroy();
procterm = 1;
doneterm = 1;
set_signals(false);
gActivator.reset();
}
#if 1
if( orbthr )
if( !g_term && orbthr )
{
try
{
if( orbthr->isRunning() )
{
orbthr->stop();
orbthr->join(0);
}
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(oaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(oaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(oaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(oaDestroy): mesg: " << fe.errmsg() << endl;
}
catch(std::exception& ex)
{
ucrit << myname << "(destructor): " << ex.what() << endl;
}
try
{
ulogsys << myname << "(destructor): orb destroy... " << endl;
if( !CORBA::is_nil(orb) )
orb->destroy();
ulogsys << myname << "(destructor): orb destroy ok."<< endl;
std::unique_lock<std::mutex> locker(g_termmutex);
g_term = true;
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(oaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(oaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(oaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(oaDestroy): mesg: " << fe.errmsg() << endl;
}
catch(std::exception& ex)
{
ucrit << myname << "(destructor): " << ex.what() << endl;
}
}
g_signo = 0;
g_termevent.notify_one();
ulogsys << myname << "(run): wait done.." << endl;
#if 1
if( g_term_thread->joinable() )
g_term_thread->join();
#else
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
#endif
ulogsys << myname << "(run): wait done OK." << endl;
}
}
// ------------------------------------------------------------------------------------------
void UniSetActivator::oaDestroy(int signo)
void UniSetActivator::uaDestroy(int signo)
{
// waittermMutex.lock();
if( !omDestroy )
{
omDestroy = true;
ulogsys << myname << "(oaDestroy): begin..."<< endl;
ulogsys << myname << "(oaDestroy): terminate... " << endl;
term(signo);
ulogsys << myname << "(oaDestroy): terminate ok. " << endl;
try
{
stop();
}
catch(...){}
ulogsys << myname << "(oaDestroy): pman deactivate... " << endl;
pman->deactivate(false,true);
ulogsys << myname << "(oaDestroy): pman deactivate ok. " << endl;
if( orbthrIsFinished==0 && orbthr )
{
try
{
ulogsys << myname << "(stop):: shutdown orb... "<<endl;
if( orbthr->isRunning() )
orb->shutdown(false);
ulogsys << myname << "(stop): shutdown ok."<< endl;
orbthr->join(0);
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(oaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(oaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(oaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(oaDestroy): mesg: " << fe.errmsg() << endl;
}
catch(...)
{
ulogsys << myname << "(oaDestroy): orb shutdown: catch... " << endl;
}
ulogsys << myname << "(stop): shutdown ok."<< endl;
#if 1
try
{
ulogsys << myname << "(oaDestroy): orb thread stop... " << endl;
if( omDestroy || !g_act )
return;
orbthr->stop();
if( orbthr->isRunning() )
orbthr->join(0);
omDestroy = true;
ulogsys << myname << "(uaDestroy): begin..."<< endl;
ulogsys << myname << "(oaDestroy): orb thread stop ok. " << endl;
orbthr.reset();
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(oaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(oaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(oaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(oaDestroy): mesg: " << fe.errmsg() << endl;
}
#endif
}
else
{
try
{
ulogsys << myname << "(oaDestroy):: shutdown orb... "<<endl;
orb->shutdown(true);
ulogsys << myname << "(stop): shutdown ok."<< endl;
ulogsys << myname << "(uaDestroy): terminate... " << endl;
term(signo);
ulogsys << myname << "(uaDestroy): terminate ok. " << endl;
try
{
stop();
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(uaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(uaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(uaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(uaDestroy): mesg: " << fe.errmsg() << endl;
}
catch(...)
{
ulogsys << myname << "(uaDestroy): orb shutdown: catch... " << endl;
}
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(oaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(oaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(oaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(oaDestroy): mesg: " << fe.errmsg() << endl;
}
}
ulogsys << myname << "(uaDestroy): pman deactivate... " << endl;
pman->deactivate(false,true);
ulogsys << myname << "(uaDestroy): pman deactivate ok. " << endl;
try
{
ulogsys << myname << "(uaDestroy): shutdown orb... "<<endl;
orb->shutdown(true);
ulogsys << myname << "(uaDestroy): shutdown ok."<< endl;
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(uaDestroy): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(uaDestroy): file: " << fe.file() << endl;
ucrit << myname << "(uaDestroy): line: " << fe.line() << endl;
ucrit << myname << "(uaDestroy): mesg: " << fe.errmsg() << endl;
}
catch(...)
{
ulogsys << myname << "(uaDestroy): orb shutdown: catch... " << endl;
}
ulogsys << myname << "(stop): shutdown ok."<< endl;
}
// ------------------------------------------------------------------------------------------
......@@ -301,15 +364,12 @@ void UniSetActivator::run( bool thread )
{
uinfo << myname << "(run): запускаемся с созданием отдельного потока... "<< endl;
orbthr = make_shared< OmniThreadCreator<UniSetActivator> >( get_aptr(), &UniSetActivator::work);
orbthrIsFinished = 0;
orbthr->start();
orbthr->yield();
}
else
{
uinfo << myname << "(run): запускаемся без создания отдельного потока... "<< endl;
work();
orbthrIsFinished = 1;
}
}
// ------------------------------------------------------------------------------------------
......@@ -365,6 +425,7 @@ void UniSetActivator::work()
else
thpid = getpid();
g_term_thread = make_shared<std::thread>(terminate_thread );
omniORB::setMainThread();
orb->run();
}
......@@ -384,33 +445,22 @@ void UniSetActivator::work()
ucrit << myname << "(work): mesg: " << fe.errmsg() << endl;
}
ulogsys << myname << "(work): orb thread stopped!" << endl;
ulogsys << myname << "(work): orb thread stopped!" << endl << flush;
// orbthr = nullptr;
if( orbthr )
{
try
{
ulogsys << myname << "(work): orb shutdown... " << endl;
orb->shutdown(false);
}
catch(omniORB::fatalException& fe)
{
ucrit << myname << "(work): : поймали omniORB::fatalException:" << endl;
ucrit << myname << "(work): file: " << fe.file() << endl;
ucrit << myname << "(work): line: " << fe.line() << endl;
ucrit << myname << "(work): mesg: " << fe.errmsg() << endl;
}
catch( std::exception& ex )
{
ucrit << myname << "(work): mesg: " << ex.what() << endl;
}
ulogsys << myname << "(work): orb shutdown ok."<< endl;
std::unique_lock<std::mutex> lkw(g_workmutex);
g_work_stopped = true;
}
g_finievent.notify_one();
orbthrIsFinished = 1;
orbthr = nullptr;
if( orbthr )
{
// HACK: почему-то мы должны тут застрять, пока не завершится основной процесс
// (terminate_thread) иначе возникает double free corruption.. :(
// возможно что-то некорректно с уничтожением orbthr..
pause();
}
}
// ------------------------------------------------------------------------------------------
void UniSetActivator::sysCommand( const UniSetTypes::SystemMessage *sm )
......@@ -456,7 +506,7 @@ void UniSetActivator::set_signals(bool ask)
// act.sa_flags |= SA_RESTART;
// act.sa_flags |= SA_RESETHAND;
if(ask)
act.sa_handler = terminated;
act.sa_handler = activator_terminate; // terminated;
else
act.sa_handler = SIG_DFL;
......@@ -469,22 +519,6 @@ void UniSetActivator::set_signals(bool ask)
}
// ------------------------------------------------------------------------------------------
void UniSetActivator::finishterm( int signo )
{
if( !doneterm )
{
ulogsys << ( gActivator ? gActivator->getName() : "" )
<< "(finishterm): прерываем процесс завершения...!" << endl<< flush;
if( gActivator )
gActivator->set_signals(false);
sigset(SIGALRM, SIG_DFL);
doneterm = 1;
kill(getpid(),SIGKILL);
}
}
// ------------------------------------------------------------------------------------------
UniSetActivator::TerminateEvent_Signal UniSetActivator::signal_terminate_event()
{
return s_term;
......@@ -492,64 +526,89 @@ UniSetActivator::TerminateEvent_Signal UniSetActivator::signal_terminate_event()
// ------------------------------------------------------------------------------------------
void UniSetActivator::terminated( int signo )
{
if( !signo || doneterm || !gActivator || procterm )
if( !g_act )
return;
{ // lock
// на случай прихода нескольких сигналов
uniset_rwmutex_wrlock l(signalMutex); //, TERMINATE_TIMEOUT*1000);
if( !procterm )
{
procterm = 1;
SIGNO = signo;
MYPID = getpid();
ulogsys << ( gActivator ? gActivator->getName() : "" ) << "(terminated): catch SIGNO="<< signo << "("
<< strsignal(signo) <<")"<< endl << flush
<< ( gActivator ? gActivator->getName() : "" ) << "(terminated): устанавливаем timer завершения на "
<< TERMINATE_TIMEOUT << " сек " << endl << flush;
sighold(SIGALRM);
sigset(SIGALRM, UniSetActivator::finishterm);
alarm(TERMINATE_TIMEOUT);
sigrelse(SIGALRM);
if( gActivator )
{
ulogsys << ( gActivator ? gActivator->getName() : "" ) << "(terminated): call oaDestroy.." << endl;
if( gActivator ) gActivator->oaDestroy(SIGNO); // gActivator->term(SIGNO);
}
doneterm = 1;
ulogsys << ( gActivator ? gActivator->getName() : "" ) << "(terminated): завершаемся..."<< endl<< flush;
if( gActivator )
{
UniSetActivator::set_signals(false);
gActivator.reset();
}
sigset(SIGALRM, SIG_DFL);
kill(getpid(), SIGNO );
}
ulogsys << "(terminated): start.." << endl;
try
{
g_act->uaDestroy(signo);
ulogsys << "(terminated): uaDestroy ok.." << endl;
}
catch(omniORB::fatalException& fe)
{
ulogsys << "(terminated): : поймали omniORB::fatalException:" << endl;
ulogsys << "(terminated): file: " << fe.file() << endl;
ulogsys << "(terminated): line: " << fe.line() << endl;
ulogsys << "(terminated): mesg: " << fe.errmsg() << endl;
}
catch(std::exception& ex)
{
ulogsys << "(terminated): " << ex.what() << endl;
}
ulogsys << "terminated ok.." << endl;
}
// ------------------------------------------------------------------------------------------
void UniSetActivator::normalexit()
{
if( gActivator )
if( g_act )
{
ulogsys << gActivator->getName() << "(default exit): good bye."<< endl << flush;
gActivator->stop();
ulogsys << g_act->getName() << "(default exit): ..begin..."<< endl << flush;
if( g_term == false )
{
// прежде чем вызывать notify_one(), мы должны освободить mutex!
{
std::unique_lock<std::mutex> locker(g_termmutex);
g_term = true;
g_signo = 0;
}
ulogsys << "(default exit): notify terminate.." << endl << flush;
g_termevent.notify_one();
}
ulogsys << "(default exit): wait done.." << endl << flush;
#if 1
if( g_term_thread->joinable() )
g_term_thread->join();
#else
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
#endif
ulogsys << "(default exit): wait done OK (good bye)" << endl << flush;
}
}
void UniSetActivator::normalterminate()
{
if( gActivator )
if( g_act )
{
ucrit << gActivator->getName() << "(default exception terminate): Unkown exception.. Good bye."<< endl<< flush;
ulogsys << g_act->getName() << "(default terminate): ..begin..."<< endl << flush;
if( g_term == false )
{
// прежде чем вызывать notify_one(), мы должны освободить mutex!
{
std::unique_lock<std::mutex> locker(g_termmutex);
g_term = true;
g_signo = 0;
}
ulogsys << "(default terminate): notify terminate.." << endl << flush;
g_termevent.notify_one();
}
ulogsys << "(default terminate): wait done.." << endl << flush;
#if 1
if( g_term_thread->joinable() )
g_term_thread->join();
#else
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
#endif
ulogsys << "(default terminate): wait done OK (good bye)" << endl << flush;
}
}
// ------------------------------------------------------------------------------------------
......@@ -557,8 +616,11 @@ void UniSetActivator::term( int signo )
{
ulogsys << myname << "(term): TERM" << endl;
if( doneterm )
return;
{
std::unique_lock<std::mutex> locker(g_termmutex);
if( g_term )
return;
}
if( signo )
sig = true;
......@@ -579,9 +641,3 @@ void UniSetActivator::term( int signo )
ulogsys << myname << "(term): END TERM" << endl;
}
// ------------------------------------------------------------------------------------------
void UniSetActivator::waitDestroy()
{
while( !doneterm && gActivator )
msleep(50);
}
// ------------------------------------------------------------------------------------------
......@@ -53,13 +53,13 @@ TEST_CASE("CallbackTimer", "[CallbackTimer]" )
REQUIRE( tc.getNum3() == 0 );
msleep(210);
REQUIRE( tc.getNum1() >= 7 );
REQUIRE( tc.getNum1() >= 5 );
REQUIRE( tc.getNum2() >= 2 );
REQUIRE( tc.getNum3() == 1 );
tmr.remove(1);
msleep(60);
REQUIRE( tc.getNum1() >= 7 );
REQUIRE( tc.getNum1() >= 6 );
tmr.terminate();
REQUIRE( tc.getNum2() >= 2 );
......
......@@ -15,7 +15,7 @@
{
bash
Memcheck:Leak
match-leak-kinds: definite
# match-leak-kinds: definite
fun:malloc
fun:xmalloc
fun:make_if_command
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment