Commit 8f7d3481 authored by Pavel Vainerman's avatar Pavel Vainerman

(UniSetObject): небольшой рефакторинг и попытка повысить производительность

обрбаботки сообщений (сделано две очереди)
parent 89ba4e3b
...@@ -83,6 +83,8 @@ SM: Подумать насчёт применения https://github.com/effici ...@@ -83,6 +83,8 @@ SM: Подумать насчёт применения https://github.com/effici
DB: Сделать регулируемый буфер на INSERT-ы БД, чтобы поберечь винт (DBServer_PGSQL, DBServer_MySQL...) // по времени или по количеству DB: Сделать регулируемый буфер на INSERT-ы БД, чтобы поберечь винт (DBServer_PGSQL, DBServer_MySQL...) // по времени или по количеству
UniSetObject: две очереди, чтобы уменьшить количество блокирований mutex..
version 3 version 3
========= =========
- подумать нужен ли нам где-то ZeroMQ (zerorpc) (вместо omniORB?) - подумать нужен ли нам где-то ZeroMQ (zerorpc) (вместо omniORB?)
......
...@@ -145,7 +145,7 @@ class ThreadCreator: ...@@ -145,7 +145,7 @@ class ThreadCreator:
private: private:
ThreadCreator(); ThreadCreator();
pid_t pid; pid_t pid = { 0 };
ThreadMaster* m; ThreadMaster* m;
Action act; Action act;
...@@ -160,7 +160,7 @@ class ThreadCreator: ...@@ -160,7 +160,7 @@ class ThreadCreator:
//---------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------
template <class ThreadMaster> template <class ThreadMaster>
ThreadCreator<ThreadMaster>::ThreadCreator( ThreadMaster* m, Action a ): ThreadCreator<ThreadMaster>::ThreadCreator( ThreadMaster* m, Action a ):
pid(-1), pid(0),
m(m), m(m),
act(a), act(a),
finm(0), finm(0),
...@@ -187,7 +187,7 @@ void ThreadCreator<ThreadMaster>::stop() ...@@ -187,7 +187,7 @@ void ThreadCreator<ThreadMaster>::stop()
//---------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------
template <class ThreadMaster> template <class ThreadMaster>
ThreadCreator<ThreadMaster>::ThreadCreator(): ThreadCreator<ThreadMaster>::ThreadCreator():
pid(-1), pid(0),
m(0), m(0),
act(0), act(0),
finm(0), finm(0),
......
...@@ -55,14 +55,48 @@ class UniSetObject; ...@@ -55,14 +55,48 @@ class UniSetObject;
typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Список подчиненных объектов */ typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Список подчиненных объектов */
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
/*! \class UniSetObject /*! \class UniSetObject
* Класс задает такие свойства объекта как: получение сообщений, помещение сообщения в очередь и т.п. * Класс реализует работу uniset-объекта: работа с очередью сообщений, регистрация объекта, инициализация и т.п.
* Для ожидания сообщений используется функция waitMessage(), основанная на таймере. * Для ожидания сообщений используется функция waitMessage(msec), основанная на таймере.
* Ожидание прерывается либо по истечении указанного времени, либо по приходу сообщения, при помощи функциии * Ожидание прерывается либо по истечении указанного времени, либо по приходу сообщения, при помощи функциии
* termWaiting() вызываемой из push(). * termWaiting() вызываемой из push().
* \note Если не будет задан ObjectId(-1), то поток обработки запущен не будет. * \note Если не будет задан ObjectId(-1), то поток обработки запущен не будет.
* Также создание потока можно принудительно отключить при помощи функции void thread(). Ее необходимо вызвать до активации объекта * Также создание потока можно принудительно отключить при помощи функции void thread(false). Ее необходимо вызвать до активации объекта
* (например в конструкторе). При этом ответственность за вызов receiveMessage() и processingMessage() возлагается * (например в конструкторе). При этом ответственность за вызов receiveMessage() и processingMessage() возлагается
* на разработчика. * на разработчика.
*
* Сообщения извлекаются из очереди в порядке приоритета сообщения. При одинаковом приоритете - в порядке поступления в очередь.
*
* Максимальное ограничение на размер очереди сообщений задаётся параметром
* --uniset-object-size-message-queue val или параметром SizeOfMessageQueue в конфигурационном файле.
* А также при помощи фунции setMaxSizeOfMessageQueue().
*
* Контроль переполения очереди осуществляется в двух местах push и receiveMessage.
* При переполнении очереди, происходит автоматическая очистка в два этапа.
* Первый: производиться попытка "свёртки" сообщений.
* Из очереди все повторяющиеся
* - SensorMessage
* - TimerMessage
* - SystemMessage
* Если это не помогло, то производиться второй этап "чистки":
* Из очереди удаляется MaxCountRemoveOfMessage сообщений.
* Этот парамер задаётся при помощи --uniset-object-maxcount-remove-message или MaxCountRemoveOfMessage в конфигурационном файле.
* А также при помощи фунции setMaxCountRemoveOfMessage().
*
* Очистка реализована в функции cleanMsgQueue();
*
* \note Для специфичной обработки может быть переопределена
* \warning Т.к. при фильтровании SensorMessage не смотрится значение,
* то при удалении сообщений об изменении аналоговых датчиков очистка может привести
* к некорректной работе фильрующих алгоритмов работающих с "выборкой" последних N значений.
* (потому-что останется одно последнее)
*
*
* ОПТИМИЗАЦИЯ N1:
* Для того, чтобы функции push() и receiveMessage() реже "сталкавались" на mutex-е очереди сообщений.
* Сделано две очереди сообщений. Одна очередь сообщений наполняется в push() (с блокировкой mutex-а),
* а вторая (без блокировки) обрабатывается в receiveMessage(). Как только сообщения заканчиваются в
* receiveMessage() очереди меняются местами (при этом захватывается mutex).
*
*/ */
class UniSetObject: class UniSetObject:
public std::enable_shared_from_this<UniSetObject>, public std::enable_shared_from_this<UniSetObject>,
...@@ -212,20 +246,7 @@ class UniSetObject: ...@@ -212,20 +246,7 @@ class UniSetObject:
}; };
typedef std::priority_queue<UniSetTypes::VoidMessage, std::vector<UniSetTypes::VoidMessage>, PriorVMsgCompare> MessagesQueue; typedef std::priority_queue<UniSetTypes::VoidMessage, std::vector<UniSetTypes::VoidMessage>, PriorVMsgCompare> MessagesQueue;
/*! Чистка очереди сообщений */
/*! Вызывается при переполнеии очереди сообщений (в двух местах push и receive)
для очитски очереди.
\warning По умолчанию удаляет из очереди все повторяющиеся
- SensorMessage
- TimerMessage
- SystemMessage
Если не помогло удаляет из очереди UniSetObject::MaxCountRemoveOfMessage
\note Для специфичной обработки может быть переопределена
\warning Т.к. при фильтровании SensorMessage не смотрится значение, то
при удалении сообщений об изменении аналоговых датчиков очистка может привести
к некорректной работе фильрующих алгоритмов работающих с "выборкой" последних N значений.
(потому-что останется одно последнее)
*/
virtual void cleanMsgQueue( MessagesQueue& q ); virtual void cleanMsgQueue( MessagesQueue& q );
inline bool isActive() inline bool isActive()
...@@ -278,9 +299,9 @@ class UniSetObject: ...@@ -278,9 +299,9 @@ class UniSetObject:
/* удаление ссылки из репозитория объектов */ /* удаление ссылки из репозитория объектов */
void unregister(); void unregister();
void init_object(); void initObject();
pid_t msgpid; // pid потока обработки сообщений pid_t msgpid = { 0 }; // pid потока обработки сообщений
bool regOK = { false }; bool regOK = { false };
std::atomic_bool active; std::atomic_bool active;
...@@ -292,12 +313,14 @@ class UniSetObject: ...@@ -292,12 +313,14 @@ class UniSetObject:
std::shared_ptr< ThreadCreator<UniSetObject> > thr; std::shared_ptr< ThreadCreator<UniSetObject> > thr;
/*! очередь сообщений для объекта */ /*! очередь сообщений для объекта */
MessagesQueue queueMsg; MessagesQueue queueMsg1,queueMsg2; // две очереди..
MessagesQueue* wQueue = { nullptr }; // указатель на текущую очередь на запись
MessagesQueue* rQueue = { nullptr }; // указатель на текущую очередь на чтение
/*! замок для блокирования совместного доступа к очереди */ /*! замок для блокирования совместного доступа к очереди */
UniSetTypes::uniset_rwmutex qmutex; UniSetTypes::uniset_rwmutex qmutex;
/*! замок для блокирования совместного доступа к очереди */ /*! замок для блокирования совместного доступа к oRef */
mutable UniSetTypes::uniset_rwmutex refmutex; mutable UniSetTypes::uniset_rwmutex refmutex;
/*! размер очереди сообщений (при превышении происходит очистка) */ /*! размер очереди сообщений (при превышении происходит очистка) */
......
...@@ -296,7 +296,6 @@ namespace UniSetTypes ...@@ -296,7 +296,6 @@ namespace UniSetTypes
return destBegin; return destBegin;
} }
// -----------------------------------------------------------------------------
} }
// Варварский запрет на использование atoi вместо uni_atoi.. // Варварский запрет на использование atoi вместо uni_atoi..
......
...@@ -61,7 +61,7 @@ UniSetObject::UniSetObject(): ...@@ -61,7 +61,7 @@ UniSetObject::UniSetObject():
tmr = CREATE_TIMER; tmr = CREATE_TIMER;
myname = "noname"; myname = "noname";
section = "nonameSection"; section = "nonameSection";
init_object(); initObject();
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
UniSetObject::UniSetObject( ObjectId id ): UniSetObject::UniSetObject( ObjectId id ):
...@@ -93,7 +93,7 @@ UniSetObject::UniSetObject( ObjectId id ): ...@@ -93,7 +93,7 @@ UniSetObject::UniSetObject( ObjectId id ):
section = "UnknownSection"; section = "UnknownSection";
} }
init_object(); initObject();
} }
...@@ -122,7 +122,7 @@ UniSetObject::UniSetObject( const string& name, const string& section ): ...@@ -122,7 +122,7 @@ UniSetObject::UniSetObject( const string& name, const string& section ):
throw Exception(name + ": my ID not found!"); throw Exception(name + ": my ID not found!");
} }
init_object(); initObject();
ui->initBackId(myid); ui->initBackId(myid);
} }
...@@ -158,7 +158,7 @@ UniSetObject::~UniSetObject() ...@@ -158,7 +158,7 @@ UniSetObject::~UniSetObject()
#endif #endif
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::init_object() void UniSetObject::initObject()
{ {
a_working = ATOMIC_VAR_INIT(0); a_working = ATOMIC_VAR_INIT(0);
active = ATOMIC_VAR_INIT(0); active = ATOMIC_VAR_INIT(0);
...@@ -167,6 +167,10 @@ void UniSetObject::init_object() ...@@ -167,6 +167,10 @@ void UniSetObject::init_object()
refmutex.setName(myname + "_refmutex"); refmutex.setName(myname + "_refmutex");
// mutex_act.setName(myname + "_mutex_act"); // mutex_act.setName(myname + "_mutex_act");
// устанавливаем указатели очередей
wQueue = &queueMsg1;
rQueue = &queueMsg2;
auto conf = uniset_conf(); auto conf = uniset_conf();
SizeOfMessageQueue = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000); SizeOfMessageQueue = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000);
...@@ -210,26 +214,39 @@ void UniSetObject::setID( UniSetTypes::ObjectId id ) ...@@ -210,26 +214,39 @@ void UniSetObject::setID( UniSetTypes::ObjectId id )
*/ */
bool UniSetObject::receiveMessage( VoidMessage& vm ) bool UniSetObject::receiveMessage( VoidMessage& vm )
{ {
// здесь работаем со своей очередью без блокировки
if( !rQueue->empty() )
{
vm = rQueue->top(); // получили сообщение
rQueue->pop(); // удалили сообщение из очереди
return true;
}
// Если своя очередь пуста
// то смотрим вторую
{ {
// lock // lock
uniset_rwmutex_wrlock mlk(qmutex); uniset_rwmutex_wrlock mlk(qmutex);
if( !queueMsg.empty() ) if( !wQueue->empty() )
{ {
// контроль переполнения // контроль переполнения
if( queueMsg.size() > SizeOfMessageQueue ) if( wQueue->size() > SizeOfMessageQueue )
{ {
ucrit << myname << "(receiveMessages): messages queue overflow!" << endl << flush; ucrit << myname << "(receiveMessages): messages queue overflow!" << endl << flush;
cleanMsgQueue(queueMsg); cleanMsgQueue(*wQueue);
// обновляем статистику по переполнениям // обновляем статистику по переполнениям
stCountOfQueueFull++; stCountOfQueueFull++;
stMaxQueueMessages = 0; stMaxQueueMessages = 0;
} }
if( !queueMsg.empty() ) if( !wQueue->empty() )
{ {
vm = queueMsg.top(); // получили сообщение vm = wQueue->top(); // получили сообщение
queueMsg.pop(); // удалили сообщение из очереди wQueue->pop(); // удалили сообщение из очереди
// меняем очереди местами
std::swap(rQueue,wQueue);
return true; return true;
} }
} }
...@@ -465,10 +482,10 @@ void UniSetObject::push( const TransportMessage& tm ) ...@@ -465,10 +482,10 @@ void UniSetObject::push( const TransportMessage& tm )
uniset_rwmutex_wrlock mlk(qmutex); uniset_rwmutex_wrlock mlk(qmutex);
// контроль переполнения // контроль переполнения
if( !queueMsg.empty() && queueMsg.size() > SizeOfMessageQueue ) if( !wQueue->empty() && wQueue->size() > SizeOfMessageQueue )
{ {
ucrit << myname << "(push): message queue overflow!" << endl << flush; ucrit << myname << "(push): message queue overflow!" << endl << flush;
cleanMsgQueue(queueMsg); cleanMsgQueue(*wQueue);
// обновляем статистику // обновляем статистику
stCountOfQueueFull++; stCountOfQueueFull++;
...@@ -476,11 +493,11 @@ void UniSetObject::push( const TransportMessage& tm ) ...@@ -476,11 +493,11 @@ void UniSetObject::push( const TransportMessage& tm )
} }
VoidMessage v(tm); VoidMessage v(tm);
queueMsg.push(v); wQueue->push(v);
// максимальное число ( для статистики ) // максимальное число ( для статистики )
if( queueMsg.size() > stMaxQueueMessages ) if( wQueue->size() > stMaxQueueMessages )
stMaxQueueMessages = queueMsg.size(); stMaxQueueMessages = wQueue->size();
} // unlock } // unlock
...@@ -623,7 +640,7 @@ unsigned int UniSetObject::countMessages() ...@@ -623,7 +640,7 @@ unsigned int UniSetObject::countMessages()
{ {
// lock // lock
uniset_rwmutex_rlock mlk(qmutex); uniset_rwmutex_rlock mlk(qmutex);
return queueMsg.size(); return wQueue->size() + rQueue->size();
} }
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
...@@ -669,8 +686,11 @@ bool UniSetObject::deactivate() ...@@ -669,8 +686,11 @@ bool UniSetObject::deactivate()
// lock // lock
uniset_rwmutex_wrlock mlk(qmutex); uniset_rwmutex_wrlock mlk(qmutex);
while( !queueMsg.empty() ) while( !wQueue->empty() )
queueMsg.pop(); wQueue->pop();
while( !rQueue->empty() )
rQueue->pop();
} }
try try
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment