Commit 3ecdf68e authored by Pavel Vainerman's avatar Pavel Vainerman

(UniSetObject): сделал на каждый приоритет свою очередь сообщений,

добавил тест, исправил ошибки в UMessageQueue Убрал из приоритетов 'Super' - нигде не использовали, считаю что Hi,Medium,Low - должно хватить.
parent 778fce6f
...@@ -49,8 +49,7 @@ namespace UniSetTypes ...@@ -49,8 +49,7 @@ namespace UniSetTypes
{ {
Low, Low,
Medium, Medium,
High, High
Super
}; };
Priority priority = { Medium }; Priority priority = { Medium };
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <mutex>
#include "MessageType.h" #include "MessageType.h"
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
...@@ -29,14 +30,17 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; ...@@ -29,14 +30,17 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
* *
* Чтобы избежать работы с mutex, очередь построена по принципу циклического буфера, * Чтобы избежать работы с mutex, очередь построена по принципу циклического буфера,
* c использованием atomic-переменных и попыткой реализовать LockFree работу. * c использованием atomic-переменных и попыткой реализовать LockFree работу.
* Есть указатель на текущую позицию записи (wp) и есть "догоняющий его" указатель на позицию чтения (rp). * Есть монотонно растущий индекс текущей позиции записи (wpos) и есть "догоняющий его" индекс позиции чтения (rpos).
* Если rp догоняет wp - значит новых сообщений нет. * Если rpos догоняет wpos - значит новых сообщений нет.
*
* \warning Очередь не универсальная и предназначена исключительно для использования в UniSetObject.
* Т.к. подразумевает схему "МНОГО ПИСАТЕЛЕЙ" и "ОДИН ЧИТАТЕЛЬ".
* *
* При этом место под очередь(буффер) резервируется сразу. * При этом место под очередь(буффер) резервируется сразу.
* Счётчики сделаны (uint) монотонно растущими. * Счётчики сделаны (uint) монотонно растущими.
* Основные идеи: * Основные идеи:
* - счётчики постоянно увеличиваются * - счётчики постоянно увеличиваются
* - каждый пишущий поток пишет в новое место * - каждый пишущий поток пишет в новое место (индекс больше последнего)
* - читающий счётчик тоже монотонно растёт * - читающий счётчик тоже монотонно растёт
* - реальная позиция для записи или чтения рассчитывается как (pos%size) этим и обеспечивается цикличность. * - реальная позиция для записи или чтения рассчитывается как (pos%size) этим и обеспечивается цикличность.
* *
...@@ -47,22 +51,42 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; ...@@ -47,22 +51,42 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
* При помощи функции setLostStrategy() можно установить стратегию что терять * При помощи функции setLostStrategy() можно установить стратегию что терять
* lostNewData - в случае переполнения теряются новые данные (т.е. не будут помещаться в очередь) * lostNewData - в случае переполнения теряются новые данные (т.е. не будут помещаться в очередь)
* lostOldData - в случае переполнения очереди, старые данные затираются новыми. * lostOldData - в случае переполнения очереди, старые данные затираются новыми.
*
* Защита от переполнения индекса (wpos и rpos).
* =============================================
* Т.к. для обеспечения lockfree записи индексы (wpos) постоянно растут
* (т.е. каждый пишущий поток пишет в новое место), в качестве atomic индекса выбран unsigned long.
* Что для x86_32 системы работающей без перезагруки длительное время переполнение индекса может стать проблеммой.
* Фактически же размер циклического буфера ограничен и запись ведётся в позицию [wpos%size], т.е.
* в общем случае достаточно чтобы индекс был не больше размера буфера (size).
* \error ПОКА ПРОБЛЕММА ПЕРЕПОЛНЕНИЯ НЕ РЕШЕНА..
* чтобы "сбрасывать" индекс, нужно приостанавливать все пишущие и читающие потоки и одновременно
* (одной транзакцией) менять wpos и rpos.
*
*/ */
class UMessageQueue class UMessageQueue
{ {
public: public:
UMessageQueue( size_t qsize = 2000 ); UMessageQueue( size_t qsize = 2000 );
void push( const UniSetTypes::TransportMessage& msg ); /*! поместить сообщение в очередь */
void push( const VoidMessagePtr& msg );
/*! Извлечь сообщение из очереди
* \return не валидный shatred_ptr если сообщений нет
*/
VoidMessagePtr top(); VoidMessagePtr top();
size_t size(); size_t size();
bool empty();
// ----- Настройки ----- // ----- Настройки -----
// неявно подразумевается, что всё настривается до первого использования
// ----------------------
void setMaxSizeOfMessageQueue( size_t s ); void setMaxSizeOfMessageQueue( size_t s );
size_t getMaxSizeOfMessageQueue(); size_t getMaxSizeOfMessageQueue();
/*! Стратегия при переполнении */
enum LostStrategy enum LostStrategy
{ {
lostOldData, // default lostOldData, // default
...@@ -84,8 +108,6 @@ class UMessageQueue ...@@ -84,8 +108,6 @@ class UMessageQueue
return stCountOfQueueFull; return stCountOfQueueFull;
} }
typedef std::vector<VoidMessagePtr> MQueue;
protected: protected:
// заполнить всю очередь указанным сообщением // заполнить всю очередь указанным сообщением
...@@ -93,10 +115,16 @@ class UMessageQueue ...@@ -93,10 +115,16 @@ class UMessageQueue
private: private:
void resetIndexes();
std::mutex resetMutex;
typedef std::vector<VoidMessagePtr> MQueue;
MQueue mqueue; MQueue mqueue;
std::atomic_uint wpos = { 0 }; // позиция на запись std::atomic_ulong wpos = { 0 }; // позиция на запись
std::atomic_uint rpos = { 0 }; // позиция на чтение std::atomic_ulong rpos = { 0 }; // позиция на чтение
std::atomic_uint mpos = { 0 }; // текущая позиция последнего элемента (max position) (реально добавленного в очередь) std::atomic_ulong mpos = { 0 }; // текущая позиция последнего элемента (max position) (реально добавленного в очередь)
LostStrategy lostStrategy = { lostOldData }; LostStrategy lostStrategy = { lostOldData };
/*! размер очереди сообщений (при превышении происходит очистка) */ /*! размер очереди сообщений (при превышении происходит очистка) */
......
...@@ -56,6 +56,7 @@ typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Спи ...@@ -56,6 +56,7 @@ typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Спи
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
/*! \class UniSetObject /*! \class UniSetObject
* Класс реализует работу uniset-объекта: работа с очередью сообщений, регистрация объекта, инициализация и т.п. * Класс реализует работу uniset-объекта: работа с очередью сообщений, регистрация объекта, инициализация и т.п.
* Обработка сообщений ведётся в специально создаваемом потоке.
* Для ожидания сообщений используется функция waitMessage(msec), основанная на таймере. * Для ожидания сообщений используется функция waitMessage(msec), основанная на таймере.
* Ожидание прерывается либо по истечении указанного времени, либо по приходу сообщения, при помощи функциии * Ожидание прерывается либо по истечении указанного времени, либо по приходу сообщения, при помощи функциии
* termWaiting() вызываемой из push(). * termWaiting() вызываемой из push().
...@@ -64,6 +65,10 @@ typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Спи ...@@ -64,6 +65,10 @@ typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Спи
* (например в конструкторе). При этом ответственность за вызов receiveMessage() и processingMessage() возлагается * (например в конструкторе). При этом ответственность за вызов receiveMessage() и processingMessage() возлагается
* на разработчика. * на разработчика.
* *
* Имеется три очереди сообщений, по приоритетам: Hi, Medium, Low.
* Соответственно сообщения вынимаются в порядке поступления, но сперва из Hi, потом из Medium, а потом из Low очереди.
* \warning Если сообщения будут поступать в Hi или Medium очередь быстрее чем они обрабатываются, то до Low сообщений дело может и не дойти.
*
*/ */
class UniSetObject: class UniSetObject:
public std::enable_shared_from_this<UniSetObject>, public std::enable_shared_from_this<UniSetObject>,
...@@ -71,8 +76,8 @@ class UniSetObject: ...@@ -71,8 +76,8 @@ class UniSetObject:
public LT_Object public LT_Object
{ {
public: public:
UniSetObject(const std::string& name, const std::string& section); UniSetObject( const std::string& name, const std::string& section );
UniSetObject(UniSetTypes::ObjectId id); UniSetObject( UniSetTypes::ObjectId id );
UniSetObject(); UniSetObject();
virtual ~UniSetObject(); virtual ~UniSetObject();
...@@ -107,6 +112,8 @@ class UniSetObject: ...@@ -107,6 +112,8 @@ class UniSetObject:
//! поместить сообщение в очередь //! поместить сообщение в очередь
virtual void push( const UniSetTypes::TransportMessage& msg ) override; virtual void push( const UniSetTypes::TransportMessage& msg ) override;
// -------------- вспомогательные --------------
/*! получить ссылку (на себя) */ /*! получить ссылку (на себя) */
inline UniSetTypes::ObjectPtr getRef() const inline UniSetTypes::ObjectPtr getRef() const
{ {
...@@ -114,12 +121,21 @@ class UniSetObject: ...@@ -114,12 +121,21 @@ class UniSetObject:
return (UniSetTypes::ObjectPtr)CORBA::Object::_duplicate(oref); return (UniSetTypes::ObjectPtr)CORBA::Object::_duplicate(oref);
} }
/*! заказ таймера (вынесена в public, хотя должна была бы быть в protected */
virtual timeout_t askTimer( UniSetTypes::TimerId timerid, timeout_t timeMS, clock_t ticks = -1, virtual timeout_t askTimer( UniSetTypes::TimerId timerid, timeout_t timeMS, clock_t ticks = -1,
UniSetTypes::Message::Priority p = UniSetTypes::Message::High ) override; UniSetTypes::Message::Priority p = UniSetTypes::Message::High ) override;
protected: protected:
std::shared_ptr<UInterface> ui; /*!< универсальный интерфейс для работы с другими процессами */
std::string myname;
std::string section;
std::weak_ptr<UniSetManager> mymngr;
/*! обработка приходящих сообщений */ /*! обработка приходящих сообщений */
virtual void processingMessage( const UniSetTypes::VoidMessage* msg ); virtual void processingMessage( const UniSetTypes::VoidMessage* msg );
// конкретные виды сообщений
virtual void sysCommand( const UniSetTypes::SystemMessage* sm ) {} virtual void sysCommand( const UniSetTypes::SystemMessage* sm ) {}
virtual void sensorInfo( const UniSetTypes::SensorMessage* sm ) {} virtual void sensorInfo( const UniSetTypes::SensorMessage* sm ) {}
virtual void timerInfo( const UniSetTypes::TimerMessage* tm ) {} virtual void timerInfo( const UniSetTypes::TimerMessage* tm ) {}
...@@ -127,30 +143,46 @@ class UniSetObject: ...@@ -127,30 +143,46 @@ class UniSetObject:
/*! Получить сообщение */ /*! Получить сообщение */
VoidMessagePtr receiveMessage(); VoidMessagePtr receiveMessage();
/*! текущее количесво сообщений в очереди */ /*! Ожидать сообщения заданное время */
size_t countMessages(); virtual VoidMessagePtr waitMessage( timeout_t msec = UniSetTimer::WaitUpTime );
size_t getCountOfQueueFull();
/*! прервать ожидание сообщений */ /*! прервать ожидание сообщений */
void termWaiting(); void termWaiting();
std::shared_ptr<UInterface> ui; /*!< универсальный интерфейс для работы с другими процессами */ /*! текущее количесво сообщений в очереди */
std::string myname; size_t countMessages();
std::string section;
//! Дизактивизация объекта (переопределяется для необходимых действий перед деактивацией) /*! количество раз перполнения очереди сообщений */
virtual bool deactivateObject() size_t getCountOfQueueFull();
//! Активизация объекта (переопределяется для необходимых действий после активизации)
virtual bool activateObject()
{ {
return true; return true;
} }
//! Активизация объекта (переопределяется для необходимых действий после активизации)
virtual bool activateObject() //! Деактивиция объекта (переопределяется для необходимых действий перед деактивацией)
virtual bool deactivateObject()
{ {
return true; return true;
} }
/*! Функция вызываемая при приходе сигнала завершения или прерывания процесса. Переопределив ее можно
* выполнять специфичные для процесса действия по обработке сигнала.
* Например переход в безопасное состояние.
* \warning В обработчике сигналов \b ЗАПРЕЩЕНО вызывать функции подобные exit(..), abort()!!!!
*/
virtual void sigterm( int signo );
inline void terminate()
{
deactivate();
}
// управление созданием потока обработки сообщений -------
/*! запрет(разрешение) создания потока для обработки сообщений */ /*! запрет(разрешение) создания потока для обработки сообщений */
inline void thread(bool create) inline void thread( bool create )
{ {
threadcreate = create; threadcreate = create;
} }
...@@ -168,56 +200,39 @@ class UniSetObject: ...@@ -168,56 +200,39 @@ class UniSetObject:
/*! функция вызываемая из потока */ /*! функция вызываемая из потока */
virtual void callback(); virtual void callback();
/*! Функция вызываемая при приходе сигнала завершения или прерывания процесса. Переопределив ее можно // ----- конфигурирование объекта -------
* выполнять специфичные для процесса действия по обработке сигнала. /*! установка ID объекта */
* Например переход в безопасное состояние.
* \warning В обработчике сигналов \b ЗАПРЕЩЕНО вызывать функции подобные exit(..), abort()!!!!
*/
virtual void sigterm( int signo );
inline void terminate()
{
deactivate();
}
/*! Ожидать сообщения timeMS */
virtual VoidMessagePtr waitMessage( timeout_t timeMS = UniSetTimer::WaitUpTime );
void setID(UniSetTypes::ObjectId id); void setID(UniSetTypes::ObjectId id);
void setMaxSizeOfMessageQueue( size_t s ) /*! установить приоритет для потока обработки сообщений (если позволяют права и система) */
{ void setThreadPriority( int p );
mqueue.setMaxSizeOfMessageQueue(s);
} /*! установка размера очереди сообщений */
void setMaxSizeOfMessageQueue( size_t s );
/*! получить размер очереди сообщений */
inline size_t getMaxSizeOfMessageQueue() inline size_t getMaxSizeOfMessageQueue()
{ {
return mqueue.getMaxSizeOfMessageQueue(); return mqueueMedium.getMaxSizeOfMessageQueue();
} }
/*! проверка "активности" объекта */
inline bool isActive() inline bool isActive()
{ {
return active; return active;
} }
/*! false - завершить работу потока обработки сообщений */
inline void setActive( bool set ) inline void setActive( bool set )
{ {
active = set; active = set;
} }
std::weak_ptr<UniSetManager> mymngr;
void setThreadPriority( int p );
private: private:
friend class UniSetManager; friend class UniSetManager;
friend class UniSetActivator; friend class UniSetActivator;
inline pid_t getMsgPID()
{
return msgpid;
}
/*! функция потока */ /*! функция потока */
void work(); void work();
//! Инициализация параметров объекта //! Инициализация параметров объекта
...@@ -242,18 +257,19 @@ class UniSetObject: ...@@ -242,18 +257,19 @@ class UniSetObject:
UniSetTypes::ObjectId myid; UniSetTypes::ObjectId myid;
CORBA::Object_var oref; CORBA::Object_var oref;
std::shared_ptr< ThreadCreator<UniSetObject> > thr;
/*! очередь сообщений для объекта */
UMessageQueue mqueue;
/*! замок для блокирования совместного доступа к oRef */ /*! замок для блокирования совместного доступа к oRef */
mutable UniSetTypes::uniset_rwmutex refmutex; mutable UniSetTypes::uniset_rwmutex refmutex;
std::shared_ptr< ThreadCreator<UniSetObject> > thr;
/*! очереди сообщений в зависимости от приоритета */
UMessageQueue mqueueLow;
UMessageQueue mqueueMedium;
UMessageQueue mqueueHi;
std::atomic_bool a_working; std::atomic_bool a_working;
std::mutex m_working; std::mutex m_working;
std::condition_variable cv_working; std::condition_variable cv_working;
// timeout_t workingTerminateTimeout; /*!< время ожидания завершения потока */
}; };
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
#endif #endif
......
...@@ -28,7 +28,7 @@ UMessageQueue::UMessageQueue( size_t qsize ): ...@@ -28,7 +28,7 @@ UMessageQueue::UMessageQueue( size_t qsize ):
mqFill(nullptr); mqFill(nullptr);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
void UMessageQueue::push( const UniSetTypes::TransportMessage& tm ) void UMessageQueue::push( const VoidMessagePtr& vm )
{ {
// проверяем переполнение, только если стратегия "терять новые данные" // проверяем переполнение, только если стратегия "терять новые данные"
// иначе нет смысла проверять, а можно просто писать новые данные затирая старые // иначе нет смысла проверять, а можно просто писать новые данные затирая старые
...@@ -42,7 +42,7 @@ void UMessageQueue::push( const UniSetTypes::TransportMessage& tm ) ...@@ -42,7 +42,7 @@ void UMessageQueue::push( const UniSetTypes::TransportMessage& tm )
size_t w = wpos.fetch_add(1); size_t w = wpos.fetch_add(1);
// а потом уже добавлять новое сообщение в "зарезервированное" место // а потом уже добавлять новое сообщение в "зарезервированное" место
mqueue[w%SizeOfMessageQueue] = make_shared<VoidMessage>(tm); mqueue[w%SizeOfMessageQueue] = vm;
mpos++; // теперь увеличиваем реальное количество элементов в очереди mpos++; // теперь увеличиваем реальное количество элементов в очереди
// ведём статистику // ведём статистику
...@@ -58,7 +58,7 @@ VoidMessagePtr UMessageQueue::top() ...@@ -58,7 +58,7 @@ VoidMessagePtr UMessageQueue::top()
if( lostStrategy == lostOldData && (wpos - rpos) >= SizeOfMessageQueue ) if( lostStrategy == lostOldData && (wpos - rpos) >= SizeOfMessageQueue )
{ {
stCountOfQueueFull++; stCountOfQueueFull++;
rpos.store( wpos - SizeOfMessageQueue - 1 ); rpos.store( wpos - SizeOfMessageQueue );
} }
// смотрим "фактическое" количество (mpos) // смотрим "фактическое" количество (mpos)
...@@ -90,6 +90,11 @@ size_t UMessageQueue::size() ...@@ -90,6 +90,11 @@ size_t UMessageQueue::size()
return (mpos - rpos); return (mpos - rpos);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
bool UMessageQueue::empty()
{
return (mpos == rpos);
}
//---------------------------------------------------------------------------
void UMessageQueue::setMaxSizeOfMessageQueue( size_t s ) void UMessageQueue::setMaxSizeOfMessageQueue( size_t s )
{ {
if( s != SizeOfMessageQueue ) if( s != SizeOfMessageQueue )
......
...@@ -117,33 +117,6 @@ UniSetObject::UniSetObject( const string& name, const string& section ): ...@@ -117,33 +117,6 @@ UniSetObject::UniSetObject( const string& name, const string& section ):
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
UniSetObject::~UniSetObject() UniSetObject::~UniSetObject()
{ {
#if 0
try
{
deactivate();
}
catch(...) {}
try
{
tmr->terminate();
}
catch(...) {}
if( thr )
{
try
{
thr->stop();
if( thr->isRunning() )
thr->join();
}
catch(...) {}
}
#endif
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::initObject() void UniSetObject::initObject()
...@@ -158,9 +131,9 @@ void UniSetObject::initObject() ...@@ -158,9 +131,9 @@ void UniSetObject::initObject()
int sz = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000); int sz = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000);
if( sz > 0 ) if( sz > 0 )
mqueue.setMaxSizeOfMessageQueue(sz); setMaxSizeOfMessageQueue(sz);
uinfo << myname << "(init): SizeOfMessageQueue=" << mqueue.getMaxSizeOfMessageQueue() uinfo << myname << "(init): SizeOfMessageQueue=" << getMaxSizeOfMessageQueue()
<< endl; << endl;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
...@@ -188,7 +161,13 @@ void UniSetObject::setID( UniSetTypes::ObjectId id ) ...@@ -188,7 +161,13 @@ void UniSetObject::setID( UniSetTypes::ObjectId id )
myid = id; myid = id;
ui->initBackId(myid); ui->initBackId(myid);
} }
// ------------------------------------------------------------------------------------------
void UniSetObject::setMaxSizeOfMessageQueue(size_t s)
{
mqueueMedium.setMaxSizeOfMessageQueue(s);
mqueueLow.setMaxSizeOfMessageQueue(s);
mqueueHi.setMaxSizeOfMessageQueue(s);
}
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
/*! /*!
* \param vm - указатель на структуру, которая заполняется если есть сообщение * \param vm - указатель на структуру, которая заполняется если есть сообщение
...@@ -196,7 +175,13 @@ void UniSetObject::setID( UniSetTypes::ObjectId id ) ...@@ -196,7 +175,13 @@ void UniSetObject::setID( UniSetTypes::ObjectId id )
*/ */
VoidMessagePtr UniSetObject::receiveMessage() VoidMessagePtr UniSetObject::receiveMessage()
{ {
return mqueue.top(); if( !mqueueHi.empty() )
return mqueueHi.top();
if( !mqueueMedium.empty() )
return mqueueMedium.top();
return mqueueLow.top();
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
VoidMessagePtr UniSetObject::waitMessage( timeout_t timeMS ) VoidMessagePtr UniSetObject::waitMessage( timeout_t timeMS )
...@@ -337,18 +322,29 @@ void UniSetObject::setThreadPriority( int p ) ...@@ -337,18 +322,29 @@ void UniSetObject::setThreadPriority( int p )
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::push( const TransportMessage& tm ) void UniSetObject::push( const TransportMessage& tm )
{ {
mqueue.push(tm); auto vm = make_shared<VoidMessage>(tm);
if( vm->priority == Message::Medium )
mqueueMedium.push(vm);
else if( vm->priority == Message::High )
mqueueHi.push(vm);
else if( vm->priority == Message::Low )
mqueueLow.push(vm);
else // на всякий по умолчанию medium
mqueueMedium.push(vm);
termWaiting(); termWaiting();
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
size_t UniSetObject::countMessages() size_t UniSetObject::countMessages()
{ {
return mqueue.size(); return (mqueueMedium.size() + mqueueLow.size() + mqueueHi.size());
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
size_t UniSetObject::getCountOfQueueFull() size_t UniSetObject::getCountOfQueueFull()
{ {
return mqueue.getCountOfQueueFull(); return (mqueueMedium.getCountOfQueueFull() +
mqueueLow.getCountOfQueueFull() +
mqueueHi.getCountOfQueueFull() );
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::sigterm( int signo ) void UniSetObject::sigterm( int signo )
...@@ -531,8 +527,7 @@ void UniSetObject::work() ...@@ -531,8 +527,7 @@ void UniSetObject::work()
{ {
uinfo << myname << ": thread processing messages running..." << endl; uinfo << myname << ": thread processing messages running..." << endl;
if( thr ) msgpid = thr ? thr->getTID() : getpid();
msgpid = thr->getTID();
{ {
std::unique_lock<std::mutex> locker(m_working); std::unique_lock<std::mutex> locker(m_working);
...@@ -661,10 +656,16 @@ UniSetTypes::SimpleInfo* UniSetObject::getInfo( ::CORBA::Long userparam ) ...@@ -661,10 +656,16 @@ UniSetTypes::SimpleInfo* UniSetObject::getInfo( ::CORBA::Long userparam )
else else
info << "откл."; info << "откл.";
info << "\tcount=" << countMessages(); info << "\tcount=" << countMessages()
info << "\tmaxMsg=" << mqueue.getMaxQueueMessages(); << "\t medum: "
info << "\tqFull(" << mqueue.getMaxSizeOfMessageQueue() << ")=" << mqueue.getCountOfQueueFull(); << " maxMsg=" << mqueueMedium.getMaxQueueMessages()
// info << "\n"; << " qFull(" << mqueueMedium.getMaxSizeOfMessageQueue() << ")=" << mqueueMedium.getCountOfQueueFull()
<< "\t hi: "
<< " maxMsg=" << mqueueHi.getMaxQueueMessages()
<< " qFull(" << mqueueHi.getMaxSizeOfMessageQueue() << ")=" << mqueueHi.getCountOfQueueFull()
<< "\t low: "
<< " maxMsg=" << mqueueLow.getMaxQueueMessages()
<< " qFull(" << mqueueLow.getMaxSizeOfMessageQueue() << ")=" << mqueueLow.getCountOfQueueFull();
SimpleInfo* res = new SimpleInfo(); SimpleInfo* res = new SimpleInfo();
res->info = info.str().c_str(); // CORBA::string_dup(info.str().c_str()); res->info = info.str().c_str(); // CORBA::string_dup(info.str().c_str());
......
...@@ -284,8 +284,6 @@ bool NCRestorer_XML::getSensorInfo( const std::shared_ptr<UniXML>& xml, xmlNode* ...@@ -284,8 +284,6 @@ bool NCRestorer_XML::getSensorInfo( const std::shared_ptr<UniXML>& xml, xmlNode*
inf->priority = Message::Medium; inf->priority = Message::Medium;
else if( prior == "High" ) else if( prior == "High" )
inf->priority = Message::High; inf->priority = Message::High;
else if( prior == "Super" )
inf->priority = Message::Super;
else else
inf->priority = Message::Medium; inf->priority = Message::Medium;
......
...@@ -17,12 +17,13 @@ void mq_write_thread() ...@@ -17,12 +17,13 @@ void mq_write_thread()
{ {
SensorMessage smsg(100,2); SensorMessage smsg(100,2);
TransportMessage tm( std::move(smsg.transport_msg()) ); TransportMessage tm( std::move(smsg.transport_msg()) );
auto vm = make_shared<VoidMessage>(tm);
msleep(100); msleep(100);
for( size_t i=0; i<COUNT; i++ ) for( size_t i=0; i<COUNT; i++ )
{ {
mq.push(tm); mq.push(vm);
} }
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -63,7 +64,8 @@ int main(int argc, const char** argv) ...@@ -63,7 +64,8 @@ int main(int argc, const char** argv)
{ {
SensorMessage sm(100,2); SensorMessage sm(100,2);
TransportMessage tm( std::move(sm.transport_msg()) ); TransportMessage tm( std::move(sm.transport_msg()) );
mq.push(tm); auto vm = make_shared<VoidMessage>(tm);
mq.push(vm);
auto msg = mq.top(); auto msg = mq.top();
assert( msg!=nullptr ); assert( msg!=nullptr );
SensorMessage sm2( msg.get() ); SensorMessage sm2( msg.get() );
......
...@@ -36,7 +36,8 @@ test_ui.cc \ ...@@ -36,7 +36,8 @@ test_ui.cc \
test_iorfile.cc \ test_iorfile.cc \
test_messagetype.cc \ test_messagetype.cc \
test_utypes.cc \ test_utypes.cc \
test_mqueue.cc test_mqueue.cc \
test_uobject.cc
# threadtst_SOURCES = threadtst.cc # threadtst_SOURCES = threadtst.cc
# threadtst_LDADD = $(top_builddir)/lib/libUniSet2.la ${SIGC_LIBS} $(COMCPP_LIBS) # threadtst_LDADD = $(top_builddir)/lib/libUniSet2.la ${SIGC_LIBS} $(COMCPP_LIBS)
......
#ifndef TestUObject_H_
#define TestUObject_H_
// -------------------------------------------------------------------------
#include "UniSetObject.h"
// -------------------------------------------------------------------------
/*! Специальный тестовый объект для тестирования класса UniSetObject
* Для наглядности и простоты все функции объявлены здесь же в h-файле
*/
class TestUObject:
public UniSetObject
{
public:
TestUObject( UniSetTypes::ObjectId id, xmlNode* cnode ):
UniSetObject(id){}
virtual ~TestUObject(){};
// специальные функции для проведения тестирования
inline VoidMessagePtr getOneMessage()
{
return receiveMessage();
}
inline bool mqEmpty()
{
return (countMessages() == 0);
}
protected:
TestUObject(){};
};
// -------------------------------------------------------------------------
#endif // TestUObject_H_
// -------------------------------------------------------------------------
...@@ -7,6 +7,15 @@ ...@@ -7,6 +7,15 @@
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
static void pushMessage( UMessageQueue& mq, long id )
{
SensorMessage sm(id,id);
sm.consumer = id; // чтобы хоть как-то идентифицировать сообщений, используем поле consumer
TransportMessage tm( std::move(sm.transport_msg()) );
auto vm = make_shared<VoidMessage>(tm);
mq.push(vm);
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: setup", "[mqueue]" ) TEST_CASE( "UMessageQueue: setup", "[mqueue]" )
{ {
UMessageQueue mq; UMessageQueue mq;
...@@ -22,14 +31,11 @@ TEST_CASE( "UMessageQueue: simple push/top", "[mqueue]" ) ...@@ -22,14 +31,11 @@ TEST_CASE( "UMessageQueue: simple push/top", "[mqueue]" )
UMessageQueue mq; UMessageQueue mq;
SensorMessage sm(100,2); pushMessage(mq,100);
TransportMessage tm( std::move(sm.transport_msg()) );
mq.push(tm);
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg!=nullptr );
SensorMessage sm2( msg.get() ); REQUIRE( msg->consumer == 100 );
REQUIRE( sm.id == sm2.id );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" ) TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" )
...@@ -37,26 +43,26 @@ TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" ) ...@@ -37,26 +43,26 @@ TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" )
REQUIRE( uniset_conf() != nullptr ); REQUIRE( uniset_conf() != nullptr );
UMessageQueue mq; UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(1); mq.setMaxSizeOfMessageQueue(2);
mq.setLostStrategy( UMessageQueue::lostOldData ); mq.setLostStrategy( UMessageQueue::lostOldData );
SensorMessage sm1(100,2); pushMessage(mq,100);
TransportMessage tm1( std::move(sm1.transport_msg()) );
mq.push(tm1);
REQUIRE( mq.size() == 1 ); REQUIRE( mq.size() == 1 );
SensorMessage sm2(110,50); pushMessage(mq,110);
TransportMessage tm2( std::move(sm2.transport_msg()) ); REQUIRE( mq.size() == 2 );
mq.push(tm2);
REQUIRE( mq.size() == 1 ); pushMessage(mq,120);
REQUIRE( mq.size() == 2 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg!=nullptr );
SensorMessage sm( msg.get() ); REQUIRE( msg->consumer == 110 );
REQUIRE( sm.id == sm2.id );
msg = mq.top();
REQUIRE( msg!=nullptr );
REQUIRE( msg->consumer == 120 );
REQUIRE( mq.getCountOfQueueFull() == 1 ); REQUIRE( mq.getCountOfQueueFull() == 1 );
} }
...@@ -66,33 +72,35 @@ TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" ) ...@@ -66,33 +72,35 @@ TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" )
REQUIRE( uniset_conf() != nullptr ); REQUIRE( uniset_conf() != nullptr );
UMessageQueue mq; UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(1); mq.setMaxSizeOfMessageQueue(2);
mq.setLostStrategy( UMessageQueue::lostNewData ); mq.setLostStrategy( UMessageQueue::lostNewData );
SensorMessage sm1(100,2); pushMessage(mq,100);
TransportMessage tm1( std::move(sm1.transport_msg()) );
mq.push(tm1);
REQUIRE( mq.size() == 1 ); REQUIRE( mq.size() == 1 );
SensorMessage sm2(110,50); pushMessage(mq,110);
TransportMessage tm2( std::move(sm2.transport_msg()) ); REQUIRE( mq.size() == 2 );
mq.push(tm2);
pushMessage(mq,120);
REQUIRE( mq.size() == 2 );
REQUIRE( mq.getCountOfQueueFull() == 1 ); REQUIRE( mq.getCountOfQueueFull() == 1 );
SensorMessage sm3(120,150); pushMessage(mq,130);
TransportMessage tm3( std::move(sm3.transport_msg()) ); REQUIRE( mq.size() == 2 );
mq.push(tm3);
REQUIRE( mq.size() == 1 );
REQUIRE( mq.getCountOfQueueFull() == 2 ); REQUIRE( mq.getCountOfQueueFull() == 2 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg!=nullptr );
SensorMessage sm( msg.get() ); REQUIRE( msg->consumer == 100 );
REQUIRE( sm.id == sm1.id );
msg = mq.top();
REQUIRE( msg!=nullptr );
REQUIRE( msg->consumer == 110 );
msg = mq.top();
REQUIRE( msg==nullptr );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: many read", "[mqueue]" ) TEST_CASE( "UMessageQueue: many read", "[mqueue]" )
...@@ -103,16 +111,12 @@ TEST_CASE( "UMessageQueue: many read", "[mqueue]" ) ...@@ -103,16 +111,12 @@ TEST_CASE( "UMessageQueue: many read", "[mqueue]" )
mq.setMaxSizeOfMessageQueue(1); mq.setMaxSizeOfMessageQueue(1);
mq.setLostStrategy( UMessageQueue::lostNewData ); mq.setLostStrategy( UMessageQueue::lostNewData );
SensorMessage sm1(100,2); pushMessage(mq,100);
TransportMessage tm1( std::move(sm1.transport_msg()) );
mq.push(tm1);
REQUIRE( mq.size() == 1 ); REQUIRE( mq.size() == 1 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg!=nullptr );
SensorMessage sm( msg.get() ); REQUIRE( msg->consumer == 100 );
REQUIRE( sm.id == sm1.id );
for( int i=0; i<5; i++ ) for( int i=0; i<5; i++ )
{ {
...@@ -121,3 +125,44 @@ TEST_CASE( "UMessageQueue: many read", "[mqueue]" ) ...@@ -121,3 +125,44 @@ TEST_CASE( "UMessageQueue: many read", "[mqueue]" )
} }
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: correct operation", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
// Проверка корректности работы, что сообщения не портяться
// и не теряются
// Тест: пишем num сообщений и читаем num сообщений
// проверяем что ни одно не потерялось
const size_t num = 1000;
UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(num+1);
size_t rnum = 0;
for( size_t i=0; i<num; i++ )
{
pushMessage(mq,i);
// каждые 50 читатем, имитируя реальную работу (чтение между записью)
if( i%50 )
{
auto m = mq.top();
REQUIRE( m->consumer == rnum );
rnum++;
}
}
REQUIRE( mq.size() == (num - rnum) );
// дочитываем всё остальное
while( !mq.empty() )
{
auto m = mq.top();
REQUIRE( m->consumer == rnum );
rnum++;
}
// проверяем что ничего не потерялось
REQUIRE( rnum == num );
}
// --------------------------------------------------------------------------
#include <catch.hpp>
// --------------------------------------------------------------------------
#include "UniSetObject.h"
#include "MessageType.h"
#include "Configuration.h"
#include "UHelpers.h"
#include "TestUObject.h"
// --------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// --------------------------------------------------------------------------
/* Простой тест для UniSetObject.
* Попытка протестировать UniSetObject без зависимостей и только интерфейс.
* Для доступа к некоторой внутренней информации, пришлось сделать TestObject.
* Фактически тестовый объект не будет "активирован" как обычно.
*/
// --------------------------------------------------------------------------
shared_ptr<TestUObject> uobj;
// --------------------------------------------------------------------------
void initTest()
{
REQUIRE( uniset_conf() != nullptr );
if( !uobj )
{
uobj = make_object<TestUObject>("TestUObject1","TestUObject");
REQUIRE( uobj != nullptr );
}
}
// --------------------------------------------------------------------------
static void pushMessage( long id, Message::Priority p )
{
SensorMessage sm(id,id);
sm.priority = p;
sm.consumer = id; // чтобы хоть как-то идентифицировать сообщений, используем поле consumer
TransportMessage tm( std::move(sm.transport_msg()) );
uobj->push(tm);
}
// --------------------------------------------------------------------------
TEST_CASE( "UObject: priority messages", "[uobject]" )
{
initTest();
/* NOTE: для того чтобы не делать преобразования из VoidMessage в SensorMessage (см. pushMesage)
* В качестве идентификатора используем поле consumer.
* Хотя в реальности, оно должно совпадать с id объекта получателя.
*/
pushMessage(100,Message::Low);
pushMessage(101,Message::Low);
pushMessage(200,Message::Medium);
pushMessage(300,Message::High);
pushMessage(301,Message::High);
// теперь проверяем что сперва вынули Hi
// но так же контролируем что порядок извлечения правильный
// в порядке поступления в очередь
auto m = uobj->getOneMessage();
REQUIRE( m->priority == Message::High );
REQUIRE( m->consumer == 300 );
m = uobj->getOneMessage();
REQUIRE( m->priority == Message::High );
REQUIRE( m->consumer == 301 );
m = uobj->getOneMessage();
REQUIRE( m->priority == Message::Medium );
REQUIRE( m->consumer == 200 );
m = uobj->getOneMessage();
REQUIRE( m->priority == Message::Low );
REQUIRE( m->consumer == 100 );
pushMessage(201,Message::Medium);
m = uobj->getOneMessage();
REQUIRE( m->priority == Message::Medium );
REQUIRE( m->consumer == 201 );
m = uobj->getOneMessage();
REQUIRE( m->priority == Message::Low );
REQUIRE( m->consumer == 101 );
REQUIRE( uobj->mqEmpty() == true );
}
// --------------------------------------------------------------------------
...@@ -43,6 +43,10 @@ ...@@ -43,6 +43,10 @@
</UniSet> </UniSet>
<settings>
<TestUObject name="TestUObject1" />
</settings>
<dlog name="dlog" levels="" file=""/> <dlog name="dlog" levels="" file=""/>
<ObjectsMap idfromfile="1"> <ObjectsMap idfromfile="1">
...@@ -87,6 +91,7 @@ ...@@ -87,6 +91,7 @@
<item id="101" name="TestProc2"/> <item id="101" name="TestProc2"/>
<item id="102" name="TestProc3"/> <item id="102" name="TestProc3"/>
<item id="103" name="TestProc4"/> <item id="103" name="TestProc4"/>
<item id="104" name="TestUObject1"/>
</objects> </objects>
</ObjectsMap> </ObjectsMap>
......
...@@ -458,6 +458,8 @@ tests/test_utypes.cc ...@@ -458,6 +458,8 @@ tests/test_utypes.cc
tests/test_logserver.cc tests/test_logserver.cc
tests/test_tcpcheck.cc tests/test_tcpcheck.cc
tests/test_mqueue.cc tests/test_mqueue.cc
tests/test_uobject.cc
tests/TestUObject.h
tests/tests-junit.xml tests/tests-junit.xml
tests/tests.cc tests/tests.cc
tests/tests_bad_config.xml tests/tests_bad_config.xml
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment