Commit 9d40f0a5 authored by Pavel Vainerman's avatar Pavel Vainerman

(MQAtomic): работа над защитой от переполнения

parent 17aad13a
...@@ -37,7 +37,7 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; ...@@ -37,7 +37,7 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
* Т.к. подразумевает схему "МНОГО ПИСАТЕЛЕЙ" и "ОДИН ЧИТАТЕЛЬ". * Т.к. подразумевает схему "МНОГО ПИСАТЕЛЕЙ" и "ОДИН ЧИТАТЕЛЬ".
* *
* При этом место под очередь(буффер) резервируется сразу. * При этом место под очередь(буффер) резервируется сразу.
* Счётчики сделаны (uint) монотонно растущими. * Счётчики сделаны (ulong) монотонно растущими.
* Основные идеи: * Основные идеи:
* - счётчики постоянно увеличиваются * - счётчики постоянно увеличиваются
* - каждый пишущий поток пишет в новое место (индекс больше последнего) * - каждый пишущий поток пишет в новое место (индекс больше последнего)
...@@ -51,26 +51,11 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; ...@@ -51,26 +51,11 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
* При помощи функции setLostStrategy() можно установить стратегию что терять * При помощи функции setLostStrategy() можно установить стратегию что терять
* lostNewData - в случае переполнения теряются новые данные (т.е. не будут помещаться в очередь) * lostNewData - в случае переполнения теряются новые данные (т.е. не будут помещаться в очередь)
* lostOldData - в случае переполнения очереди, старые данные затираются новыми. * lostOldData - в случае переполнения очереди, старые данные затираются новыми.
* * Под переполнением подразумевается, что чтение отстаёт от писателей больше чем на размер буфера.
* Защита от переполнения индекса (wpos и rpos).
* =============================================
* Т.к. для обеспечения lockfree записи индексы (wpos) постоянно растут
* (т.е. каждый пишущий поток пишет в новое место), в качестве atomic индекса выбран unsigned long.
* Для x86_32 системы работающей без перезагруки длительное время, переполнение индекса может стать проблеммой.
* Фактически же размер циклического буфера ограничен и запись ведётся в позицию [wpos%size], т.е.
* в общем случае достаточно чтобы индекс был не больше размера буфера (size).
* \error ПОКА ПРОБЛЕММА ПЕРЕПОЛНЕНИЯ БЕЗ ПОТЕРИ ПАКЕТОВ НЕ РЕШЕНА..
* чтобы "сбрасывать" индекс без потери сообщений, нужно одной транзакцией менять wpos, qpos и rpos.
* Как это "красиво" сделать в рамках lockfree я пока не придумал.
* Поэтому сейчас просто теряются сообщения, в зависимости от стратегии (lostStrategy) либо
* не добавляются новые сообщения пока rpos не перейдёт через максимум (lostStrategy=lostNewData),
* либо потеряются "старые" (rpos приравняется к wpos) - это lostStrategy=lostOldData.
* *
* -------------------------------- * --------------------------------
* ЭТА ОЧЕРЕДЬ ПОКАЗЫВАЕТ В ДВА-ТРИ РАЗА ЛУЧШУЮ СКОРОСТЬ ПО СРАВНЕНИЮ С MQMutex * ЭТА ОЧЕРЕДЬ ПОКАЗЫВАЕТ В ТРИ РАЗА ЛУЧШУЮ СКОРОСТЬ ПО СРАВНЕНИЮ С MQMutex
* При скорости поступления сообщений 1 сообщение в 10 мсек, без переполнения на x86_32 очередь * --------------------------------
* проработает ~1.2 года (речь о работе без перезапуска программы)
*
*/ */
class MQAtomic class MQAtomic
{ {
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr; typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
/*! \class MQMutex /*! \class MQMutex
* Очередь сообщений на std::mutex-е. * Простая "многопоточная" очередь сообщений с использованием std::mutex.
* Максимальное ограничение на размер очереди сообщений задаётся функцией setMaxSizeOfMessageQueue(). * Максимальное ограничение на размер очереди сообщений задаётся функцией setMaxSizeOfMessageQueue().
* *
* Контроль переполения очереди осуществляется в push * Контроль переполения очереди осуществляется в push
......
...@@ -39,13 +39,19 @@ void MQAtomic::push( const VoidMessagePtr& vm ) ...@@ -39,13 +39,19 @@ void MQAtomic::push( const VoidMessagePtr& vm )
} }
// ----------------------------------------------- // -----------------------------------------------
// Если у нас wpos уже перешёл через максимум и стратегия "потеря новых сообщений" // Если у нас wpos уже перешёл через максимум
// то просто ждм пока "подтянется" rpos // то смотрим где rpos
if( wpos < rpos ) if( wpos < rpos )
{ {
stCountOfLostMessages++; // только надо привести к одному масштабу
if( lostStrategy == lostNewData ) unsigned long w = wpos%SizeOfMessageQueue;
unsigned long r = rpos%SizeOfMessageQueue;
if( lostStrategy == lostNewData && (r-w) >= SizeOfMessageQueue )
{
stCountOfLostMessages++;
return; return;
}
} }
// ----------------------------------------------- // -----------------------------------------------
...@@ -72,45 +78,38 @@ VoidMessagePtr MQAtomic::top() ...@@ -72,45 +78,38 @@ VoidMessagePtr MQAtomic::top()
rpos.store( wpos - SizeOfMessageQueue ); rpos.store( wpos - SizeOfMessageQueue );
} }
if( rpos > qpos ) if( rpos == qpos )
{ return nullptr;
if( lostStrategy == lostNewData )
{
// дочитываем до конца.. (пока rpos не перейдёт через максимум)
unsigned long r = rpos.fetch_add(1);
return mqueue[r%SizeOfMessageQueue];
}
// if( lostStrategy == lostOldData ) // смотрим именно qpos, а не wpos.
rpos = 0; // Т.к. qpos увеличивается только после помещения элемента в очередь
if( qpos == 0 ) // (помещение в вектор тоже занимает время)
return nullptr; // иначе может случиться что wpos уже увеличился, но элемент ещё не поместили в очередь
} // а мы уже пытаемся читать.
// смотрим qpos - который увеличивается только после помещения элемента в очередь
// т.к. помещение в вектор тоже занимает время,
// то может случиться что wpos уже увеличился, а элемент ещё не поместили в очередь
// при этом вызвался этот top()
if( rpos < qpos ) if( rpos < qpos )
{ {
// сперва надо сдвинуть счётчик (чтобы следующий поток уже читал новое) // сперва надо сдвинуть счётчик (чтобы следующий поток уже работал с следующим значением)
unsigned long r = rpos.fetch_add(1); unsigned long r = rpos.fetch_add(1);
return mqueue[r%SizeOfMessageQueue];
}
// если в этот момент был "переполнен" wpos // Если rpos > qpos, значит qpos уже перешёл через максимум
if( r > wpos && lostStrategy == lostOldData ) // И это особый случай обработки (пока rpos тоже не "перескочит" через максимум)
{ if( rpos > qpos ) // делаем if каждый раз, т.к. qpos может уже поменяться в параллельном потоке
r = 0; {
if( rpos > wpos ) // приводим к одному масштабу
rpos = 0; unsigned long w = qpos%SizeOfMessageQueue;
unsigned long r = rpos%SizeOfMessageQueue;
if( qpos == 0 ) if( lostStrategy == lostOldData && (r - w) >= SizeOfMessageQueue )
return nullptr; {
stCountOfLostMessages++;
rpos.store(qpos - SizeOfMessageQueue); // "подтягиваем" rpos к qpos
} }
// т.к. между if и этим местом, может придти другой читающий поток, то // продолжаем читать как обычно
// проверяем здесь ещё раз r = rpos.fetch_add(1);
if( r < qpos ) return mqueue[r%SizeOfMessageQueue];
return mqueue[r%SizeOfMessageQueue];
} }
return nullptr; return nullptr;
......
...@@ -8,9 +8,14 @@ ...@@ -8,9 +8,14 @@
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// ВНИМАНИЕ! ЗДЕСЬ ОПРЕДЕЛЯЕТСЯ ТИП ТЕСТИРУЕМОЙ ОЧЕРЕДИ // ВНИМАНИЕ! ЗДЕСЬ ОПРЕДЕЛЯЕТСЯ ТИП ТЕСТИРУЕМОЙ ОЧЕРЕДИ
// (пока не придумал как параметризовать тест) // (пока не придумал как параметризовать тест)
typedef MQAtomic UMessageQueue;
#define TEST_MQ_ATOMIC 1 #define TEST_MQ_ATOMIC 1
#ifdef TEST_MQ_ATOMIC
typedef MQAtomic UMessageQueue;
#else
typedef MQMutex UMessageQueue;
#endif
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#ifdef TEST_MQ_ATOMIC #ifdef TEST_MQ_ATOMIC
// специальный "декоратор" чтобы можно было тестировать переполнение индексов // специальный "декоратор" чтобы можно было тестировать переполнение индексов
...@@ -209,26 +214,67 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostOldData)", "[mqueue]" ) ...@@ -209,26 +214,67 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostOldData)", "[mqueue]" )
mq.set_wpos(max); mq.set_wpos(max);
mq.set_rpos(max); mq.set_rpos(max);
// это сообщение будет потеряно, // При переходе через максимум ничего не должны потерять
// т.к. добавляется при ещё не переполненном wpos
pushMessage(mq,100); pushMessage(mq,100);
pushMessage(mq,110);
pushMessage(mq,120);
// первое чтение после переполнения
// обновляет rpos, поэтому элемент последний мы теряем
auto m = mq.top(); auto m = mq.top();
REQUIRE( m == nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 100 );
// это сообщение уже должно к нам вернутся
pushMessage(mq,110);
m = mq.top(); m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 110 ); REQUIRE( m->consumer == 110 );
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 120 );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: lost data (strategy=lostOldData)", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
unsigned long max = std::numeric_limits<unsigned long>::max();
MQAtomicTest mq;
mq.setLostStrategy(MQAtomic::lostOldData);
mq.setMaxSizeOfMessageQueue(2);
pushMessage(mq,100);
pushMessage(mq,110);
pushMessage(mq,120); pushMessage(mq,120);
auto m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 110 );
m = mq.top(); m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 120 ); REQUIRE( m->consumer == 120 );
m = mq.top();
REQUIRE( m == nullptr );
// Теперь проверяем + переполнение счётчика
mq.set_wpos(max);
mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять
pushMessage(mq,140);
pushMessage(mq,150);
pushMessage(mq,160);
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 150 );
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 160 );
m = mq.top();
REQUIRE( m == nullptr );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" ) TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" )
...@@ -242,31 +288,69 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" ) ...@@ -242,31 +288,69 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" )
mq.set_wpos(max); mq.set_wpos(max);
mq.set_rpos(max); mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять
pushMessage(mq,100); pushMessage(mq,100);
pushMessage(mq,110); pushMessage(mq,110);
pushMessage(mq,120); pushMessage(mq,120);
// мы должны прочитать последнее сообщение из очереди
auto m = mq.top(); auto m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 100 ); REQUIRE( m->consumer == 100 );
// дальше сообщений нет пока-что (а те что были были потеряны)
m = mq.top(); m = mq.top();
REQUIRE( m == nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 110 );
pushMessage(mq,130); m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 120 );
}
// --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: lost data (strategy=lostNewData)", "[mqueue]" )
{
REQUIRE( uniset_conf() != nullptr );
unsigned long max = std::numeric_limits<unsigned long>::max();
MQAtomicTest mq;
mq.setLostStrategy(MQAtomic::lostNewData);
mq.setMaxSizeOfMessageQueue(2);
pushMessage(mq,100);
pushMessage(mq,110);
pushMessage(mq,120);
auto m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 100 );
m = mq.top(); m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 130 ); REQUIRE( m->consumer == 110 );
m = mq.top();
REQUIRE( m == nullptr );
// Теперь проверяем + переполнение счётчика
mq.set_wpos(max);
mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять
pushMessage(mq,140); pushMessage(mq,140);
pushMessage(mq,150);
pushMessage(mq,160);
m = mq.top(); m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
REQUIRE( m->consumer == 140 ); REQUIRE( m->consumer == 140 );
m = mq.top();
REQUIRE( m != nullptr );
REQUIRE( m->consumer == 150 );
m = mq.top();
REQUIRE( m == nullptr );
} }
// --------------------------------------------------------------------------
#endif #endif
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
#undef TEST_MQ_ATOMIC #undef TEST_MQ_ATOMIC
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment