Commit 3a9c97ce authored by Pavel Vainerman's avatar Pavel Vainerman

(UniSetObject):

Исправил ошибку в функции чистки очереди сообщений в случае переполнения. Не учитывалось, что может быть разный consumer (актуально для ProxyManager). Помимо этого вынес функцию инициализации параметров в отдельную функицю (теперь размер очереди сообщений, timeout-ы для mutex-ов задавать параметрами командной строки).
parent d1b0b060
...@@ -215,6 +215,8 @@ class UniSetObject: ...@@ -215,6 +215,8 @@ class UniSetObject:
/* удаление ссылки из репозитория объектов */ /* удаление ссылки из репозитория объектов */
void unregister(); void unregister();
void init_object();
pid_t msgpid; // pid потока обработки сообщений pid_t msgpid; // pid потока обработки сообщений
bool reg; bool reg;
bool active; bool active;
......
...@@ -63,17 +63,7 @@ stCountOfQueueFull(0) ...@@ -63,17 +63,7 @@ stCountOfQueueFull(0)
tmr = CREATE_TIMER; tmr = CREATE_TIMER;
myname = "noname"; myname = "noname";
section = "nonameSection"; section = "nonameSection";
init_object();
SizeOfMessageQueue = conf->getPIntField("SizeOfMessageQueue", 1000);
MaxCountRemoveOfMessage = conf->getIntField("MaxCountRemoveOfMessage");
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = SizeOfMessageQueue / 4;
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = 10;
recvMutexTimeout = conf->getPIntField("RecvMutexTimeout", 10000);
pushMutexTimeout = conf->getPIntField("PushMutexTimeout", 9000);
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
UniSetObject::UniSetObject( ObjectId id ): UniSetObject::UniSetObject( ObjectId id ):
...@@ -92,14 +82,6 @@ MaxCountRemoveOfMessage(10), ...@@ -92,14 +82,6 @@ MaxCountRemoveOfMessage(10),
stMaxQueueMessages(0), stMaxQueueMessages(0),
stCountOfQueueFull(0) stCountOfQueueFull(0)
{ {
SizeOfMessageQueue = conf->getPIntField("SizeOfMessageQueue", 1000);
MaxCountRemoveOfMessage = conf->getIntField("MaxCountRemoveOfMessage");
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = SizeOfMessageQueue / 4;
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = 10;
tmr = CREATE_TIMER; tmr = CREATE_TIMER;
if (myid >=0) if (myid >=0)
{ {
...@@ -110,13 +92,12 @@ stCountOfQueueFull(0) ...@@ -110,13 +92,12 @@ stCountOfQueueFull(0)
else else
{ {
threadcreate = false; threadcreate = false;
myid=UniSetTypes::DefaultObjectId; myid = UniSetTypes::DefaultObjectId;
myname = "noname"; myname = "noname";
section = "nonameSection"; section = "nonameSection";
} }
recvMutexTimeout = conf->getPIntField("RecvMutexTimeout", 10000);
pushMutexTimeout = conf->getPIntField("PushMutexTimeout", 9000);
init_object();
} }
...@@ -145,17 +126,8 @@ stCountOfQueueFull(0) ...@@ -145,17 +126,8 @@ stCountOfQueueFull(0)
unideb[Debug::WARN] << "name: my ID not found!" << endl; unideb[Debug::WARN] << "name: my ID not found!" << endl;
throw Exception(name+": my ID not found!"); throw Exception(name+": my ID not found!");
} }
SizeOfMessageQueue = conf->getPIntField("SizeOfMessageQueue", 1000);
MaxCountRemoveOfMessage = conf->getIntField("MaxCountRemoveOfMessage");
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = SizeOfMessageQueue / 4;
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = 10;
recvMutexTimeout = conf->getPIntField("RecvMutexTimeout", 10000);
pushMutexTimeout = conf->getPIntField("PushMutexTimeout", 9000);
init_object();
ui.initBackId(myid); ui.initBackId(myid);
} }
...@@ -168,6 +140,30 @@ UniSetObject::~UniSetObject() ...@@ -168,6 +140,30 @@ UniSetObject::~UniSetObject()
delete thr; delete thr;
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::init_object()
{
SizeOfMessageQueue = conf->getArgPInt("--uniset-object-size-message-queue",conf->getField("SizeOfMessageQueue"), 1000);
MaxCountRemoveOfMessage = conf->getArgInt("--uniset-object-maxcount-remove-message",conf->getField("MaxCountRemoveOfMessage"));
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = SizeOfMessageQueue / 4;
if( MaxCountRemoveOfMessage <= 0 )
MaxCountRemoveOfMessage = 10;
recvMutexTimeout = conf->getArgPInt("--uniset-object-receive-mutex-timeout",conf->getField("RecvMutexTimeout"), 10000);
pushMutexTimeout = conf->getArgPInt("--uniset-object-push-mutex-timeout",conf->getField("PushMutexTimeout"), 9000);
if( unideb.debugging(Debug::INFO) )
{
unideb[Debug::INFO] << myname << "(init): SizeOfMessageQueue=" << SizeOfMessageQueue
<< " MaxCountRemoveOfMessage=" << MaxCountRemoveOfMessage
<< " recvMutexTimeout=" << recvMutexTimeout
<< " pushMutexTimeout=" << pushMutexTimeout
<< endl;
}
}
// ------------------------------------------------------------------------------------------
/*! /*!
* \param om - указазтель на менджер управляющий объектом * \param om - указазтель на менджер управляющий объектом
* \return Возращает \a true если инициализация прошда успешно, и \a false если нет * \return Возращает \a true если инициализация прошда успешно, и \a false если нет
...@@ -486,183 +482,197 @@ void UniSetObject::push(const TransportMessage& tm) ...@@ -486,183 +482,197 @@ void UniSetObject::push(const TransportMessage& tm)
termWaiting(); termWaiting();
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
void UniSetObject::cleanMsgQueue( MessagesQueue& q ) struct tmpConsumerInfo
{ {
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): msg queue cleaning..." << endl << flush; tmpConsumerInfo(){}
// проходим по всем известным нам типам(базовым)
// ищем все совпадающие сообщения и оставляем только последние...
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): current size of queue: " << q.size() << endl << flush;
VoidMessage m;
map<UniSetTypes::KeyType,VoidMessage> smap; map<UniSetTypes::KeyType,VoidMessage> smap;
map<int,VoidMessage> tmap; map<int,VoidMessage> tmap;
map<int,VoidMessage> sysmap; map<int,VoidMessage> sysmap;
list<VoidMessage> lstOther;
map<MsgInfo,VoidMessage> amap; map<MsgInfo,VoidMessage> amap;
map<MsgInfo,VoidMessage> imap; map<MsgInfo,VoidMessage> imap;
map<MsgInfo,VoidMessage> cmap; map<MsgInfo,VoidMessage> cmap;
list<VoidMessage> lstOther;
};
void UniSetObject::cleanMsgQueue( MessagesQueue& q )
{
if( unideb.debugging(Debug::CRIT) )
{
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): msg queue cleaning..." << endl << flush;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): current size of queue: " << q.size() << endl << flush;
}
// проходим по всем известным нам типам(базовым)
// ищем все совпадающие сообщения и оставляем только последние...
VoidMessage m;
map<UniSetTypes::ObjectId,tmpConsumerInfo> consumermap;
// while( receiveMessage(vm) ); // while( receiveMessage(vm) );
// while нельзя использовать потому-что, из параллельного потока // while нельзя использовать потому-что, из параллельного потока
// могут запихивать в очередь ещё сообщения.. И это цикл никогда не прервётся... // могут запихивать в очередь ещё сообщения.. И это цикл никогда не прервётся...
while( !q.empty() ) while( !q.empty() )
{ {
m = q.top(); m = q.top();
q.pop(); q.pop();
switch( m.type ) switch( m.type )
{
case Message::SensorInfo:
{ {
case Message::SensorInfo: SensorMessage sm(&m);
{ UniSetTypes::KeyType k(key(sm.id,sm.node));
SensorMessage sm(&m); // т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
UniSetTypes::KeyType k(key(sm.id,sm.node)); // то достаточно просто сохранять последнее сообщение для одинаковых Key
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п. consumermap[sm.consumer].smap[k] = m;
// то достаточно просто сохранять последнее сообщение для одинаковых Key }
smap[k] = m; break;
}
break;
case Message::Timer: case Message::Timer:
{ {
TimerMessage tm(&m); TimerMessage tm(&m);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п. // т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых TimerId // то достаточно просто сохранять последнее сообщение для одинаковых TimerId
tmap[tm.id] = m; consumermap[tm.consumer].tmap[tm.id] = m;
} }
break; break;
case Message::SysCommand: case Message::SysCommand:
{ {
SystemMessage sm(&m); SystemMessage sm(&m);
sysmap[sm.command] = m; consumermap[sm.consumer].sysmap[sm.command] = m;
} }
break; break;
case Message::Alarm: case Message::Alarm:
{ {
AlarmMessage am(&m); AlarmMessage am(&m);
MsgInfo mi(am); MsgInfo mi(am);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п. // т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo // то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo
amap[mi] = m; consumermap[am.consumer].amap[mi] = m;
} }
break; break;
case Message::Info: case Message::Info:
{ {
InfoMessage im(&m); InfoMessage im(&m);
MsgInfo mi(im); MsgInfo mi(im);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п. // т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo // то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo
imap[mi] = m; consumermap[im.consumer].imap[mi] = m;
} }
break; break;
case Message::Confirm: case Message::Confirm:
{ {
ConfirmMessage cm(&m); ConfirmMessage cm(&m);
MsgInfo mi(cm); MsgInfo mi(cm);
// т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п. // т.к. из очереди сообщений сперва вынимаются самые старые, потом свежее и т.п.
// то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo // то достаточно просто сохранять последнее сообщение для одинаковых MsgInfo
cmap[mi] = m; consumermap[cm.consumer].cmap[mi] = m;
} }
break; break;
case Message::Unused: case Message::Unused:
// просто выкидываем (игнорируем) // просто выкидываем (игнорируем)
break; break;
default: default:
// сразу пизаем // сразу пизаем
lstOther.push_front(m); consumermap[m.consumer].lstOther.push_front(m);
break; break;
} }
} }
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): ******** cleanup RESULT ********" << endl;
for( map<UniSetTypes::ObjectId,tmpConsumerInfo>::iterator it0 = consumermap.begin();
it0!=consumermap.end(); ++it0 )
{
if( unideb.debugging(Debug::CRIT) ) if( unideb.debugging(Debug::CRIT) )
{ {
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean SensorMessage: " << smap.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): CONSUMER=" << it0->first << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean TimerMessage: " << tmap.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean SensorMessage: " << it0->second.smap.size() << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean SystemMessage: " << sysmap.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean TimerMessage: " << it0->second.tmap.size() << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean AlarmMessage: " << amap.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean SystemMessage: " << it0->second.sysmap.size() << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean InfoMessage: " << imap.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean AlarmMessage: " << it0->second.amap.size() << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean ConfirmMessage: " << cmap.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean InfoMessage: " << it0->second.imap.size() << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean other: " << lstOther.size() << endl; unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean ConfirmMessage: " << it0->second.cmap.size() << endl;
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): after clean other: " << it0->second.lstOther.size() << endl;
} }
// теперь ОСТАВШИЕСЯ запихиваем обратно в очередь... // теперь ОСТАВШИЕСЯ запихиваем обратно в очередь...
map<UniSetTypes::KeyType,VoidMessage>::iterator it=it0->second.smap.begin();
map<UniSetTypes::KeyType,VoidMessage>::iterator it=smap.begin(); for( ; it!=it0->second.smap.end(); ++it )
for( ; it!=smap.end(); ++it )
{ {
q.push(it->second); q.push(it->second);
} }
map<int,VoidMessage>::iterator it1=tmap.begin(); map<int,VoidMessage>::iterator it1=it0->second.tmap.begin();
for( ; it1!=tmap.end(); ++it1 ) for( ; it1!=it0->second.tmap.end(); ++it1 )
{ {
q.push(it1->second); q.push(it1->second);
} }
map<int,VoidMessage>::iterator it2=sysmap.begin(); map<int,VoidMessage>::iterator it2=it0->second.sysmap.begin();
for( ; it2!=sysmap.end(); ++it2 ) for( ; it2!=it0->second.sysmap.end(); ++it2 )
{ {
q.push(it2->second); q.push(it2->second);
} }
map<MsgInfo,VoidMessage>::iterator it3=amap.begin(); map<MsgInfo,VoidMessage>::iterator it3=it0->second.amap.begin();
for( ; it3!=amap.end(); ++it3 ) for( ; it3!=it0->second.amap.end(); ++it3 )
{ {
q.push(it3->second); q.push(it3->second);
} }
map<MsgInfo,VoidMessage>::iterator it4=imap.begin(); map<MsgInfo,VoidMessage>::iterator it4=it0->second.imap.begin();
for( ; it4!=imap.end(); ++it4 ) for( ; it4!=it0->second.imap.end(); ++it4 )
{ {
q.push(it4->second); q.push(it4->second);
} }
map<MsgInfo,VoidMessage>::iterator it5=cmap.begin(); map<MsgInfo,VoidMessage>::iterator it5=it0->second.cmap.begin();
for( ; it5!=cmap.end(); ++it5 ) for( ; it5!=it0->second.cmap.end(); ++it5 )
{ {
q.push(it5->second); q.push(it5->second);
} }
list<VoidMessage>::iterator it6=lstOther.begin(); list<VoidMessage>::iterator it6=it0->second.lstOther.begin();
for( ; it6!=lstOther.end(); ++it6 ) for( ; it6!=it0->second.lstOther.end(); ++it6 )
{
q.push(*it6); q.push(*it6);
} }
if( unideb.debugging(Debug::CRIT) )
{
unideb[Debug::CRIT] << myname
<< "(cleanMsgQueue): ******* result size of queue: "
<< q.size()
<< " < " << getMaxSizeOfMessageQueue() << endl;
}
if( q.size() >= getMaxSizeOfMessageQueue() )
{
if( unideb.debugging(Debug::CRIT) ) if( unideb.debugging(Debug::CRIT) )
{ {
unideb[Debug::CRIT] << myname unideb[Debug::CRIT] << myname << "(cleanMsgQueue): clean failed. size > " << q.size() << endl;
<< "(cleanMsgQueue): result size of queue: " unideb[Debug::CRIT] << myname << "(cleanMsgQueue): remove " << getMaxCountRemoveOfMessage() << " old messages " << endl;
<< q.size()
<< " < " << getMaxSizeOfMessageQueue() << endl;
} }
for( unsigned int i=0; i<getMaxCountRemoveOfMessage(); i++ )
if( q.size() >= getMaxSizeOfMessageQueue() )
{ {
if( unideb.debugging(Debug::CRIT) ) q.top();
{ q.pop();
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): clean failed. size > " << q.size() << endl; if( q.empty() )
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): remove " << getMaxCountRemoveOfMessage() << " old messages " << endl; break;
}
for( unsigned int i=0; i<getMaxCountRemoveOfMessage(); i++ )
{
q.top();
q.pop();
if( q.empty() )
break;
}
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): result size=" << q.size() << endl;
} }
if( unideb.debugging(Debug::CRIT) )
unideb[Debug::CRIT] << myname << "(cleanMsgQueue): result size=" << q.size() << endl;
}
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
unsigned int UniSetObject::countMessages() unsigned int UniSetObject::countMessages()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment