Commit 646bf7b9 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNet): небольшая оптимизация: сделал mutex более гранулярным

(mutex на сообщение, а не на mypacks)
parent c751a705
...@@ -47,7 +47,7 @@ ${START} -f ./uniset2-smemory-plus --smemory-id SharedMemory --confile test.xml ...@@ -47,7 +47,7 @@ ${START} -f ./uniset2-smemory-plus --smemory-id SharedMemory --confile test.xml
--ulog-add-levels system \ --ulog-add-levels system \
--add-unet \ --add-unet \
--unet-name UNetExchange --unet-run-logserver \ --unet-name UNetExchange --unet-run-logserver \
--unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 --unet-sendpause 1000 --unet-filter-field rs --unet-filter-value 2 --unet-maxdifferense 40 --unet-sendpause 1000 \
$* $*
# --add-rtu \ # --add-rtu \
# --rs-dev /dev/cbsideA1 \ # --rs-dev /dev/cbsideA1 \
......
...@@ -92,6 +92,12 @@ namespace UniSetUDP ...@@ -92,6 +92,12 @@ namespace UniSetUDP
{ {
UDPMessage(); UDPMessage();
UDPMessage(UDPMessage&& m) = default;
UDPMessage& operator=(UDPMessage&&) = default;
UDPMessage( const UDPMessage& m ) = default;
UDPMessage& operator=(const UDPMessage&) = default;
explicit UDPMessage( UDPPacket& p ); explicit UDPMessage( UDPPacket& p );
size_t transport_msg( UDPPacket& p ); size_t transport_msg( UDPPacket& p );
static size_t getMessage( UDPMessage& m, UDPPacket& p ); static size_t getMessage( UDPMessage& m, UDPPacket& p );
......
...@@ -40,7 +40,6 @@ UNetSender::UNetSender(const std::string& _host, const int _port, const std::sha ...@@ -40,7 +40,6 @@ UNetSender::UNetSender(const std::string& _host, const int _port, const std::sha
packsendpause(5), packsendpause(5),
activated(false), activated(false),
items(100), items(100),
maxItem(0),
packetnum(1), packetnum(1),
lastcrc(0), lastcrc(0),
maxAData(maxACount), maxAData(maxACount),
...@@ -74,10 +73,13 @@ UNetSender::UNetSender(const std::string& _host, const int _port, const std::sha ...@@ -74,10 +73,13 @@ UNetSender::UNetSender(const std::string& _host, const int _port, const std::sha
mypacks[0].resize(1); mypacks[0].resize(1);
packs_anum[0] = 0; packs_anum[0] = 0;
packs_dnum[0] = 0; packs_dnum[0] = 0;
UniSetUDP::UDPMessage& mypack(mypacks[0][0]); auto& mypack(mypacks[0][0]);
// выставляем поля, которые не меняются // выставляем поля, которые не меняются
mypack.nodeID = uniset_conf()->getLocalNode(); {
mypack.procID = shm->ID(); uniset_rwmutex_wrlock l(mypack.mut);
mypack.msg.nodeID = uniset_conf()->getLocalNode();
mypack.msg.procID = shm->ID();
}
// ------------------------------- // -------------------------------
if( shm->isLocalwork() ) if( shm->isLocalwork() )
...@@ -164,13 +166,14 @@ void UNetSender::updateSensor( UniSetTypes::ObjectId id, long value ) ...@@ -164,13 +166,14 @@ void UNetSender::updateSensor( UniSetTypes::ObjectId id, long value )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::updateItem( UItem& it, long value ) void UNetSender::updateItem( UItem& it, long value )
{ {
UniSetTypes::uniset_rwmutex_wrlock l(pack_mutex);
auto& pk = mypacks[it.pack_sendfactor]; auto& pk = mypacks[it.pack_sendfactor];
UniSetUDP::UDPMessage& mypack(pk[it.pack_num]);
auto& mypack(pk[it.pack_num]);
UniSetTypes::uniset_rwmutex_wrlock l(mypack.mut);
if( it.iotype == UniversalIO::DI || it.iotype == UniversalIO::DO ) if( it.iotype == UniversalIO::DI || it.iotype == UniversalIO::DO )
mypack.setDData(it.pack_ind, value); mypack.msg.setDData(it.pack_ind, value);
else if( it.iotype == UniversalIO::AI || it.iotype == UniversalIO::AO ) else if( it.iotype == UniversalIO::AI || it.iotype == UniversalIO::AO )
mypack.setAData(it.pack_ind, value); mypack.msg.setAData(it.pack_ind, value);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::setCheckConnectionPause( int msec ) void UNetSender::setCheckConnectionPause( int msec )
...@@ -262,17 +265,17 @@ void UNetSender::send() ...@@ -262,17 +265,17 @@ void UNetSender::send()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// #define UNETUDP_DISABLE_OPTIMIZATION_N1 // #define UNETUDP_DISABLE_OPTIMIZATION_N1
void UNetSender::real_send( UniSetUDP::UDPMessage& mypack ) void UNetSender::real_send( PackMessage& mypack )
{ {
UniSetTypes::uniset_rwmutex_rlock l(pack_mutex); UniSetTypes::uniset_rwmutex_rlock l(mypack.mut);
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1 #ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
mypack.num = packetnum++; mypack.msg.num = packetnum++;
#else #else
uint16_t crc = mypack.getDataCRC(); uint16_t crc = mypack.msg.getDataCRC();
if( crc != lastcrc ) if( crc != lastcrc )
{ {
mypack.num = packetnum++; mypack.msg.num = packetnum++;
lastcrc = crc; lastcrc = crc;
} }
...@@ -286,7 +289,7 @@ void UNetSender::real_send( UniSetUDP::UDPMessage& mypack ) ...@@ -286,7 +289,7 @@ void UNetSender::real_send( UniSetUDP::UDPMessage& mypack )
if( !udp || !udp->poll(writeTimeout * 1000, Poco::Net::Socket::SELECT_WRITE) ) if( !udp || !udp->poll(writeTimeout * 1000, Poco::Net::Socket::SELECT_WRITE) )
return; return;
mypack.transport_msg(s_msg); mypack.msg.transport_msg(s_msg);
try try
{ {
...@@ -380,7 +383,7 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -380,7 +383,7 @@ bool UNetSender::initItem( UniXML::iterator& it )
int priority = it.getPIntProp(prefix + "_sendfactor", 0); int priority = it.getPIntProp(prefix + "_sendfactor", 0);
auto pk = mypacks[priority]; auto& pk = mypacks[priority];
UItem p; UItem p;
p.iotype = UniSetTypes::getIOType(it.getProp("iotype")); p.iotype = UniSetTypes::getIOType(it.getProp("iotype"));
...@@ -401,9 +404,11 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -401,9 +404,11 @@ bool UNetSender::initItem( UniXML::iterator& it )
if( pk.size() <= dnum ) if( pk.size() <= dnum )
pk.resize(dnum + 1); pk.resize(dnum + 1);
UniSetUDP::UDPMessage& mypack(pk[dnum]); auto& mypack(pk[dnum]);
p.pack_ind = mypack.addDData(sid, 0); uniset_rwmutex_wrlock l(mypack.mut);
p.pack_ind = mypack.msg.addDData(sid, 0);
if( p.pack_ind >= maxDData ) if( p.pack_ind >= maxDData )
{ {
...@@ -412,10 +417,11 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -412,10 +417,11 @@ bool UNetSender::initItem( UniXML::iterator& it )
if( dnum >= pk.size() ) if( dnum >= pk.size() )
pk.resize(dnum + 1); pk.resize(dnum + 1);
UniSetUDP::UDPMessage& mypack( pk[dnum] ); auto& mypack2( pk[dnum] );
p.pack_ind = mypack.addDData(sid, 0); uniset_rwmutex_wrlock l2(mypack2.mut);
mypack.nodeID = uniset_conf()->getLocalNode(); p.pack_ind = mypack2.msg.addDData(sid, 0);
mypack.procID = shm->ID(); mypack2.msg.nodeID = uniset_conf()->getLocalNode();
mypack2.msg.procID = shm->ID();
} }
p.pack_num = dnum; p.pack_num = dnum;
...@@ -438,9 +444,9 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -438,9 +444,9 @@ bool UNetSender::initItem( UniXML::iterator& it )
if( pk.size() <= anum ) if( pk.size() <= anum )
pk.resize(anum + 1); pk.resize(anum + 1);
UniSetUDP::UDPMessage& mypack(pk[anum]); auto& mypack(pk[anum]);
uniset_rwmutex_wrlock l(mypack.mut);
p.pack_ind = mypack.addAData(sid, 0); p.pack_ind = mypack.msg.addAData(sid, 0);
if( p.pack_ind >= maxAData ) if( p.pack_ind >= maxAData )
{ {
...@@ -449,10 +455,11 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -449,10 +455,11 @@ bool UNetSender::initItem( UniXML::iterator& it )
if( anum >= pk.size() ) if( anum >= pk.size() )
pk.resize(anum + 1); pk.resize(anum + 1);
UniSetUDP::UDPMessage& mypack(pk[anum]); auto& mypack2(pk[anum]);
p.pack_ind = mypack.addAData(sid, 0); uniset_rwmutex_wrlock l2(mypack2.mut);
mypack.nodeID = uniset_conf()->getLocalNode(); p.pack_ind = mypack2.msg.addAData(sid, 0);
mypack.procID = shm->ID(); mypack2.msg.nodeID = uniset_conf()->getLocalNode();
mypack2.msg.procID = shm->ID();
} }
p.pack_num = anum; p.pack_num = anum;
...@@ -469,12 +476,17 @@ bool UNetSender::initItem( UniXML::iterator& it ) ...@@ -469,12 +476,17 @@ bool UNetSender::initItem( UniXML::iterator& it )
} }
} }
mypacks[priority] = pk;
items[p.id] = p;
maxItem++;
unetinfo << myname << "(initItem): add " << p << endl; unetinfo << myname << "(initItem): add " << p << endl;
auto i = items.find(p.id);
if( i != items.end() )
{
unetcrit << myname
<< "(readItem): Sensor (" << p.id << ")" << sname << " ALREADY ADDED!! ABORT!" << endl;
raise(SIGTERM);
return false;
}
items.emplace(p.id, std::move(p));
return true; return true;
} }
...@@ -511,7 +523,7 @@ const std::string UNetSender::getShortInfo() const ...@@ -511,7 +523,7 @@ const std::string UNetSender::getShortInfo() const
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort() s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort()
<< " lastpacknum=" << packetnum << " lastpacknum=" << packetnum
<< " lastcrc=" << setw(6) << lastcrc << " lastcrc=" << setw(6) << lastcrc
<< " items=" << maxItem << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize() << " items=" << items.size() << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize()
<< endl << endl
<< "\t packs([sendfactor]=num): " << "\t packs([sendfactor]=num): "
<< endl; << endl;
...@@ -522,8 +534,9 @@ const std::string UNetSender::getShortInfo() const ...@@ -522,8 +534,9 @@ const std::string UNetSender::getShortInfo() const
size_t n=0; size_t n=0;
for( const auto& p: i->second ) for( const auto& p: i->second )
{ {
s << " \t\t[" << (n++) << "]=" << p.sizeOf() << " bytes" //uniset_rwmutex_rlock l(p->mut);
<< " ( numA=" << setw(5) << p.asize() << " numD=" << setw(5) << p.dsize() << ")" s << " \t\t[" << (n++) << "]=" << p.msg.sizeOf() << " bytes"
<< " ( numA=" << setw(5) << p.msg.asize() << " numD=" << setw(5) << p.msg.dsize() << ")"
<< endl; << endl;
} }
} }
......
...@@ -41,12 +41,13 @@ ...@@ -41,12 +41,13 @@
* Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете), * Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
* то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него, * то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
* в свою очередь остальные продолжают "добивать" предыдущий пакет. * в свою очередь остальные продолжают "добивать" предыдущий пакет.
* В свою очередь в initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует * В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
* существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование. * существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
* *
* ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetReceiver) сделана следующая логика: * ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
* Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента * Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.. последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
На стороне UNetReceiver пакаеты с повторными номерами (т.е. уже обработанные) - откидываются.
* *
* *
* Создание соединения * Создание соединения
...@@ -87,7 +88,6 @@ class UNetSender ...@@ -87,7 +88,6 @@ class UNetSender
size_t pack_num; size_t pack_num;
size_t pack_ind; size_t pack_ind;
sendfactor_t pack_sendfactor = { 0 }; sendfactor_t pack_sendfactor = { 0 };
friend std::ostream& operator<<( std::ostream& os, UItem& p ); friend std::ostream& operator<<( std::ostream& os, UItem& p );
}; };
...@@ -99,7 +99,19 @@ class UNetSender ...@@ -99,7 +99,19 @@ class UNetSender
void stop(); void stop();
void send(); void send();
void real_send(UniSetUDP::UDPMessage& mypack);
struct PackMessage
{
PackMessage( UniSetUDP::UDPMessage&& m ):msg(std::move(m)){}
PackMessage( const UniSetUDP::UDPMessage& m ) = delete;
PackMessage(){}
UniSetUDP::UDPMessage msg;
UniSetTypes::uniset_rwmutex mut;
};
void real_send( PackMessage& mypack );
/*! (принудительно) обновить все данные (из SM) */ /*! (принудительно) обновить все данные (из SM) */
void updateFromSM(); void updateFromSM();
...@@ -184,15 +196,14 @@ class UNetSender ...@@ -184,15 +196,14 @@ class UNetSender
std::atomic_bool activated = { false }; std::atomic_bool activated = { false };
PassiveTimer ptCheckConnection; PassiveTimer ptCheckConnection;
UniSetTypes::uniset_rwmutex pack_mutex; typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
typedef std::unordered_map<sendfactor_t, std::vector<UniSetUDP::UDPMessage>> Packs;
// mypacks заполняется в начале и дальше с ним происходит только чтение
// поэтому mutex-ом его не защищаем
Packs mypacks; Packs mypacks;
std::unordered_map<sendfactor_t, size_t> packs_anum; std::unordered_map<sendfactor_t, size_t> packs_anum;
std::unordered_map<sendfactor_t, size_t> packs_dnum; std::unordered_map<sendfactor_t, size_t> packs_dnum;
UItemMap items; UItemMap items;
size_t maxItem = { 0 };
size_t packetnum = { 1 }; /*!< номер очередного посылаемого пакета */ size_t packetnum = { 1 }; /*!< номер очередного посылаемого пакета */
uint16_t lastcrc = { 0 }; uint16_t lastcrc = { 0 };
UniSetUDP::UDPPacket s_msg; UniSetUDP::UDPPacket s_msg;
......
...@@ -109,6 +109,8 @@ class ThreadCreator: ...@@ -109,6 +109,8 @@ class ThreadCreator:
void stop(); void stop();
void start(); void start();
void sleep( long milliseconds );
inline bool isRunning() inline bool isRunning()
{ {
return thr.isRunning(); return thr.isRunning();
...@@ -200,6 +202,12 @@ void ThreadCreator<ThreadMaster>::start() ...@@ -200,6 +202,12 @@ void ThreadCreator<ThreadMaster>::start()
} }
//---------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------
template <class ThreadMaster> template <class ThreadMaster>
void ThreadCreator<ThreadMaster>::sleep( long milliseconds )
{
thr.sleep(milliseconds);
}
//----------------------------------------------------------------------------------------
template <class ThreadMaster>
ThreadCreator<ThreadMaster>::ThreadCreator(): ThreadCreator<ThreadMaster>::ThreadCreator():
m(0), m(0),
act(0), act(0),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment