Commit 29a2af64 authored by Pavel Vainerman's avatar Pavel Vainerman

Начало рефакторинга механизма обработки сообщений:

(перешёл на хранение shared_ptr<VoidMessage> вместо VoidMessage, чтобы избежать лишних копирований).
parent 8f7d3481
......@@ -64,9 +64,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
// обработка сообщений (таймеров и т.п.)
for( unsigned int i=0; i&lt;<xsl:call-template name="settings-alone"><xsl:with-param name="varname" select="'msg-count'"/></xsl:call-template>; i++ )
{
if( !receiveMessage(msg) )
auto m = receiveMessage();
if( !m )
break;
processingMessage(&amp;msg);
processingMessage(m.get());
// обновление выходов
updateOutputs(forceOut);
......
......@@ -68,9 +68,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
// обработка сообщений (таймеров и т.п.)
for( unsigned int i=0; i&lt;<xsl:call-template name="settings-alone"><xsl:with-param name="varname" select="'msg-count'"/></xsl:call-template>; i++ )
{
if( !receiveMessage(msg) )
auto m = receiveMessage();
if( !m )
break;
processingMessage(&amp;msg);
processingMessage(m.get());
}
// Проверка изменения состояния датчиков
......
......@@ -65,9 +65,11 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
// обработка сообщений (таймеров и т.п.)
for( unsigned int i=0; i&lt;<xsl:call-template name="settings"><xsl:with-param name="varname" select="'msg-count'"/></xsl:call-template>; i++ )
{
if( !receiveMessage(msg) )
auto m = receiveMessage();
if( !m )
break;
processingMessage(&amp;msg);
processingMessage(m.get());
updateOutputs(forceOut);
// updatePreviousValues();
}
......
......@@ -71,9 +71,10 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::callback()
// обработка сообщений (таймеров и т.п.)
for( unsigned int i=0; i&lt;<xsl:call-template name="settings"><xsl:with-param name="varname" select="'msg-count'"/></xsl:call-template>; i++ )
{
if( !receiveMessage(msg) )
auto m = receiveMessage();
if( !m )
break;
processingMessage(&amp;msg);
processingMessage(m.get());
}
// Выполнение шага программы
......
......@@ -547,9 +547,11 @@ void UObject_SK::callback()
// обработка сообщений (таймеров и т.п.)
for( unsigned int i=0; i<20; i++ )
{
if( !receiveMessage(msg) )
auto m = receiveMessage();
if( !m )
break;
processingMessage(&msg);
processingMessage(m.get());
updateOutputs(forceOut);
// updatePreviousValues();
}
......
......@@ -53,6 +53,8 @@ class UniSetManager;
//---------------------------------------------------------------------------
class UniSetObject;
typedef std::list< std::shared_ptr<UniSetObject> > ObjectsList; /*!< Список подчиненных объектов */
typedef std::shared_ptr<UniSetTypes::VoidMessage> VoidMessagePtr;
//---------------------------------------------------------------------------
/*! \class UniSetObject
* Класс реализует работу uniset-объекта: работа с очередью сообщений, регистрация объекта, инициализация и т.п.
......@@ -158,7 +160,7 @@ class UniSetObject:
virtual void timerInfo( const UniSetTypes::TimerMessage* tm ) {}
/*! Получить сообщение */
bool receiveMessage( UniSetTypes::VoidMessage& vm );
VoidMessagePtr receiveMessage();
/*! текущее количесво сообщений в очереди */
unsigned int countMessages();
......@@ -213,7 +215,7 @@ class UniSetObject:
}
/*! Ожидать сообщения timeMS */
virtual bool waitMessage(UniSetTypes::VoidMessage& msg, timeout_t timeMS = UniSetTimer::WaitUpTime);
virtual VoidMessagePtr waitMessage( timeout_t timeMS = UniSetTimer::WaitUpTime );
void setID(UniSetTypes::ObjectId id);
......@@ -239,12 +241,13 @@ class UniSetObject:
// функция определения приоритетного сообщения для обработки
struct PriorVMsgCompare:
public std::binary_function<UniSetTypes::VoidMessage, UniSetTypes::VoidMessage, bool>
public std::binary_function<VoidMessagePtr, VoidMessagePtr, bool>
{
bool operator()(const UniSetTypes::VoidMessage& lhs,
const UniSetTypes::VoidMessage& rhs) const;
bool operator()(const VoidMessagePtr& lhs,
const VoidMessagePtr& rhs) const;
};
typedef std::priority_queue<UniSetTypes::VoidMessage, std::vector<UniSetTypes::VoidMessage>, PriorVMsgCompare> MessagesQueue;
typedef std::priority_queue<VoidMessagePtr, std::vector<VoidMessagePtr>, PriorVMsgCompare> MessagesQueue;
/*! Чистка очереди сообщений */
virtual void cleanMsgQueue( MessagesQueue& q );
......@@ -258,7 +261,6 @@ class UniSetObject:
active = set;
}
UniSetTypes::VoidMessage msg;
std::weak_ptr<UniSetManager> mymngr;
void setThreadPriority( int p );
......
......@@ -212,14 +212,14 @@ void UniSetObject::setID( UniSetTypes::ObjectId id )
* \param vm - указатель на структуру, которая заполняется если есть сообщение
* \return Возвращает \a true если сообщение есть, и \a false если нет
*/
bool UniSetObject::receiveMessage( VoidMessage& vm )
VoidMessagePtr UniSetObject::receiveMessage()
{
// здесь работаем со своей очередью без блокировки
if( !rQueue->empty() )
{
vm = rQueue->top(); // получили сообщение
auto m = rQueue->top(); // получили сообщение
rQueue->pop(); // удалили сообщение из очереди
return true;
return m;
}
// Если своя очередь пуста
......@@ -242,12 +242,12 @@ bool UniSetObject::receiveMessage( VoidMessage& vm )
if( !wQueue->empty() )
{
vm = wQueue->top(); // получили сообщение
auto m = wQueue->top(); // получили сообщение
wQueue->pop(); // удалили сообщение из очереди
// меняем очереди местами
std::swap(rQueue,wQueue);
return true;
return m;
}
}
} // unlock queue
......@@ -340,13 +340,14 @@ struct CInfo
};
// ------------------------------------------------------------------------------------------
bool UniSetObject::waitMessage(VoidMessage& vm, timeout_t timeMS)
VoidMessagePtr UniSetObject::waitMessage( timeout_t timeMS )
{
if( receiveMessage(vm) )
return true;
auto m = receiveMessage();
if( m )
return m;
tmr->wait(timeMS);
return receiveMessage(vm);
return receiveMessage();
}
// ------------------------------------------------------------------------------------------
void UniSetObject::registered()
......@@ -492,7 +493,7 @@ void UniSetObject::push( const TransportMessage& tm )
stMaxQueueMessages = 0;
}
VoidMessage v(tm);
auto v = make_shared<VoidMessage>(tm);
wQueue->push(v);
// максимальное число ( для статистики )
......@@ -508,11 +509,11 @@ struct tmpConsumerInfo
{
tmpConsumerInfo() {}
unordered_map<UniSetTypes::KeyType, VoidMessage> smap;
unordered_map<int, VoidMessage> tmap;
unordered_map<int, VoidMessage> sysmap;
std::map<CInfo, VoidMessage> cmap;
list<VoidMessage> lstOther;
unordered_map<UniSetTypes::KeyType, VoidMessagePtr> smap;
unordered_map<int, VoidMessagePtr> tmap;
unordered_map<int, VoidMessagePtr> sysmap;
std::map<CInfo, VoidMessagePtr> cmap;
list<VoidMessagePtr> lstOther;
};
void UniSetObject::cleanMsgQueue( MessagesQueue& q )
......@@ -531,14 +532,14 @@ void UniSetObject::cleanMsgQueue( MessagesQueue& q )
while( !q.empty() )
{
m = q.top();
auto m = q.top();
q.pop();
switch( m.type )
switch( m->type )
{
case Message::SensorInfo:
{
SensorMessage sm(&m);
SensorMessage sm(m.get());
UniSetTypes::KeyType k(key(sm.id, sm.node));
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых Key
......@@ -548,7 +549,7 @@ void UniSetObject::cleanMsgQueue( MessagesQueue& q )
case Message::Timer:
{
TimerMessage tm(&m);
TimerMessage tm(m.get());
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых TimerId
consumermap[tm.consumer].tmap[tm.id] = m;
......@@ -557,14 +558,14 @@ void UniSetObject::cleanMsgQueue( MessagesQueue& q )
case Message::SysCommand:
{
SystemMessage sm(&m);
SystemMessage sm(m.get());
consumermap[sm.consumer].sysmap[sm.command] = m;
}
break;
case Message::Confirm:
{
ConfirmMessage cm(&m);
ConfirmMessage cm(m.get());
CInfo ci(cm);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo
......@@ -578,7 +579,7 @@ void UniSetObject::cleanMsgQueue( MessagesQueue& q )
default:
// сразу помещаем в очередь
consumermap[m.consumer].lstOther.push_front(m);
consumermap[m->consumer].lstOther.push_front(m);
break;
}
......@@ -865,8 +866,9 @@ void UniSetObject::callback()
// заказа продолжит спать(т.е. обработчик вызван не будет)...
try
{
if( waitMessage(msg, sleepTime) )
processingMessage(&msg);
auto m = waitMessage(sleepTime);
if( m )
processingMessage(m.get());
sleepTime = checkTimers(this);
}
......@@ -984,18 +986,18 @@ ostream& operator<<(ostream& os, UniSetObject& obj )
}
// ------------------------------------------------------------------------------------------
bool UniSetObject::PriorVMsgCompare::operator()(const UniSetTypes::VoidMessage& lhs,
const UniSetTypes::VoidMessage& rhs) const
bool UniSetObject::PriorVMsgCompare::operator()(const VoidMessagePtr& lhs,
const VoidMessagePtr& rhs) const
{
if( lhs.priority == rhs.priority )
if( lhs->priority == rhs->priority )
{
if( lhs.tm.tv_sec == rhs.tm.tv_sec )
return lhs.tm.tv_usec >= rhs.tm.tv_usec;
if( lhs->tm.tv_sec == rhs->tm.tv_sec )
return lhs->tm.tv_usec >= rhs->tm.tv_usec;
return lhs.tm.tv_sec >= rhs.tm.tv_sec;
return lhs->tm.tv_sec >= rhs->tm.tv_sec;
}
return lhs.priority < rhs.priority;
return lhs->priority < rhs->priority;
}
// ------------------------------------------------------------------------------------------
#undef CREATE_TIMER
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment