Commit e69aac7c authored by Pavel Vainerman's avatar Pavel Vainerman

(UniSetObject): рефакторинг: вынес очередь сообщений в отдельный класс,

чтобы можно было легко менять реализацию, добавил отдельный perf-test для очереди сообщений.
parent 29a2af64
......@@ -377,6 +377,7 @@ AC_CONFIG_FILES([Makefile
include/modbus/Makefile
tests/Makefile
tests/UniXmlTest/Makefile
tests/MQPerfTest/Makefile
docs/Makefile
docs/UniSetDox.cfg
docs/UniSetDoxDevel.cfg
......
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// --------------------------------------------------------------------------
#ifndef UMessageQueue_H_
#define UMessageQueue_H_
//--------------------------------------------------------------------------
#include <queue>
#include <vector>
#include <memory>
#include "Mutex.h"
#include "MessageType.h"
//--------------------------------------------------------------------------
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//--------------------------------------------------------------------------
/*! \class UMessageQueue
* Очередь сообщений.
* \warning Очередь рассчитана на МНОГО ПИСАТЕЛЕЙ и ОДИН(!) читатель. Т.е. чтение должно быть из одного потока!
* Сообщения извлекаются из очереди в порядке приоритета сообщения. При одинаковом приоритете - в порядке поступления в очередь.
*
* Максимальное ограничение на размер очереди сообщений задаётся функцией setMaxSizeOfMessageQueue().
*
* Контроль переполения очереди осуществляется в двух местах push и receiveMessage.
* При переполнении очереди, происходит автоматическая очистка в два этапа.
* Первый: производиться попытка "свёртки" сообщений.
* Из очереди все повторяющиеся
* - SensorMessage
* - TimerMessage
* - SystemMessage
* Если это не помогло, то производиться второй этап "чистки":
* Из очереди удаляется MaxCountRemoveOfMessage сообщений.
* Этот парамер задаётся при помощи setMaxCountRemoveOfMessage(). По умолчанию 1/4 очереди сообщений.
*
* Очистка реализована в функции cleanMsgQueue();
*
* \warning Т.к. при фильтровании SensorMessage не смотрится значение,
* то при удалении сообщений об изменении аналоговых датчиков очистка может привести
* к некорректной работе фильрующих алгоритмов работающих с "выборкой" последних N значений.
* (потому-что останется одно последнее)
*
* ОПТИМИЗАЦИЯ N1:
* Для того, чтобы функции push() и top() реже "сталкавались" на mutex-е очереди сообщений.
* Сделано две очереди сообщений. Одна очередь сообщений наполняется в push() (с блокировкой mutex-а),
* а вторая (без блокировки) обрабатывается в top(). Как только сообщения заканчиваются в
* top() очереди меняются местами (при захваченном mutex).
*/
class UMessageQueue
{
public:
UMessageQueue( size_t qsize = 2000 );
void push( const UniSetTypes::TransportMessage& msg );
VoidMessagePtr top();
size_t size();
// ----- Настройки -----
void setMaxSizeOfMessageQueue( size_t s );
size_t getMaxSizeOfMessageQueue();
void setMaxCountRemoveOfMessage( size_t m );
size_t getMaxCountRemoveOfMessage();
// ---- Статистика ----
/*! максимальное количество которое было в очереди сообщений */
inline size_t getMaxQueueMessages() const
{
return stMaxQueueMessages;
}
/*! сколько раз очередь переполнялась */
inline size_t getCountOfQueueFull() const
{
return stCountOfQueueFull;
}
// функция определения приоритетного сообщения для обработки
struct VoidMessageCompare:
public std::binary_function<VoidMessagePtr, VoidMessagePtr, bool>
{
bool operator()(const VoidMessagePtr& lhs,
const VoidMessagePtr& rhs) const;
};
typedef std::priority_queue<VoidMessagePtr, std::vector<VoidMessagePtr>, VoidMessageCompare> MQueue;
protected:
/*! Чистка очереди сообщений */
void cleanMsgQueue( MQueue& q );
private:
MQueue* wQ = { nullptr }; // указатель на текущую очередь на запись
MQueue* rQ = { nullptr }; // указатель на текущую очередь на чтение
MQueue mq1,mq2;
/*! размер очереди сообщений (при превышении происходит очистка) */
size_t SizeOfMessageQueue = { 2000 };
/*! сколько сообщений удалять при очисте */
size_t MaxCountRemoveOfMessage = { 500 };
/*! замок для блокирования совместного доступа к очереди */
UniSetTypes::uniset_rwmutex qmutex;
// статистическая информация
size_t stMaxQueueMessages = { 0 }; /*!< Максимальное число сообщений хранившихся в очереди */
size_t stCountOfQueueFull = { 0 }; /*!< количество переполнений очереди сообщений */
};
//---------------------------------------------------------------------------
#endif
//---------------------------------------------------------------------------
......@@ -29,7 +29,6 @@
#include <atomic>
#include <unistd.h>
#include <sys/time.h>
#include <queue>
#include <ostream>
#include <memory>
#include <string>
......@@ -43,6 +42,7 @@
#include "UniSetObject_i.hh"
#include "ThreadCreator.h"
#include "LT_Object.h"
#include "UMessageQueue.h"
//---------------------------------------------------------------------------
//#include <omnithread.h>
......@@ -53,8 +53,6 @@ class UniSetManager;
//---------------------------------------------------------------------------
class UniSetObject;
typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Список подчиненных объектов */
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//---------------------------------------------------------------------------
/*! \class UniSetObject
* Класс реализует работу uniset-объекта: работа с очередью сообщений, регистрация объекта, инициализация и т.п.
......@@ -66,39 +64,6 @@ typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
* (например в конструкторе). При этом ответственность за вызов receiveMessage() и processingMessage() возлагается
* на разработчика.
*
* Сообщения извлекаются из очереди в порядке приоритета сообщения. При одинаковом приоритете - в порядке поступления в очередь.
*
* Максимальное ограничение на размер очереди сообщений задаётся параметром
* --uniset-object-size-message-queue val или параметром SizeOfMessageQueue в конфигурационном файле.
* А также при помощи фунции setMaxSizeOfMessageQueue().
*
* Контроль переполения очереди осуществляется в двух местах push и receiveMessage.
* При переполнении очереди, происходит автоматическая очистка в два этапа.
* Первый: производиться попытка "свёртки" сообщений.
* Из очереди все повторяющиеся
* - SensorMessage
* - TimerMessage
* - SystemMessage
* Если это не помогло, то производиться второй этап "чистки":
* Из очереди удаляется MaxCountRemoveOfMessage сообщений.
* Этот парамер задаётся при помощи --uniset-object-maxcount-remove-message или MaxCountRemoveOfMessage в конфигурационном файле.
* А также при помощи фунции setMaxCountRemoveOfMessage().
*
* Очистка реализована в функции cleanMsgQueue();
*
* \note Для специфичной обработки может быть переопределена
* \warning Т.к. при фильтровании SensorMessage не смотрится значение,
* то при удалении сообщений об изменении аналоговых датчиков очистка может привести
* к некорректной работе фильрующих алгоритмов работающих с "выборкой" последних N значений.
* (потому-что останется одно последнее)
*
*
* ОПТИМИЗАЦИЯ N1:
* Для того, чтобы функции push() и receiveMessage() реже "сталкавались" на mutex-е очереди сообщений.
* Сделано две очереди сообщений. Одна очередь сообщений наполняется в push() (с блокировкой mutex-а),
* а вторая (без блокировки) обрабатывается в receiveMessage(). Как только сообщения заканчиваются в
* receiveMessage() очереди меняются местами (при этом захватывается mutex).
*
*/
class UniSetObject:
public std::enable_shared_from_this<UniSetObject>,
......@@ -163,7 +128,8 @@ class UniSetObject:
VoidMessagePtr receiveMessage();
/*! текущее количесво сообщений в очереди */
unsigned int countMessages();
size_t countMessages();
size_t getCountOfQueueFull();
/*! прервать ожидание сообщений */
void termWaiting();
......@@ -221,37 +187,24 @@ class UniSetObject:
void setMaxSizeOfMessageQueue( size_t s )
{
SizeOfMessageQueue = s;
mqueue.setMaxSizeOfMessageQueue(s);
}
inline unsigned int getMaxSizeOfMessageQueue()
inline size_t getMaxSizeOfMessageQueue()
{
return SizeOfMessageQueue;
return mqueue.getMaxSizeOfMessageQueue();
}
void setMaxCountRemoveOfMessage( size_t m )
{
MaxCountRemoveOfMessage = m;
mqueue.setMaxCountRemoveOfMessage(m);
}
inline unsigned int getMaxCountRemoveOfMessage()
inline size_t getMaxCountRemoveOfMessage()
{
return MaxCountRemoveOfMessage;
return mqueue.getMaxCountRemoveOfMessage();
}
// функция определения приоритетного сообщения для обработки
struct PriorVMsgCompare:
public std::binary_function<VoidMessagePtr, VoidMessagePtr, bool>
{
bool operator()(const VoidMessagePtr& lhs,
const VoidMessagePtr& rhs) const;
};
typedef std::priority_queue<VoidMessagePtr, std::vector<VoidMessagePtr>, PriorVMsgCompare> MessagesQueue;
/*! Чистка очереди сообщений */
virtual void cleanMsgQueue( MessagesQueue& q );
inline bool isActive()
{
return active;
......@@ -265,19 +218,6 @@ class UniSetObject:
void setThreadPriority( int p );
// ------- Статистика -------
/*! максимальное количество которое было в очереди сообщений */
inline size_t getMaxQueueMessages()
{
return stMaxQueueMessages;
}
/*! сколько раз очередь переполнялась */
inline size_t getCountOfQueueFull()
{
return stCountOfQueueFull;
}
private:
friend class UniSetManager;
......@@ -315,25 +255,11 @@ class UniSetObject:
std::shared_ptr< ThreadCreator<UniSetObject> > thr;
/*! очередь сообщений для объекта */
MessagesQueue queueMsg1,queueMsg2; // две очереди..
MessagesQueue* wQueue = { nullptr }; // указатель на текущую очередь на запись
MessagesQueue* rQueue = { nullptr }; // указатель на текущую очередь на чтение
/*! замок для блокирования совместного доступа к очереди */
UniSetTypes::uniset_rwmutex qmutex;
UMessageQueue mqueue;
/*! замок для блокирования совместного доступа к oRef */
mutable UniSetTypes::uniset_rwmutex refmutex;
/*! размер очереди сообщений (при превышении происходит очистка) */
size_t SizeOfMessageQueue;
/*! сколько сообщений удалять при очисте */
size_t MaxCountRemoveOfMessage;
// статистическая информация
size_t stMaxQueueMessages = { 0 }; /*!< Максимальное число сообщений хранившихся в очереди */
size_t stCountOfQueueFull = { 0 }; /*!< количество переполнений очереди сообщений */
std::atomic_bool a_working;
std::mutex m_working;
std::condition_variable cv_working;
......
noinst_LTLIBRARIES = libObjectsRepository.la
libObjectsRepository_la_SOURCES = UniSetTypes_iSK.cc UniSetObject_iSK.cc UniSetTypes.cc \
libObjectsRepository_la_SOURCES = UniSetTypes_iSK.cc UniSetObject_iSK.cc UniSetTypes.cc UMessageQueue.cc \
UniSetManager_iSK.cc ObjectIndex.cc ObjectIndex_Array.cc ObjectIndex_XML.cc ObjectIndex_idXML.cc \
ORepHelpers.cc UniSetObject.cc UniSetManager.cc \
UniSetActivator.cc ObjectRepository.cc ObjectRepositoryFactory.cc \
......
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <unordered_map>
#include <map>
#include "MessageType.h"
#include "UMessageQueue.h"
//--------------------------------------------------------------------------
using namespace UniSetTypes;
using namespace std;
//--------------------------------------------------------------------------
UMessageQueue::UMessageQueue( size_t qsize ):
SizeOfMessageQueue(qsize)
{
wQ = &mq1;
rQ = &mq2;
}
//---------------------------------------------------------------------------
void UMessageQueue::push( const UniSetTypes::TransportMessage& tm )
{
{
// lock
uniset_rwmutex_wrlock mlk(qmutex);
// контроль переполнения
if( !wQ->empty() && wQ->size() > SizeOfMessageQueue )
{
// ucrit << myname << "(push): message queue overflow!" << endl << flush;
cleanMsgQueue(*wQ);
// обновляем статистику
stCountOfQueueFull++;
stMaxQueueMessages = 0;
}
auto v = make_shared<VoidMessage>(tm);
wQ->push(v);
// максимальное число ( для статистики )
if( wQ->size() > stMaxQueueMessages )
stMaxQueueMessages = wQ->size();
} // unlock
}
//---------------------------------------------------------------------------
VoidMessagePtr UMessageQueue::top()
{
// здесь работаем со своей очередью без блокировки
if( !rQ->empty() )
{
auto m = rQ->top(); // получили сообщение
rQ->pop(); // удалили сообщение из очереди
return m;
}
// Если своя очередь пуста
// то смотрим вторую
{
// lock
uniset_rwmutex_wrlock mlk(qmutex);
if( !wQ->empty() )
{
// контроль переполнения
if( wQ->size() > SizeOfMessageQueue )
{
// ucrit << myname << "(receiveMessages): messages queue overflow!" << endl << flush;
cleanMsgQueue(*wQ);
// обновляем статистику по переполнениям
stCountOfQueueFull++;
stMaxQueueMessages = 0;
}
if( !wQ->empty() )
{
auto m = wQ->top(); // получили сообщение
wQ->pop(); // удалили сообщение из очереди
// меняем очереди местами
std::swap(rQ,wQ);
return m;
}
}
} // unlock queue
return nullptr;
}
//---------------------------------------------------------------------------
size_t UMessageQueue::size()
{
uniset_rwmutex_wrlock mlk(qmutex);
return (wQ->size() + rQ->size());
}
//---------------------------------------------------------------------------
void UMessageQueue::setMaxSizeOfMessageQueue(size_t s)
{
SizeOfMessageQueue = s;
}
size_t UMessageQueue::getMaxSizeOfMessageQueue()
{
return SizeOfMessageQueue;
}
//---------------------------------------------------------------------------
void UMessageQueue::setMaxCountRemoveOfMessage( size_t m )
{
MaxCountRemoveOfMessage = m;
}
size_t UMessageQueue::getMaxCountRemoveOfMessage()
{
return MaxCountRemoveOfMessage;
}
//---------------------------------------------------------------------------
// структура определяющая минимальное количество полей
// по которым можно судить о схожести сообщений
// используется локально и только в функции очистки очереди сообщений
struct MsgInfo
{
MsgInfo():
type(Message::Unused),
id(DefaultObjectId),
node(DefaultObjectId)
{
// struct timezone tz;
tm.tv_sec = 0;
tm.tv_usec = 0;
// gettimeofday(&tm,&tz);
}
int type;
ObjectId id; // от кого
struct timeval tm; // время
ObjectId node; // откуда
inline bool operator < ( const MsgInfo& mi ) const
{
if( type != mi.type )
return type < mi.type;
if( id != mi.id )
return id < mi.id;
if( node != mi.node )
return node < mi.node;
if( tm.tv_sec != mi.tm.tv_sec )
return tm.tv_sec < mi.tm.tv_sec;
return tm.tv_usec < mi.tm.tv_usec;
}
};
//---------------------------------------------------------------------------
// структура определяющая минимальное количество полей
// по которым можно судить о схожести сообщений
// используется локально и только в функции очистки очереди сообщений
struct CInfo
{
CInfo():
sensor_id(DefaultObjectId),
value(0),
time(0),
time_usec(0),
confirm(0)
{
}
explicit CInfo( ConfirmMessage& cm ):
sensor_id(cm.sensor_id),
value(cm.value),
time(cm.time),
time_usec(cm.time_usec),
confirm(cm.confirm)
{}
long sensor_id; /* ID датчика */
double value; /* значение датчика */
time_t time; /* время, когда датчик получил сигнал */
time_t time_usec; /* время в микросекундах */
time_t confirm; /* время, когда произошло квитирование */
inline bool operator < ( const CInfo& mi ) const
{
if( sensor_id != mi.sensor_id )
return sensor_id < mi.sensor_id;
if( value != mi.value )
return value < mi.value;
if( time != mi.time )
return time < mi.time;
return time_usec < mi.time_usec;
}
};
//---------------------------------------------------------------------------
struct tmpConsumerInfo
{
tmpConsumerInfo() {}
unordered_map<UniSetTypes::KeyType, VoidMessagePtr> smap;
unordered_map<int, VoidMessagePtr> tmap;
unordered_map<int, VoidMessagePtr> sysmap;
std::map<CInfo, VoidMessagePtr> cmap;
list<VoidMessagePtr> lstOther;
};
void UMessageQueue::cleanMsgQueue( UMessageQueue::MQueue& q )
{
#if 0
ucrit << myname << "(cleanMsgQueue): msg queue cleaning..." << endl << flush;
ucrit << myname << "(cleanMsgQueue): current size of queue: " << q.size() << endl << flush;
#endif
// проходим по всем известным нам типам(базовым)
// ищем все совпадающие сообщения и оставляем только последние...
VoidMessage m;
unordered_map<UniSetTypes::ObjectId, tmpConsumerInfo> consumermap;
while( !q.empty() )
{
auto m = q.top();
q.pop();
switch( m->type )
{
case Message::SensorInfo:
{
SensorMessage sm(m.get());
UniSetTypes::KeyType k(key(sm.id, sm.node));
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых Key
consumermap[sm.consumer].smap[k] = m;
}
break;
case Message::Timer:
{
TimerMessage tm(m.get());
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых TimerId
consumermap[tm.consumer].tmap[tm.id] = m;
}
break;
case Message::SysCommand:
{
SystemMessage sm(m.get());
consumermap[sm.consumer].sysmap[sm.command] = m;
}
break;
case Message::Confirm:
{
ConfirmMessage cm(m.get());
CInfo ci(cm);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo
consumermap[cm.consumer].cmap[ci] = m;
}
break;
case Message::Unused:
// просто выкидываем (игнорируем)
break;
default:
// сразу помещаем в очередь
consumermap[m->consumer].lstOther.push_front(m);
break;
}
}
// ucrit << myname << "(cleanMsgQueue): ******** cleanup RESULT ********" << endl;
for( auto& c : consumermap )
{
#if 0
ucrit << myname << "(cleanMsgQueue): CONSUMER=" << c.first << endl;
ucrit << myname << "(cleanMsgQueue): after clean SensorMessage: " << c.second.smap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean TimerMessage: " << c.second.tmap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean SystemMessage: " << c.second.sysmap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean ConfirmMessage: " << c.second.cmap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean other: " << c.second.lstOther.size() << endl;
#endif
// теперь ОСТАВШИЕСЯ запихиваем обратно в очередь...
for( auto& v : c.second.smap )
q.push(v.second);
for( auto& v : c.second.tmap )
q.push(v.second);
for( auto& v : c.second.sysmap )
q.push(v.second);
for( auto& v : c.second.cmap )
q.push(v.second);
for( auto& v : c.second.lstOther )
q.push(v);
}
// ucrit << myname
// << "(cleanMsgQueue): ******* result size of queue: "
// << q.size()
// << " < " << getMaxSizeOfMessageQueue() << endl;
if( q.size() >= SizeOfMessageQueue )
{
// ucrit << myname << "(cleanMsgQueue): clean failed. size > " << q.size() << endl;
// ucrit << myname << "(cleanMsgQueue): remove " << getMaxCountRemoveOfMessage() << " old messages " << endl;
for( unsigned int i = 0; i < MaxCountRemoveOfMessage; i++ )
{
q.top();
q.pop();
if( q.empty() )
break;
}
// ucrit << myname << "(cleanMsgQueue): result size=" << q.size() << endl;
}
}
//---------------------------------------------------------------------------
bool UMessageQueue::VoidMessageCompare::operator()(const VoidMessagePtr& lhs, const VoidMessagePtr& rhs) const
{
if( lhs->priority == rhs->priority )
{
if( lhs->tm.tv_sec == rhs->tm.tv_sec )
return lhs->tm.tv_usec >= rhs->tm.tv_usec;
return lhs->tm.tv_sec >= rhs->tm.tv_sec;
}
return lhs->priority < rhs->priority;
}
//---------------------------------------------------------------------------
......@@ -50,11 +50,7 @@ UniSetObject::UniSetObject():
active(0),
threadcreate(false),
myid(UniSetTypes::DefaultObjectId),
oref(0),
SizeOfMessageQueue(1000),
MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0),
stCountOfQueueFull(0)
oref(0)
{
ui = make_shared<UInterface>(UniSetTypes::DefaultObjectId);
......@@ -70,11 +66,7 @@ UniSetObject::UniSetObject( ObjectId id ):
active(0),
threadcreate(true),
myid(id),
oref(0),
SizeOfMessageQueue(1000),
MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0),
stCountOfQueueFull(0)
oref(0)
{
ui = make_shared<UInterface>(id);
tmr = CREATE_TIMER;
......@@ -103,11 +95,7 @@ UniSetObject::UniSetObject( const string& name, const string& section ):
active(0),
threadcreate(true),
myid(UniSetTypes::DefaultObjectId),
oref(0),
SizeOfMessageQueue(1000),
MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0),
stCountOfQueueFull(0)
oref(0)
{
ui = make_shared<UInterface>(UniSetTypes::DefaultObjectId);
......@@ -163,22 +151,23 @@ void UniSetObject::initObject()
a_working = ATOMIC_VAR_INIT(0);
active = ATOMIC_VAR_INIT(0);
qmutex.setName(myname + "_qmutex");
refmutex.setName(myname + "_refmutex");
// mutex_act.setName(myname + "_mutex_act");
// устанавливаем указатели очередей
wQueue = &queueMsg1;
rQueue = &queueMsg2;
auto conf = uniset_conf();
SizeOfMessageQueue = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000);
MaxCountRemoveOfMessage = conf->getArgPInt("--uniset-object-maxcount-remove-message", conf->getField("MaxCountRemoveOfMessage"), SizeOfMessageQueue / 4);
int sz = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000);
if( sz > 0 )
mqueue.setMaxSizeOfMessageQueue(sz);
int maxremove = conf->getArgPInt("--uniset-object-maxcount-remove-message", conf->getField("MaxCountRemoveOfMessage"), sz / 4);
if( maxremove > 0 )
mqueue.setMaxCountRemoveOfMessage(maxremove);
// workingTerminateTimeout = conf->getArgPInt("--uniset-object-working-terminate-timeout",conf->getField("WorkingTerminateTimeout"),2000);
uinfo << myname << "(init): SizeOfMessageQueue=" << SizeOfMessageQueue
<< " MaxCountRemoveOfMessage=" << MaxCountRemoveOfMessage
uinfo << myname << "(init): SizeOfMessageQueue=" << mqueue.getMaxSizeOfMessageQueue()
<< " MaxCountRemoveOfMessage=" << mqueue.getMaxCountRemoveOfMessage()
<< endl;
}
// ------------------------------------------------------------------------------------------
......@@ -214,131 +203,8 @@ void UniSetObject::setID( UniSetTypes::ObjectId id )
*/
VoidMessagePtr UniSetObject::receiveMessage()
{
// здесь работаем со своей очередью без блокировки
if( !rQueue->empty() )
{
auto m = rQueue->top(); // получили сообщение
rQueue->pop(); // удалили сообщение из очереди
return m;
}
// Если своя очередь пуста
// то смотрим вторую
{
// lock
uniset_rwmutex_wrlock mlk(qmutex);
if( !wQueue->empty() )
{
// контроль переполнения
if( wQueue->size() > SizeOfMessageQueue )
{
ucrit << myname << "(receiveMessages): messages queue overflow!" << endl << flush;
cleanMsgQueue(*wQueue);
// обновляем статистику по переполнениям
stCountOfQueueFull++;
stMaxQueueMessages = 0;
}
if( !wQueue->empty() )
{
auto m = wQueue->top(); // получили сообщение
wQueue->pop(); // удалили сообщение из очереди
// меняем очереди местами
std::swap(rQueue,wQueue);
return m;
}
}
} // unlock queue
return false;
return mqueue.top();
}
// ------------------------------------------------------------------------------------------
// структура определяющая минимальное количество полей
// по которым можно судить о схожести сообщений
// используется локально и только в функции очистки очереди сообщений
struct MsgInfo
{
MsgInfo():
type(Message::Unused),
id(DefaultObjectId),
node(DefaultObjectId)
{
// struct timezone tz;
tm.tv_sec = 0;
tm.tv_usec = 0;
// gettimeofday(&tm,&tz);
}
int type;
ObjectId id; // от кого
struct timeval tm; // время
ObjectId node; // откуда
inline bool operator < ( const MsgInfo& mi ) const
{
if( type != mi.type )
return type < mi.type;
if( id != mi.id )
return id < mi.id;
if( node != mi.node )
return node < mi.node;
if( tm.tv_sec != mi.tm.tv_sec )
return tm.tv_sec < mi.tm.tv_sec;
return tm.tv_usec < mi.tm.tv_usec;
}
};
// структура определяющая минимальное количество полей
// по которым можно судить о схожести сообщений
// используется локально и только в функции очистки очереди сообщений
struct CInfo
{
CInfo():
sensor_id(DefaultObjectId),
value(0),
time(0),
time_usec(0),
confirm(0)
{
}
explicit CInfo( ConfirmMessage& cm ):
sensor_id(cm.sensor_id),
value(cm.value),
time(cm.time),
time_usec(cm.time_usec),
confirm(cm.confirm)
{}
long sensor_id; /* ID датчика */
double value; /* значение датчика */
time_t time; /* время, когда датчик получил сигнал */
time_t time_usec; /* время в микросекундах */
time_t confirm; /* время, когда произошло квитирование */
inline bool operator < ( const CInfo& mi ) const
{
if( sensor_id != mi.sensor_id )
return sensor_id < mi.sensor_id;
if( value != mi.value )
return value < mi.value;
if( time != mi.time )
return time < mi.time;
return time_usec < mi.time_usec;
}
};
// ------------------------------------------------------------------------------------------
VoidMessagePtr UniSetObject::waitMessage( timeout_t timeMS )
{
......@@ -478,171 +344,18 @@ void UniSetObject::setThreadPriority( int p )
// ------------------------------------------------------------------------------------------
void UniSetObject::push( const TransportMessage& tm )
{
{
// lock
uniset_rwmutex_wrlock mlk(qmutex);
// контроль переполнения
if( !wQueue->empty() && wQueue->size() > SizeOfMessageQueue )
{
ucrit << myname << "(push): message queue overflow!" << endl << flush;
cleanMsgQueue(*wQueue);
// обновляем статистику
stCountOfQueueFull++;
stMaxQueueMessages = 0;
}
auto v = make_shared<VoidMessage>(tm);
wQueue->push(v);
// максимальное число ( для статистики )
if( wQueue->size() > stMaxQueueMessages )
stMaxQueueMessages = wQueue->size();
} // unlock
mqueue.push(tm);
termWaiting();
}
// ------------------------------------------------------------------------------------------
struct tmpConsumerInfo
size_t UniSetObject::countMessages()
{
tmpConsumerInfo() {}
unordered_map<UniSetTypes::KeyType, VoidMessagePtr> smap;
unordered_map<int, VoidMessagePtr> tmap;
unordered_map<int, VoidMessagePtr> sysmap;
std::map<CInfo, VoidMessagePtr> cmap;
list<VoidMessagePtr> lstOther;
};
void UniSetObject::cleanMsgQueue( MessagesQueue& q )
{
ucrit << myname << "(cleanMsgQueue): msg queue cleaning..." << endl << flush;
ucrit << myname << "(cleanMsgQueue): current size of queue: " << q.size() << endl << flush;
// проходим по всем известным нам типам(базовым)
// ищем все совпадающие сообщения и оставляем только последние...
VoidMessage m;
unordered_map<UniSetTypes::ObjectId, tmpConsumerInfo> consumermap;
// while( receiveMessage(vm) );
// while нельзя использовать потому-что, из параллельного потока
// могут запихивать в очередь ещё сообщения.. И это цикл никогда не прервётся...
while( !q.empty() )
{
auto m = q.top();
q.pop();
switch( m->type )
{
case Message::SensorInfo:
{
SensorMessage sm(m.get());
UniSetTypes::KeyType k(key(sm.id, sm.node));
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых Key
consumermap[sm.consumer].smap[k] = m;
}
break;
case Message::Timer:
{
TimerMessage tm(m.get());
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых TimerId
consumermap[tm.consumer].tmap[tm.id] = m;
}
break;
case Message::SysCommand:
{
SystemMessage sm(m.get());
consumermap[sm.consumer].sysmap[sm.command] = m;
}
break;
case Message::Confirm:
{
ConfirmMessage cm(m.get());
CInfo ci(cm);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo
consumermap[cm.consumer].cmap[ci] = m;
}
break;
case Message::Unused:
// просто выкидываем (игнорируем)
break;
default:
// сразу помещаем в очередь
consumermap[m->consumer].lstOther.push_front(m);
break;
}
}
ucrit << myname << "(cleanMsgQueue): ******** cleanup RESULT ********" << endl;
for( auto& c : consumermap )
{
ucrit << myname << "(cleanMsgQueue): CONSUMER=" << c.first << endl;
ucrit << myname << "(cleanMsgQueue): after clean SensorMessage: " << c.second.smap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean TimerMessage: " << c.second.tmap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean SystemMessage: " << c.second.sysmap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean ConfirmMessage: " << c.second.cmap.size() << endl;
ucrit << myname << "(cleanMsgQueue): after clean other: " << c.second.lstOther.size() << endl;
// теперь ОСТАВШИЕСЯ запихиваем обратно в очередь...
for( auto& v : c.second.smap )
q.push(v.second);
for( auto& v : c.second.tmap )
q.push(v.second);
for( auto& v : c.second.sysmap )
q.push(v.second);
for( auto& v : c.second.cmap )
q.push(v.second);
for( auto& v : c.second.lstOther )
q.push(v);
}
ucrit << myname
<< "(cleanMsgQueue): ******* result size of queue: "
<< q.size()
<< " < " << getMaxSizeOfMessageQueue() << endl;
if( q.size() >= getMaxSizeOfMessageQueue() )
{
ucrit << myname << "(cleanMsgQueue): clean failed. size > " << q.size() << endl;
ucrit << myname << "(cleanMsgQueue): remove " << getMaxCountRemoveOfMessage() << " old messages " << endl;
for( unsigned int i = 0; i < getMaxCountRemoveOfMessage(); i++ )
{
q.top();
q.pop();
if( q.empty() )
break;
}
ucrit << myname << "(cleanMsgQueue): result size=" << q.size() << endl;
}
return mqueue.size();
}
// ------------------------------------------------------------------------------------------
unsigned int UniSetObject::countMessages()
size_t UniSetObject::getCountOfQueueFull()
{
{
// lock
uniset_rwmutex_rlock mlk(qmutex);
return wQueue->size() + rQueue->size();
}
return mqueue.getCountOfQueueFull();
}
// ------------------------------------------------------------------------------------------
void UniSetObject::sigterm( int signo )
......@@ -682,18 +395,6 @@ bool UniSetObject::deactivate()
thr->stop();
}
// Очищаем очередь
{
// lock
uniset_rwmutex_wrlock mlk(qmutex);
while( !wQueue->empty() )
wQueue->pop();
while( !rQueue->empty() )
rQueue->pop();
}
try
{
uinfo << myname << "(deactivate): ..." << endl;
......@@ -968,8 +669,8 @@ UniSetTypes::SimpleInfo* UniSetObject::getInfo( ::CORBA::Long userparam )
info << "откл.";
info << "\tcount=" << countMessages();
info << "\tmaxMsg=" << stMaxQueueMessages;
info << "\tqFull(" << SizeOfMessageQueue << ")=" << stCountOfQueueFull;
info << "\tmaxMsg=" << mqueue.getMaxQueueMessages();
info << "\tqFull(" << mqueue.getMaxSizeOfMessageQueue() << ")=" << mqueue.getCountOfQueueFull();
// info << "\n";
SimpleInfo* res = new SimpleInfo();
......@@ -985,19 +686,4 @@ ostream& operator<<(ostream& os, UniSetObject& obj )
return os << si->info;
}
// ------------------------------------------------------------------------------------------
bool UniSetObject::PriorVMsgCompare::operator()(const VoidMessagePtr& lhs,
const VoidMessagePtr& rhs) const
{
if( lhs->priority == rhs->priority )
{
if( lhs->tm.tv_sec == rhs->tm.tv_sec )
return lhs->tm.tv_usec >= rhs->tm.tv_usec;
return lhs->tm.tv_sec >= rhs->tm.tv_sec;
}
return lhs->priority < rhs->priority;
}
// ------------------------------------------------------------------------------------------
#undef CREATE_TIMER
noinst_PROGRAMS = mq-test
mq_test_LDADD = $(top_builddir)/lib/libUniSet2.la $(SIGC_LIBS) $(COMCPP_LIBS)
mq_test_CPPFLAGS = -I$(top_builddir)/include -I$(top_builddir)/extensions/include $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
mq_test_SOURCES = mq-test.cc
#include <string>
#include <iostream>
#include <thread>
#include <atomic>
#include "UMessageQueue.h"
// --------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
// --------------------------------------------------------------------------
UMessageQueue mq; // тестируемая очередь
const size_t COUNT = 1000000; // сколько сообщения послать
// --------------------------------------------------------------------------
// поток записи
void mq_write_thread()
{
SensorMessage smsg(100,2);
TransportMessage tm( std::move(smsg.transport_msg()) );
msleep(100);
for( size_t i=0; i<COUNT; i++ )
{
mq.push(tm);
}
}
// --------------------------------------------------------------------------
int one_test()
{
auto wthread = std::thread(mq_write_thread);
std::chrono::time_point<std::chrono::system_clock> start, end;
start = std::chrono::system_clock::now();
size_t rnum = 0;
while( rnum < COUNT )
{
auto m = mq.top();
if( m )
rnum++;
}
wthread.join();
end = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
}
// --------------------------------------------------------------------------
int main(int argc, const char** argv)
{
try
{
uniset_init(argc, argv);
int tnum = 10;
// чтобы не происходило переполнение
mq.setMaxSizeOfMessageQueue(COUNT+1);
vector<int> res;
res.reserve(tnum);
for( int i=0; i<tnum; i++ )
{
res.push_back(one_test());
}
// вычисляем среднее
int sum = 0;
for( auto&& r: res )
sum += r;
float avg = (float)sum / tnum;
std::cerr << "average elapsed time [" << tnum << "]: " << avg << " msec for " << COUNT << endl;
return 0;
}
catch( const SystemError& err )
{
cerr << "(mq-test): " << err << endl;
}
catch( const Exception& ex )
{
cerr << "(mq-test): " << ex << endl;
}
catch( const std::exception& e )
{
cerr << "(mq-test): " << e.what() << endl;
}
catch(...)
{
cerr << "(mq-test): catch(...)" << endl;
}
return 1;
}
#!/bin/sh
START=uniset2-start.sh
${START} -f ./mq-test --confile ./test.xml $*
../../Utilities/scripts/uniset2-stop.sh
\ No newline at end of file
../../conf/test.xml
\ No newline at end of file
../../Utilities/scripts/uniset2-functions.sh
\ No newline at end of file
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
SUBDIRS=MQPerfTest
if HAVE_TESTS
############################################################################
# This file is part of the UniSet library #
......
......@@ -469,6 +469,7 @@ tests/tests_with_conf.xml
tests/threadtst.cc
tests/umutex.cc
tests/perf_test.cc
tests/MQPerfTest/mq-test.cc
testsuite/Makefile.am
Utilities/Admin/admin.cc
Utilities/Admin/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment