Commit 778fce6f authored by Pavel Vainerman's avatar Pavel Vainerman

(UMessageQueue): вариант реализации на atmic-ах, заложил тест для очереди сообщений.

parent 76921f15
......@@ -17,44 +17,36 @@
#ifndef UMessageQueue_H_
#define UMessageQueue_H_
//--------------------------------------------------------------------------
#include <queue>
#include <atomic>
#include <vector>
#include <memory>
#include "Mutex.h"
#include "MessageType.h"
//--------------------------------------------------------------------------
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//--------------------------------------------------------------------------
/*! \class UMessageQueue
* Очередь сообщений.
* \warning Очередь рассчитана на МНОГО ПИСАТЕЛЕЙ и ОДИН(!) читатель. Т.е. чтение должно быть из одного потока!
* Сообщения извлекаются из очереди в порядке приоритета сообщения. При одинаковом приоритете - в порядке поступления в очередь.
*
* Максимальное ограничение на размер очереди сообщений задаётся функцией setMaxSizeOfMessageQueue().
*
* Контроль переполения очереди осуществляется в двух местах push и receiveMessage.
* При переполнении очереди, происходит автоматическая очистка в два этапа.
* Первый: производиться попытка "свёртки" сообщений.
* Из очереди все повторяющиеся
* - SensorMessage
* - TimerMessage
* - SystemMessage
* Если это не помогло, то производиться второй этап "чистки":
* Из очереди удаляется MaxCountRemoveOfMessage сообщений.
* Этот парамер задаётся при помощи setMaxCountRemoveOfMessage(). По умолчанию 1/4 очереди сообщений.
* Чтобы избежать работы с mutex, очередь построена по принципу циклического буфера,
* c использованием atomic-переменных и попыткой реализовать LockFree работу.
* Есть указатель на текущую позицию записи (wp) и есть "догоняющий его" указатель на позицию чтения (rp).
* Если rp догоняет wp - значит новых сообщений нет.
*
* Очистка реализована в функции cleanMsgQueue();
* При этом место под очередь(буффер) резервируется сразу.
* Счётчики сделаны (uint) монотонно растущими.
* Основные идеи:
* - счётчики постоянно увеличиваются
* - каждый пишущий поток пишет в новое место
* - читающий счётчик тоже монотонно растёт
* - реальная позиция для записи или чтения рассчитывается как (pos%size) этим и обеспечивается цикличность.
*
* \warning Т.к. при фильтровании SensorMessage не смотрится значение,
* то при удалении сообщений об изменении аналоговых датчиков очистка может привести
* к некорректной работе фильрующих алгоритмов работающих с "выборкой" последних N значений.
* (потому-что останется одно последнее)
* Максимальное ограничение на размер очереди сообщений задаётся функцией setMaxSizeOfMessageQueue().
*
* ОПТИМИЗАЦИЯ N1:
* Для того, чтобы функции push() и top() реже "сталкавались" на mutex-е очереди сообщений.
* Сделано две очереди сообщений. Одна очередь сообщений наполняется в push() (с блокировкой mutex-а),
* а вторая (без блокировки) обрабатывается в top(). Как только сообщения заканчиваются в
* top() очереди меняются местами (при захваченном mutex).
* Контроль переполения очереди осуществляется в push и в top;
* Если очередь переполняется, то сообщения ТЕРЯЮТСЯ!
* При помощи функции setLostStrategy() можно установить стратегию что терять
* lostNewData - в случае переполнения теряются новые данные (т.е. не будут помещаться в очередь)
* lostOldData - в случае переполнения очереди, старые данные затираются новыми.
*/
class UMessageQueue
{
......@@ -71,8 +63,13 @@ class UMessageQueue
void setMaxSizeOfMessageQueue( size_t s );
size_t getMaxSizeOfMessageQueue();
void setMaxCountRemoveOfMessage( size_t m );
size_t getMaxCountRemoveOfMessage();
enum LostStrategy
{
lostOldData, // default
lostNewData
};
void setLostStrategy( LostStrategy s );
// ---- Статистика ----
/*! максимальное количество которое было в очереди сообщений */
......@@ -87,37 +84,24 @@ class UMessageQueue
return stCountOfQueueFull;
}
// функция определения приоритетного сообщения для обработки
struct VoidMessageCompare:
public std::binary_function<VoidMessagePtr, VoidMessagePtr, bool>
{
bool operator()(const VoidMessagePtr& lhs,
const VoidMessagePtr& rhs) const;
};
typedef std::priority_queue<VoidMessagePtr, std::vector<VoidMessagePtr>, VoidMessageCompare> MQueue;
typedef std::vector<VoidMessagePtr> MQueue;
protected:
/*! Чистка очереди сообщений */
void cleanMsgQueue( MQueue& q );
// заполнить всю очередь указанным сообщением
void mqFill( const VoidMessagePtr& v );
private:
MQueue* wQ = { nullptr }; // указатель на текущую очередь на запись
MQueue* rQ = { nullptr }; // указатель на текущую очередь на чтение
MQueue mq1,mq2;
MQueue mqueue;
std::atomic_uint wpos = { 0 }; // позиция на запись
std::atomic_uint rpos = { 0 }; // позиция на чтение
std::atomic_uint mpos = { 0 }; // текущая позиция последнего элемента (max position) (реально добавленного в очередь)
LostStrategy lostStrategy = { lostOldData };
/*! размер очереди сообщений (при превышении происходит очистка) */
size_t SizeOfMessageQueue = { 2000 };
/*! сколько сообщений удалять при очисте */
size_t MaxCountRemoveOfMessage = { 500 };
/*! замок для блокирования совместного доступа к очереди */
UniSetTypes::uniset_rwmutex qmutex;
// статистическая информация
size_t stMaxQueueMessages = { 0 }; /*!< Максимальное число сообщений хранившихся в очереди */
size_t stCountOfQueueFull = { 0 }; /*!< количество переполнений очереди сообщений */
......
......@@ -195,16 +195,6 @@ class UniSetObject:
return mqueue.getMaxSizeOfMessageQueue();
}
void setMaxCountRemoveOfMessage( size_t m )
{
mqueue.setMaxCountRemoveOfMessage(m);
}
inline size_t getMaxCountRemoveOfMessage()
{
return mqueue.getMaxCountRemoveOfMessage();
}
inline bool isActive()
{
return active;
......
......@@ -160,14 +160,7 @@ void UniSetObject::initObject()
if( sz > 0 )
mqueue.setMaxSizeOfMessageQueue(sz);
int maxremove = conf->getArgPInt("--uniset-object-maxcount-remove-message", conf->getField("MaxCountRemoveOfMessage"), sz / 4);
if( maxremove > 0 )
mqueue.setMaxCountRemoveOfMessage(maxremove);
// workingTerminateTimeout = conf->getArgPInt("--uniset-object-working-terminate-timeout",conf->getField("WorkingTerminateTimeout"),2000);
uinfo << myname << "(init): SizeOfMessageQueue=" << mqueue.getMaxSizeOfMessageQueue()
<< " MaxCountRemoveOfMessage=" << mqueue.getMaxCountRemoveOfMessage()
<< endl;
}
// ------------------------------------------------------------------------------------------
......
#include <string>
#include <iostream>
#include <assert.h>
#include <thread>
#include <atomic>
#include "UMessageQueue.h"
......@@ -58,6 +59,17 @@ int main(int argc, const char** argv)
// чтобы не происходило переполнение
mq.setMaxSizeOfMessageQueue(COUNT+1);
// сперва просто проверка что очередь работает.
{
SensorMessage sm(100,2);
TransportMessage tm( std::move(sm.transport_msg()) );
mq.push(tm);
auto msg = mq.top();
assert( msg!=nullptr );
SensorMessage sm2( msg.get() );
assert( sm.id == sm2.id );
}
vector<int> res;
res.reserve(tnum);
......
......@@ -35,7 +35,8 @@ test_conftest.cc \
test_ui.cc \
test_iorfile.cc \
test_messagetype.cc \
test_utypes.cc
test_utypes.cc \
test_mqueue.cc
# threadtst_SOURCES = threadtst.cc
# threadtst_LDADD = $(top_builddir)/lib/libUniSet2.la ${SIGC_LIBS} $(COMCPP_LIBS)
......
#include <catch.hpp>
// --------------------------------------------------------------------------
#include "UMessageQueue.h"
#include "MessageType.h"
#include "Configuration.h"
// --------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: setup", "[mqueue]" )
{
UMessageQueue mq;
// проверка установки размера очереди
mq.setMaxSizeOfMessageQueue(10);
REQUIRE(mq.getMaxSizeOfMessageQueue() == 10 );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: simple push/top", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
UMessageQueue mq;
SensorMessage sm(100,2);
TransportMessage tm( std::move(sm.transport_msg()) );
mq.push(tm);
auto msg = mq.top();
REQUIRE( msg!=nullptr );
SensorMessage sm2( msg.get() );
REQUIRE( sm.id == sm2.id );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(1);
mq.setLostStrategy( UMessageQueue::lostOldData );
SensorMessage sm1(100,2);
TransportMessage tm1( std::move(sm1.transport_msg()) );
mq.push(tm1);
REQUIRE( mq.size() == 1 );
SensorMessage sm2(110,50);
TransportMessage tm2( std::move(sm2.transport_msg()) );
mq.push(tm2);
REQUIRE( mq.size() == 1 );
auto msg = mq.top();
REQUIRE( msg!=nullptr );
SensorMessage sm( msg.get() );
REQUIRE( sm.id == sm2.id );
REQUIRE( mq.getCountOfQueueFull() == 1 );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(1);
mq.setLostStrategy( UMessageQueue::lostNewData );
SensorMessage sm1(100,2);
TransportMessage tm1( std::move(sm1.transport_msg()) );
mq.push(tm1);
REQUIRE( mq.size() == 1 );
SensorMessage sm2(110,50);
TransportMessage tm2( std::move(sm2.transport_msg()) );
mq.push(tm2);
REQUIRE( mq.getCountOfQueueFull() == 1 );
SensorMessage sm3(120,150);
TransportMessage tm3( std::move(sm3.transport_msg()) );
mq.push(tm3);
REQUIRE( mq.size() == 1 );
REQUIRE( mq.getCountOfQueueFull() == 2 );
auto msg = mq.top();
REQUIRE( msg!=nullptr );
SensorMessage sm( msg.get() );
REQUIRE( sm.id == sm1.id );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: many read", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(1);
mq.setLostStrategy( UMessageQueue::lostNewData );
SensorMessage sm1(100,2);
TransportMessage tm1( std::move(sm1.transport_msg()) );
mq.push(tm1);
REQUIRE( mq.size() == 1 );
auto msg = mq.top();
REQUIRE( msg!=nullptr );
SensorMessage sm( msg.get() );
REQUIRE( sm.id == sm1.id );
for( int i=0; i<5; i++ )
{
auto msg = mq.top();
REQUIRE( msg==nullptr );
}
}
// --------------------------------------------------------------------------
......@@ -457,6 +457,7 @@ tests/test_unixml.cc
tests/test_utypes.cc
tests/test_logserver.cc
tests/test_tcpcheck.cc
tests/test_mqueue.cc
tests/tests-junit.xml
tests/tests.cc
tests/tests_bad_config.xml
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment