You need to sign in or sign up before continuing.
Commit 54b685e2 authored by Pavel Vainerman's avatar Pavel Vainerman

Очередная версия механизма завершения процессов. Добавлен контрольный поток

(делающий KILL если процессы не завершились), а также проведены некоторые стилистические правки.
parent b1ef89fe
......@@ -96,10 +96,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
ucrit &lt;&lt; myname &lt;&lt; "(execute): СORBA::SystemException: "
&lt;&lt; ex.NP_minorString() &lt;&lt; endl;
}
catch(...)
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch ..." &lt;&lt; endl;
}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
if( !active )
return;
......@@ -168,11 +168,11 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::preAskSensors( UniversalIO::UIOComm
{
ucrit &lt;&lt; myname &lt;&lt; "(preAskSensors): " &lt;&lt; ex &lt;&lt; endl;
}
catch(...)
{
ucrit &lt;&lt; myname &lt;&lt; "(preAskSensors): catch(...)" &lt;&lt; endl;
}
msleep(askPause);
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
msleep(askPause);
}
}
// -----------------------------------------------------------------------------
......
......@@ -101,9 +101,9 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
ucrit &lt;&lt; myname &lt;&lt; "(execute): СORBA::SystemException: "
&lt;&lt; ex.NP_minorString() &lt;&lt; endl;
}
catch(...)
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch ..." &lt;&lt; endl;
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
if( !active )
......
......@@ -95,10 +95,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
ucrit &lt;&lt; myname &lt;&lt; "(execute): СORBA::SystemException: "
&lt;&lt; ex.NP_minorString() &lt;&lt; endl;
}
catch(...)
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch ..." &lt;&lt; endl;
}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
if( !active )
return;
......@@ -233,10 +233,11 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::preAskSensors( UniversalIO::UIOComm
{
ucrit &lt;&lt; myname &lt;&lt; "(preAskSensors): " &lt;&lt; ex &lt;&lt; endl;
}
catch(...)
{
ucrit &lt;&lt; myname &lt;&lt; "(preAskSensors): catch(...)" &lt;&lt; endl;
}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
msleep(askPause);
}
}
......
......@@ -99,10 +99,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
ucrit &lt;&lt; myname &lt;&lt; "(execute): СORBA::SystemException: "
&lt;&lt; ex.NP_minorString() &lt;&lt; endl;
}
catch(...)
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch ..." &lt;&lt; endl;
}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
if( !active )
return;
......
......@@ -115,7 +115,10 @@
ui->setValue( si,(m_<xsl:value-of select="../../@name"/> ? 1:0), getId() );
return true;
}
catch(...){}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
return false;
}
</xsl:when>
......@@ -395,10 +398,11 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::waitSM( int wait_msec, ObjectId _te
&lt;&lt; wait_msec &lt;&lt; " мсек";
ucrit &lt;&lt; err.str() &lt;&lt; endl;
// terminate();
// abort();
raise(SIGTERM);
terminate();
abort();
// kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
// throw SystemError(err.str());
}
<xsl:if test="normalize-space($TESTMODE)!=''">
......@@ -412,10 +416,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::waitSM( int wait_msec, ObjectId _te
&lt;&lt; wait_msec &lt;&lt; " мсек";
ucrit &lt;&lt; err.str() &lt;&lt; endl;
terminate();
abort();
// kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
// terminate();
// abort();
raise(SIGTERM);
// throw SystemError(err.str());
}
}
</xsl:if>
......@@ -809,7 +813,11 @@ bool <xsl:value-of select="$CLASSNAME"/>_SK::alarm( UniSetTypes::ObjectId _code,
ui->setValue( si,m_<xsl:value-of select="@name"/>,getId() );
return true;
}
catch(...){}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; myname &lt;&lt; "(execute): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
return false;
}
</xsl:for-each>
......
......@@ -84,6 +84,10 @@ int main( int argc,char* argv[] )
{
cerr &lt;&lt; "(main): " &lt;&lt; ex &lt;&lt; endl;
}
catch( std::exception&amp;ex )
{
ucrit &lt;&lt; "(main): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
catch(...)
{
cerr &lt;&lt; "(main): catch ..." &lt;&lt; endl;
......
......@@ -92,6 +92,10 @@ int main( int argc, const char** argv )
{
cerr &lt;&lt; "(main): " &lt;&lt; ex &lt;&lt; endl;
}
catch( std::exception&amp;ex )
{
cerr &lt;&lt; "(main): catch " &lt;&lt; ex.what() &lt;&lt; endl;
}
catch(...)
{
cerr &lt;&lt; "(main): catch ..." &lt;&lt; endl;
......
......@@ -13,10 +13,10 @@ GENERATED=TestGen_SK.h TestGen_SK.cc TestGen-main.cc
GENERATED2=TestGenAlone_SK.h TestGenAlone_SK.cc TestGenAlone-main.cc
GENUOBJ=UObject_SK.cc UObject_SK.h
TestGen_SK.cc: ../@PACKAGE@-codegen testgen.src.xml ../*.xsl
TestGen_SK.cc TestGen_SK.h: ../@PACKAGE@-codegen testgen.src.xml ../*.xsl
../@PACKAGE@-codegen -l $(top_builddir)/Utilities/codegen --local-include -n TestGen testgen.src.xml
TestGenAlone_SK.cc: ../@PACKAGE@-codegen testgen-alone.src.xml ../*.xsl
TestGenAlone_SK.cc TestGenAlone_SK.h: ../@PACKAGE@-codegen testgen-alone.src.xml ../*.xsl
../@PACKAGE@-codegen -l $(top_builddir)/Utilities/codegen --local-include --alone -n TestGenAlone testgen-alone.src.xml
$(GENUOBJ): ../@PACKAGE@-codegen uobject.src.xml ../*.xsl
......
......@@ -14,6 +14,7 @@
--mbtcp-force-disconnect 1 \
--mbtcp-polltime 3000 \
--mbtcp-force-out 1 \
--ulog-add-levels system \
$*
#--mbtcp-exchange-mode-id MB1_Mode_AS \
......
......@@ -95,7 +95,7 @@ void MBTCPTestServer::runThread()
void MBTCPTestServer::execute()
{
isrunning = true;
cerr << "******************** MBTCPTestServer(" << myname << ") running... " << endl;
// cerr << "******************** MBTCPTestServer(" << myname << ") running... " << endl;
// Работа...
while(1)
{
......
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
......@@ -389,7 +389,10 @@ void MBSlave::waitSMReady()
ostringstream err;
err << myname << "(waitSMReady): Не дождались готовности SharedMemory к работе в течение " << ready_timeout << " мсек";
dcrit << err.str() << endl;
throw SystemError(err.str());
// throw SystemError(err.str());
raise(SIGTERM);
terminate();
// abort();
}
}
// -----------------------------------------------------------------------------
......
......@@ -4,7 +4,7 @@ uniset2-start.sh -f ./uniset2-mbslave --confile test.xml --dlog-add-levels any \
--smemory-id SharedMemory \
--mbs-name MBSlave1 \
--mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \
--mbs-askcount-id SVU_AskCount_AS --mbs-respond-id RespondRTU_S --mbs-respond-invert 1 $*
--mbs-askcount-id SVU_AskCount_AS --mbs-respond-id RespondRTU_S --mbs-respond-invert 1 --ulog-add-levels system $*
# --mbs-force 1
#--mbs-reg-from-id 1 \
#--mbs-filter-field CAN2sender --mbs-filter-value SYSTSNode \
\ No newline at end of file
......@@ -5,6 +5,6 @@ export LD_LIBRARY_PATH="../../lib/.libs;../lib/.libs"
ulimit -Sc 10000000000
./uniset2-start.sh -f ./uniset2-smemory --smemory-id SharedMemory --pulsar-id DO_C --pulsar-iotype DO --pulsar-msec 100 \
--confile test.xml --datfile test.xml --db-logging 1 $* \
--confile test.xml --datfile test.xml --db-logging 1 --ulog-add-levels system $* \
#--ulog-add-levels info,crit,warn,level9,system \
#--dlog-add-levels info,crit,warn \
......@@ -51,7 +51,7 @@
*/
// -----------------------------------------------------------------------------
class UNetReceiver:
public std::enable_shared_from_this<UNetReceiver>
public std::enable_shared_from_this<UNetReceiver>
{
public:
UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi );
......
......@@ -8,7 +8,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2015-01-17+03:00
// generate timestamp: 2015-01-19+03:00
// -----------------------------------------------------------------------------
#ifndef UObject_SK_H_
#define UObject_SK_H_
......
......@@ -11,7 +11,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2015-01-17+03:00
// generate timestamp: 2015-01-19+03:00
// -----------------------------------------------------------------------------
#include "Configuration.h"
#include "Exceptions.h"
......@@ -342,10 +342,11 @@ void UObject_SK::waitSM( int wait_msec, ObjectId _testID )
<< wait_msec << " мсек";
ucrit << err.str() << endl;
// terminate();
// abort();
raise(SIGTERM);
terminate();
abort();
// kill(SIGTERM,getpid()); // прерываем (перезапускаем) процесс...
throw SystemError(err.str());
// throw SystemError(err.str());
}
......@@ -402,10 +403,10 @@ void UObject_SK::callback()
ucrit << myname << "(execute): СORBA::SystemException: "
<< ex.NP_minorString() << endl;
}
catch(...)
{
ucrit << myname << "(execute): catch ..." << endl;
}
catch( std::exception&ex )
{
ucrit << myname << "(execute): catch " << ex.what() << endl;
}
if( !active )
return;
......@@ -483,10 +484,11 @@ void UObject_SK::preAskSensors( UniversalIO::UIOCommand _cmd )
{
ucrit << myname << "(preAskSensors): " << ex << endl;
}
catch(...)
{
ucrit << myname << "(preAskSensors): catch(...)" << endl;
}
catch( std::exception&ex )
{
ucrit << myname << "(execute): catch " << ex.what() << endl;
}
msleep(askPause);
}
}
......
......@@ -10,4 +10,4 @@ cd ../../Utilities/Admin/
cd -
./uniset2-start.sh -f ./tests_with_sm $* -- --confile tests_with_sm.xml --e-startup-pause 10 --ulog-levels warn,crit --dlog-levels warn,crit
./uniset2-start.sh -f ./tests_with_sm $* -- --confile tests_with_sm.xml --e-startup-pause 10 --ulog-add-levels system,warn,crit --dlog-levels warn,crit
......@@ -104,6 +104,8 @@ class OmniThreadCreator:
inline void stop(){ exit(0); }
inline pid_t getTID(){ return id(); }
inline void join(){ omni_thread::join(NULL); }
protected:
void* run_undetached(void *x)
{
......
......@@ -27,6 +27,10 @@
#include <signal.h>
#include <sys/time.h>
#include <cc++/socket.h>
#include <condition_variable>
#include <thread>
#include <mutex>
#include <atomic>
#include "Mutex.h"
//----------------------------------------------------------------------------------------
/*! \class UniSetTimer
......@@ -111,53 +115,48 @@ private:
};
//----------------------------------------------------------------------------------------
class omni_mutex;
class omni_condition;
/*! \class ThrPassiveTimer
/*! \class PassiveCondTimer
* \brief Пассивный таймер с режимом засыпания (ожидания)
* \author Pavel Vainerman
* \par
* Позволяет заснуть на заданное время wait(timeout_t timeMS).
* Механизм работает на основе взаимных блокировок потоков (mutex и condition).
* Механизм работает на основе std::condition_variable
* \note Если таймер запущен в режиме ожидания (WaitUpTime), то он может быть выведен из него
* при помощи terminate().
* ТОЛЬКО при помощи terminate().
*/
class ThrPassiveTimer:
class PassiveCondTimer:
public PassiveTimer
{
public:
ThrPassiveTimer();
~ThrPassiveTimer();
PassiveCondTimer();
~PassiveCondTimer();
virtual bool wait(timeout_t timeMS); /*!< блокировать вызывающий поток на заданное время */
virtual void terminate(); /*!< прервать работу таймера */
protected:
bool isTerminated();
void setTerminated( bool set );
private:
bool terminated;
omni_mutex* tmutex;
omni_condition* tcondx;
UniSetTypes::uniset_mutex term_mutex;
std::atomic_bool terminated;
std::mutex m_working;
std::condition_variable cv_working;
};
//----------------------------------------------------------------------------------------
/*! \class PassiveSysTimer
/*! \class PassiveSigTimer
* \brief Пассивный таймер с режимом засыпания (ожидания)
* \author Pavel Vainerman
* \par
* Создан на основе сигнала (SIGALRM).
*/
class PassiveSysTimer:
class PassiveSigTimer:
public PassiveTimer
{
public:
PassiveSysTimer();
~PassiveSysTimer();
PassiveSigTimer();
~PassiveSigTimer();
virtual bool wait(timeout_t timeMS); //throw(UniSetTypes::NotSetSignal);
virtual void terminate();
......
......@@ -137,6 +137,8 @@ class ThreadCreator:
{
if( finm )
(finm->*finact)();
//delete this;
}
virtual void initial()
......
......@@ -110,12 +110,11 @@ class UniSetActivator:
static std::shared_ptr<UniSetActivator> inst;
private:
friend void terminate_thread();
friend void finished_thread();
friend std::shared_ptr<UniSetTypes::Configuration> UniSetTypes::uniset_init( int argc, const char* const* argv, const std::string& xmlfile );
static void terminated(int signo);
// static void finishterm(int signo);
static void normalexit();
static void normalterminate();
static void set_signals(bool ask);
......
......@@ -26,6 +26,10 @@
#ifndef UniSetObject_H_
#define UniSetObject_H_
//--------------------------------------------------------------------------
#include <condition_variable>
#include <thread>
#include <mutex>
#include <atomic>
#include <unistd.h>
#include <sys/time.h>
#include <queue>
......@@ -135,7 +139,7 @@ class UniSetObject:
* Например переход в безопасное состояние.
* \warning В обработчике сигналов \b ЗАПРЕЩЕНО вызывать функции подобные exit(..), abort()!!!!
*/
virtual void sigterm( int signo ){};
virtual void sigterm( int signo );
inline void terminate(){ deactivate(); }
......@@ -249,7 +253,12 @@ class UniSetObject:
// статистическая информация
unsigned long stMaxQueueMessages; /*<! Максимальное число сообщений хранившихся в очереди */
unsigned long stCountOfQueueFull; /*! количество переполнений очереди сообщений */
unsigned long stCountOfQueueFull; /*!< количество переполнений очереди сообщений */
std::atomic_bool a_working;
std::mutex m_working;
std::condition_variable cv_working;
// timeout_t workingTerminateTimeout; /*!< время ожидания завершения потока */
};
//---------------------------------------------------------------------------
#endif
......
......@@ -1929,7 +1929,16 @@ bool UInterface::waitReady( const ObjectId id, int msec, int pmsec, const Object
if( ready )
break;
}
catch(...){}
catch(CORBA::OBJECT_NOT_EXIST)
{
}
catch(CORBA::COMM_FAILURE& ex)
{
}
catch(...)
{
break;
}
msleep(pmsec);
}
......
......@@ -63,9 +63,12 @@ using namespace std;
- завершение CORBA-потока (если был создан) (функция work).
- orb destroy
Для защиты от "зависания" во время завершения, создаётся ещё один временный поток g_fini_thread,
который в течение TERMINATE_TIMEOUT секунд ждёт события 'g_finievent'. После чего происходит
"принудительное" отключение, обработчики сигналов восстанавливаются на умолчательные и процесс
Для защиты от "зависания" во время завершения, создаётся ещё один поток g_kill_thread, который ожидает завершения
в течение KILL_TIMEOUT и формирует сигнал SIGKILL если в течение этого времени не выставился флаг g_done = true;
Помимио этого в процессе завершения создаётся временный поток g_fini_thread,
который в течение TERMINATE_TIMEOUT секунд ждёт события 'g_finievent' (от процесса остановки котока для ORB).
После чего происходит "принудительное" отключение, обработчики сигналов восстанавливаются на умолчательные и процесс
завершается..
В случае, если для UniSetActivator был создан отдельный поток ( run(true) ) при завершении работы программы
......@@ -89,7 +92,9 @@ static std::mutex g_donemutex;
static std::condition_variable g_doneevent;
static std::shared_ptr<std::thread> g_term_thread;
static std::shared_ptr<std::thread> g_fini_thread;
static std::shared_ptr<std::thread> g_kill_thread;
static const int TERMINATE_TIMEOUT = 3; // время отведенное на завершение процесса [сек]
static const int KILL_TIMEOUT = 8;
// ------------------------------------------------------------------------------------------
static void activator_terminate( int signo )
{
......@@ -111,7 +116,7 @@ static void activator_terminate( int signo )
// ------------------------------------------------------------------------------------------
void finished_thread()
{
ulogsys << "****** FINISHED THREAD START **** "<< endl << flush;
ulogsys << "****** FINISHED START **** "<< endl << flush;
std::unique_lock<std::mutex> lk(g_finimutex);
if( g_finished )
......@@ -121,7 +126,25 @@ void finished_thread()
std::unique_lock<std::mutex> lkw(g_workmutex);
g_finievent.wait_for(lkw, std::chrono::milliseconds(TERMINATE_TIMEOUT*1000), [](){ return (g_work_stopped == true); } );
ulogsys << "****** FINISHED THREAD END.." << endl << flush;
ulogsys << "****** FINISHED END ****" << endl << flush;
}
// ------------------------------------------------------------------------------------------
void kill_thread()
{
std::unique_lock<std::mutex> lk(g_donemutex);
if( g_done )
return;
g_doneevent.wait_for(lk, std::chrono::milliseconds(KILL_TIMEOUT*1000), [](){ return (g_done == true); } );
if( !g_done )
{
ulogsys << "****** KILL TIMEOUT.. *******" << endl << flush;
raise(SIGKILL);
}
ulogsys << "KILL THREAD: ..bye.." << endl;
}
// ------------------------------------------------------------------------------------------
void terminate_thread()
......@@ -138,49 +161,75 @@ void terminate_thread()
// g_termmutex надо отпустить, т.к. он будет проверяться в ~UniSetActvator
}
ulogsys << "****** TERMINATE THREAD: event signo=" << g_signo << endl << flush;
{
std::unique_lock<std::mutex> locker(g_finimutex);
if( g_finished )
return;
}
{
std::unique_lock<std::mutex> lk(g_donemutex);
g_done = false;
g_kill_thread = make_shared<std::thread>(kill_thread);
}
if( g_act )
{
{
std::unique_lock<std::mutex> lk(g_finimutex);
if( g_finished )
{
ulogsys << "...FINISHED TREAD ALREADY STARTED..." << endl << flush;
ulogsys << "...FINISHED ALREADY STARTED..." << endl << flush;
return;
}
g_fini_thread = make_shared<std::thread>(finished_thread);
}
ulogsys << "TERMINATE THREAD: call terminated.." << endl << flush;
try
{
ulogsys << "TERMINATE THREAD: destroy.." << endl;
g_act->orb->shutdown(true);
ulogsys << "TERMINATE THREAD: destroy ok.." << endl;
}
catch(omniORB::fatalException& fe)
{
ulogsys << "(TERMINATE THREAD): : поймали omniORB::fatalException:" << endl;
ulogsys << "(TERMINATE THREAD): file: " << fe.file() << endl;
ulogsys << "(TERMINATE THREAD): line: " << fe.line() << endl;
ulogsys << "(TERMINATE THREAD): mesg: " << fe.errmsg() << endl;
}
catch(std::exception& ex)
{
ulogsys << "(TERMINATE THREAD): " << ex.what() << endl;
}
ulogsys << "(TERMINATE THREAD): call terminated.." << endl << flush;
g_act->terminated(g_signo);
if( g_fini_thread && g_fini_thread->joinable() )
g_fini_thread->join();
ulogsys << "TERMINATE THREAD: FINISHED OK.." << endl << flush;
ulogsys << "(TERMINATE THREAD): FINISHED OK.." << endl << flush;
if( g_act && g_act->orb )
{
try
{
ulogsys << "TERMINATE THREAD: destroy.." << endl;
ulogsys << "(TERMINATE THREAD): destroy.." << endl;
g_act->orb->destroy();
ulogsys << "TERMINATE THREAD: destroy ok.." << endl;
ulogsys << "(TERMINATE THREAD): destroy ok.." << endl;
}
catch(omniORB::fatalException& fe)
{
ulogsys << "(uaDestroy): : поймали omniORB::fatalException:" << endl;
ulogsys << "(uaDestroy): file: " << fe.file() << endl;
ulogsys << "(uaDestroy): line: " << fe.line() << endl;
ulogsys << "(uaDestroy): mesg: " << fe.errmsg() << endl;
ulogsys << "(TERMINATE THREAD): : поймали omniORB::fatalException:" << endl;
ulogsys << "(TERMINATE THREAD): file: " << fe.file() << endl;
ulogsys << "(TERMINATE THREAD): line: " << fe.line() << endl;
ulogsys << "(TERMINATE THREAD): mesg: " << fe.errmsg() << endl;
}
catch(std::exception& ex)
{
ulogsys << "(destructor): " << ex.what() << endl;
ulogsys << "(TERMINATE THREAD): " << ex.what() << endl;
}
}
......@@ -188,14 +237,14 @@ void terminate_thread()
UniSetActivator::set_signals(false);
}
ulogsys << "TERMINATE THREAD: ..bye.." << endl;
{
std::unique_lock<std::mutex> lk(g_donemutex);
g_done = true;
}
g_doneevent.notify_all();
g_doneevent.notify_all();
g_kill_thread->join();
ulogsys << "(TERMINATE THREAD): ..bye.." << endl;
}
// ---------------------------------------------------------------------------
UniSetActivatorPtr UniSetActivator::inst;
......@@ -204,7 +253,7 @@ UniSetActivatorPtr UniSetActivator::Instance( const UniSetTypes::ObjectId id )
{
if( inst == nullptr )
{
inst = std::shared_ptr<UniSetActivator>( new UniSetActivator(id) );
inst = shared_ptr<UniSetActivator>( new UniSetActivator(id) );
g_act = inst;
}
......@@ -251,8 +300,10 @@ void UniSetActivator::init()
if( CORBA::is_nil(poa) )
ucrit << myname << "(init): init poa failed!!!" << endl;
atexit( UniSetActivator::normalexit );
set_terminate( UniSetActivator::normalterminate ); // ловушка для неизвестных исключений
// Чтобы подключиться к функциям завершения как можно раньше (раньше создания объектов)
// этот код перенесён в Configuration::uniset_init (в надежде, что uniset_init всегда вызывается одной из первых).
// atexit( UniSetActivator::normalexit );
// set_terminate( UniSetActivator::normalterminate ); // ловушка для неизвестных исключений
}
// ------------------------------------------------------------------------------------------
......@@ -269,8 +320,8 @@ UniSetActivator::~UniSetActivator()
g_termevent.notify_one();
ulogsys << myname << "(run): wait done.." << endl;
#if 1
if( g_term_thread->joinable() )
g_term_thread->join();
// if( g_term_thread->joinable() )
// g_term_thread->join();
#else
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
......@@ -360,6 +411,7 @@ void UniSetActivator::run( bool thread )
msleep(50);
set_signals(true);
if( thread )
{
uinfo << myname << "(run): запускаемся с созданием отдельного потока... "<< endl;
......@@ -446,7 +498,6 @@ void UniSetActivator::work()
}
ulogsys << myname << "(work): orb thread stopped!" << endl << flush;
// orbthr = nullptr;
{
std::unique_lock<std::mutex> lkw(g_workmutex);
......@@ -456,9 +507,8 @@ void UniSetActivator::work()
if( orbthr )
{
// HACK: почему-то мы должны тут застрять, пока не завершится основной процесс
// (terminate_thread) иначе возникает double free corruption.. :(
// возможно что-то некорректно с уничтожением orbthr..
// HACK: почему-то мы должны тут застрять,
// где-то что-то некорректно с уничтожением потока..
pause();
}
}
......@@ -550,7 +600,6 @@ void UniSetActivator::terminated( int signo )
ulogsys << "terminated ok.." << endl;
}
// ------------------------------------------------------------------------------------------
void UniSetActivator::normalexit()
{
if( g_act )
......@@ -570,12 +619,15 @@ void UniSetActivator::normalexit()
ulogsys << "(default exit): wait done.." << endl << flush;
#if 1
if( g_term_thread->joinable() )
if( g_term_thread && g_term_thread->joinable() )
g_term_thread->join();
#else
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
if( g_doneevent )
{
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
}
#endif
ulogsys << "(default exit): wait done OK (good bye)" << endl << flush;
......@@ -601,12 +653,15 @@ void UniSetActivator::normalterminate()
ulogsys << "(default terminate): wait done.." << endl << flush;
#if 1
if( g_term_thread->joinable() )
if( g_term_thread && g_term_thread->joinable() )
g_term_thread->join();
#else
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
if( g_doneevent )
{
std::unique_lock<std::mutex> locker(g_donemutex);
while( !g_done )
g_doneevent.wait(locker);
}
#endif
ulogsys << "(default terminate): wait done OK (good bye)" << endl << flush;
}
......
......@@ -27,6 +27,7 @@
#include <pthread.h>
#include <sys/types.h>
#include <sstream>
#include <chrono>
#include "Exceptions.h"
#include "ORepHelpers.h"
......@@ -40,7 +41,7 @@
using namespace std;
using namespace UniSetTypes;
#define CREATE_TIMER make_shared<ThrPassiveTimer>();
#define CREATE_TIMER make_shared<PassiveCondTimer>();
// new PassiveSysTimer();
// ------------------------------------------------------------------------------------------
......@@ -56,7 +57,7 @@ MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0),
stCountOfQueueFull(0)
{
ui = make_shared<UInterface>(UniSetTypes::DefaultObjectId);
ui = make_shared<UInterface>(UniSetTypes::DefaultObjectId);
tmr = CREATE_TIMER;
myname = "noname";
......@@ -76,7 +77,7 @@ MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0),
stCountOfQueueFull(0)
{
ui = make_shared<UInterface>(id);
ui = make_shared<UInterface>(id);
tmr = CREATE_TIMER;
if (myid >=0)
{
......@@ -108,7 +109,7 @@ MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0),
stCountOfQueueFull(0)
{
ui = make_shared<UInterface>(UniSetTypes::DefaultObjectId);
ui = make_shared<UInterface>(UniSetTypes::DefaultObjectId);
/*! \warning UniverslalInterface не инициализируется идентификатором объекта */
tmr = CREATE_TIMER;
......@@ -153,6 +154,9 @@ UniSetObject::~UniSetObject()
// ------------------------------------------------------------------------------------------
void UniSetObject::init_object()
{
a_working = ATOMIC_VAR_INIT(0);
active = ATOMIC_VAR_INIT(0);
qmutex.setName(myname + "_qmutex");
refmutex.setName(myname + "_refmutex");
// mutex_act.setName(myname + "_mutex_act");
......@@ -160,11 +164,8 @@ void UniSetObject::init_object()
auto conf = uniset_conf();
SizeOfMessageQueue = conf->getArgPInt("--uniset-object-size-message-queue",conf->getField("SizeOfMessageQueue"), 1000);
MaxCountRemoveOfMessage = conf->getArgInt("--uniset-object-maxcount-remove-message",conf->getField("MaxCountRemoveOfMessage"));
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = SizeOfMessageQueue / 4;
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = 10;
MaxCountRemoveOfMessage = conf->getArgPInt("--uniset-object-maxcount-remove-message",conf->getField("MaxCountRemoveOfMessage"),SizeOfMessageQueue / 4);
// workingTerminateTimeout = conf->getArgPInt("--uniset-object-working-terminate-timeout",conf->getField("WorkingTerminateTimeout"),2000);
uinfo << myname << "(init): SizeOfMessageQueue=" << SizeOfMessageQueue
<< " MaxCountRemoveOfMessage=" << MaxCountRemoveOfMessage
......@@ -610,6 +611,10 @@ unsigned int UniSetObject::countMessages()
}
}
// ------------------------------------------------------------------------------------------
void UniSetObject::sigterm( int signo )
{
}
// ------------------------------------------------------------------------------------------
bool UniSetObject::deactivate()
{
if( !isActive() )
......@@ -624,13 +629,23 @@ bool UniSetObject::deactivate()
setActive(false); // завершаем поток обработки сообщений
if( tmr )
tmr->stop();
tmr->terminate();
if( thr )
{
std::unique_lock<std::mutex> lk(m_working);
// cv_working.wait_for(lk, std::chrono::milliseconds(workingTerminateTimeout), [&](){ return (a_working == false); } );
if( a_working )
cv_working.wait(lk);
if( a_working )
thr->stop();
}
// Очищаем очередь
{ // lock
uniset_rwmutex_wrlock mlk(qmutex);
while( !queueMsg.empty() )
queueMsg.pop();
queueMsg.pop();
}
try
......@@ -745,6 +760,10 @@ bool UniSetObject::activate()
if( myid!=UniSetTypes::DefaultObjectId && threadcreate )
{
thr = make_shared< ThreadCreator<UniSetObject> >(this, &UniSetObject::work);
thr->setCancel(ost::Thread::cancelDeferred);
std::unique_lock<std::mutex> locker(m_working);
a_working = true;
thr->start();
}
else
......@@ -767,10 +786,22 @@ void UniSetObject::work()
if( thr )
msgpid = thr->getTID();
while( isActive() )
callback();
{
std::unique_lock<std::mutex> locker(m_working);
a_working = true;
}
while( isActive() )
callback();
uinfo << myname << ": thread processing messages stopped..." << endl;
{
std::unique_lock<std::mutex> locker(m_working);
a_working = false;
}
cv_working.notify_all();
}
// ------------------------------------------------------------------------------------------
void UniSetObject::callback()
......
......@@ -2,4 +2,4 @@
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libTimers.la
libTimers_la_SOURCES=PassiveTimer.cc PassiveSysTimer.cc ThrPassiveTimer.cc
libTimers_la_SOURCES=PassiveTimer.cc PassiveSigTimer.cc PassiveCondTimer.cc
......@@ -25,76 +25,45 @@
#include <unistd.h>
#include <sstream>
#include <time.h>
#include <omnithread.h>
#include "PassiveTimer.h"
// ------------------------------------------------------------------------------------------
using namespace std;
// ------------------------------------------------------------------------------------------
ThrPassiveTimer::ThrPassiveTimer():
terminated(1)
PassiveCondTimer::PassiveCondTimer():
terminated(ATOMIC_VAR_INIT(1))
{
// были сделаны указателями
// чтобы уйти от include в head-файле
tmutex = new omni_mutex();
tcondx = new omni_condition(tmutex);
}
// ------------------------------------------------------------------------------------------
ThrPassiveTimer::~ThrPassiveTimer()
PassiveCondTimer::~PassiveCondTimer()
{
terminate();
// while( !terminated ){};
delete tcondx;
delete tmutex;
}
// ------------------------------------------------------------------------------------------
bool ThrPassiveTimer::isTerminated()
{
UniSetTypes::uniset_mutex_lock l(term_mutex,100);
return terminated;
}
// ------------------------------------------------------------------------------------------
void ThrPassiveTimer::setTerminated( bool set )
void PassiveCondTimer::terminate()
{
UniSetTypes::uniset_mutex_lock l(term_mutex,200);
terminated = set;
}
// ------------------------------------------------------------------------------------------
void ThrPassiveTimer::terminate()
{
if( !isTerminated() )
{
setTerminated(true);
tcondx->signal();
std::unique_lock<std::mutex> lk(m_working);
terminated = true;
}
cv_working.notify_all();
}
// ------------------------------------------------------------------------------------------
bool ThrPassiveTimer::wait( timeout_t time_msec )
bool PassiveCondTimer::wait( timeout_t time_msec )
{
setTerminated(false);
{
tmutex->lock();
timeout_t t_msec = PassiveTimer::setTiming(time_msec); // вызываем для совместимости с обычным PassiveTimer-ом
if( time_msec == WaitUpTime )
{
while( !isTerminated() ) // на всякий, вдруг проснется по ошибке...
tcondx->wait();
}
else
{
unsigned long sec, msec;
omni_thread::get_time(&sec,&msec, t_msec/1000, (t_msec%1000)*1000000 );
// cout <<"timer: спим "<< timeMS/1000 << "[сек] и " << (timeMS%1000)*1000000 <<"[мсек]" << endl;
tcondx->timedwait(sec, msec);
}
std::unique_lock<std::mutex> lk(m_working);
terminated = false;
tmutex->unlock();
timeout_t t_msec = PassiveTimer::setTiming(time_msec); // вызываем для совместимости с обычным PassiveTimer-ом
if( time_msec == WaitUpTime )
{
while( !terminated )
cv_working.wait(lk);
}
else
cv_working.wait_for(lk, std::chrono::milliseconds(t_msec), [&](){ return (terminated == true); } );
setTerminated(true);
terminated = true;
return true;
}
// ------------------------------------------------------------------------------------------
......@@ -34,30 +34,30 @@
using namespace std;
//using namespace UniSetTypes;
// ------------------------------------------------------------------------------------------
void PassiveSysTimer::call(int signo, siginfo_t *evp, void *ucontext)
void PassiveSigTimer::call(int signo, siginfo_t *evp, void *ucontext)
{
cout << "PassiveSysTimer: callme time="<< evp->si_value.sival_int<< " ms"<<endl;
cout << "PassiveSigTimer: callme time="<< evp->si_value.sival_int<< " ms"<<endl;
}
void PassiveSysTimer::callalrm(int signo)
void PassiveSigTimer::callalrm(int signo)
{
// cout << "PassiveSysTimer: callme signo "<< signo <<endl;
// cout << "PassiveSigTimer: callme signo "<< signo <<endl;
}
// ------------------------------------------------------------------------------------------
PassiveSysTimer::PassiveSysTimer():
PassiveSigTimer::PassiveSigTimer():
terminated(1)
{
init();
}
// ------------------------------------------------------------------------------------------
PassiveSysTimer::~PassiveSysTimer()
PassiveSigTimer::~PassiveSigTimer()
{
terminate();
}
// ------------------------------------------------------------------------------------------
void PassiveSysTimer::init()
void PassiveSigTimer::init()
{
/*
struct itimerval val;
......@@ -69,13 +69,13 @@ void PassiveSysTimer::init()
if( sigaction(SIGALRM, &action, 0) == -1)
{
cerr << "PassiveSysTimer: error sigaction" << endl;
cerr << "PassiveSigTimer: error sigaction" << endl;
throw NotSetSignal("PassiveTimer: errir sigaction");
}
*/
}
// ------------------------------------------------------------------------------------------
void PassiveSysTimer::terminate()
void PassiveSigTimer::terminate()
{
if (!terminated)
{
......@@ -86,7 +86,7 @@ void PassiveSysTimer::terminate()
}
}
// ------------------------------------------------------------------------------------------
bool PassiveSysTimer::wait(timeout_t timeMS)
bool PassiveSigTimer::wait(timeout_t timeMS)
{
pid = getpid();
......@@ -99,7 +99,7 @@ bool PassiveSysTimer::wait(timeout_t timeMS)
if( sigaction(SIGALRM, &action, 0) == -1)
{
cerr << "PassiveSysTimer: error sigaction" << endl;
cerr << "PassiveSigTimer: error sigaction" << endl;
return false;
}
......@@ -147,7 +147,7 @@ bool PassiveSysTimer::wait(timeout_t timeMS)
terminated = 1;
sigprocmask( SIG_UNBLOCK, &mask, NULL );
// cout << "PassiveSysTimer: time ok"<< endl;
// cout << "PassiveSigTimer: time ok"<< endl;
return true;
}
......
......@@ -37,6 +37,7 @@
#include "ObjectIndex_Array.h"
#include "ObjectIndex_XML.h"
#include "ObjectIndex_idXML.h"
#include "UniSetActivator.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
......@@ -1027,6 +1028,9 @@ std::shared_ptr<Configuration> uniset_init( int argc, const char* const* argv, c
return UniSetTypes::uconf;
}
atexit( UniSetActivator::normalexit );
set_terminate( UniSetActivator::normalterminate ); // ловушка для неизвестных исключений
string confile = UniSetTypes::getArgParam( "--confile", argc, argv, xmlfile );
ulog.setLogName("ulog");
UniSetTypes::uconf = make_shared<Configuration>(argc, argv, confile);
......
......@@ -41,7 +41,7 @@ SViewer::SViewer(const string& csec, bool sn):
cache(500, 15),
isShort(sn)
{
ui = make_shared<UInterface>(UniSetTypes::uniset_conf());
ui = make_shared<UInterface>(UniSetTypes::uniset_conf());
}
SViewer::~SViewer()
......
......@@ -11,6 +11,7 @@ tests_LDADD = $(top_builddir)/lib/libUniSet2.la $(SIGC_LIBS) $(COMCPP_LIBS) $(C
tests_CPPFLAGS = -I$(top_builddir)/include $(SIGC_CFLAGS) $(COMCPP_CFLAGS) $(COV_CFLAGS)
tests_SOURCES = tests.cc \
test_passivetimer.cc \
test_passivecondtimer.cc \
test_hourglass.cc \
test_delaytimer.cc \
test_unixml.cc \
......
......@@ -81,7 +81,7 @@ TEST_CASE("HourGlass", "[HourGlass]" )
REQUIRE( hg.amount() >= 60 );
hg.rotate(false); // опять назад..
msleep(80); // по сути сигнал сбросился..(т.к. оставалось 70.. а прошло 80)
msleep(90); // по сути сигнал сбросился..(т.к. оставалось 70.. а прошло 90)
CHECK_FALSE( hg.check() );
REQUIRE( hg.amount() == 0 );
REQUIRE( hg.remain() == 100 );
......
#include <catch.hpp>
#include "PassiveTimer.h"
#include "UniSetTypes.h"
using namespace std;
// --------------------------------------------------------------------------
static std::atomic_int g_num = ATOMIC_VAR_INIT(0);
static std::mutex g_mutex;
static std::shared_ptr<PassiveCondTimer> tmr;
// --------------------------------------------------------------------------
void thread_function( int msec )
{
try
{
msleep(msec);
/*
std::chrono::milliseconds dura( msec );
mstd::sleep_for( dura );
*/
if( tmr )
tmr->terminate();
}
catch( std::exception& ex )
{
FAIL( ex.what() );
}
}
TEST_CASE("PassiveCondTimer: wait", "[PassiveTimer][PassiveCondTimer]" )
{
tmr = make_shared<PassiveCondTimer>();
PassiveTimer ptTime;
tmr->wait(300);
REQUIRE( ptTime.getCurrent() >= 300 );
REQUIRE( ptTime.getCurrent() <= 340 );
tmr.reset();
}
// --------------------------------------------------------------------------
TEST_CASE("PassiveCondTimer: waitup", "[PassiveTimer][PassiveCondTimer]" )
{
tmr = make_shared<PassiveCondTimer>();
std::thread thr(thread_function,500);
PassiveTimer ptTime;
tmr->wait(UniSetTimer::WaitUpTime);
REQUIRE( ptTime.getCurrent() >= 500 );
REQUIRE( ptTime.getCurrent() <= 540 );
thr.join();
tmr.reset();
}
// --------------------------------------------------------------------------
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment