Commit 77cb1caa authored by Pavel Vainerman's avatar Pavel Vainerman

(MQ): вынес разные реализации очереди сообщений в разные файлы,

чтобы можно было легко менять местами и пробовать, небольшой рефакторинг названий функций работы с очередью.
parent 3ecdf68e
...@@ -16,7 +16,7 @@ TestProc::~TestProc() ...@@ -16,7 +16,7 @@ TestProc::~TestProc()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool TestProc::isFullQueue() bool TestProc::isFullQueue()
{ {
return ( getCountOfQueueFull() > 0 ); return ( getCountOfLostMessages() > 0 );
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
TestProc::TestProc() TestProc::TestProc()
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#ifndef UMessageQueue_H_ #ifndef MQAtomic_H_
#define UMessageQueue_H_ #define MQAtomic_H_
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
#include <atomic> #include <atomic>
#include <vector> #include <vector>
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
/*! \class UMessageQueue /*! \class MQAtomic
* Очередь сообщений. * Очередь сообщений на основе atomic переменных.
* *
* Чтобы избежать работы с mutex, очередь построена по принципу циклического буфера, * Чтобы избежать работы с mutex, очередь построена по принципу циклического буфера,
* c использованием atomic-переменных и попыткой реализовать LockFree работу. * c использованием atomic-переменных и попыткой реализовать LockFree работу.
...@@ -56,18 +56,26 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; ...@@ -56,18 +56,26 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
* ============================================= * =============================================
* Т.к. для обеспечения lockfree записи индексы (wpos) постоянно растут * Т.к. для обеспечения lockfree записи индексы (wpos) постоянно растут
* (т.е. каждый пишущий поток пишет в новое место), в качестве atomic индекса выбран unsigned long. * (т.е. каждый пишущий поток пишет в новое место), в качестве atomic индекса выбран unsigned long.
* Что для x86_32 системы работающей без перезагруки длительное время переполнение индекса может стать проблеммой. * Для x86_32 системы работающей без перезагруки длительное время, переполнение индекса может стать проблеммой.
* Фактически же размер циклического буфера ограничен и запись ведётся в позицию [wpos%size], т.е. * Фактически же размер циклического буфера ограничен и запись ведётся в позицию [wpos%size], т.е.
* в общем случае достаточно чтобы индекс был не больше размера буфера (size). * в общем случае достаточно чтобы индекс был не больше размера буфера (size).
* \error ПОКА ПРОБЛЕММА ПЕРЕПОЛНЕНИЯ НЕ РЕШЕНА.. * \error ПОКА ПРОБЛЕММА ПЕРЕПОЛНЕНИЯ БЕЗ ПОТЕРИ ПАКЕТОВ НЕ РЕШЕНА..
* чтобы "сбрасывать" индекс, нужно приостанавливать все пишущие и читающие потоки и одновременно * чтобы "сбрасывать" индекс без потери сообщений, нужно одной транзакцией менять wpos, qpos и rpos.
* (одной транзакцией) менять wpos и rpos. * Как это "красиво" сделать в рамках lockfree я пока не придумал.
* Поэтому сейчас просто теряются сообщения, в зависимости от стратегии (lostStrategy) либо
* не добавляются новые сообщения пока rpos не перейдёт через максимум (lostStrategy=lostNewData),
* либо потеряются "старые" (rpos приравняется к wpos) - это lostStrategy=lostOldData.
*
* --------------------------------
* ЭТА ОЧЕРЕДЬ ПОКАЗЫВАЕТ В ДВА-ТРИ РАЗА ЛУЧШУЮ СКОРОСТЬ ПО СРАВНЕНИЮ С MQMutex
* При скорости поступления сообщений 1 сообщение в 10 мсек, без переполнения на x86_32 очередь
* проработает ~1.2 года (речь о работе без перезапуска программы)
* *
*/ */
class UMessageQueue class MQAtomic
{ {
public: public:
UMessageQueue( size_t qsize = 2000 ); MQAtomic( size_t qsize = 2000 );
/*! поместить сообщение в очередь */ /*! поместить сообщение в очередь */
void push( const VoidMessagePtr& msg ); void push( const VoidMessagePtr& msg );
...@@ -103,9 +111,9 @@ class UMessageQueue ...@@ -103,9 +111,9 @@ class UMessageQueue
} }
/*! сколько раз очередь переполнялась */ /*! сколько раз очередь переполнялась */
inline size_t getCountOfQueueFull() const inline size_t getCountOfLostMessages() const
{ {
return stCountOfQueueFull; return stCountOfLostMessages;
} }
protected: protected:
...@@ -113,17 +121,19 @@ class UMessageQueue ...@@ -113,17 +121,19 @@ class UMessageQueue
// заполнить всю очередь указанным сообщением // заполнить всю очередь указанным сообщением
void mqFill( const VoidMessagePtr& v ); void mqFill( const VoidMessagePtr& v );
private: // для возможности тестирования переполнения
// специально делается такая функция
void set_wpos( unsigned long pos );
void set_rpos( unsigned long pos );
void resetIndexes(); private:
std::mutex resetMutex;
typedef std::vector<VoidMessagePtr> MQueue; typedef std::vector<VoidMessagePtr> MQueue;
MQueue mqueue; MQueue mqueue;
std::atomic_ulong wpos = { 0 }; // позиция на запись std::atomic_ulong wpos = { 0 }; // позиция на запись
std::atomic_ulong rpos = { 0 }; // позиция на чтение std::atomic_ulong rpos = { 0 }; // позиция на чтение
std::atomic_ulong mpos = { 0 }; // текущая позиция последнего элемента (max position) (реально добавленного в очередь) std::atomic_ulong qpos = { 0 }; // текущая позиция последнего элемента (max position) (реально добавленного в очередь)
LostStrategy lostStrategy = { lostOldData }; LostStrategy lostStrategy = { lostOldData };
...@@ -132,7 +142,7 @@ class UMessageQueue ...@@ -132,7 +142,7 @@ class UMessageQueue
// статистическая информация // статистическая информация
size_t stMaxQueueMessages = { 0 }; /*!< Максимальное число сообщений хранившихся в очереди */ size_t stMaxQueueMessages = { 0 }; /*!< Максимальное число сообщений хранившихся в очереди */
size_t stCountOfQueueFull = { 0 }; /*!< количество переполнений очереди сообщений */ size_t stCountOfLostMessages = { 0 }; /*!< количество переполнений очереди сообщений */
}; };
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
#endif #endif
......
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// --------------------------------------------------------------------------
#ifndef MQMutex_H_
#define MQMutex_H_
//--------------------------------------------------------------------------
#include <deque>
#include <list>
#include <memory>
#include "Mutex.h"
#include "MessageType.h"
//--------------------------------------------------------------------------
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//--------------------------------------------------------------------------
/*! \class MQMutex
* Очередь сообщений на std::mutex-е.
* Максимальное ограничение на размер очереди сообщений задаётся функцией setMaxSizeOfMessageQueue().
*
* Контроль переполения очереди осуществляется в push
* Если очередь переполняется, то сообщения ТЕРЯЮТСЯ!
* При помощи функции setLostStrategy() можно установить стратегию что терять
* lostNewData - в случае переполнения теряются новые данные (т.е. не будут помещаться в очередь)
* lostOldData - в случае переполнения очереди, старые данные затираются новыми.
*
*/
class MQMutex
{
public:
MQMutex( size_t qsize = 2000 );
/*! поместить сообщение в очередь */
void push( const VoidMessagePtr& msg );
/*! Извлечь сообщение из очереди
* \return не валидный shatred_ptr если сообщений нет
*/
VoidMessagePtr top();
size_t size();
bool empty();
// ----- Настройки -----
// неявно подразумевается, что всё настраивается до первого использования
// ----------------------
void setMaxSizeOfMessageQueue( size_t s );
size_t getMaxSizeOfMessageQueue();
/*! Стратегия при переполнении */
enum LostStrategy
{
lostOldData, // default
lostNewData
};
void setLostStrategy( LostStrategy s );
// ---- Статистика ----
/*! максимальное количество которое было в очереди сообщений */
inline size_t getMaxQueueMessages() const
{
return stMaxQueueMessages;
}
/*! количество потерянных сообщений */
inline size_t getCountOfLostMessages() const
{
return stCountOfLostMessages;
}
protected:
private:
//typedef std::queue<VoidMessagePtr> MQueue;
typedef std::deque<VoidMessagePtr> MQueue;
MQueue mqueue;
std::mutex qmutex;
LostStrategy lostStrategy = { lostOldData };
/*! размер очереди сообщений (при превышении происходит очистка) */
size_t SizeOfMessageQueue = { 2000 };
// статистическая информация
size_t stMaxQueueMessages = { 0 }; /*!< Максимальное число сообщений хранившихся в очереди */
size_t stCountOfLostMessages = { 0 }; /*!< количество переполнений очереди сообщений */
};
//---------------------------------------------------------------------------
#endif
//---------------------------------------------------------------------------
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
#include "UniSetObject_i.hh" #include "UniSetObject_i.hh"
#include "ThreadCreator.h" #include "ThreadCreator.h"
#include "LT_Object.h" #include "LT_Object.h"
#include "UMessageQueue.h" #include "MQMutex.h"
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
//#include <omnithread.h> //#include <omnithread.h>
...@@ -152,8 +152,8 @@ class UniSetObject: ...@@ -152,8 +152,8 @@ class UniSetObject:
/*! текущее количесво сообщений в очереди */ /*! текущее количесво сообщений в очереди */
size_t countMessages(); size_t countMessages();
/*! количество раз перполнения очереди сообщений */ /*! количество потерянных сообщений */
size_t getCountOfQueueFull(); size_t getCountOfLostMessages();
//! Активизация объекта (переопределяется для необходимых действий после активизации) //! Активизация объекта (переопределяется для необходимых действий после активизации)
virtual bool activateObject() virtual bool activateObject()
...@@ -263,9 +263,9 @@ class UniSetObject: ...@@ -263,9 +263,9 @@ class UniSetObject:
std::shared_ptr< ThreadCreator<UniSetObject> > thr; std::shared_ptr< ThreadCreator<UniSetObject> > thr;
/*! очереди сообщений в зависимости от приоритета */ /*! очереди сообщений в зависимости от приоритета */
UMessageQueue mqueueLow; MQMutex mqueueLow;
UMessageQueue mqueueMedium; MQMutex mqueueMedium;
UMessageQueue mqueueHi; MQMutex mqueueHi;
std::atomic_bool a_working; std::atomic_bool a_working;
std::mutex m_working; std::mutex m_working;
......
noinst_LTLIBRARIES = libObjectsRepository.la noinst_LTLIBRARIES = libObjectsRepository.la
libObjectsRepository_la_SOURCES = UniSetTypes_iSK.cc UniSetObject_iSK.cc UniSetTypes.cc UMessageQueue.cc \ libObjectsRepository_la_SOURCES = UniSetTypes_iSK.cc UniSetObject_iSK.cc UniSetTypes.cc \
UniSetManager_iSK.cc ObjectIndex.cc ObjectIndex_Array.cc ObjectIndex_XML.cc ObjectIndex_idXML.cc \ UniSetManager_iSK.cc ObjectIndex.cc ObjectIndex_Array.cc ObjectIndex_XML.cc ObjectIndex_idXML.cc \
ORepHelpers.cc UniSetObject.cc UniSetManager.cc \ ORepHelpers.cc UniSetObject.cc UniSetManager.cc \
UniSetActivator.cc ObjectRepository.cc ObjectRepositoryFactory.cc \ UniSetActivator.cc ObjectRepository.cc ObjectRepositoryFactory.cc \
......
...@@ -340,11 +340,11 @@ size_t UniSetObject::countMessages() ...@@ -340,11 +340,11 @@ size_t UniSetObject::countMessages()
return (mqueueMedium.size() + mqueueLow.size() + mqueueHi.size()); return (mqueueMedium.size() + mqueueLow.size() + mqueueHi.size());
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
size_t UniSetObject::getCountOfQueueFull() size_t UniSetObject::getCountOfLostMessages()
{ {
return (mqueueMedium.getCountOfQueueFull() + return (mqueueMedium.getCountOfLostMessages() +
mqueueLow.getCountOfQueueFull() + mqueueLow.getCountOfLostMessages() +
mqueueHi.getCountOfQueueFull() ); mqueueHi.getCountOfLostMessages() );
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::sigterm( int signo ) void UniSetObject::sigterm( int signo )
...@@ -659,13 +659,13 @@ UniSetTypes::SimpleInfo* UniSetObject::getInfo( ::CORBA::Long userparam ) ...@@ -659,13 +659,13 @@ UniSetTypes::SimpleInfo* UniSetObject::getInfo( ::CORBA::Long userparam )
info << "\tcount=" << countMessages() info << "\tcount=" << countMessages()
<< "\t medum: " << "\t medum: "
<< " maxMsg=" << mqueueMedium.getMaxQueueMessages() << " maxMsg=" << mqueueMedium.getMaxQueueMessages()
<< " qFull(" << mqueueMedium.getMaxSizeOfMessageQueue() << ")=" << mqueueMedium.getCountOfQueueFull() << " qFull(" << mqueueMedium.getMaxSizeOfMessageQueue() << ")=" << mqueueMedium.getCountOfLostMessages()
<< "\t hi: " << "\t hi: "
<< " maxMsg=" << mqueueHi.getMaxQueueMessages() << " maxMsg=" << mqueueHi.getMaxQueueMessages()
<< " qFull(" << mqueueHi.getMaxSizeOfMessageQueue() << ")=" << mqueueHi.getCountOfQueueFull() << " qFull(" << mqueueHi.getMaxSizeOfMessageQueue() << ")=" << mqueueHi.getCountOfLostMessages()
<< "\t low: " << "\t low: "
<< " maxMsg=" << mqueueLow.getMaxQueueMessages() << " maxMsg=" << mqueueLow.getMaxQueueMessages()
<< " qFull(" << mqueueLow.getMaxSizeOfMessageQueue() << ")=" << mqueueLow.getCountOfQueueFull(); << " qFull(" << mqueueLow.getMaxSizeOfMessageQueue() << ")=" << mqueueLow.getCountOfLostMessages();
SimpleInfo* res = new SimpleInfo(); SimpleInfo* res = new SimpleInfo();
res->info = info.str().c_str(); // CORBA::string_dup(info.str().c_str()); res->info = info.str().c_str(); // CORBA::string_dup(info.str().c_str());
......
...@@ -17,68 +17,106 @@ ...@@ -17,68 +17,106 @@
#include <unordered_map> #include <unordered_map>
#include <map> #include <map>
#include "MessageType.h" #include "MessageType.h"
#include "UMessageQueue.h" #include "MQAtomic.h"
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
using namespace UniSetTypes; using namespace UniSetTypes;
using namespace std; using namespace std;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
UMessageQueue::UMessageQueue( size_t qsize ): MQAtomic::MQAtomic( size_t qsize ):
SizeOfMessageQueue(qsize) SizeOfMessageQueue(qsize)
{ {
mqFill(nullptr); mqFill(nullptr);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
void UMessageQueue::push( const VoidMessagePtr& vm ) void MQAtomic::push( const VoidMessagePtr& vm )
{ {
// проверяем переполнение, только если стратегия "терять новые данные" // проверяем переполнение, только если стратегия "терять новые данные"
// иначе нет смысла проверять, а можно просто писать новые данные затирая старые // иначе нет смысла проверять, а можно просто писать новые данные затирая старые
if( lostStrategy == lostNewData && (wpos - rpos) >= SizeOfMessageQueue ) if( lostStrategy == lostNewData && (wpos - rpos) >= SizeOfMessageQueue )
{ {
stCountOfQueueFull++; stCountOfLostMessages++;
return; return;
} }
// -----------------------------------------------
// Если у нас wpos уже перешёл через максимум и стратегия "потеря новых сообщений"
// то просто ждм пока "подтянется" rpos
if( wpos < rpos )
{
stCountOfLostMessages++;
if( lostStrategy == lostNewData )
return;
}
// -----------------------------------------------
// сперва надо сдвинуть счётчик (чтобы следующий поток уже писал в новое место) // сперва надо сдвинуть счётчик (чтобы следующий поток уже писал в новое место)
size_t w = wpos.fetch_add(1); unsigned long w = wpos.fetch_add(1);
// а потом уже добавлять новое сообщение в "зарезервированное" место // а потом уже добавлять новое сообщение в "зарезервированное" место
mqueue[w%SizeOfMessageQueue] = vm; mqueue[w%SizeOfMessageQueue] = vm;
mpos++; // теперь увеличиваем реальное количество элементов в очереди qpos.fetch_add(1); // теперь увеличиваем реальное количество элементов в очереди
// ведём статистику // ведём статистику
size_t sz = mpos - rpos; size_t sz = qpos - rpos;
if( sz > stMaxQueueMessages ) if( sz > stMaxQueueMessages )
stMaxQueueMessages = sz; stMaxQueueMessages = sz;
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
VoidMessagePtr UMessageQueue::top() VoidMessagePtr MQAtomic::top()
{ {
// если стратегия "потеря старых данных" // если стратегия "потеря старых данных"
// то надо постоянно "подтягивать" rpos к wpos // то надо постоянно "подтягивать" rpos к wpos
if( lostStrategy == lostOldData && (wpos - rpos) >= SizeOfMessageQueue ) if( lostStrategy == lostOldData && (wpos - rpos) >= SizeOfMessageQueue )
{ {
stCountOfQueueFull++; stCountOfLostMessages++;
rpos.store( wpos - SizeOfMessageQueue ); rpos.store( wpos - SizeOfMessageQueue );
} }
// смотрим "фактическое" количество (mpos) if( rpos > qpos )
// т.к. помещение в вектор тоже занимает время {
// а при этом wpos у нас уже будет +1 if( lostStrategy == lostNewData )
if( rpos < mpos ) {
// дочитываем до конца.. (пока rpos не перейдёт через максимум)
unsigned long r = rpos.fetch_add(1);
return mqueue[r%SizeOfMessageQueue];
}
// if( lostStrategy == lostOldData )
rpos = 0;
if( qpos == 0 )
return nullptr;
}
// смотрим qpos - который увеличивается только после помещения элемента в очередь
// т.к. помещение в вектор тоже занимает время,
// то может случиться что wpos уже увеличился, а элемент ещё не поместили в очередь
// при этом вызвался этот top()
if( rpos < qpos )
{ {
// сперва надо сдвинуть счётчик (чтобы следующий поток уже читал новое) // сперва надо сдвинуть счётчик (чтобы следующий поток уже читал новое)
size_t r = rpos.fetch_add(1); unsigned long r = rpos.fetch_add(1);
// если в этот момент был "переполнен" wpos
if( r > wpos && lostStrategy == lostOldData )
{
r = 0;
if( rpos > wpos )
rpos = 0;
if( qpos == 0 )
return nullptr;
}
// т.к. между if и этим местом, может придти другой читающий поток, то // т.к. между if и этим местом, может придти другой читающий поток, то
// проверяем здесь ещё раз // проверяем здесь ещё раз
if( r < mpos ) if( r < qpos )
return mqueue[r%SizeOfMessageQueue]; return mqueue[r%SizeOfMessageQueue];
} }
return nullptr; return nullptr;
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
size_t UMessageQueue::size() size_t MQAtomic::size()
{ {
// т.к. rpos корректируется только при фактическом вызое top() // т.к. rpos корректируется только при фактическом вызое top()
// то тут приходиться смотреть если у нас переполнение // то тут приходиться смотреть если у нас переполнение
...@@ -87,36 +125,34 @@ size_t UMessageQueue::size() ...@@ -87,36 +125,34 @@ size_t UMessageQueue::size()
if( (wpos - rpos) >= SizeOfMessageQueue ) if( (wpos - rpos) >= SizeOfMessageQueue )
return SizeOfMessageQueue; return SizeOfMessageQueue;
return (mpos - rpos); return (qpos - rpos);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
bool UMessageQueue::empty() bool MQAtomic::empty()
{ {
return (mpos == rpos); return (qpos == rpos);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
void UMessageQueue::setMaxSizeOfMessageQueue( size_t s ) void MQAtomic::setMaxSizeOfMessageQueue( size_t s )
{ {
if( s != SizeOfMessageQueue ) if( s != SizeOfMessageQueue )
{ {
SizeOfMessageQueue = s; SizeOfMessageQueue = s;
mqFill(nullptr); mqFill(nullptr);
} }
else
mqFill(nullptr);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
size_t UMessageQueue::getMaxSizeOfMessageQueue() size_t MQAtomic::getMaxSizeOfMessageQueue()
{ {
return SizeOfMessageQueue; return SizeOfMessageQueue;
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
void UMessageQueue::setLostStrategy( UMessageQueue::LostStrategy s ) void MQAtomic::setLostStrategy( MQAtomic::LostStrategy s )
{ {
lostStrategy = s; lostStrategy = s;
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
void UMessageQueue::mqFill( const VoidMessagePtr& v ) void MQAtomic::mqFill( const VoidMessagePtr& v )
{ {
mqueue.reserve(SizeOfMessageQueue); mqueue.reserve(SizeOfMessageQueue);
mqueue.clear(); mqueue.clear();
...@@ -124,3 +160,14 @@ void UMessageQueue::mqFill( const VoidMessagePtr& v ) ...@@ -124,3 +160,14 @@ void UMessageQueue::mqFill( const VoidMessagePtr& v )
mqueue.push_back(v); mqueue.push_back(v);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
void MQAtomic::set_wpos( unsigned long pos )
{
wpos = pos;
qpos = pos;
}
//---------------------------------------------------------------------------
void MQAtomic::set_rpos( unsigned long pos )
{
rpos = pos;
}
//---------------------------------------------------------------------------
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <unordered_map>
#include <map>
#include "MessageType.h"
#include "MQMutex.h"
//--------------------------------------------------------------------------
using namespace UniSetTypes;
using namespace std;
//--------------------------------------------------------------------------
MQMutex::MQMutex( size_t qsize ):
SizeOfMessageQueue(qsize)
{
}
//---------------------------------------------------------------------------
void MQMutex::push( const VoidMessagePtr& vm )
{
std::lock_guard<std::mutex> lk(qmutex);
size_t sz = mqueue.size();
// проверяем переполнение, только если стратегия "терять новые данные"
// иначе нет смысла проверять, а можно просто писать новые данные затирая старые
// (sz+1) - т.к мы смотрим есть ли место для новых данных
if( (sz+1) > SizeOfMessageQueue )
{
stCountOfLostMessages++;
if( lostStrategy == lostNewData )
return;
// if( lostStrategy == lostOldData )
mqueue.pop_front(); // удаляем одно старое, добавляем одно новое
sz--;
}
mqueue.push_back(vm);
sz++;
if( sz > stMaxQueueMessages )
stMaxQueueMessages = sz;
}
//---------------------------------------------------------------------------
VoidMessagePtr MQMutex::top()
{
std::lock_guard<std::mutex> lk(qmutex);
if( mqueue.empty() )
return nullptr;
auto m = mqueue.front();
mqueue.pop_front();
return m;
}
//---------------------------------------------------------------------------
size_t MQMutex::size()
{
std::lock_guard<std::mutex> lk(qmutex);
return mqueue.size();
}
//---------------------------------------------------------------------------
bool MQMutex::empty()
{
std::lock_guard<std::mutex> lk(qmutex);
return mqueue.empty();
}
//---------------------------------------------------------------------------
void MQMutex::setMaxSizeOfMessageQueue( size_t s )
{
SizeOfMessageQueue = s;
}
//---------------------------------------------------------------------------
size_t MQMutex::getMaxSizeOfMessageQueue()
{
return SizeOfMessageQueue;
}
//---------------------------------------------------------------------------
void MQMutex::setLostStrategy( MQMutex::LostStrategy s )
{
lostStrategy = s;
}
//---------------------------------------------------------------------------
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
noinst_LTLIBRARIES = libVarious.la noinst_LTLIBRARIES = libVarious.la
libVarious_la_CPPFLAGS = $(SIGC_CFLAGS) $(COMCPP_CFLAGS) libVarious_la_CPPFLAGS = $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libVarious_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS) libVarious_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS)
libVarious_la_SOURCES = UniXML.cc MessageType.cc Configuration.cc \ libVarious_la_SOURCES = UniXML.cc MessageType.cc Configuration.cc MQMutex.cc MQAtomic.cc \
Restorer_XML.cc RunLock.cc Mutex.cc SViewer.cc SMonitor.cc LT_Object.cc WDTInterface.cc VMonitor.cc Restorer_XML.cc RunLock.cc Mutex.cc SViewer.cc SMonitor.cc LT_Object.cc WDTInterface.cc VMonitor.cc
local-clean: local-clean:
......
...@@ -3,14 +3,15 @@ ...@@ -3,14 +3,15 @@
#include <assert.h> #include <assert.h>
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include "UMessageQueue.h" #include "MQAtomic.h"
#include "MQMutex.h"
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
UMessageQueue mq; // тестируемая очередь MQAtomic mq; // тестируемая очередь
const size_t COUNT = 1000000; // сколько сообщения послать const size_t COUNT = 1000000; // сколько сообщений поместить в очередь
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// поток записи // поток записи
void mq_write_thread() void mq_write_thread()
......
#include <catch.hpp> #include <catch.hpp>
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#include "UMessageQueue.h" #include <limits>
#include "MQAtomic.h"
#include "MQMutex.h"
#include "MessageType.h" #include "MessageType.h"
#include "Configuration.h" #include "Configuration.h"
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// ВНИМАНИЕ! ЗДЕСЬ ОПРЕДЕЛЯЕТСЯ ТИП ТЕСТИРУЕМОЙ ОЧЕРЕДИ
// (пока не придумал как параметризовать тест)
typedef MQAtomic UMessageQueue;
#define TEST_MQ_ATOMIC 1
// --------------------------------------------------------------------------
#ifdef TEST_MQ_ATOMIC
// специальный "декоратор" чтобы можно было тестировать переполнение индексов
class MQAtomicTest:
public MQAtomic
{
public:
inline void set_wpos( unsigned long pos )
{
MQAtomic::set_wpos(pos);
}
inline void set_rpos( unsigned long pos )
{
MQAtomic::set_rpos(pos);
}
};
#endif
// --------------------------------------------------------------------------
// ВНИМАНИЕ! ЗДЕСЬ ОПРЕДЕЛЯЕТСЯ ТИП ТЕСТИРУЕМОЙ ОЧЕРЕДИ
// (пока не придумал как параметризовать тест)
typedef MQAtomic UMessageQueue;
// --------------------------------------------------------------------------
using namespace std; using namespace std;
using namespace UniSetTypes; using namespace UniSetTypes;
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -64,7 +94,7 @@ TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" ) ...@@ -64,7 +94,7 @@ TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" )
REQUIRE( msg!=nullptr ); REQUIRE( msg!=nullptr );
REQUIRE( msg->consumer == 120 ); REQUIRE( msg->consumer == 120 );
REQUIRE( mq.getCountOfQueueFull() == 1 ); REQUIRE( mq.getCountOfLostMessages() == 1 );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" ) TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" )
...@@ -84,12 +114,12 @@ TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" ) ...@@ -84,12 +114,12 @@ TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" )
pushMessage(mq,120); pushMessage(mq,120);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
REQUIRE( mq.getCountOfQueueFull() == 1 ); REQUIRE( mq.getCountOfLostMessages() == 1 );
pushMessage(mq,130); pushMessage(mq,130);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
REQUIRE( mq.getCountOfQueueFull() == 2 ); REQUIRE( mq.getCountOfLostMessages() == 2 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg!=nullptr );
...@@ -166,3 +196,79 @@ TEST_CASE( "UMessageQueue: correct operation", "[mqueue]" ) ...@@ -166,3 +196,79 @@ TEST_CASE( "UMessageQueue: correct operation", "[mqueue]" )
REQUIRE( rnum == num ); REQUIRE( rnum == num );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#ifdef TEST_MQ_ATOMIC
TEST_CASE( "UMessageQueue: overflow index (strategy=lostOldData)", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
unsigned long max = std::numeric_limits<unsigned long>::max();
MQAtomicTest mq;
mq.setLostStrategy(MQAtomic::lostOldData);
mq.set_wpos(max);
mq.set_rpos(max);
// это сообщение будет потеряно,
// т.к. добавляется при ещё не переполненном wpos
pushMessage(mq,100);
// первое чтение после переполнения
// обновляет rpos, поэтому элемент последний мы теряем
auto m = mq.top();
REQUIRE( m == nullptr );
// это сообщение уже должно к нам вернутся
pushMessage(mq,110);
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 110 );
pushMessage(mq,120);
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 120 );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
unsigned long max = std::numeric_limits<unsigned long>::max();
MQAtomicTest mq;
mq.setLostStrategy(MQAtomic::lostNewData);
mq.set_wpos(max);
mq.set_rpos(max);
pushMessage(mq,100);
pushMessage(mq,110);
pushMessage(mq,120);
// мы должны прочитать последнее сообщение из очереди
auto m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 100 );
// дальше сообщений нет пока-что (а те что были были потеряны)
m = mq.top();
REQUIRE( m == nullptr );
pushMessage(mq,130);
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 130 );
pushMessage(mq,140);
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 140 );
}
#endif
// --------------------------------------------------------------------------
#undef TEST_MQ_ATOMIC
// --------------------------------------------------------------------------
...@@ -322,7 +322,8 @@ include/UInterface.h ...@@ -322,7 +322,8 @@ include/UInterface.h
include/UniSetActivator.h include/UniSetActivator.h
include/UniSetManager.h include/UniSetManager.h
include/UniSetObject.h include/UniSetObject.h
include/UMessageQueue.h include/MQMutex.h
include/MQAtomic.h
include/UniSetTypes.h include/UniSetTypes.h
include/UniXML.h include/UniXML.h
include/UTCPCore.h include/UTCPCore.h
...@@ -429,6 +430,8 @@ src/Various/VMonitor.cc ...@@ -429,6 +430,8 @@ src/Various/VMonitor.cc
src/Various/SViewer.cc src/Various/SViewer.cc
src/Various/UniXML.cc src/Various/UniXML.cc
src/Various/WDTInterface.cc src/Various/WDTInterface.cc
src/Various/MQMutex.cc
src/Various/MQAtomic.cc
src/Makefile.am src/Makefile.am
tests/UniXmlTest/Makefile.am tests/UniXmlTest/Makefile.am
tests/UniXmlTest/XmlTest.cc tests/UniXmlTest/XmlTest.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment