Commit 8136d52b authored by Pavel Vainerman's avatar Pavel Vainerman

backported to p9 as 2.8.1-alt0.M90P.1 (with rpmbph script)

parents a46c3b60 69525fbf
name: C/C++ CI
on:
push:
branches: [ master, github-actions ]
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: install packages
run: |
sudo apt-get install libcomedi-dev libpoco-dev libmysqlclient-dev libomniorb4-dev libev-dev omniidl xsltproc libpqxx3-dev librrd-dev libsigc++-2.0-dev libsqlite3-dev python-dev libmosquittopp-dev
wget https://github.com/catchorg/Catch2/releases/download/v1.11.0/catch.hpp -O include/catch.hpp
- name: configure
run: |
export CXXFLAGS='-pipe -O2 -pedantic -Wall'
# due broken comedi
export CXXFLAGS="$CXXFLAGS -Wl,--unresolved-symbols=ignore-in-shared-libs"
autoreconf -fiv
./configure --enable-mysql --enable-sqlite --enable-rrd --enable-io --disable-python --disable-mqtt --disable-pgsql --disable-api --disable-netdata --disable-logdb
- name: make
run: make
- name: simple tests
run: |
cd tests;
make check
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
# ******** NOTE ********
name: "CodeQL"
on:
push:
branches: [ master, github-actions ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '44 6 * * 4'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
language: [ 'cpp' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: Checkout repository
uses: actions/checkout@v2
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
- name: install packages
run: |
sudo apt-get install libcomedi-dev libpoco-dev libmysqlclient-dev libomniorb4-dev libev-dev omniidl xsltproc libpqxx3-dev librrd-dev libsigc++-2.0-dev libsqlite3-dev python-dev libmosquittopp-dev
wget https://github.com/catchorg/Catch2/releases/download/v1.11.0/catch.hpp -O include/catch.hpp
- name: build
run: |
export CXXFLAGS='-pipe -O2 -pedantic -Wall'
# due broken comedi
export CXXFLAGS="$CXXFLAGS -Wl,--unresolved-symbols=ignore-in-shared-libs"
autoreconf -fiv
./configure --enable-mysql --enable-sqlite --enable-rrd --enable-io --disable-python --disable-mqtt --disable-pgsql --disable-api --disable-netdata --disable-logdb
make
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
%define oname uniset2 %define oname uniset2
Name: libuniset2 Name: libuniset2
Version: 2.8 Version: 2.8.1
Release: alt14.M90P.15 Release: alt0.M90P.1
Summary: UniSet - library for building distributed industrial control systems Summary: UniSet - library for building distributed industrial control systems
License: LGPL License: LGPL
...@@ -546,9 +546,12 @@ rm -f %buildroot%_docdir/%oname/html/*.md5 ...@@ -546,9 +546,12 @@ rm -f %buildroot%_docdir/%oname/html/*.md5
# history of current unpublished changes # history of current unpublished changes
%changelog %changelog
* Sun Oct 25 2020 Pavel Vainerman <pv@altlinux.ru> 2.8-alt14.M90P.15 * Sat Dec 05 2020 Pavel Vainerman <pv@altlinux.ru> 2.8.1-alt0.M90P.1
- backport to ALTLinux p9 (by rpmbph script) - backport to ALTLinux p9 (by rpmbph script)
* Sat Dec 05 2020 Pavel Vainerman <pv@altlinux.ru> 2.8.1-alt1
- (unet): unet recevier refactoring (optimization)
* Sun Oct 25 2020 Pavel Vainerman <pv@altlinux.ru> 2.8-alt15 * Sun Oct 25 2020 Pavel Vainerman <pv@altlinux.ru> 2.8-alt15
- minor fixes - minor fixes
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# See doc: http://www.gnu.org/software/hello/manual/autoconf/Generic-Programs.html # See doc: http://www.gnu.org/software/hello/manual/autoconf/Generic-Programs.html
# AC_PREREQ(2.59) # AC_PREREQ(2.59)
AC_INIT([uniset2], [2.8.0], pv@etersoft.ru) AC_INIT([uniset2], [2.8.1], pv@etersoft.ru)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME,AC_PACKAGE_VERSION) AM_INIT_AUTOMAKE(AC_PACKAGE_NAME,AC_PACKAGE_VERSION)
LIBVER=2:8:0 LIBVER=2:8:0
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
// myvar = LE_TO_H(myvar) // myvar = LE_TO_H(myvar)
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#if __BYTE_ORDER == __LITTLE_ENDIAN #if __BYTE_ORDER == __LITTLE_ENDIAN
static bool HostIsBigEndian = false;
#define LE_TO_H(x) {} #define LE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX #elif INTPTR_MAX == INT64_MAX
#define LE_TO_H(x) x = le64toh(x) #define LE_TO_H(x) x = le64toh(x)
...@@ -33,6 +34,7 @@ ...@@ -33,6 +34,7 @@
#endif #endif
#if __BYTE_ORDER == __BIG_ENDIAN #if __BYTE_ORDER == __BIG_ENDIAN
static bool HostIsBigEndian = true;
#define BE_TO_H(x) {} #define BE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX #elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x) #define BE_TO_H(x) x = be64toh(x)
...@@ -253,7 +255,7 @@ namespace uniset ...@@ -253,7 +255,7 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::transport_msg( UDPPacket& p ) const noexcept size_t UDPMessage::transport_msg( UDPPacket& p ) const noexcept
{ {
memset(&p, 0, sizeof(UDPPacket)); p = UDPPacket{}; // reset data
size_t i = 0; size_t i = 0;
memcpy(&(p.data[i]), this, sizeof(UDPHeader)); memcpy(&(p.data[i]), this, sizeof(UDPHeader));
...@@ -311,7 +313,8 @@ namespace uniset ...@@ -311,7 +313,8 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p ) noexcept size_t UDPMessage::getMessage( UDPMessage& m, UDPPacket& p ) noexcept
{ {
memset(&m, 0, sizeof(m)); // reset data
m = UDPMessage{};
size_t i = 0; size_t i = 0;
memcpy(&m, &(p.data[i]), sizeof(UDPHeader)); memcpy(&m, &(p.data[i]), sizeof(UDPHeader));
...@@ -320,7 +323,7 @@ namespace uniset ...@@ -320,7 +323,7 @@ namespace uniset
// byte order from packet // byte order from packet
u_char be_order = m._be_order; u_char be_order = m._be_order;
if( be_order ) if( be_order && !HostIsBigEndian )
{ {
BE_TO_H(m.magic); BE_TO_H(m.magic);
BE_TO_H(m.num); BE_TO_H(m.num);
...@@ -329,7 +332,7 @@ namespace uniset ...@@ -329,7 +332,7 @@ namespace uniset
BE_TO_H(m.dcount); BE_TO_H(m.dcount);
BE_TO_H(m.acount); BE_TO_H(m.acount);
} }
else else if( !be_order && HostIsBigEndian )
{ {
LE_TO_H(m.magic); LE_TO_H(m.magic);
LE_TO_H(m.num); LE_TO_H(m.num);
...@@ -385,29 +388,32 @@ namespace uniset ...@@ -385,29 +388,32 @@ namespace uniset
// CONVERT DATA TO HOST BYTE ORDER // CONVERT DATA TO HOST BYTE ORDER
// ------------------------------- // -------------------------------
for( size_t n = 0; n < m.acount; n++ ) if( (be_order && !HostIsBigEndian) || (!be_order && HostIsBigEndian) )
{ {
if( be_order ) for( size_t n = 0; n < m.acount; n++ )
{
BE_TO_H(m.a_dat[n].id);
BE_TO_H(m.a_dat[n].val);
}
else
{ {
LE_TO_H(m.a_dat[n].id); if( be_order )
LE_TO_H(m.a_dat[n].val); {
BE_TO_H(m.a_dat[n].id);
BE_TO_H(m.a_dat[n].val);
}
else
{
LE_TO_H(m.a_dat[n].id);
LE_TO_H(m.a_dat[n].val);
}
} }
}
for( size_t n = 0; n < m.dcount; n++ ) for( size_t n = 0; n < m.dcount; n++ )
{
if( be_order )
{
BE_TO_H(m.d_id[n]);
}
else
{ {
LE_TO_H(m.d_id[n]); if( be_order )
{
BE_TO_H(m.d_id[n]);
}
else
{
LE_TO_H(m.d_id[n]);
}
} }
} }
......
...@@ -77,7 +77,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -77,7 +77,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int recvTimeout = conf->getArgPInt("--" + prefix + "-recv-timeout", it.getProp("recvTimeout"), 5000); int recvTimeout = conf->getArgPInt("--" + prefix + "-recv-timeout", it.getProp("recvTimeout"), 5000);
int prepareTime = conf->getArgPInt("--" + prefix + "-prepare-time", it.getProp("prepareTime"), 2000); int prepareTime = conf->getArgPInt("--" + prefix + "-prepare-time", it.getProp("prepareTime"), 2000);
int evrunTimeout = conf->getArgPInt("--" + prefix + "-evrun-timeout", it.getProp("evrunTimeout"), 60000); int evrunTimeout = conf->getArgPInt("--" + prefix + "-evrun-timeout", it.getProp("evrunTimeout"), 60000);
int recvpause = conf->getArgPInt("--" + prefix + "-recvpause", it.getProp("recvpause"), 10);
int sendpause = conf->getArgPInt("--" + prefix + "-sendpause", it.getProp("sendpause"), 100); int sendpause = conf->getArgPInt("--" + prefix + "-sendpause", it.getProp("sendpause"), 100);
int packsendpause = conf->getArgPInt("--" + prefix + "-packsendpause", it.getProp("packsendpause"), 5); int packsendpause = conf->getArgPInt("--" + prefix + "-packsendpause", it.getProp("packsendpause"), 5);
int packsendpauseFactor = conf->getArgPInt("--" + prefix + "-packsendpause-factor", it.getProp("packsendpauseFactor"), 0); int packsendpauseFactor = conf->getArgPInt("--" + prefix + "-packsendpause-factor", it.getProp("packsendpauseFactor"), 0);
...@@ -85,9 +84,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -85,9 +84,9 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout", it.getProp("lostTimeout"), 2 * updatepause); int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout", it.getProp("lostTimeout"), 2 * updatepause);
steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000); steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100); int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100);
int maxProcessingCount = conf->getArgPInt("--" + prefix + "-maxprocessingcount", it.getProp("maxProcessingCount"), 100);
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000); int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000); int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
int recvBufferSize = conf->getArgPInt("--" + prefix + "-recv-buffer-size", it.getProp("recvBufferSize"), 3000);
std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop"); std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop");
...@@ -334,16 +333,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -334,16 +333,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
} }
} }
UNetReceiver::UpdateStrategy r_upStrategy = UNetReceiver::strToUpdateStrategy( n_it.getProp2("unet_update_strategy", updateStrategy) );
if( r_upStrategy == UNetReceiver::useUpdateUnknown )
{
ostringstream err;
err << myname << ": Unknown update strategy!!! '" << n_it.getProp2("unet_update_strategy", updateStrategy) << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver " unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl; << h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix); auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix);
...@@ -358,16 +347,14 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -358,16 +347,14 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r->setPrepareTime(prepareTime); r->setPrepareTime(prepareTime);
r->setEvrunTimeout(evrunTimeout); r->setEvrunTimeout(evrunTimeout);
r->setLostTimeout(lostTimeout); r->setLostTimeout(lostTimeout);
r->setReceivePause(recvpause);
r->setUpdatePause(updatepause); r->setUpdatePause(updatepause);
r->setCheckConnectionPause(checkConnectionPause); r->setCheckConnectionPause(checkConnectionPause);
r->setInitPause(initpause); r->setInitPause(initpause);
r->setMaxDifferens(maxDiff); r->setMaxDifferens(maxDiff);
r->setMaxProcessingCount(maxProcessingCount);
r->setRespondID(resp_id, resp_invert); r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id); r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) ); r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r->setUpdateStrategy(r_upStrategy); r->setBufferSize(recvBufferSize);
shared_ptr<UNetReceiver> r2(nullptr); shared_ptr<UNetReceiver> r2(nullptr);
...@@ -389,16 +376,14 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const ...@@ -389,16 +376,14 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
r2->setPrepareTime(prepareTime); r2->setPrepareTime(prepareTime);
r2->setEvrunTimeout(evrunTimeout); r2->setEvrunTimeout(evrunTimeout);
r2->setLostTimeout(lostTimeout); r2->setLostTimeout(lostTimeout);
r2->setReceivePause(recvpause);
r2->setUpdatePause(updatepause); r2->setUpdatePause(updatepause);
r2->setCheckConnectionPause(checkConnectionPause); r2->setCheckConnectionPause(checkConnectionPause);
r2->setInitPause(initpause); r2->setInitPause(initpause);
r2->setMaxDifferens(maxDiff); r2->setMaxDifferens(maxDiff);
r2->setMaxProcessingCount(maxProcessingCount);
r2->setRespondID(resp2_id, resp_invert); r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id); r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) ); r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r2->setUpdateStrategy(r_upStrategy); r2->setBufferSize(recvBufferSize);
} }
} }
catch(...) catch(...)
...@@ -550,7 +535,7 @@ void UNetExchange::step() noexcept ...@@ -550,7 +535,7 @@ void UNetExchange::step() noexcept
} }
} }
for( auto && it : recvlist ) for( auto&& it : recvlist )
it.step(shm, myname, unetlog); it.step(shm, myname, unetlog);
} }
...@@ -758,10 +743,10 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd ) ...@@ -758,10 +743,10 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
void UNetExchange::sensorInfo( const uniset::SensorMessage* sm ) void UNetExchange::sensorInfo( const uniset::SensorMessage* sm )
{ {
if( sender ) if( sender )
sender->updateSensor( sm->id , sm->value ); sender->updateSensor( sm->id, sm->value );
if( sender2 ) if( sender2 )
sender2->updateSensor( sm->id , sm->value ); sender2->updateSensor( sm->id, sm->value );
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject() bool UNetExchange::activateObject()
...@@ -842,7 +827,7 @@ void UNetExchange::initIterators() noexcept ...@@ -842,7 +827,7 @@ void UNetExchange::initIterators() noexcept
if( sender2 ) if( sender2 )
sender2->initIterators(); sender2->initIterators();
for( auto && it : recvlist ) for( auto&& it : recvlist )
it.initIterators(shm); it.initIterators(shm);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -853,19 +838,13 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept ...@@ -853,19 +838,13 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl; cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--prefix-prepare-time msec - Время необходимое на подготовку (восстановление связи) при переключении на другой канал" << endl; cout << "--prefix-prepare-time msec - Время необходимое на подготовку (восстановление связи) при переключении на другой канал" << endl;
cout << "--prefix-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl; cout << "--prefix-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--prefix-recvpause msec - Пауза между приёмами. По умолчанию 10" << endl;
cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl; cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl;
cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с recvpause и sendpause). По умолчанию 100" << endl; cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с sendpause). По умолчанию 100" << endl;
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl; cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl; cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl; cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-maxprocessingcount num - Максимальное количество пакетов обрабатываемых за один раз (если их слишком много)" << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl; cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-update-strategy [thread,evloop] - Стратегия обновления данных в SM. " << endl; cout << "--prefix-recv-buffer-size sz - Размер циклического буфера для приёма сообщений. По умолчанию: 3000" << endl;
cout << " 'thread' - у каждого UNetReceiver отдельный поток" << endl;
cout << " 'evloop' - используется общий (с приёмом сообщений) event loop" << endl;
cout << " По умолчанию: evloop" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl; cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl; cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl; cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
...@@ -875,7 +854,7 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept ...@@ -875,7 +854,7 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-nodes-filter-value name - Значение фильтрующего поля для списка узлов" << endl; cout << "--prefix-nodes-filter-value name - Значение фильтрующего поля для списка узлов" << endl;
cout << endl; cout << endl;
cout << " Logs: " << endl; cout << " Logs: " << endl;
cout << "--prefix-log-... - log control" << endl; cout << "--prefix-log-... - log control" << endl;
cout << " add-levels ..." << endl; cout << " add-levels ..." << endl;
cout << " del-levels ..." << endl; cout << " del-levels ..." << endl;
cout << " set-levels ..." << endl; cout << " set-levels ..." << endl;
...@@ -917,7 +896,7 @@ std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const ch ...@@ -917,7 +896,7 @@ std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const ch
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept
{ {
for( auto && it : recvlist ) for( auto&& it : recvlist )
{ {
if( it.r1 == r ) if( it.r1 == r )
{ {
......
...@@ -58,7 +58,6 @@ namespace uniset ...@@ -58,7 +58,6 @@ namespace uniset
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
"получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM. "получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM.
При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver). При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver).
или каждый на своём потоке. Это определяется параметром \b unet_update_strategy.
\par \par
При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать", При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать",
...@@ -78,15 +77,13 @@ namespace uniset ...@@ -78,15 +77,13 @@ namespace uniset
... ...
</iocards> </iocards>
</item> </item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001" unet_update_strategy="evloop"/> <item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/> <item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes> </nodes>
\endcode \endcode
* \b unet_update_strategy - задаёт стратегию обновления данных в SM. Буфер для приёма сообщений можно настроить параметром \b recvBufferSize="1000" в конфигурационной секции
Поддерживается два варианта: или аргументом командной строки \b --prefix-recv-buffer-size sz
- 'thread' - отдельный поток обновления
- 'evloop' - использование общего с приёмом event loop (libev)
\note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра \note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра
--prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes> --prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes>
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <sstream> #include <sstream>
#include <cmath>
#include <iomanip> #include <iomanip>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include "unisetstd.h" #include "unisetstd.h"
...@@ -28,24 +29,13 @@ using namespace uniset; ...@@ -28,24 +29,13 @@ using namespace uniset;
using namespace uniset::extensions; using namespace uniset::extensions;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
CommonEventLoop UNetReceiver::loop; CommonEventLoop UNetReceiver::loop;
static UniSetUDP::UDPMessage emptyMessage;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/*
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.num == rhs.num )
// return (lhs < rhs);
return lhs.num > rhs.num;
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver(const std::string& s_host, int _port UNetReceiver::UNetReceiver(const std::string& s_host, int _port
, const std::shared_ptr<SMInterface>& smi , const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection , bool nocheckConnection
, const std::string& prefix ): , const std::string& prefix ):
shm(smi), shm(smi),
recvpause(10),
updatepause(100), updatepause(100),
port(_port), port(_port),
saddr(s_host, _port), saddr(s_host, _port),
...@@ -57,11 +47,8 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port ...@@ -57,11 +47,8 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
respondInvert(false), respondInvert(false),
sidLostPackets(uniset::DefaultObjectId), sidLostPackets(uniset::DefaultObjectId),
activated(false), activated(false),
pnum(0), cbuf(cbufSize),
maxDifferens(20), maxDifferens(20),
waitClean(false),
rnum(0),
maxProcessingCount(100),
lockUpdate(false), lockUpdate(false),
d_cache_init_ok(false), d_cache_init_ok(false),
a_cache_init_ok(false) a_cache_init_ok(false)
...@@ -83,20 +70,27 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port ...@@ -83,20 +70,27 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
auto conf = uniset_conf(); auto conf = uniset_conf();
conf->initLogStream(unetlog, prefix + "-log"); conf->initLogStream(unetlog, prefix + "-log");
upThread = unisetstd::make_unique< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::updateThread);
if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) ) if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this); evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this); evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this); evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this); evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
ptLostTimeout.setTiming(lostTimeout);
ptRecvTimeout.setTiming(recvTimeout);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver() UNetReceiver::~UNetReceiver()
{ {
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setBufferSize( size_t sz ) noexcept
{
cbufSize = sz;
cbuf.resize(sz);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept
{ {
std::lock_guard<std::mutex> l(tmMutex); std::lock_guard<std::mutex> l(tmMutex);
...@@ -124,24 +118,14 @@ void UNetReceiver::setLostTimeout( timeout_t msec ) noexcept ...@@ -124,24 +118,14 @@ void UNetReceiver::setLostTimeout( timeout_t msec ) noexcept
ptLostTimeout.setTiming(msec); ptLostTimeout.setTiming(msec);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setReceivePause( timeout_t msec ) noexcept
{
recvpause = msec;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdatePause( timeout_t msec ) noexcept void UNetReceiver::setUpdatePause( timeout_t msec ) noexcept
{ {
updatepause = msec; updatepause = msec;
if( upStrategy == useUpdateEventLoop && evUpdate.is_active() ) if( evUpdate.is_active() )
evUpdate.start(0, (float)updatepause / 1000.); evUpdate.start(0, (float)updatepause / 1000.);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set ) noexcept
{
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxDifferens( unsigned long set ) noexcept void UNetReceiver::setMaxDifferens( unsigned long set ) noexcept
{ {
maxDifferens = set; maxDifferens = set;
...@@ -216,9 +200,7 @@ bool UNetReceiver::createConnection( bool throwEx ) ...@@ -216,9 +200,7 @@ bool UNetReceiver::createConnection( bool throwEx )
udp = unisetstd::make_unique<UDPReceiveU>(addr, port); udp = unisetstd::make_unique<UDPReceiveU>(addr, port);
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev) udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this); evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( upStrategy == useUpdateEventLoop )
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() ) if( evCheckConnection.is_active() )
evCheckConnection.stop(); evCheckConnection.stop();
...@@ -269,9 +251,6 @@ void UNetReceiver::start() ...@@ -269,9 +251,6 @@ void UNetReceiver::start()
std::terminate(); std::terminate();
return; return;
} }
if( upStrategy == useUpdateThread && !upThread->isRunning() )
upThread->start();
} }
else else
forceUpdate(); forceUpdate();
...@@ -283,12 +262,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept ...@@ -283,12 +262,8 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evStatistic.start(0, 1.0); // раз в сек evStatistic.start(0, 1.0); // раз в сек
evInitPause.set(eloop); evInitPause.set(eloop);
evUpdate.set(eloop);
if( upStrategy == useUpdateEventLoop ) evUpdate.start( 0, ((float)updatepause / 1000.) );
{
evUpdate.set(eloop);
evUpdate.start( 0, ((float)updatepause / 1000.) );
}
if( !udp ) if( !udp )
{ {
...@@ -330,8 +305,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept ...@@ -330,8 +305,7 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept void UNetReceiver::forceUpdate() noexcept
{ {
pack_guard l(packMutex, upStrategy); rnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. update) // и тем самым заставляем обновить данные в SM (см. update)
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -368,211 +342,145 @@ void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept ...@@ -368,211 +342,145 @@ void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept
tmr.stop(); tmr.stop();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::update() noexcept size_t UNetReceiver::rnext( size_t num )
{ {
UniSetUDP::UDPMessage p; UniSetUDP::UDPMessage* p;
// обрабатываем, пока очередь либо не опустеет, size_t i = num + 1;
// либо обнаружится "дырка" в последовательности,
// но при этом обрабатываем не больше maxProcessingCount
// за один раз..
int k = maxProcessingCount;
while( k > 0 ) while( i < wnum )
{ {
{ p = &cbuf[i % cbufSize];
// lock qpack
pack_guard l(packMutex, upStrategy);
if( qpack.empty() ) if( p->num > num )
return; return i;
p = qpack.top(); i++;
size_t sub = labs( (long int)(p.num - pnum) ); }
if( pnum > 0 ) return wnum;
{ }
// если sub > maxDifferens // -----------------------------------------------------------------------------
// значит это просто "разрыв" void UNetReceiver::update() noexcept
// и нам ждать lostTimeout не надо {
// сразу начинаем обрабатывать новые пакеты // ещё не было пакетов
// а если > 1 && < maxDifferens if( wnum == 1 && rnum == 0 )
// значит это временная "дырка" return;
// и надо подождать lostTimeout
// чтобы констатировать потерю пакета..
if( sub > 1 && sub < maxDifferens )
{
// если p.num < pnum, то это какой-то "дубль",
// т.к мы все пакеты <= pnum уже "отработали".
// а значит можно не ждать, а откидывать пакет и
// дальше работать..
if( p.num < pnum )
{
qpack.pop();
continue;
}
if( !ptLostTimeout.checkTime() )
return;
unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p.num << " lost " << sub << " packets " << endl;
lostPackets += sub;
}
else if( p.num == pnum )
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
* для "оптимизации".. лучше игнорировать
*/
qpack.pop(); // пока выбрали вариант "оптимизации" (выкидываем из очереди и идём дальше)
continue;
}
if( sub >= maxDifferens ) UniSetUDP::UDPMessage* p;
{ CacheItem* c_it = nullptr;
// считаем сколько пакетов потеряли.. (pnum=0 - означает что мы только что запустились...) UniSetUDP::UDPAData* dat = nullptr;
if( pnum != 0 && p.num > pnum ) long s_id;
{
lostPackets += sub; // обрабатываем, пока очередь либо не опустеет,
unetwarn << myname << "(update): sub=" << sub << " > maxDifferenst(" << maxDifferens << ")! lost " << sub << " packets " << endl; // либо обнаружится "дырка" в последовательности,
} while( rnum < wnum )
} {
} p = &cbuf[rnum % cbufSize];
ptLostTimeout.reset(); // если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
if( p->num != rnum )
{
if( !ptLostTimeout.checkTime() )
return;
// удаляем из очереди, только если size_t sub = (p->num - rnum);
// всё в порядке с последовательностью.. unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p->num << " lost " << sub << " packets " << endl;
qpack.pop(); lostPackets += sub;
pnum = p.num;
} // unlock qpack
k--; // ищем следующий пакет для обработки
rnum = rnext(rnum);
continue;
}
ptLostTimeout.reset();
rnum++;
upCount++; upCount++;
// cerr << myname << "(update): " << p.msg.header << endl;
initDCache(p, !d_cache_init_ok);
initACache(p, !a_cache_init_ok);
// Обработка дискретных // Обработка дискретных
CacheInfo& d_iv = d_icache_map[p.getDataID()]; auto d_iv = getDCache(p, !d_cache_init_ok);
for( size_t i = 0; i < p.dcount; i++ ) for( size_t i = 0; i < p->dcount; i++ )
{ {
try try
{ {
long id = p.dID(i); s_id = p->dID(i);
bool val = p.dValue(i); c_it = &d_iv.cache[i];
CacheItem& ii(d_iv.cache[i]); if( c_it->id != s_id )
if( ii.id != id )
{ {
unetwarn << myname << "(update): reinit cache for sid=" << id << endl; unetwarn << myname << "(update): reinit dcache for sid=" << s_id << endl;
ii.id = id; c_it->id = s_id;
shm->initIterator(ii.ioit); shm->initIterator(c_it->ioit);
} }
// обновление данных в SM (блокировано) // обновление данных в SM (блокировано)
if( lockUpdate ) if( lockUpdate )
continue; continue;
shm->localSetValue(ii.ioit, id, val, shm->ID()); shm->localSetValue(c_it->ioit, s_id, p->dValue(i), shm->ID());
} }
catch( const uniset::Exception& ex) catch( const uniset::Exception& ex)
{ {
unetcrit << myname << "(update): " << ex << std::endl; unetcrit << myname << "(update): "
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: " << ex
<< std::endl;
} }
catch(...) catch(...)
{ {
unetcrit << myname << "(update): catch ..." << std::endl; unetcrit << myname << "(update): "
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: catch..."
<< std::endl;
} }
} }
// Обработка аналоговых // Обработка аналоговых
CacheInfo& a_iv = a_icache_map[p.getDataID()]; auto a_iv = getACache(p, !a_cache_init_ok);
for( size_t i = 0; i < p.acount; i++ ) for( size_t i = 0; i < p->acount; i++ )
{ {
try try
{ {
UniSetUDP::UDPAData& d = p.a_dat[i]; dat = &p->a_dat[i];
c_it = &a_iv.cache[i];
CacheItem& ii(a_iv.cache[i]);
if( ii.id != d.id ) if( c_it->id != dat->id )
{ {
unetwarn << myname << "(update): reinit cache for sid=" << d.id << endl; unetwarn << myname << "(update): reinit acache for sid=" << dat->id << endl;
ii.id = d.id; c_it->id = dat->id;
shm->initIterator(ii.ioit); shm->initIterator(c_it->ioit);
} }
// обновление данных в SM (блокировано) // обновление данных в SM (блокировано)
if( lockUpdate ) if( lockUpdate )
continue; continue;
shm->localSetValue(ii.ioit, d.id, d.val, shm->ID()); shm->localSetValue(c_it->ioit, dat->id, dat->val, shm->ID());
} }
catch( const uniset::Exception& ex) catch( const uniset::Exception& ex)
{ {
unetcrit << myname << "(update): " << ex << std::endl; unetcrit << myname << "(update): "
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: " << ex
<< std::endl;
} }
catch(...) catch(...)
{ {
unetcrit << myname << "(update): catch ..." << std::endl; unetcrit << myname << "(update): "
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: catch..."
<< std::endl;
} }
} }
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::updateThread() noexcept
{
while( activated )
{
try
{
update();
}
catch( std::exception& ex )
{
unetcrit << myname << "(update_thread): " << ex.what() << endl;
}
// смотрим есть ли связь..
checkConnection();
if( sidRespond != DefaultObjectId )
{
try
{
if( isInitOK() )
{
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (respond) " << ex.what() << std::endl;
}
}
if( sidLostPackets != DefaultObjectId )
{
try
{
shm->localSetValue(itLostPackets, sidLostPackets, getLostPacketsNum(), shm->ID());
}
catch( const std::exception& ex )
{
unetcrit << myname << "(update_thread): (lostPackets) " << ex.what() << std::endl;
}
}
msleep(updatepause);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::callback( ev::io& watcher, int revents ) noexcept void UNetReceiver::callback( ev::io& watcher, int revents ) noexcept
{ {
if( EV_ERROR & revents ) if( EV_ERROR & revents )
...@@ -713,7 +621,6 @@ void UNetReceiver::stop() ...@@ -713,7 +621,6 @@ void UNetReceiver::stop()
{ {
unetinfo << myname << ": stop.." << endl; unetinfo << myname << ": stop.." << endl;
activated = false; activated = false;
upThread->join();
loop.evstop(this); loop.evstop(this);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -723,7 +630,6 @@ bool UNetReceiver::receive() noexcept ...@@ -723,7 +630,6 @@ bool UNetReceiver::receive() noexcept
{ {
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data)); ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data));
recvCount++; recvCount++;
//ssize_t ret = udp->receiveFrom(r_buf.data, sizeof(r_buf.data),saddr);
if( ret < 0 ) if( ret < 0 )
{ {
...@@ -737,98 +643,76 @@ bool UNetReceiver::receive() noexcept ...@@ -737,98 +643,76 @@ bool UNetReceiver::receive() noexcept
return false; return false;
} }
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf); // сперва пробуем сохранить пакет в том месте, которе должно быть очередным на запись
pack = &cbuf[wnum % cbufSize];
size_t sz = UniSetUDP::UDPMessage::getMessage(*pack, r_buf);
if( sz == 0 ) if( sz == 0 )
{ {
unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl; unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
return false; return false;
} }
}
catch( Poco::Net::NetException& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
return false;
}
catch( exception& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
return false;
}
if( pack.magic != UniSetUDP::UNETUDP_MAGICNUM ) if( pack->magic != UniSetUDP::UNETUDP_MAGICNUM )
{ return false;
// пакет не нашей "системы"
return false;
}
if( rnum > 0 && labs( (long int)(pack.num - rnum) ) > maxDifferens ) if( abs(long(pack->num - wnum)) > maxDifferens || abs( long(wnum - rnum) ) >= (cbufSize - 2) )
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
* А можно породолжать складывать во временную, но тогда есть риск "никогда" не разгрести временную
* очередь, при "частых обрывах". Потому-что update будет на каждом разрыве ждать ещё lostTimeout..
*/
// Пока выбираю.. чистить qtmp. Это будет соотвествовать логике работы с картами у которых ограничен буфер приёма.
// Обычно "кольцевой". Т.е. если не успели обработать и "вынуть" из буфера информацию.. он будет переписан новыми данными
if( waitClean )
{ {
unetcrit << myname << "(receive): reset qtmp.." << endl; unetcrit << myname << "(receive): DISAGREE "
<< " packnum=" << pack->num
<< " wnum=" << wnum
<< " rnum=" << rnum
<< " (maxDiff=" << maxDifferens
<< " indexDiff=" << abs( long(wnum - rnum) )
<< ")"
<< endl;
lostPackets = pack->num > wnum ? (pack->num - wnum - 1) : lostPackets + 1;
// реинициализируем позицию для чтения
rnum = pack->num;
wnum = pack->num + 1;
// перемещаем пакет в нужное место (если требуется)
if( wnum != pack->num )
{
cbuf[pack->num % cbufSize] = (*pack);
pack->num = 0;
}
while( !qtmp.empty() ) return true;
qtmp.pop();
} }
waitClean = true; if( pack->num != wnum )
} {
// перемещаем пакет в правильное место
// в соответствии с его номером
cbuf[pack->num % cbufSize] = (*pack);
if( pack->num >= wnum )
wnum = pack->num + 1;
rnum = pack.num; // обнуляем номер в том месте где записали, чтобы его не обрабатывал update
pack->num = 0;
}
else if( pack->num >= wnum )
wnum = pack->num + 1;
#if 0 // начальная инициализация для чтения
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz if( rnum == 0 )
<< " header: " << pack.msg.header rnum = pack->num;
<< " waitClean=" << waitClean
<< endl;
for( size_t i = 0; i < pack.msg.header.dcount; i++ ) return true;
}
catch( Poco::Net::NetException& ex )
{ {
UniSetUDP::UDPData& d = pack.msg.dat[i]; unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
cerr << "****** save id=" << d.id << " val=" << d.val << endl;
} }
catch( exception& ex )
#endif
{ {
// lock qpack unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
pack_guard l(packMutex, upStrategy); }
if( !waitClean )
{
qpack.push(pack);
return true;
}
if( !qpack.empty() )
{
qtmp.push(pack);
}
else
{
// основная очередь освободилась..
// копируем в неё всё что набралось в qtmp...
while( !qtmp.empty() )
{
qpack.push(qtmp.top());
qtmp.pop();
}
// не забываем и текущий поместить в очередь..
qpack.push(pack);
waitClean = false;
}
} // unlock qpack
return true; return false;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initIterators() noexcept void UNetReceiver::initIterators() noexcept
...@@ -837,7 +721,7 @@ void UNetReceiver::initIterators() noexcept ...@@ -837,7 +721,7 @@ void UNetReceiver::initIterators() noexcept
{ {
CacheVec& d_icache(mit->second.cache); CacheVec& d_icache(mit->second.cache);
for( auto && it : d_icache ) for( auto&& it : d_icache )
shm->initIterator(it.ioit); shm->initIterator(it.ioit);
} }
...@@ -845,39 +729,40 @@ void UNetReceiver::initIterators() noexcept ...@@ -845,39 +729,40 @@ void UNetReceiver::initIterators() noexcept
{ {
CacheVec& a_icache(mit->second.cache); CacheVec& a_icache(mit->second.cache);
for( auto && it : a_icache ) for( auto&& it : a_icache )
shm->initIterator(it.ioit); shm->initIterator(it.ioit);
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) noexcept UNetReceiver::CacheInfo& UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack, bool force ) noexcept
{ {
CacheInfo& d_info(d_icache_map[pack.getDataID()]); // если элемента нет, он будет создан
CacheInfo& d_info = d_icache_map[pack->getDataID()];
if( !force && pack.dcount == d_info.cache.size() ) if( !force && pack->dcount == d_info.cache.size() )
return; return d_info;
if( d_info.cache_init_ok && pack.dcount == d_info.cache.size() ) if( d_info.cache_init_ok && pack->dcount == d_info.cache.size() )
{ {
d_cache_init_ok = true; d_cache_init_ok = true;
auto it = d_icache_map.begin(); auto it = d_icache_map.begin();
for( ; it != d_icache_map.end(); ++it ) for( ; it != d_icache_map.end(); ++it )
{ {
CacheInfo& d_info(it->second); d_info = it->second;
d_cache_init_ok = d_cache_init_ok && d_info.cache_init_ok; d_cache_init_ok = d_cache_init_ok && d_info.cache_init_ok;
if(d_cache_init_ok == false) if(d_cache_init_ok == false)
break; break;
} }
return; return d_info;
} }
unetinfo << myname << ": init dcache for " << pack.getDataID() << endl; unetinfo << myname << ": init dcache for " << pack->getDataID() << endl;
d_info.cache_init_ok = true; d_info.cache_init_ok = true;
d_info.cache.resize(pack.dcount); d_info.cache.resize(pack->dcount);
size_t sz = d_info.cache.size(); size_t sz = d_info.cache.size();
auto conf = uniset_conf(); auto conf = uniset_conf();
...@@ -886,43 +771,46 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) noexcep ...@@ -886,43 +771,46 @@ void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) noexcep
{ {
CacheItem& d(d_info.cache[i]); CacheItem& d(d_info.cache[i]);
if( d.id != pack.d_id[i] ) if( d.id != pack->d_id[i] )
{ {
d.id = pack.d_id[i]; d.id = pack->d_id[i];
shm->initIterator(d.ioit); shm->initIterator(d.ioit);
} }
} }
return d_info;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force ) noexcept UNetReceiver::CacheInfo& UNetReceiver::getACache( UniSetUDP::UDPMessage* pack, bool force ) noexcept
{ {
CacheInfo& a_info(a_icache_map[pack.getDataID()]); // если элемента нет, он будет создан
CacheInfo& a_info = a_icache_map[pack->getDataID()];
if( !force && pack.acount == a_info.cache.size() ) if( !force && pack->acount == a_info.cache.size() )
return; return a_info;
if( a_info.cache_init_ok && pack.acount == a_info.cache.size() ) if( a_info.cache_init_ok && pack->acount == a_info.cache.size() )
{ {
a_cache_init_ok = true; a_cache_init_ok = true;
auto it = a_icache_map.begin(); auto it = a_icache_map.begin();
for( ; it != a_icache_map.end(); ++it ) for( ; it != a_icache_map.end(); ++it )
{ {
CacheInfo& a_info(it->second); a_info = it->second;
a_cache_init_ok = a_cache_init_ok && a_info.cache_init_ok; a_cache_init_ok = a_cache_init_ok && a_info.cache_init_ok;
if(a_cache_init_ok == false) if(a_cache_init_ok == false)
break; break;
} }
return; return a_info;
} }
unetinfo << myname << ": init icache for " << pack.getDataID() << endl; unetinfo << myname << ": init icache for " << pack->getDataID() << endl;
a_info.cache_init_ok = true; a_info.cache_init_ok = true;
auto conf = uniset_conf(); auto conf = uniset_conf();
a_info.cache.resize(pack.acount); a_info.cache.resize(pack->acount);
size_t sz = a_info.cache.size(); size_t sz = a_info.cache.size();
...@@ -930,12 +818,14 @@ void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force ) noexcep ...@@ -930,12 +818,14 @@ void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force ) noexcep
{ {
CacheItem& d(a_info.cache[i]); CacheItem& d(a_info.cache[i]);
if( d.id != pack.a_dat[i].id ) if( d.id != pack->a_dat[i].id )
{ {
d.id = pack.a_dat[i].id; d.id = pack->a_dat[i].id;
shm->initIterator(d.ioit); shm->initIterator(d.ioit);
} }
} }
return a_info;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept
...@@ -943,49 +833,6 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept ...@@ -943,49 +833,6 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept
slEvent = sl; slEvent = sl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::UpdateStrategy UNetReceiver::strToUpdateStrategy( const string& s ) noexcept
{
if( s == "thread" || s == "THREAD" )
return useUpdateThread;
if( s == "evloop" || s == "EVLOOP" )
return useUpdateEventLoop;
return useUpdateUnknown;
}
// -----------------------------------------------------------------------------
string UNetReceiver::to_string( UNetReceiver::UpdateStrategy s ) noexcept
{
if( s == useUpdateThread )
return "thread";
if( s == useUpdateEventLoop )
return "evloop";
return "";
}
// -----------------------------------------------------------------------------
void UNetReceiver::setUpdateStrategy( UNetReceiver::UpdateStrategy set )
{
if( set == useUpdateEventLoop && upThread->isRunning() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateEventLoop' strategy but updateThread is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
if( set == useUpdateThread && evUpdate.is_active() )
{
ostringstream err;
err << myname << "(setUpdateStrategy): set 'useUpdateThread' strategy but update event loop is running!";
unetcrit << err.str() << endl;
throw SystemError(err.str());
}
upStrategy = set;
}
// -----------------------------------------------------------------------------
const std::string UNetReceiver::getShortInfo() const noexcept const std::string UNetReceiver::getShortInfo() const noexcept
{ {
// warning: будет вызываться из другого потока // warning: будет вызываться из другого потока
...@@ -998,36 +845,18 @@ const std::string UNetReceiver::getShortInfo() const noexcept ...@@ -998,36 +845,18 @@ const std::string UNetReceiver::getShortInfo() const noexcept
<< " recvOK=" << isRecvOK() << " recvOK=" << isRecvOK()
<< " receivepack=" << rnum << " receivepack=" << rnum
<< " lostPackets=" << setw(6) << getLostPacketsNum() << " lostPackets=" << setw(6) << getLostPacketsNum()
<< " updateStrategy=" << to_string(upStrategy)
<< endl << endl
<< "\t[" << "\t["
<< " recvTimeout=" << setw(6) << recvTimeout << " recvTimeout=" << setw(6) << recvTimeout
<< " prepareTime=" << setw(6) << prepareTime << " prepareTime=" << setw(6) << prepareTime
<< " evrunTimeout=" << setw(6) << evrunTimeout << " evrunTimeout=" << setw(6) << evrunTimeout
<< " lostTimeout=" << setw(6) << lostTimeout << " lostTimeout=" << setw(6) << lostTimeout
<< " recvpause=" << setw(6) << recvpause
<< " updatepause=" << setw(6) << updatepause << " updatepause=" << setw(6) << updatepause
<< " maxDifferens=" << setw(6) << maxDifferens << " maxDifferens=" << setw(6) << maxDifferens
<< " maxProcessingCount=" << setw(6) << maxProcessingCount
<< " waitClean=" << waitClean
<< " ]" << " ]"
<< endl << endl
<< "\t[ qsize=" << qpack.size() << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]"; << "\t[ qsize=" << (wnum - rnum - 1) << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]";
return s.str(); return s.str();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UNetReceiver::pack_guard::pack_guard( mutex& _m, UNetReceiver::UpdateStrategy _s ):
m(_m),
s(_s)
{
if( s == useUpdateThread )
m.lock();
}
// -----------------------------------------------------------------------------
UNetReceiver::pack_guard::~pack_guard()
{
if( s == useUpdateThread )
m.unlock();
}
// -----------------------------------------------------------------------------
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <queue> #include <queue>
#include <deque>
#include <unordered_map> #include <unordered_map>
#include <sigc++/sigc++.h> #include <sigc++/sigc++.h>
#include <ev++.h> #include <ev++.h>
...@@ -38,13 +39,20 @@ namespace uniset ...@@ -38,13 +39,20 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* =============== * ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приоритета используется номер пакета * что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", * Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
* куда поместить пакет в буфере. Есть два индекса
* rnum - (read number) номер последнего обработанного пакета + 1
* wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
* WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
*
* При этом обработка ведётся по порядку (только пакеты идущие подряд)
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
* Всё это реализовано в функции UNetReceiver::real_update() * Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
* Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
* либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
* *
* КЭШ * КЭШ
* === * ===
...@@ -63,19 +71,15 @@ namespace uniset ...@@ -63,19 +71,15 @@ namespace uniset
* В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов, * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
* т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов. * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
* *
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) * Обработка сбоев в номере пакетов
* ========================================================================= * =========================================================================
* Для защиты от сбоя счётчика сделана следующая логика: * Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, * то считается, что произошёл сбой или узел который посылал пакеты перезагрузился
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. * Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, * реинициализация и обработка продолжается с нового номера.
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. *
* Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
* и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
* затирают старые, если их не успели вынуть и обработать.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
* ========================================================================= * =========================================================================
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.. * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.
* *
* Создание соединения (открытие сокета) * Создание соединения (открытие сокета)
* ====================================== * ======================================
...@@ -87,13 +91,6 @@ namespace uniset ...@@ -87,13 +91,6 @@ namespace uniset
* Если такая логика не требуется, то можно задать в конструкторе * Если такая логика не требуется, то можно задать в конструкторе
* последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения. * выкинуто исключение при неудачной попытке создания соединения.
*
* Стратегия обновления данных в SM
* ==================================
* При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
* Поддерживается два варианта:
* 'thread' - отдельный поток обновления
* 'evloop' - использование общего с приёмом event loop (libev)
*/ */
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
class UNetReceiver: class UNetReceiver:
...@@ -125,7 +122,6 @@ namespace uniset ...@@ -125,7 +122,6 @@ namespace uniset
size_t getLostPacketsNum() const noexcept; size_t getLostPacketsNum() const noexcept;
void setReceiveTimeout( timeout_t msec ) noexcept; void setReceiveTimeout( timeout_t msec ) noexcept;
void setReceivePause( timeout_t msec ) noexcept;
void setUpdatePause( timeout_t msec ) noexcept; void setUpdatePause( timeout_t msec ) noexcept;
void setLostTimeout( timeout_t msec ) noexcept; void setLostTimeout( timeout_t msec ) noexcept;
void setPrepareTime( timeout_t msec ) noexcept; void setPrepareTime( timeout_t msec ) noexcept;
...@@ -133,12 +129,11 @@ namespace uniset ...@@ -133,12 +129,11 @@ namespace uniset
void setMaxDifferens( unsigned long set ) noexcept; void setMaxDifferens( unsigned long set ) noexcept;
void setEvrunTimeout(timeout_t msec ) noexcept; void setEvrunTimeout(timeout_t msec ) noexcept;
void setInitPause( timeout_t msec ) noexcept; void setInitPause( timeout_t msec ) noexcept;
void setBufferSize( size_t sz ) noexcept;
void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept; void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
void setLostPacketsID( uniset::ObjectId id ) noexcept; void setLostPacketsID( uniset::ObjectId id ) noexcept;
void setMaxProcessingCount( int set ) noexcept;
void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
inline std::string getAddress() const noexcept inline std::string getAddress() const noexcept
...@@ -161,35 +156,6 @@ namespace uniset ...@@ -161,35 +156,6 @@ namespace uniset
void connectEvent( EventSlot sl ) noexcept; void connectEvent( EventSlot sl ) noexcept;
// -------------------------------------------------------------------- // --------------------------------------------------------------------
/*! Стратегия обработки сообщений */
enum UpdateStrategy
{
useUpdateUnknown,
useUpdateThread, /*!< использовать отдельный поток */
useUpdateEventLoop /*!< использовать event loop (т.е. совместно с receive) */
};
static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
static std::string to_string( UpdateStrategy s) noexcept;
//! функция должна вызываться до первого вызова start()
void setUpdateStrategy( UpdateStrategy set );
// специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
// (т.к. при evloop mutex захватывать не нужно)
class pack_guard
{
public:
pack_guard( std::mutex& m, UpdateStrategy s );
~pack_guard();
protected:
std::mutex& m;
UpdateStrategy s;
};
// --------------------------------------------------------------------
inline std::shared_ptr<DebugStream> getLog() inline std::shared_ptr<DebugStream> getLog()
{ {
return unetlog; return unetlog;
...@@ -205,7 +171,6 @@ namespace uniset ...@@ -205,7 +171,6 @@ namespace uniset
bool receive() noexcept; bool receive() noexcept;
void step() noexcept; void step() noexcept;
void update() noexcept; void update() noexcept;
void updateThread() noexcept;
void callback( ev::io& watcher, int revents ) noexcept; void callback( ev::io& watcher, int revents ) noexcept;
void readEvent( ev::io& watcher ) noexcept; void readEvent( ev::io& watcher ) noexcept;
void updateEvent( ev::periodic& watcher, int revents ) noexcept; void updateEvent( ev::periodic& watcher, int revents ) noexcept;
...@@ -222,26 +187,11 @@ namespace uniset ...@@ -222,26 +187,11 @@ namespace uniset
void initIterators() noexcept; void initIterators() noexcept;
bool createConnection( bool throwEx = false ); bool createConnection( bool throwEx = false );
void checkConnection(); void checkConnection();
size_t rnext( size_t num );
public:
// функция определения приоритетного сообщения для обработки
struct PacketCompare:
public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
{
inline bool operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
return lhs.num > rhs.num;
}
};
typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
private: private:
UNetReceiver(); UNetReceiver();
timeout_t recvpause = { 10 }; /*!< пауза между приёмами пакетов, [мсек] */
timeout_t updatepause = { 100 }; /*!< периодичность обновления данных в SM, [мсек] */ timeout_t updatepause = { 100 }; /*!< периодичность обновления данных в SM, [мсек] */
std::unique_ptr<UDPReceiveU> udp; std::unique_ptr<UDPReceiveU> udp;
...@@ -255,18 +205,14 @@ namespace uniset ...@@ -255,18 +205,14 @@ namespace uniset
ev::periodic evUpdate; ev::periodic evUpdate;
ev::timer evInitPause; ev::timer evInitPause;
UpdateStrategy upStrategy = { useUpdateEventLoop };
// счётчики для подсчёта статистики // счётчики для подсчёта статистики
size_t recvCount = { 0 }; size_t recvCount = { 0 };
size_t upCount = { 0 }; size_t upCount = { 0 };
// текущая статистик // текущая статистика
size_t statRecvPerSec = { 0 }; /*!< количество принимаемых пакетов в секунду */ size_t statRecvPerSec = { 0 }; /*!< количество принимаемых пакетов в секунду */
size_t statUpPerSec = { 0 }; /*!< количество обработанных пакетов в секунду */ size_t statUpPerSec = { 0 }; /*!< количество обработанных пакетов в секунду */
std::unique_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
// делаем loop общим.. одним на всех! // делаем loop общим.. одним на всех!
static CommonEventLoop loop; static CommonEventLoop loop;
...@@ -294,23 +240,19 @@ namespace uniset ...@@ -294,23 +240,19 @@ namespace uniset
std::atomic_bool activated = { false }; std::atomic_bool activated = { false };
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */ size_t cbufSize = { 2000 }; /*!< размер буфера для сообщений по умолчанию */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */ std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
UniSetUDP::UDPPacket r_buf; size_t wnum = { 1 }; /*!< номер следующего ожидаемого пакета */
std::mutex packMutex; /*!< mutex для работы с очередью */ size_t rnum = { 0 }; /*!< номер последнего обработанного пакета */
size_t pnum = { 0 }; /*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
UniSetUDP::UDPMessage* pack;
UniSetUDP::UDPPacket r_buf; /*!< просто буфер для получения очередного сообщения */
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов /*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился... * прошёл через максимум или сбился...
*/ */
size_t maxDifferens = { 20 }; size_t maxDifferens = { 20 };
PacketQueue qtmp; /*!< очередь на время обработки(очистки) основной очереди */
bool waitClean = { false }; /*!< флаг означающий, что ждём очистки очереди до конца */
size_t rnum = { 0 }; /*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
size_t maxProcessingCount = { 100 }; /*!< максимальное число обрабатываемых за один раз сообщений */
std::atomic_bool lockUpdate = { false }; /*!< флаг блокировки сохранения принятых данных в SM */ std::atomic_bool lockUpdate = { false }; /*!< флаг блокировки сохранения принятых данных в SM */
EventSlot slEvent; EventSlot slEvent;
...@@ -344,8 +286,8 @@ namespace uniset ...@@ -344,8 +286,8 @@ namespace uniset
bool d_cache_init_ok = { false }; bool d_cache_init_ok = { false };
bool a_cache_init_ok = { false }; bool a_cache_init_ok = { false };
void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept; CacheInfo& getDCache( UniSetUDP::UDPMessage* pack, bool force = false ) noexcept;
void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept; CacheInfo& getACache( UniSetUDP::UDPMessage* pack, bool force = false ) noexcept;
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
} // end of namespace uniset } // end of namespace uniset
......
...@@ -156,7 +156,7 @@ namespace uniset ...@@ -156,7 +156,7 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::updateFromSM() void UNetSender::updateFromSM()
{ {
for( auto && it : items ) for( auto&& it : items )
{ {
UItem& i = it.second; UItem& i = it.second;
...@@ -243,7 +243,7 @@ namespace uniset ...@@ -243,7 +243,7 @@ namespace uniset
if( !shm->isLocalwork() ) if( !shm->isLocalwork() )
updateFromSM(); updateFromSM();
for( auto && it : mypacks ) for( auto&& it : mypacks )
{ {
if( it.first > 1 && (ncycle % it.first) != 0 ) if( it.first > 1 && (ncycle % it.first) != 0 )
continue; continue;
...@@ -504,6 +504,7 @@ namespace uniset ...@@ -504,6 +504,7 @@ namespace uniset
if( p.pack_ind >= maxAData ) if( p.pack_ind >= maxAData )
{ {
anum++; anum++;
if( anum >= pk.size() ) if( anum >= pk.size() )
pk.resize(anum + 1); pk.resize(anum + 1);
...@@ -551,7 +552,7 @@ namespace uniset ...@@ -551,7 +552,7 @@ namespace uniset
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::initIterators() void UNetSender::initIterators()
{ {
for( auto && it : items ) for( auto&& it : items )
shm->initIterator(it.second.ioit); shm->initIterator(it.second.ioit);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
......
if HAVE_TESTS if HAVE_TESTS
noinst_PROGRAMS = tests-with-sm urecv-perf-test noinst_PROGRAMS = tests-with-sm urecv-perf-test
#noinst_PROGRAMS = urecv-perf-test
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc
tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \ tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
......
...@@ -96,45 +96,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 ) ...@@ -96,45 +96,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
size_t ret = udp_s->sendTo(&s_buf.data, s_buf.len, s_addr); size_t ret = udp_s->sendTo(&s_buf.data, s_buf.len, s_addr);
REQUIRE( ret == s_buf.len ); REQUIRE( ret == s_buf.len );
} }
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
{
UNetReceiver::PacketQueue q;
UniSetUDP::UDPMessage m1;
m1.num = 10;
UniSetUDP::UDPMessage m2;
m2.num = 11;
UniSetUDP::UDPMessage m3;
m3.num = 13;
UniSetUDP::UDPMessage m4;
m4.num = 100;
// специально складываем в обратном порядке
// чтобы проверить "сортировку"
q.push(m1);
q.push(m3);
q.push(m2);
q.push(m4);
UniSetUDP::UDPMessage t = q.top();
REQUIRE( t.num == 10 );
q.pop();
t = q.top();
REQUIRE( t.num == 11 );
q.pop();
t = q.top();
REQUIRE( t.num == 13 );
q.pop();
t = q.top();
REQUIRE( t.num == 100 );
q.pop();
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]") TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
{ {
SECTION("UDPMessage::isFull()") SECTION("UDPMessage::isFull()")
......
#!/bin/sh
# '--' - нужен для отделения аргументов catch, от наших..
cd ../../../Utilities/Admin/
./uniset2-start.sh -f ./create_links.sh
./uniset2-start.sh -f ./create
./uniset2-start.sh -f ./exist | grep -q UNISET_PLC/Controllers || exit 1
cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy thread
#--unet-log-add-levels any
AT_SETUP([UNetUDP tests (with SM)(thread)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_thread.sh],[0],[ignore],[ignore])
AT_CLEANUP
AT_SETUP([UNetUDP tests (with SM)(evloop)]) AT_SETUP([UNetUDP tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore]) AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP AT_CLEANUP
......
...@@ -42,7 +42,9 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5 ...@@ -42,7 +42,9 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{ {
try try
{ {
cout << "create sender: " << s_host << ":" << begPort + i << endl;
auto s = make_shared<UDPSocketU>(s_host, begPort + i); auto s = make_shared<UDPSocketU>(s_host, begPort + i);
s->setBroadcast(true);
vsend.emplace_back(s); vsend.emplace_back(s);
} }
catch( Poco::Net::NetException& e ) catch( Poco::Net::NetException& e )
...@@ -103,7 +105,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5 ...@@ -103,7 +105,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
for( auto && udp : vsend ) for( auto&& udp : vsend )
{ {
try try
{ {
...@@ -139,6 +141,7 @@ static void run_test( size_t max, const std::string& host ) ...@@ -139,6 +141,7 @@ static void run_test( size_t max, const std::string& host )
// make receivers.. // make receivers..
for( size_t i = 0; i < max; i++ ) for( size_t i = 0; i < max; i++ )
{ {
cout << "create receiver: " << host << ":" << begPort + i << endl;
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance()); auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
r->setLockUpdate(true); r->setLockUpdate(true);
vrecv.emplace_back(r); vrecv.emplace_back(r);
...@@ -147,7 +150,7 @@ static void run_test( size_t max, const std::string& host ) ...@@ -147,7 +150,7 @@ static void run_test( size_t max, const std::string& host )
size_t count = 0; size_t count = 0;
// Run receivers.. // Run receivers..
for( auto && r : vrecv ) for( auto&& r : vrecv )
{ {
if( r ) if( r )
{ {
...@@ -156,12 +159,12 @@ static void run_test( size_t max, const std::string& host ) ...@@ -156,12 +159,12 @@ static void run_test( size_t max, const std::string& host )
} }
} }
cerr << "RUn " << count << " receivers..." << endl; cerr << "RUN " << count << " receivers..." << endl;
// wait.. // wait..
pause(); pause();
for( auto && r : vrecv ) for( auto&& r : vrecv )
{ {
if(r) if(r)
r->stop(); r->stop();
...@@ -177,9 +180,9 @@ int main(int argc, char* argv[] ) ...@@ -177,9 +180,9 @@ int main(int argc, char* argv[] )
auto conf = uniset_init(argc, argv); auto conf = uniset_init(argc, argv);
if( argc > 1 && !strcmp(argv[1], "s") ) if( argc > 1 && !strcmp(argv[1], "s") )
run_senders(10, host); run_senders(1, host);
else else
run_test(10, host); run_test(1, host);
return 0; return 0;
} }
......
...@@ -29,6 +29,7 @@ static struct option longopts[] = ...@@ -29,6 +29,7 @@ static struct option longopts[] =
{ "prof", required_argument, 0, 'y' }, { "prof", required_argument, 0, 'y' },
{ "a-data", required_argument, 0, 'a' }, { "a-data", required_argument, 0, 'a' },
{ "d-data", required_argument, 0, 'i' }, { "d-data", required_argument, 0, 'i' },
{ "pack-num", required_argument, 0, 'u' },
{ NULL, 0, 0, 0 } { NULL, 0, 0, 0 }
}; };
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -79,10 +80,11 @@ int main(int argc, char* argv[]) ...@@ -79,10 +80,11 @@ int main(int argc, char* argv[])
unsigned int nprof = 0; unsigned int nprof = 0;
std::string d_data = ""; std::string d_data = "";
std::string a_data = ""; std::string a_data = "";
size_t packetnum = 1;
while(1) while(1)
{ {
opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:", longopts, &optindex); opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:u:", longopts, &optindex);
if( opt == -1 ) if( opt == -1 )
break; break;
...@@ -106,6 +108,7 @@ int main(int argc, char* argv[]) ...@@ -106,6 +108,7 @@ int main(int argc, char* argv[])
cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl; cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl;
cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl; cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl;
cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl; cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl;
cout << "[-u|--pack-num] num - first packet numbrt (default: 1)" << endl;
cout << endl; cout << endl;
return 0; return 0;
...@@ -171,6 +174,10 @@ int main(int argc, char* argv[]) ...@@ -171,6 +174,10 @@ int main(int argc, char* argv[])
ncycles = atoi(optarg); ncycles = atoi(optarg);
break; break;
case 'u':
packetnum = atoi(optarg);
break;
case '?': case '?':
default: default:
cerr << "? argumnet" << endl; cerr << "? argumnet" << endl;
...@@ -345,7 +352,6 @@ int main(int argc, char* argv[]) ...@@ -345,7 +352,6 @@ int main(int argc, char* argv[])
Poco::Net::SocketAddress sa(s_host, port); Poco::Net::SocketAddress sa(s_host, port);
udp->connect(sa); udp->connect(sa);
size_t packetnum = 0;
UniSetUDP::UDPPacket s_buf; UniSetUDP::UDPPacket s_buf;
...@@ -358,7 +364,7 @@ int main(int argc, char* argv[]) ...@@ -358,7 +364,7 @@ int main(int argc, char* argv[])
{ {
mypack.num = packetnum++; mypack.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum) // при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1" // пакет опять должен иметь номер "1"
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment