Commit d03993a8 authored by Pavel Vainerman's avatar Pavel Vainerman

fixed after backport to p9

parent a2213d77
......@@ -3,17 +3,8 @@
# See doc: http://www.gnu.org/software/hello/manual/autoconf/Generic-Programs.html
# AC_PREREQ(2.59)
<<<<<<< HEAD
<<<<<<< HEAD
AC_INIT([uniset2], [2.8.1], pv@etersoft.ru)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME,AC_PACKAGE_VERSION)
=======
AC_INIT([uniset2], [2.9.1], pv@etersoft.ru)
=======
AC_INIT([uniset2], [2.9.2], pv@etersoft.ru)
>>>>>>> 2.9.2-alt1
AM_INIT_AUTOMAKE
>>>>>>> 2.9.1-alt1
LIBVER=11:2:9
AC_SUBST(LIBVER)
......
......@@ -23,7 +23,6 @@
// myvar = LE_TO_H(myvar)
// -------------------------------------------------------------------------
#if __BYTE_ORDER == __LITTLE_ENDIAN
static bool HostIsBigEndian = false;
#define LE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX
#define LE_TO_H(x) x = le64toh(x)
......@@ -34,8 +33,7 @@ static bool HostIsBigEndian = false;
#endif
#if __BYTE_ORDER == __BIG_ENDIAN
static bool HostIsBigEndian = true;
header.#define BE_TO_H(x) {}
#define BE_TO_H(x) {}
#elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x)
#elif INTPTR_MAX == INT32_MAX
......@@ -114,191 +112,6 @@ namespace uniset
}
#endif
<<<<<<< HEAD
// crc = crc & 0xffff;
}
return crc;
}
// -------------------------------------------------------------------------
uint16_t UniSetUDP::makeCRC( unsigned char* buf, size_t len ) noexcept
{
uint16_t crc = 0xffff;
crc = get_crc_16(crc, (unsigned char*)(buf), len);
return crc;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader& p )
{
return os << "nodeID=" << p.nodeID
<< " procID=" << p.procID
<< " dcount=" << p.dcount
<< " acount=" << p.acount
<< " pnum=" << p.num;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPHeader* p )
{
return os << (*p);
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPAData& p )
{
return os << "id=" << p.id << " val=" << p.val;
}
// -----------------------------------------------------------------------------
std::ostream& UniSetUDP::operator<<( std::ostream& os, UniSetUDP::UDPMessage& p )
{
os << (UDPHeader*)(&p) << endl;
os << "DIGITAL:" << endl;
for( size_t i = 0; i < p.header.dcount; i++ )
os << "[" << i << "]={" << p.dID(i) << "," << p.dValue(i) << "}" << endl;
os << "ANALOG:" << endl;
for( size_t i = 0; i < p.header.acount; i++ )
os << "[" << i << "]={" << p.a_dat[i].id << "," << p.a_dat[i].val << "}" << endl;
return os;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( const UniSetUDP::UDPAData& dat ) noexcept
{
if( header.acount >= MaxACount )
return MaxACount;
a_dat[header.acount] = dat;
header.acount++;
return header.acount - 1;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addAData( long id, long val) noexcept
{
UDPAData d(id, val);
return addAData(d);
}
// -----------------------------------------------------------------------------
bool UDPMessage::setAData( size_t index, long val ) noexcept
{
if( index < MaxACount )
{
a_dat[index].val = val;
return true;
}
return false;
}
// -----------------------------------------------------------------------------
size_t UDPMessage::addDData( long id, bool val ) noexcept
{
if( header.dcount >= MaxDCount )
return MaxDCount;
// сохраняем ID
d_id[header.dcount] = id;
bool res = setDData( header.dcount, val );
if( res )
{
header.dcount++;
return header.dcount - 1;
}
return MaxDCount;
}
// -----------------------------------------------------------------------------
bool UDPMessage::setDData( size_t index, bool val ) noexcept
{
if( index >= MaxDCount )
return false;
size_t nbyte = index / 8 * sizeof(uint8_t);
size_t nbit = index % 8 * sizeof(uint8_t);
// выставляем бит
unsigned char d = d_dat[nbyte];
if( val )
d |= (1 << nbit);
else
d &= ~(1 << nbit);
d_dat[nbyte] = d;
return true;
}
// -----------------------------------------------------------------------------
long UDPMessage::dID( size_t index ) const noexcept
{
if( index >= MaxDCount )
return uniset::DefaultObjectId;
return d_id[index];
}
// -----------------------------------------------------------------------------
bool UDPMessage::dValue( size_t index ) const noexcept
{
if( index >= MaxDCount )
return uniset::DefaultObjectId;
size_t nbyte = index / 8 * sizeof(uint8_t);
size_t nbit = index % 8 * sizeof(uint8_t);
return ( d_dat[nbyte] & (1 << nbit) );
}
// -----------------------------------------------------------------------------
long UDPMessage::getDataID() const noexcept
{
// в качестве идентификатора берётся ID первого датчика в данных
// приоритет имеет аналоговые датчики
if( header.acount > 0 )
return a_dat[0].id;
if( header.dcount > 0 )
return d_id[0];
// если нет данных(?) просто возвращаем номер пакета
return header.num;
}
// -----------------------------------------------------------------------------
bool UDPMessage::isOk() noexcept
{
return ( header.magic == UniSetUDP::UNETUDP_MAGICNUM );
}
// -----------------------------------------------------------------------------
void UDPMessage::ntoh() noexcept
{
// byte order from packet
uint8_t be_order = header._be_order;
if( be_order && !HostIsBigEndian )
{
BE_TO_H(header.magic);
BE_TO_H(header.num);
BE_TO_H(header.procID);
BE_TO_H(header.nodeID);
BE_TO_H(header.dcount);
BE_TO_H(header.acount);
}
else if( !be_order && HostIsBigEndian )
{
LE_TO_H(header.magic);
LE_TO_H(header.num);
LE_TO_H(header.procID);
LE_TO_H(header.nodeID);
LE_TO_H(header.dcount);
LE_TO_H(header.acount);
}
// set host byte order
#if __BYTE_ORDER == __LITTLE_ENDIAN
header._be_order = 0;
#elif __BYTE_ORDER == __BIG_ENDIAN
header._be_order = 1;
=======
// crc = crc & 0xffff;
}
......@@ -531,56 +344,10 @@ namespace uniset
m._be_order = 0;
#elif __BYTE_ORDER == __BIG_ENDIAN
m. be_order = 1;
>>>>>>> 2.9.1-alt1
#else
#error UNET(getMessage): Unknown byte order!
#endif
<<<<<<< HEAD
// CONVERT DATA TO HOST BYTE ORDER
// -------------------------------
if( (be_order && !HostIsBigEndian) || (!be_order && HostIsBigEndian) )
{
for( size_t n = 0; n < header.acount; n++ )
{
if( be_order )
{
BE_TO_H(a_dat[n].id);
BE_TO_H(a_dat[n].val);
}
else
{
LE_TO_H(a_dat[n].id);
LE_TO_H(a_dat[n].val);
}
}
for( size_t n = 0; n < header.dcount; n++ )
{
if( be_order )
{
BE_TO_H(d_id[n]);
}
else
{
LE_TO_H(d_id[n]);
}
}
}
}
// -----------------------------------------------------------------------------
uint16_t UDPMessage::getDataCRC() const noexcept
{
uint16_t crc[3];
crc[0] = makeCRC( (unsigned char*)(a_dat), sizeof(a_dat) );
crc[1] = makeCRC( (unsigned char*)(d_id), sizeof(d_id) );
crc[2] = makeCRC( (unsigned char*)(d_dat), sizeof(d_dat) );
return makeCRC( (unsigned char*)(&crc), sizeof(crc) );
}
UDPHeader::UDPHeader() noexcept
: magic(UNETUDP_MAGICNUM)
=======
// проверяем наш ли пакет..
if( m.magic != UniSetUDP::UNETUDP_MAGICNUM )
{
......@@ -658,7 +425,6 @@ namespace uniset
UDPHeader::UDPHeader() noexcept
: magic(UNETUDP_MAGICNUM)
>>>>>>> 2.9.1-alt1
#if __BYTE_ORDER == __LITTLE_ENDIAN
, _be_order(0)
#elif __BYTE_ORDER == __BIG_ENDIAN
......
......@@ -24,15 +24,6 @@
// --------------------------------------------------------------------------
namespace uniset
{
<<<<<<< HEAD
// -----------------------------------------------------------------------------
namespace UniSetUDP
{
/*! С учётом того, что ID могут идти не подряд. Сделан следующий формат:
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
=======
// -----------------------------------------------------------------------------
namespace UniSetUDP
{
......@@ -44,7 +35,6 @@ namespace uniset
\todo Подумать на тему сделать два отдельных вида пакетов для булевых значений и для аналоговых,
чтобы уйти от преобразования UDPMessage --> UDPPacket --> UDPMessage.
>>>>>>> 2.9.1-alt1
\warning ТЕКУЩАЯ ВЕРСИЯ ПРОТОКОЛА НЕ БУДЕТ РАБОТАТЬ МЕЖДУ 32-битными и 64-битными системами (из-за отличия в типе long).
т.к. это не сильно актуально, пока не переделываю.
......@@ -59,21 +49,6 @@ namespace uniset
Т.е. если все узлы будут иметь одинаковый порядок байт, фактического перекодирования не будет.
*/
<<<<<<< HEAD
const uint32_t UNETUDP_MAGICNUM = 0x1343EFD; // идентификатор протокола
struct UDPHeader
{
UDPHeader() noexcept;
uint32_t magic;
uint8_t _be_order; // 1 - BE byte order, 0 - LE byte order
size_t num;
long nodeID;
long procID;
size_t dcount; /*!< количество булевых величин */
size_t acount; /*!< количество аналоговых величин */
} __attribute__((packed));
=======
const uint32_t UNETUDP_MAGICNUM = 0x133EF54; // идентификатор протокола
struct UDPHeader
......@@ -89,7 +64,6 @@ namespace uniset
size_t acount; /*!< количество аналоговых величин */
} __attribute__((packed));
>>>>>>> 2.9.1-alt1
std::ostream& operator<<( std::ostream& os, UDPHeader& p );
std::ostream& operator<<( std::ostream& os, UDPHeader* p );
......@@ -108,21 +82,6 @@ namespace uniset
std::ostream& operator<<( std::ostream& os, UDPAData& p );
<<<<<<< HEAD
// Теоретический размер данных в UDP пакете (исключая заголовки) 65507
// Фактически желательно не вылезать за размер MTU (обычно 1500) - заголовки = 1432 байта
// т.е. надо чтобы sizeof(UDPPacket) < 1432
// При текущих настройках sizeof(UDPPacket) = 56421 (!)
static const size_t MaxACount = 2000;
static const size_t MaxDCount = 3000;
static const size_t MaxDDataCount = 1 + MaxDCount / 8 * sizeof(uint8_t);
struct UDPMessage
{
// net to host
void ntoh() noexcept;
bool isOk() noexcept;
=======
// Теоретический размер данных в UDP пакете (исключая заголовки) 65507
// Фактически желательно не вылезать за размер MTU (обычно 1500) - заголовки = 1432 байта
// т.е. надо чтобы sizeof(UDPPacket) < 1432
......@@ -159,7 +118,6 @@ namespace uniset
size_t transport_msg( UDPPacket& p ) const noexcept;
static size_t getMessage( UDPMessage& m, UDPPacket& p ) noexcept;
>>>>>>> 2.9.1-alt1
// \warning в случае переполнения возвращается MaxDCount
size_t addDData( long id, bool val ) noexcept;
......@@ -183,39 +141,6 @@ namespace uniset
long getDataID( ) const noexcept; /*!< получение "уникального" идентификатора данных этого пакета */
<<<<<<< HEAD
inline bool isAFull() const noexcept
{
return (header.acount >= MaxACount);
}
inline bool isDFull() const noexcept
{
return (header.dcount >= MaxDCount);
}
inline bool isFull() const noexcept
{
return !((header.dcount < MaxDCount) && (header.acount < MaxACount));
}
inline size_t dsize() const noexcept
{
return header.dcount;
}
inline size_t asize() const noexcept
{
return header.acount;
}
uint16_t getDataCRC() const noexcept;
UDPHeader header;
UDPAData a_dat[MaxACount]; /*!< аналоговые величины */
long d_id[MaxDCount]; /*!< список дискретных ID */
uint8_t d_dat[MaxDDataCount]; /*!< битовые значения */
} __attribute__((packed));
=======
inline bool isAFull() const noexcept
{
return (acount >= MaxACount);
......@@ -255,7 +180,6 @@ namespace uniset
long d_id[MaxDCount]; /*!< список дискретных ID */
uint8_t d_dat[MaxDDataCount]; /*!< битовые значения */
};
>>>>>>> 2.9.1-alt1
std::ostream& operator<<( std::ostream& os, UDPMessage& p );
......
......@@ -57,403 +57,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
logserv = make_shared<LogServer>(loga);
logserv->init( prefix + "-logserver", cnode );
<<<<<<< HEAD
if( findArgParam("--" + prefix + "-run-logserver", conf->getArgc(), conf->getArgv()) != -1 )
{
logserv_host = conf->getArg2Param("--" + prefix + "-logserver-host", it.getProp("logserverHost"), "localhost");
logserv_port = conf->getArgPInt("--" + prefix + "-logserver-port", it.getProp("logserverPort"), getId());
}
// определяем фильтр
s_field = conf->getArgParam("--" + prefix + "-filter-field");
s_fvalue = conf->getArgParam("--" + prefix + "-filter-value");
unetinfo << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
const string n_field(conf->getArgParam("--" + prefix + "-nodes-filter-field"));
const string n_fvalue(conf->getArgParam("--" + prefix + "-nodes-filter-value"));
unetinfo << myname << "(init): read nodes-filter-field='" << n_field
<< "' nodes-filter-value='" << n_fvalue << "'" << endl;
int recvTimeout = conf->getArgPInt("--" + prefix + "-recv-timeout", it.getProp("recvTimeout"), 5000);
int prepareTime = conf->getArgPInt("--" + prefix + "-prepare-time", it.getProp("prepareTime"), 2000);
int evrunTimeout = conf->getArgPInt("--" + prefix + "-evrun-timeout", it.getProp("evrunTimeout"), 60000);
int sendpause = conf->getArgPInt("--" + prefix + "-sendpause", it.getProp("sendpause"), 100);
int packsendpause = conf->getArgPInt("--" + prefix + "-packsendpause", it.getProp("packsendpause"), 5);
int packsendpauseFactor = conf->getArgPInt("--" + prefix + "-packsendpause-factor", it.getProp("packsendpauseFactor"), 0);
int updatepause = conf->getArgPInt("--" + prefix + "-updatepause", it.getProp("updatepause"), 100);
int lostTimeout = conf->getArgPInt("--" + prefix + "-lost-timeout", it.getProp("lostTimeout"), 2 * updatepause);
steptime = conf->getArgPInt("--" + prefix + "-steptime", it.getProp("steptime"), 1000);
int maxDiff = conf->getArgPInt("--" + prefix + "-maxdifferense", it.getProp("maxDifferense"), 100);
int checkConnectionPause = conf->getArgPInt("--" + prefix + "-checkconnection-pause", it.getProp("checkConnectionPause"), 10000);
int initpause = conf->getArgPInt("--" + prefix + "-initpause", it.getProp("initpause"), 5000);
int recvBufferSize = conf->getArgPInt("--" + prefix + "-recv-buffer-size", it.getProp("recvBufferSize"), 100);
std::string updateStrategy = conf->getArg2Param("--" + prefix + "-update-strategy", it.getProp("updateStrategy"), "evloop");
no_sender = conf->getArgInt("--" + prefix + "-nosender", it.getProp("nosender"));
std::string nconfname = conf->getArg2Param("--" + prefix + "-nodes-confnode", it.getProp("nodesConfNode"), "nodes");
xmlNode* nodes = 0;
if( nconfname == "nodes" )
nodes = conf->getXMLNodesSection();
else
{
auto xml = conf->getConfXML();
nodes = conf->findNode(xml->getFirstNode(), nconfname);
}
unetinfo << myname << "(init): init from <" << nconfname << ">" << endl;
if( !nodes )
throw uniset::SystemError("(UNetExchange): Not found confnode <" + nconfname + ">");
UniXML::iterator n_it(nodes);
string default_ip(n_it.getProp("unet_broadcast_ip"));
string default_ip2(n_it.getProp("unet_broadcast_ip2"));
if( !n_it.goChildren() )
throw uniset::SystemError("(UNetExchange): Items not found for <nodes>");
for( ; n_it.getCurrent(); n_it.goNext() )
{
if( n_it.getIntProp("unet_ignore") )
{
unetinfo << myname << "(init): unet_ignore.. for " << n_it.getProp("name") << endl;
continue;
}
// проверяем фильтры для подсетей
if( !uniset::check_filter(n_it, n_field, n_fvalue) )
continue;
// Если указано поле unet_broadcast_ip непосредственно у узла - берём его
// если не указано берём общий broadcast_ip
string h = { n_it.getProp2("unet_broadcast_ip", default_ip) };
string h2 = { n_it.getProp2("unet_broadcast_ip2", default_ip2) };
if( h.empty() )
{
ostringstream err;
err << myname << "(init): Unknown broadcast IP for " << n_it.getProp("name");
unetcrit << err.str() << endl;
throw uniset::SystemError(err.str());
}
if( h2.empty() )
unetinfo << myname << "(init): ip2 not used..." << endl;
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
int p = n_it.getPIntProp("unet_port", n_it.getIntProp("id"));
// по умолчанию порт на втором канале такой же как на первом (если не задан отдельно)
int p2 = n_it.getPIntProp("unet_port2", p);
string n(n_it.getProp("name"));
if( n == conf->getLocalNodeName() )
{
if( no_sender )
{
unetinfo << myname << "(init): sender OFF for this node...("
<< n_it.getProp("name") << ")" << endl;
continue;
}
unetinfo << myname << "(init): init sender.. my node " << n_it.getProp("name") << endl;
sender = make_shared<UNetSender>(h, p, shm, false, s_field, s_fvalue, "unet", prefix);
sender->setSendPause(sendpause);
sender->setPackSendPause(packsendpause);
sender->setPackSendPauseFactor(packsendpauseFactor);
sender->setCheckConnectionPause(checkConnectionPause);
loga->add(sender->getLog());
try
{
// создаём "писателя" для второго канала если задан
if( !h2.empty() )
{
unetinfo << myname << "(init): init sender2.. my node " << n_it.getProp("name") << endl;
sender2 = make_shared<UNetSender>(h2, p2, shm, false, s_field, s_fvalue, prefix);
sender2->setSendPause(sendpause);
sender2->setCheckConnectionPause(checkConnectionPause);
loga->add(sender2->getLog());
}
}
catch(...)
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
sender2 = 0;
unetcrit << myname << "(ignore): DON`T CREATE 'UNetSender' for " << h2 << ":" << p2 << endl;
}
continue;
}
unetinfo << myname << "(init): add UNetReceiver for " << h << ":" << p << endl;
if( checkExistUNetHost(h, p) )
{
unetinfo << myname << "(init): " << h << ":" << p << " already added! Ignore.." << endl;
continue;
}
bool resp_invert = n_it.getIntProp("unet_respond_invert");
string s_resp_id(n_it.getProp("unet_respond1_id"));
uniset::ObjectId resp_id = uniset::DefaultObjectId;
if( !s_resp_id.empty() )
{
resp_id = conf->getSensorID(s_resp_id);
if( resp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID.. Not found id for '" << s_resp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_resp2_id(n_it.getProp("unet_respond2_id"));
uniset::ObjectId resp2_id = uniset::DefaultObjectId;
if( !s_resp2_id.empty() )
{
resp2_id = conf->getSensorID(s_resp2_id);
if( resp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(2).. Not found id for '" << s_resp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_id(n_it.getProp("unet_lostpackets1_id"));
uniset::ObjectId lp_id = uniset::DefaultObjectId;
if( !s_lp_id.empty() )
{
lp_id = conf->getSensorID(s_lp_id);
if( lp_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID.. Not found id for '" << s_lp_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp2_id(n_it.getProp("unet_lostpackets2_id"));
uniset::ObjectId lp2_id = uniset::DefaultObjectId;
if( !s_lp2_id.empty() )
{
lp2_id = conf->getSensorID(s_lp2_id);
if( lp2_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(2).. Not found id for '" << s_lp2_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_lp_comm_id(n_it.getProp("unet_lostpackets_id"));
uniset::ObjectId lp_comm_id = uniset::DefaultObjectId;
if( !s_lp_comm_id.empty() )
{
lp_comm_id = conf->getSensorID(s_lp_comm_id);
if( lp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown LostPacketsID(comm).. Not found id for '" << s_lp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_resp_comm_id(n_it.getProp("unet_respond_id"));
uniset::ObjectId resp_comm_id = uniset::DefaultObjectId;
if( !s_resp_comm_id.empty() )
{
resp_comm_id = conf->getSensorID(s_resp_comm_id);
if( resp_comm_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown RespondID(comm).. Not found id for '" << s_resp_comm_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_numchannel_id(n_it.getProp("unet_numchannel_id"));
uniset::ObjectId numchannel_id = uniset::DefaultObjectId;
if( !s_numchannel_id.empty() )
{
numchannel_id = conf->getSensorID(s_numchannel_id);
if( numchannel_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown NumChannelID.. Not found id for '" << s_numchannel_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
string s_channelSwitchCount_id(n_it.getProp("unet_channelswitchcount_id"));
uniset::ObjectId channelswitchcount_id = uniset::DefaultObjectId;
if( !s_channelSwitchCount_id.empty() )
{
channelswitchcount_id = conf->getSensorID(s_channelSwitchCount_id);
if( channelswitchcount_id == uniset::DefaultObjectId )
{
ostringstream err;
err << myname << ": " << n_it.getProp("name") << " : Unknown ChannelSwitchCountID.. Not found id for '" << channelswitchcount_id << "'" << endl;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm, false, prefix);
loga->add(r->getLog());
// на всякий принудительно разблокируем,
// чтобы не зависеть от значения по умолчанию
r->setLockUpdate(false);
r->setReceiveTimeout(recvTimeout);
r->setPrepareTime(prepareTime);
r->setEvrunTimeout(evrunTimeout);
r->setLostTimeout(lostTimeout);
r->setUpdatePause(updatepause);
r->setCheckConnectionPause(checkConnectionPause);
r->setInitPause(initpause);
r->setMaxDifferens(maxDiff);
r->setRespondID(resp_id, resp_invert);
r->setLostPacketsID(lp_id);
r->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r->setBufferSize(recvBufferSize);
shared_ptr<UNetReceiver> r2(nullptr);
try
{
if( !h2.empty() ) // создаём читателя впо второму каналу
{
unetinfo << myname << "(init): (node='" << n << "') add reserv receiver "
<< h2 << ":" << p2 << endl;
r2 = make_shared<UNetReceiver>(h2, p2, shm, false, prefix);
loga->add(r2->getLog());
// т.к. это резервный канал (по началу блокируем его)
r2->setLockUpdate(true);
r2->setReceiveTimeout(recvTimeout);
r2->setPrepareTime(prepareTime);
r2->setEvrunTimeout(evrunTimeout);
r2->setLostTimeout(lostTimeout);
r2->setUpdatePause(updatepause);
r2->setCheckConnectionPause(checkConnectionPause);
r2->setInitPause(initpause);
r2->setMaxDifferens(maxDiff);
r2->setRespondID(resp2_id, resp_invert);
r2->setLostPacketsID(lp2_id);
r2->connectEvent( sigc::mem_fun(this, &UNetExchange::receiverEvent) );
r2->setBufferSize(recvBufferSize);
}
}
catch(...)
{
// т.е. это "резервный канал", то игнорируем ошибку его создания
// при запуске "интерфейс" может быть и не доступен...
r2 = 0;
unetcrit << myname << "(ignore): DON`T CREATE 'UNetReceiver' for " << h2 << ":" << p2 << endl;
}
ReceiverInfo ri(r, r2);
ri.setRespondID(resp_comm_id, resp_invert);
ri.setLostPacketsID(lp_comm_id);
ri.setChannelNumID(numchannel_id);
ri.setChannelSwitchCountID(channelswitchcount_id);
recvlist.emplace_back( std::move(ri) );
}
// -------------------------------
// ********** HEARTBEAT *************
string heart = conf->getArgParam("--" + prefix + "-heartbeat-id", it.getProp("heartbeat_id"));
if( !heart.empty() )
{
sidHeartBeat = conf->getSensorID(heart);
if( sidHeartBeat == DefaultObjectId )
{
ostringstream err;
err << myname << ": не найден идентификатор для датчика 'HeartBeat' " << heart;
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
int heartbeatTime = conf->getArgPInt("--" + prefix + "-heartbeat-time", it.getProp("heartbeatTime"), conf->getHeartBeatTime());
if( heartbeatTime )
ptHeartBeat.setTiming(heartbeatTime);
else
ptHeartBeat.setTiming(UniSetTimer::WaitUpTime);
maxHeartBeat = conf->getArgPInt("--" + prefix + "-heartbeat-max", it.getProp("heartbeat_max"), 10);
test_id = sidHeartBeat;
}
else
{
test_id = conf->getSensorID("TestMode_S");
if( test_id == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): test_id unknown. 'TestMode_S' not found...";
unetcrit << myname << "(init): " << err.str() << endl;
throw SystemError(err.str());
}
}
unetinfo << myname << "(init): test_id=" << test_id << endl;
activateTimeout = conf->getArgPInt("--" + prefix + "-activate-timeout", 20000);
if( ic )
ic->logAgregator()->add(loga);
vmonit(s_field);
vmonit(s_fvalue);
vmonit(maxHeartBeat);
=======
if( findArgParam("--" + prefix + "-run-logserver", conf->getArgc(), conf->getArgv()) != -1 )
{
logserv_host = conf->getArg2Param("--" + prefix + "-logserver-host", it.getProp("logserverHost"), "localhost");
......@@ -864,7 +467,6 @@ UNetExchange::UNetExchange(uniset::ObjectId objId, uniset::ObjectId shmId, const
vmonit(s_field);
vmonit(s_fvalue);
vmonit(maxHeartBeat);
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
UNetExchange::~UNetExchange()
......@@ -932,26 +534,6 @@ void UNetExchange::timerInfo( const TimerMessage* tm )
// -----------------------------------------------------------------------------
void UNetExchange::step() noexcept
{
<<<<<<< HEAD
if( !activated )
return;
if( sidHeartBeat != DefaultObjectId && ptHeartBeat.checkTime() )
{
try
{
shm->localSetValue(itHeartBeat, sidHeartBeat, maxHeartBeat, getId());
ptHeartBeat.reset();
}
catch( const std::exception& ex )
{
unetcrit << myname << "(step): (hb) " << ex.what() << std::endl;
}
}
for( auto&& it : recvlist )
it.step(shm, myname, unetlog);
=======
if( !activated )
return;
......@@ -970,7 +552,6 @@ void UNetExchange::step() noexcept
for( auto&& it : recvlist )
it.step(shm, myname, unetlog);
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
......@@ -1176,19 +757,11 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
// ------------------------------------------------------------------------------------------
void UNetExchange::sensorInfo( const uniset::SensorMessage* sm )
{
<<<<<<< HEAD
if( sender )
sender->updateSensor( sm->id, sm->value );
if( sender2 )
sender2->updateSensor( sm->id, sm->value );
=======
if( sender )
sender->updateSensor( sm->id, sm->value );
if( sender2 )
sender2->updateSensor( sm->id, sm->value );
>>>>>>> 2.9.1-alt1
}
// ------------------------------------------------------------------------------------------
bool UNetExchange::activateObject()
......@@ -1269,51 +842,12 @@ void UNetExchange::initIterators() noexcept
if( sender2 )
sender2->initIterators();
<<<<<<< HEAD
for( auto&& it : recvlist )
it.initIterators(shm);
=======
for( auto&& it : recvlist )
it.initIterators(shm);
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
{
<<<<<<< HEAD
cout << "Default prefix='unet'" << endl;
cout << "--prefix-name NameID - Идентификтора процесса." << endl;
cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
cout << "--prefix-prepare-time msec - Время необходимое на подготовку (восстановление связи) при переключении на другой канал" << endl;
cout << "--prefix-lost-timeout msec - Время ожидания заполнения 'дырки' между пакетами. По умолчанию 5000 мсек." << endl;
cout << "--prefix-sendpause msec - Пауза между посылками. По умолчанию 100" << endl;
cout << "--prefix-updatepause msec - Пауза между обновлением информации в SM (Корелирует с sendpause). По умолчанию 100" << endl;
cout << "--prefix-steptime msec - Пауза между обновлением информации о связи с узлами." << endl;
cout << "--prefix-checkconnection-pause msec - Пауза между попытками открыть соединение (если это не удалось до этого). По умолчанию: 10000 (10 сек)" << endl;
cout << "--prefix-maxdifferense num - Маскимальная разница в номерах пакетов для фиксации события 'потеря пакетов' " << endl;
cout << "--prefix-nosender [0,1] - Отключить посылку." << endl;
cout << "--prefix-recv-buffer-size sz - Размер циклического буфера для приёма сообщений. По умолчанию: 100" << endl;
cout << "--prefix-sm-ready-timeout msec - Время ожидание я готовности SM к работе. По умолчанию 120000" << endl;
cout << "--prefix-filter-field name - Название фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << "--prefix-filter-value name - Значение фильтрующего поля при формировании списка датчиков посылаемых данным узлом" << endl;
cout << endl;
cout << "--prefix-nodes-confnode name - <Узел> откуда считывается список узлов. Default: <nodes>" << endl;
cout << "--prefix-nodes-filter-field name - Фильтрующее поле для списка узлов" << endl;
cout << "--prefix-nodes-filter-value name - Значение фильтрующего поля для списка узлов" << endl;
cout << endl;
cout << " Logs: " << endl;
cout << "--prefix-log-... - log control" << endl;
cout << " add-levels ..." << endl;
cout << " del-levels ..." << endl;
cout << " set-levels ..." << endl;
cout << " logfile filaname" << endl;
cout << " no-debug " << endl;
cout << " LogServer: " << endl;
cout << "--prefix-run-logserver - run logserver. Default: localhost:id" << endl;
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
cout << LogServer::help_print("prefix-logserver") << endl;
=======
cout << "Default prefix='unet'" << endl;
cout << "--prefix-name NameID - Идентификтора процесса." << endl;
cout << "--prefix-recv-timeout msec - Время для фиксации события 'отсутсвие связи'" << endl;
......@@ -1352,7 +886,6 @@ void UNetExchange::help_print( int argc, const char* argv[] ) noexcept
cout << "--prefix-logserver-host ip - listen ip. Default: localhost" << endl;
cout << "--prefix-logserver-port num - listen port. Default: ID" << endl;
cout << LogServer::help_print("prefix-logserver") << endl;
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const char* const argv[], uniset::ObjectId icID,
......@@ -1384,100 +917,6 @@ std::shared_ptr<UNetExchange> UNetExchange::init_unetexchange(int argc, const ch
// -----------------------------------------------------------------------------
void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept
{
<<<<<<< HEAD
for( auto&& it : recvlist )
{
if( it.r1 == r )
{
if( ev == UNetReceiver::evTimeout )
{
// если нет второго канала или нет связи
// то и переключать не надо
if( !it.r2 || !it.r2->isRecvOK() )
return;
// пропала связь по первому каналу...
// переключаемся на второй
it.r1->setLockUpdate(true);
it.r2->setLockUpdate(false);
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": timeout for channel1.. select channel 2" << endl;
}
else if( ev == UNetReceiver::evOK )
{
// если связь восстановилась..
// проверяем, а что там со вторым каналом
// если у него связи нет, то забираем себе..
if( !it.r2 || !it.r2->isRecvOK() )
{
it.r1->setLockUpdate(false);
if( it.r2 )
it.r2->setLockUpdate(true);
// если какой-то канал уже работал
// то увеличиваем счётчик переключений
// а если ещё не работал, значит это просто первое включение канала
// а не переключение
if( it.channelSwitchCount > 0 )
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": link failed for channel2.. select again channel1.." << endl;
}
}
return;
}
if( it.r2 == r )
{
if( ev == UNetReceiver::evTimeout )
{
// если первого канала нет или нет связи
// то и переключать не надо
if( !it.r1 || !it.r1->isRecvOK() )
return;
// пропала связь по второму каналу...
// переключаемся на первый
it.r1->setLockUpdate(false);
it.r2->setLockUpdate(true);
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": timeout for channel2.. select channel 1" << endl;
}
else if( ev == UNetReceiver::evOK )
{
// если связь восстановилась..
// проверяем, а что там со первым каналом
// если у него связи нет, то забираем себе..
if( !it.r1 || !it.r1->isRecvOK() )
{
if( it.r1 )
it.r1->setLockUpdate(true);
it.r2->setLockUpdate(false);
// если какой-то канал уже работал
// то увеличиваем счётчик переключений
// а если ещё не работал, значит это просто первое включение канала
// а не переключение
if( it.channelSwitchCount > 0 )
it.channelSwitchCount++;
dlog8 << myname << "(event): " << r->getName()
<< ": link failed for channel1.. select again channel2.." << endl;
}
}
return;
}
}
=======
for( auto&& it : recvlist )
{
if( it.r1 == r )
......@@ -1570,7 +1009,6 @@ void UNetExchange::receiverEvent( const shared_ptr<UNetReceiver>& r, UNetReceive
return;
}
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
uniset::SimpleInfo* UNetExchange::getInfo( const char* userparam )
......
......@@ -41,266 +41,6 @@
// --------------------------------------------------------------------------
namespace uniset
{
<<<<<<< HEAD
// -----------------------------------------------------------------------------
/*!
\page pageUNetExchangeUDP Сетевой обмен на основе UDP (UNetUDP)
- \ref pgUNetUDP_Common
- \ref pgUNetUDP_Conf
- \ref pgUNetUDP_Reserv
- \ref pgUNetUDP_SendFactor
- \ref pgUNetUDP_Stat
\section pgUNetUDP_Common Общее описание
Обмен построен на основе протокола UDP.
Основная идея заключается в том, что каждый узел на порту равном своему ID
посылает в сеть UDP-пакеты содержащие данные считанные из локальной SM. Формат данных - это набор
пар [id,value]. Другие узлы принимают их. Помимо этого данный процесс запускает
"получателей" по одному на каждый (другой) узел и ловит пакеты от них, сохраняя данные в SM.
При этом "получатели" работают на одном(!) потоке с использованием событий libev (см. UNetReceiver).
\par
При своём старте процесс считывает из секции \<nodes> список узлов которые необходимо "слушать",
а также параметры своего узла. Открывает по потоку приёма на каждый узел и поток
передачи для своих данных. Помимо этого такие же потоки для резервных каналов, если они включены
(см. \ref pgUNetUDP_Reserv ).
\section pgUNetUDP_Conf Пример конфигурирования
По умолчанию при считывании используется \b unet_broadcast_ip (указанный в секции \<nodes>)
и \b id узла - в качестве порта.
Но можно переопределять эти параметры, при помощи указания \b unet_port и/или \b unet_broadcast_ip,
для конкретного узла (\<item>).
\code
<nodes port="2809" unet_broadcast_ip="192.168.56.255">
<item ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="1" unet_port="3000" unet_broadcast_ip="192.168.57.255">
<iocards>
...
</iocards>
</item>
<item ip="192.168.56.10" name="Node1" textname="Node1" unet_port="3001"/>
<item ip="192.168.56.11" name="Node2" textname="Node2" unet_port="3002"/>
</nodes>
\endcode
Буфер для приёма сообщений можно настроить параметром \b recvBufferSize="1000" в конфигурационной секции
или аргументом командной строки \b --prefix-recv-buffer-size sz
\note Имеется возможность задавать отдельную настроечную секцию для "списка узлов" при помощи параметра
--prefix-nodes-confnode name. По умолчанию настройка ведётся по секции <nodes>
\section pgUNetUDP_Reserv Настройка резервного канала связи
В текущей реализации поддерживается возможность обмена по двум подсетям (Ethernet-каналам).
Она основана на том, что, для каждого узла помимо основного "читателя",
создаётся дополнительный "читатель"(поток) слушающий другой ip-адрес и порт.
А так же, для локального узла создаётся дополнительный "писатель"(поток),
который посылает данные в (указанную) вторую подсеть. Для того, чтобы задействовать
второй канал, достаточно объявить в настройках переменные
\b unet_broadcast_ip2. А также в случае необходимости для конкретного узла
можно указать \b unet_broadcast_ip2 и \b unet_port2.
Переключение между "каналами" происходит по следующей логике:
При старте включается только первый канал. Второй канал работает в режиме "пассивного" чтения.
Т.е. все пакеты принимаются, но данные в SharedMemory не сохраняются.
Если во время работы пропадает связь по первому каналу, идёт переключение на второй канал.
Первый канал переводиться в "пассивный" режим, а второй канал, переводится в "нормальный"(активный)
режим. Далее работа ведётся по второму каналу, независимо от того, что связь на первом
канале может восстановиться. Это сделано для защиты от постоянных перескакиваний
с канала на канал. Работа на втором канале будет вестись, пока не пропадёт связь
на нём. Тогда будет попытка переключиться обратно на первый канал и так "по кругу".
В свою очередь "писатели"(если они не отключены) всегда посылают данные в оба канала.
\section pgUNetUDP_SendFactor Регулирование частоты посылки
В текущей реализации поддерживается механизм, позволяющий регулировать частоту посылки данных
для каждого датчика. Суть механизма заключается в том, что для каждого датчика можно задать свойство
- \b prefix_sendfactor="N" Где N>1 - задаёт "делитель" относительно \b sendpause определяющий с какой частотой
информация о данном датчике будет посылаться. Например N=2 - каждый второй цикл, N=3 - каждый третий и т.п.
При загрузке все датчики (относящиеся к данному процессу) разбиваются на группы пакетов согласно своей частоте посылки.
При этом внутри одной группы датчики разбиваются по пакетам согласно заданному максимальному размеру пакета
(см. конструктор класса UNetSender()).
\section pgUNetUDP_PackSendPause Пауза между посылкой пакетов
Параметр \b --prefix-packsendpause или \b packsendpause в настаройках позволяет задать паузу (в миллисекундах)
между посылками пакетов. Если итоговых пакетов с данными больше чем 1.
При этом параметр \b --prefix-packsendpause-factor или \b packsendpauseFactor позволяет указать,
что необходимо делать паузы не между каждым пакетом, а через каждый N пакет.
По умолчанию \b packsendpause=5 миллисекунд.
\section pgUNetUDP_Stat Статистика работы канала
Для возможности мониторинга работы имеются счётчики, которые можно привязать к датчикам,
задав их для соответствующего узла в секции '<nodes>' конфигурационного файла.
- unet_lostpackets_id="" - общее количество потерянных пакетов с данным узлом (суммарно по обоим каналам)
- unet_lostpackets1_id="" - количество потерянных пакетов с данным узлом по первому каналу
- unet_lostpackets2_id="" - количество потерянных пакетов с данным узлом по второму каналу
- unet_respond_id="" - наличие связи хотя бы по одному каналу
- unet_respond1_id - наличие связи по первому каналу
- unet_respond2_id - наличие связи по второму каналу
- unet_numchannel_id="" - номер текущего активного канала
- unet_channelswitchcount_id="" - количество переключений с канала на канал
*/
// -----------------------------------------------------------------------------
class UNetExchange:
public UniSetObject
{
public:
UNetExchange( uniset::ObjectId objId, uniset::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr, const std::string& prefix = "unet" );
virtual ~UNetExchange();
/*! глобальная функция для инициализации объекта */
static std::shared_ptr<UNetExchange> init_unetexchange( int argc, const char* const argv[],
uniset::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = 0, const std::string& prefix = "unet" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* argv[] ) noexcept;
bool checkExistUNetHost( const std::string& host, int port ) noexcept;
inline std::shared_ptr<LogAgregator> getLogAggregator() noexcept
{
return loga;
}
inline std::shared_ptr<DebugStream> log() noexcept
{
return unetlog;
}
virtual uniset::SimpleInfo* getInfo( const char* userparam = 0 ) override;
protected:
xmlNode* cnode;
std::string s_field;
std::string s_fvalue;
std::shared_ptr<SMInterface> shm;
void step() noexcept;
void sysCommand( const uniset::SystemMessage* msg ) override;
void sensorInfo( const uniset::SensorMessage* sm ) override;
void timerInfo( const uniset::TimerMessage* tm ) override;
void askSensors( UniversalIO::UIOCommand cmd );
bool waitSMReady();
void receiverEvent( const std::shared_ptr<UNetReceiver>& r, UNetReceiver::Event ev ) noexcept;
virtual bool activateObject() override;
virtual bool deactivateObject() override;
// действия при завершении работы
void termSenders();
void termReceivers();
void initIterators() noexcept;
void startReceivers();
enum Timer
{
tmStep
};
private:
UNetExchange();
timeout_t initPause = { 0 };
uniset::uniset_rwmutex mutex_start;
PassiveTimer ptHeartBeat;
uniset::ObjectId sidHeartBeat = { uniset::DefaultObjectId };
timeout_t maxHeartBeat = { 10 };
IOController::IOStateList::iterator itHeartBeat;
uniset::ObjectId test_id = { uniset::DefaultObjectId };
timeout_t steptime = { 1000 }; /*!< периодичность вызова step, [мсек] */
std::atomic_bool activated = { false };
std::atomic_bool cancelled = { false };
timeout_t activateTimeout = { 20000 }; // msec
struct ReceiverInfo
{
ReceiverInfo() noexcept: r1(nullptr), r2(nullptr),
sidRespond(uniset::DefaultObjectId),
respondInvert(false),
sidLostPackets(uniset::DefaultObjectId),
sidChannelNum(uniset::DefaultObjectId)
{}
ReceiverInfo( const std::shared_ptr<UNetReceiver>& _r1, const std::shared_ptr<UNetReceiver>& _r2 ) noexcept:
r1(_r1), r2(_r2),
sidRespond(uniset::DefaultObjectId),
respondInvert(false),
sidLostPackets(uniset::DefaultObjectId),
sidChannelNum(uniset::DefaultObjectId)
{}
std::shared_ptr<UNetReceiver> r1; /*!< приём по первому каналу */
std::shared_ptr<UNetReceiver> r2; /*!< приём по второму каналу */
void step(const std::shared_ptr<SMInterface>& shm, const std::string& myname, std::shared_ptr<DebugStream>& log ) noexcept;
inline void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept
{
sidRespond = id;
respondInvert = invert;
}
inline void setLostPacketsID( uniset::ObjectId id ) noexcept
{
sidLostPackets = id;
}
inline void setChannelNumID( uniset::ObjectId id ) noexcept
{
sidChannelNum = id;
}
inline void setChannelSwitchCountID( uniset::ObjectId id ) noexcept
{
sidChannelSwitchCount = id;
}
inline void initIterators( const std::shared_ptr<SMInterface>& shm ) noexcept
{
shm->initIterator(itLostPackets);
shm->initIterator(itRespond);
shm->initIterator(itChannelNum);
shm->initIterator(itChannelSwitchCount);
}
// Сводная информация по двум каналам
// сумма потерянных пакетов и наличие связи
// хотя бы по одному каналу, номер рабочего канала
// количество переключений с канала на канал
// ( реализацию см. ReceiverInfo::step() )
uniset::ObjectId sidRespond;
IOController::IOStateList::iterator itRespond;
bool respondInvert = { false };
uniset::ObjectId sidLostPackets;
IOController::IOStateList::iterator itLostPackets;
uniset::ObjectId sidChannelNum;
IOController::IOStateList::iterator itChannelNum;
long channelSwitchCount = { 0 }; /*!< счётчик переключений с канала на канал */
uniset::ObjectId sidChannelSwitchCount = { uniset::DefaultObjectId };
IOController::IOStateList::iterator itChannelSwitchCount;
};
typedef std::deque<ReceiverInfo> ReceiverList;
ReceiverList recvlist;
bool no_sender = { false }; /*!< флаг отключения посылки сообщений (создания потока для посылки)*/
std::shared_ptr<UNetSender> sender;
std::shared_ptr<UNetSender> sender2;
std::shared_ptr<LogAgregator> loga;
std::shared_ptr<DebugStream> unetlog;
std::shared_ptr<LogServer> logserv;
std::string logserv_host = {""};
int logserv_port = {0};
VMonitor vmon;
};
// --------------------------------------------------------------------------
=======
// -----------------------------------------------------------------------------
/*!
\page pageUNetExchangeUDP Сетевой обмен на основе UDP (UNetUDP)
......@@ -563,7 +303,6 @@ namespace uniset
VMonitor vmon;
};
// --------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
} // end of namespace uniset
// -----------------------------------------------------------------------------
#endif // UNetExchange_H_
......
......@@ -15,7 +15,6 @@
*/
// -------------------------------------------------------------------------
#include <sstream>
#include <cmath>
#include <iomanip>
#include <Poco/Net/NetException.h>
#include "unisetstd.h"
......@@ -30,16 +29,18 @@ using namespace uniset::extensions;
// -----------------------------------------------------------------------------
CommonEventLoop UNetReceiver::loop;
// -----------------------------------------------------------------------------
/*
bool UNetReceiver::PacketCompare::operator()(const UniSetUDP::UDPMessage& lhs,
const UniSetUDP::UDPMessage& rhs) const
{
// if( lhs.num == rhs.num )
// return (lhs < rhs);
return lhs.num > rhs.num;
}
*/
// ------------------------------------------------------------------------------------------
UNetReceiver::UNetReceiver(const std::string& s_host, int _port
<<<<<<< HEAD
, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection
, const std::string& prefix ):
shm(smi),
port(_port),
saddr(s_host, _port),
cbuf(cbufSize)
=======
, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection
, const std::string& prefix ):
......@@ -64,7 +65,6 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
lockUpdate(false),
d_cache_init_ok(false),
a_cache_init_ok(false)
>>>>>>> 2.9.1-alt1
{
{
ostringstream s;
......@@ -83,17 +83,6 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
auto conf = uniset_conf();
conf->initLogStream(unetlog, prefix + "-log");
<<<<<<< HEAD
if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
ptLostTimeout.setTiming(lostTimeout);
ptRecvTimeout.setTiming(recvTimeout);
=======
upThread = unisetstd::make_unique< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::updateThread);
if( !createConnection(nocheckConnection /* <-- это флаг throwEx */) )
......@@ -102,22 +91,12 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
evInitPause.set<UNetReceiver, &UNetReceiver::initEvent>(this);
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
{
}
// -----------------------------------------------------------------------------
void UNetReceiver::setBufferSize( size_t sz ) noexcept
{
if( sz > 0 )
{
cbufSize = sz;
cbuf.resize(sz);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec ) noexcept
{
std::lock_guard<std::mutex> l(tmMutex);
......@@ -145,24 +124,15 @@ void UNetReceiver::setLostTimeout( timeout_t msec ) noexcept
ptLostTimeout.setTiming(msec);
}
// -----------------------------------------------------------------------------
<<<<<<< HEAD
=======
void UNetReceiver::setReceivePause( timeout_t msec ) noexcept
{
recvpause = msec;
}
// -----------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
void UNetReceiver::setUpdatePause( timeout_t msec ) noexcept
{
updatepause = msec;
<<<<<<< HEAD
if( evUpdate.is_active() )
evUpdate.start(0, (float)updatepause / 1000.);
}
// -----------------------------------------------------------------------------
=======
if( upStrategy == useUpdateEventLoop && evUpdate.is_active() )
evUpdate.start(0, (float)updatepause / 1000.);
}
......@@ -172,7 +142,6 @@ void UNetReceiver::setMaxProcessingCount( int set ) noexcept
maxProcessingCount = set;
}
// -----------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
void UNetReceiver::setMaxDifferens( unsigned long set ) noexcept
{
maxDifferens = set;
......@@ -203,12 +172,8 @@ void UNetReceiver::setLostPacketsID( uniset::ObjectId id ) noexcept
// -----------------------------------------------------------------------------
void UNetReceiver::setLockUpdate( bool st ) noexcept
{
<<<<<<< HEAD
lockUpdate = st;
=======
lockUpdate = st;
>>>>>>> 2.9.1-alt1
if( !st )
ptPrepare.reset();
......@@ -243,49 +208,6 @@ size_t UNetReceiver::getLostPacketsNum() const noexcept
// -----------------------------------------------------------------------------
bool UNetReceiver::createConnection( bool throwEx )
{
<<<<<<< HEAD
if( !activated )
return false;
try
{
udp = unisetstd::make_unique<UDPReceiveU>(addr, port);
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
if( evCheckConnection.is_active() )
evCheckConnection.stop();
ptRecvTimeout.setTiming(recvTimeout);
ptPrepare.setTiming(prepareTime);
evprepare(loop.evloop());
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
catch( ... )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
return ( udp != nullptr );
=======
if( !activated )
return false;
......@@ -331,28 +253,10 @@ bool UNetReceiver::createConnection( bool throwEx )
}
return ( udp != nullptr );
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetReceiver::start()
{
<<<<<<< HEAD
unetinfo << myname << ":... start... " << endl;
if( !activated )
{
activated = true;
if( !loop.async_evrun(this, evrunTimeout) )
{
unetcrit << myname << "(start): evrun FAILED! (timeout=" << evrunTimeout << " msec)" << endl;
std::terminate();
return;
}
}
else
forceUpdate();
=======
unetinfo << myname << ":... start... " << endl;
if( !activated )
......@@ -371,32 +275,10 @@ void UNetReceiver::start()
}
else
forceUpdate();
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
{
<<<<<<< HEAD
evStatistic.set(eloop);
evStatistic.start(0, 1.0); // раз в сек
evInitPause.set(eloop);
evUpdate.set(eloop);
evUpdate.start( 0, ((float)updatepause / 1000.) );
if( !udp )
{
evCheckConnection.set(eloop);
evCheckConnection.start(0, checkConnectionTime);
evInitPause.stop();
}
else
{
evReceive.set(eloop);
evReceive.start(udp->getSocket(), ev::READ);
evInitPause.start(0);
}
=======
evStatistic.set(eloop);
evStatistic.start(0, 1.0); // раз в сек
......@@ -420,7 +302,6 @@ void UNetReceiver::evprepare( const ev::loop_ref& eloop ) noexcept
evReceive.start(udp->getSocket(), ev::READ);
evInitPause.start(0);
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
......@@ -449,15 +330,9 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop ) noexcept
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate() noexcept
{
<<<<<<< HEAD
// сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обработать заново последний пакет и обновить данные в SM (см. update)
rnum = wnum - 1;
=======
pack_guard l(packMutex, upStrategy);
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. update)
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetReceiver::statisticsEvent(ev::periodic& tm, int revents) noexcept
......@@ -493,147 +368,8 @@ void UNetReceiver::initEvent( ev::timer& tmr, int revents ) noexcept
tmr.stop();
}
// -----------------------------------------------------------------------------
size_t UNetReceiver::rnext( size_t num )
{
UniSetUDP::UDPMessage* p;
size_t i = num + 1;
while( i < wnum )
{
p = &cbuf[i % cbufSize];
if( p->header.num > num )
return i;
i++;
}
return wnum;
}
// -----------------------------------------------------------------------------
void UNetReceiver::update() noexcept
{
<<<<<<< HEAD
// ещё не было пакетов
if( wnum == 1 && rnum == 0 )
return;
UniSetUDP::UDPMessage* p;
CacheItem* c_it = nullptr;
UniSetUDP::UDPAData* dat = nullptr;
long s_id;
// обрабатываем, пока очередь либо не опустеет,
// либо обнаружится "дырка" в последовательности,
while( rnum < wnum )
{
p = &(cbuf[rnum % cbufSize]);
// если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
if( p->header.num != rnum )
{
if( !ptLostTimeout.checkTime() )
return;
size_t sub = 1;
if( p->header.num > rnum )
sub = (p->header.num - rnum);
unetwarn << myname << "(update): lostTimeout(" << ptLostTimeout.getInterval() << ")! pnum=" << p->header.num << " lost " << sub << " packets " << endl;
lostPackets += sub;
// ищем следующий пакет для обработки
rnum = rnext(rnum);
continue;
}
ptLostTimeout.reset();
rnum++;
upCount++;
// обновление данных в SM (блокировано)
if( lockUpdate )
continue;
// Обработка дискретных
auto d_iv = getDCache(p);
for( size_t i = 0; i < p->header.dcount; i++ )
{
try
{
s_id = p->dID(i);
c_it = &(*d_iv)[i];
if( c_it->id != s_id )
{
unetwarn << myname << "(update): reinit dcache for sid=" << s_id << endl;
c_it->id = s_id;
shm->initIterator(c_it->ioit);
}
shm->localSetValue(c_it->ioit, s_id, p->dValue(i), shm->ID());
}
catch( const uniset::Exception& ex)
{
unetcrit << myname << "(update): D:"
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: " << ex
<< std::endl;
}
catch(...)
{
unetcrit << myname << "(update): D:"
<< " id=" << s_id
<< " val=" << p->dValue(i)
<< " error: catch..."
<< std::endl;
}
}
// Обработка аналоговых
auto a_iv = getACache(p);
for( size_t i = 0; i < p->header.acount; i++ )
{
try
{
dat = &p->a_dat[i];
c_it = &(*a_iv)[i];
if( c_it->id != dat->id )
{
unetwarn << myname << "(update): reinit acache for sid=" << dat->id << endl;
c_it->id = dat->id;
shm->initIterator(c_it->ioit);
}
shm->localSetValue(c_it->ioit, dat->id, dat->val, shm->ID());
}
catch( const uniset::Exception& ex)
{
unetcrit << myname << "(update): A:"
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: " << ex
<< std::endl;
}
catch(...)
{
unetcrit << myname << "(update): A:"
<< " id=" << dat->id
<< " val=" << dat->val
<< " error: catch..."
<< std::endl;
}
}
}
}
// -----------------------------------------------------------------------------
=======
UniSetUDP::UDPMessage p;
// обрабатываем, пока очередь либо не опустеет,
// либо обнаружится "дырка" в последовательности,
......@@ -837,7 +573,6 @@ void UNetReceiver::updateThread() noexcept
}
}
// -----------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
void UNetReceiver::callback( ev::io& watcher, int revents ) noexcept
{
if( EV_ERROR & revents )
......@@ -976,105 +711,14 @@ void UNetReceiver::checkConnectionEvent( ev::periodic& tm, int revents ) noexcep
// -----------------------------------------------------------------------------
void UNetReceiver::stop()
{
<<<<<<< HEAD
unetinfo << myname << ": stop.." << endl;
activated = false;
loop.evstop(this);
=======
unetinfo << myname << ": stop.." << endl;
activated = false;
upThread->join();
loop.evstop(this);
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
bool UNetReceiver::receive() noexcept
{
<<<<<<< HEAD
try
{
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
pack = &(cbuf[wnum % cbufSize]);
ssize_t ret = udp->receiveBytes(pack, sizeof(UniSetUDP::UDPMessage));
if( ret < 0 )
{
unetcrit << myname << "(receive): recv err(" << errno << "): " << strerror(errno) << endl;
return false;
}
if( ret == 0 )
{
unetwarn << myname << "(receive): disconnected?!... recv 0 bytes.." << endl;
return false;
}
recvCount++;
// конвертируем byte order
pack->ntoh();
if( !pack->isOk() )
return false;
if( size_t(abs(long(pack->header.num - wnum))) > maxDifferens || size_t(abs( long(wnum - rnum) )) >= (cbufSize - 2) )
{
unetcrit << myname << "(receive): DISAGREE "
<< " packnum=" << pack->header.num
<< " wnum=" << wnum
<< " rnum=" << rnum
<< " (maxDiff=" << maxDifferens
<< " indexDiff=" << abs( long(wnum - rnum) )
<< ")"
<< endl;
lostPackets = pack->header.num > wnum ? (pack->header.num - wnum - 1) : lostPackets + 1;
// реинициализируем позицию для чтения
rnum = pack->header.num;
wnum = pack->header.num + 1;
// перемещаем пакет в нужное место (если требуется)
if( wnum != pack->header.num )
{
cbuf[pack->header.num % cbufSize] = (*pack);
pack->header.num = 0;
}
return true;
}
if( pack->header.num != wnum )
{
// перемещаем пакет в правильное место
// в соответствии с его номером
cbuf[pack->header.num % cbufSize] = (*pack);
if( pack->header.num >= wnum )
wnum = pack->header.num + 1;
// обнуляем номер в том месте где записали, чтобы его не обрабатывал update
pack->header.num = 0;
}
else if( pack->header.num >= wnum )
wnum = pack->header.num + 1;
// начальная инициализация для чтения
if( rnum == 0 )
rnum = pack->header.num;
return true;
}
catch( Poco::Net::NetException& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
}
catch( exception& ex )
{
unetcrit << myname << "(receive): recv err: " << ex.what() << endl;
}
return false;
=======
try
{
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data));
......@@ -1185,28 +829,10 @@ bool UNetReceiver::receive() noexcept
} // unlock qpack
return true;
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetReceiver::initIterators() noexcept
{
<<<<<<< HEAD
for( auto mit = d_icache_map.begin(); mit != d_icache_map.end(); ++mit )
{
CacheVec& d_icache = mit->second;
for( auto&& it : d_icache )
shm->initIterator(it.ioit);
}
for( auto mit = a_icache_map.begin(); mit != a_icache_map.end(); ++mit )
{
CacheVec& a_icache = mit->second;
for( auto&& it : a_icache )
shm->initIterator(it.ioit);
}
=======
for( auto mit = d_icache_map.begin(); mit != d_icache_map.end(); ++mit )
{
CacheVec& d_icache(mit->second.cache);
......@@ -1222,42 +848,10 @@ void UNetReceiver::initIterators() noexcept
for( auto&& it : a_icache )
shm->initIterator(it.ioit);
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) noexcept
void UNetReceiver::initDCache( UniSetUDP::UDPMessage& pack, bool force ) noexcept
{
<<<<<<< HEAD
auto dit = d_icache_map.find(pack->getDataID());
if( dit == d_icache_map.end() )
{
auto p = d_icache_map.emplace(pack->getDataID(), UNetReceiver::CacheVec());
dit = p.first;
}
CacheVec* d_info = &dit->second;
if( pack->header.dcount == d_info->size() )
return d_info;
unetinfo << myname << ": init dcache[" << pack->header.dcount << "] for " << pack->getDataID() << endl;
d_info->resize(pack->header.dcount);
for( size_t i = 0; i < pack->header.dcount; i++ )
{
CacheItem& d = (*d_info)[i];
if( d.id != pack->d_id[i] )
{
d.id = pack->d_id[i];
shm->initIterator(d.ioit);
}
}
return d_info;
=======
CacheInfo& d_info(d_icache_map[pack.getDataID()]);
if( !force && pack.dcount == d_info.cache.size() )
......@@ -1298,42 +892,10 @@ UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) n
shm->initIterator(d.ioit);
}
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) noexcept
void UNetReceiver::initACache( UniSetUDP::UDPMessage& pack, bool force ) noexcept
{
<<<<<<< HEAD
auto ait = a_icache_map.find(pack->getDataID());
if( ait == a_icache_map.end() )
{
auto p = a_icache_map.emplace(pack->getDataID(), UNetReceiver::CacheVec());
ait = p.first;
}
CacheVec* a_info = &ait->second;
if( pack->header.acount == a_info->size() )
return a_info;
unetinfo << myname << ": init acache[" << pack->header.acount << "] for " << pack->getDataID() << endl;
a_info->resize(pack->header.acount);
for( size_t i = 0; i < pack->header.acount; i++ )
{
CacheItem& d = (*a_info)[i];
if( d.id != pack->a_dat[i].id )
{
d.id = pack->a_dat[i].id;
shm->initIterator(d.ioit);
}
}
return a_info;
=======
CacheInfo& a_info(a_icache_map[pack.getDataID()]);
if( !force && pack.acount == a_info.cache.size() )
......@@ -1374,7 +936,6 @@ UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) n
shm->initIterator(d.ioit);
}
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept
......@@ -1382,35 +943,6 @@ void UNetReceiver::connectEvent( UNetReceiver::EventSlot sl ) noexcept
slEvent = sl;
}
// -----------------------------------------------------------------------------
<<<<<<< HEAD
const std::string UNetReceiver::getShortInfo() const noexcept
{
// warning: будет вызываться из другого потока
// (считаем что чтение безопасно)
ostringstream s;
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort()
<< "[ " << setw(7) << ( isLockUpdate() ? "PASSIVE" : "ACTIVE" ) << " ]"
<< " recvOK=" << isRecvOK()
<< " receivepack=" << rnum
<< " lostPackets=" << setw(6) << getLostPacketsNum()
<< endl
<< "\t["
<< " recvTimeout=" << setw(6) << recvTimeout
<< " prepareTime=" << setw(6) << prepareTime
<< " evrunTimeout=" << setw(6) << evrunTimeout
<< " lostTimeout=" << setw(6) << lostTimeout
<< " updatepause=" << setw(6) << updatepause
<< " maxDifferens=" << setw(6) << maxDifferens
<< " ]"
<< endl
<< "\t[ qsize=" << (wnum - rnum - 1) << " recv=" << statRecvPerSec << " update=" << statUpPerSec << " per sec ]";
return s.str();
}
// -----------------------------------------------------------------------------
=======
UNetReceiver::UpdateStrategy UNetReceiver::strToUpdateStrategy( const string& s ) noexcept
{
if( s == "thread" || s == "THREAD" )
......@@ -1499,4 +1031,3 @@ UNetReceiver::pack_guard::~pack_guard()
m.unlock();
}
// -----------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
......@@ -20,7 +20,7 @@
#include <ostream>
#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <unordered_map>
#include <sigc++/sigc++.h>
#include <ev++.h>
......@@ -35,249 +35,6 @@
// --------------------------------------------------------------------------
namespace uniset
{
<<<<<<< HEAD
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
* Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
* куда поместить пакет в буфере. Есть два индекса
* rnum - (read number) номер последнего обработанного пакета + 1
* wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
* WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
*
* При этом обработка ведётся по порядку (только пакеты идущие подряд)
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
* Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
* Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
* либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядковый номер данных в пакете является индексом в кэше.
* Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*
* КЭШ (ДОПОЛНЕНИЕ)
* ===
* Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
* map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
* Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
* В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
* т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
*
* Обработка сбоев в номере пакетов
* =========================================================================
* Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
* то считается, что произошёл сбой или узел который посылал пакеты перезагрузился
* Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
* реинициализация и обработка продолжается с нового номера.
*
* =========================================================================
* ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем.
*
* Создание соединения (открытие сокета)
* ======================================
* Попытка создать сокет производиться сразу в конструкторе, если это не получается,
* то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
* открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
* (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
* ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
* Если такая логика не требуется, то можно задать в конструкторе
* последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
*/
// -----------------------------------------------------------------------------
class UNetReceiver:
protected EvWatcher,
public std::enable_shared_from_this<UNetReceiver>
{
public:
UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection = false
, const std::string& prefix = "unet" );
virtual ~UNetReceiver();
void start();
void stop();
inline const std::string getName() const
{
return myname;
}
// блокировать сохранение данных в SM
void setLockUpdate( bool st ) noexcept;
bool isLockUpdate() const noexcept;
void resetTimeout() noexcept;
bool isInitOK() const noexcept;
bool isRecvOK() const noexcept;
size_t getLostPacketsNum() const noexcept;
void setReceiveTimeout( timeout_t msec ) noexcept;
void setUpdatePause( timeout_t msec ) noexcept;
void setLostTimeout( timeout_t msec ) noexcept;
void setPrepareTime( timeout_t msec ) noexcept;
void setCheckConnectionPause( timeout_t msec ) noexcept;
void setMaxDifferens( unsigned long set ) noexcept;
void setEvrunTimeout(timeout_t msec ) noexcept;
void setInitPause( timeout_t msec ) noexcept;
void setBufferSize( size_t sz ) noexcept;
void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
void setLostPacketsID( uniset::ObjectId id ) noexcept;
void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
inline std::string getAddress() const noexcept
{
return addr;
}
inline int getPort() const noexcept
{
return port;
}
/*! Коды событий */
enum Event
{
evOK, /*!< связь есть */
evTimeout /*!< потеря связи */
};
typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
void connectEvent( EventSlot sl ) noexcept;
// --------------------------------------------------------------------
inline std::shared_ptr<DebugStream> getLog()
{
return unetlog;
}
virtual const std::string getShortInfo() const noexcept;
protected:
const std::shared_ptr<SMInterface> shm;
std::shared_ptr<DebugStream> unetlog;
bool receive() noexcept;
void step() noexcept;
void update() noexcept;
void callback( ev::io& watcher, int revents ) noexcept;
void readEvent( ev::io& watcher ) noexcept;
void updateEvent( ev::periodic& watcher, int revents ) noexcept;
void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
void initEvent( ev::timer& watcher, int revents ) noexcept;
virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
virtual std::string wname() const noexcept override
{
return myname;
}
void initIterators() noexcept;
bool createConnection( bool throwEx = false );
void checkConnection();
size_t rnext( size_t num );
private:
UNetReceiver();
timeout_t updatepause = { 100 }; /*!< периодичность обновления данных в SM, [мсек] */
std::unique_ptr<UDPReceiveU> udp;
std::string addr;
int port = { 0 };
Poco::Net::SocketAddress saddr;
std::string myname;
ev::io evReceive;
ev::periodic evCheckConnection;
ev::periodic evStatistic;
ev::periodic evUpdate;
ev::timer evInitPause;
// счётчики для подсчёта статистики
size_t recvCount = { 0 };
size_t upCount = { 0 };
// текущая статистика
size_t statRecvPerSec = { 0 }; /*!< количество принимаемых пакетов в секунду */
size_t statUpPerSec = { 0 }; /*!< количество обработанных пакетов в секунду */
// делаем loop общим.. одним на всех!
static CommonEventLoop loop;
double checkConnectionTime = { 10.0 }; // sec
std::mutex checkConnMutex;
PassiveTimer ptRecvTimeout;
PassiveTimer ptPrepare;
timeout_t recvTimeout = { 5000 }; // msec
timeout_t prepareTime = { 2000 };
timeout_t evrunTimeout = { 15000 };
timeout_t lostTimeout = { 200 };
double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
std::atomic_bool initOK = { false };
PassiveTimer ptLostTimeout;
size_t lostPackets = { 0 }; /*!< счётчик потерянных пакетов */
uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
IOController::IOStateList::iterator itRespond;
bool respondInvert = { false };
uniset::ObjectId sidLostPackets = { uniset::DefaultObjectId };
IOController::IOStateList::iterator itLostPackets;
std::atomic_bool activated = { false };
size_t cbufSize = { 100 }; /*!< размер буфера для сообщений по умолчанию */
std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
size_t wnum = { 1 }; /*!< номер следующего ожидаемого пакета */
size_t rnum = { 0 }; /*!< номер последнего обработанного пакета */
UniSetUDP::UDPMessage* pack; // текущий обрабатываемый пакет
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
*/
size_t maxDifferens = { 20 };
std::atomic_bool lockUpdate = { false }; /*!< флаг блокировки сохранения принятых данных в SM */
EventSlot slEvent;
Trigger trTimeout;
std::mutex tmMutex;
struct CacheItem
{
long id = { uniset::DefaultObjectId };
IOController::IOStateList::iterator ioit;
CacheItem():
id(uniset::DefaultObjectId) {}
};
typedef std::vector<CacheItem> CacheVec;
// ключом является UDPMessage::getDataID()
typedef std::unordered_map<long, CacheVec> CacheMap;
CacheMap d_icache_map; /*!< кэш итераторов для булевых */
CacheMap a_icache_map; /*!< кэш итераторов для аналоговых */
CacheVec* getDCache( UniSetUDP::UDPMessage* pack ) noexcept;
CacheVec* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
};
// --------------------------------------------------------------------------
=======
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
......@@ -591,7 +348,6 @@ namespace uniset
void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
};
// --------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
} // end of namespace uniset
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
......
......@@ -25,293 +25,6 @@
// -------------------------------------------------------------------------
namespace uniset
{
<<<<<<< HEAD
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset::extensions;
// -----------------------------------------------------------------------------
UNetSender::UNetSender(const std::string& _host, const int _port, const std::shared_ptr<SMInterface>& smi,
bool nocheckConnection, const std::string& s_f, const std::string& s_val,
const std::string& s_prefix,
const std::string& prefix,
size_t maxDCount, size_t maxACount ):
s_field(s_f),
s_fvalue(s_val),
prop_prefix(s_prefix),
shm(smi),
port(_port),
s_host(_host),
saddr(_host, _port),
sendpause(150),
packsendpause(5),
packsendpauseFactor(1),
activated(false),
packetnum(1),
lastcrc(0),
maxAData(maxACount),
maxDData(maxDCount)
{
items.reserve(100);
{
ostringstream s;
s << "S(" << setw(15) << s_host << ":" << setw(4) << port << ")";
myname = s.str();
}
ostringstream logname;
logname << prefix << "-S-" << s_host << "-" << port;
unetlog = make_shared<DebugStream>();
unetlog->setLogName(logname.str());
auto conf = uniset_conf();
conf->initLogStream(unetlog, prefix + "-log");
unetinfo << myname << "(init): read filter-field='" << s_field
<< "' filter-value='" << s_fvalue << "'" << endl;
unetinfo << "(UNetSender): UDP set to " << s_host << ":" << port << endl;
addr = s_host.c_str();
ptCheckConnection.setTiming(10000); // default 10 сек
createConnection(nocheckConnection);
s_thr = unisetstd::make_unique< ThreadCreator<UNetSender> >(this, &UNetSender::send);
mypacks[0].resize(1);
packs_anum[0] = 0;
packs_dnum[0] = 0;
auto& mypack(mypacks[0][0]);
// выставляем поля, которые не меняются
{
uniset_rwmutex_wrlock l(mypack.mut);
mypack.msg.header.nodeID = uniset_conf()->getLocalNode();
mypack.msg.header.procID = shm->ID();
}
// -------------------------------
if( shm->isLocalwork() )
{
readConfiguration();
unetinfo << myname << "(init): dlist size = " << items.size() << endl;
}
else
{
auto ic = std::dynamic_pointer_cast<SharedMemory>(shm->SM());
if( ic )
ic->addReadItem( sigc::mem_fun(this, &UNetSender::readItem) );
else
{
unetwarn << myname << "(init): Failed to convert the pointer 'IONotifyController' -> 'SharedMemory'" << endl;
readConfiguration();
unetinfo << myname << "(init): dlist size = " << items.size() << endl;
}
}
}
// -----------------------------------------------------------------------------
UNetSender::~UNetSender()
{
}
// -----------------------------------------------------------------------------
bool UNetSender::createConnection( bool throwEx )
{
unetinfo << myname << "(createConnection): .." << endl;
try
{
//udp = make_shared<UDPSocketU>(addr, port);
udp = unisetstd::make_unique<UDPSocketU>();
udp->setBroadcast(true);
udp->setSendTimeout( UniSetTimer::millisecToPoco(writeTimeout) );
// udp->setNoDelay(true);
}
catch( const std::exception& e )
{
ostringstream s;
s << myname << "(createConnection): " << e.what();
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
catch( ... )
{
ostringstream s;
s << myname << "(createConnection): catch...";
unetcrit << s.str() << std::endl;
if( throwEx )
throw SystemError(s.str());
udp = nullptr;
}
return (udp != nullptr);
}
// -----------------------------------------------------------------------------
void UNetSender::updateFromSM()
{
for( auto&& it : items )
{
UItem& i = it.second;
try
{
long value = shm->localGetValue(i.ioit, i.id);
updateItem(i, value);
}
catch( IOController_i::Undefined& ex )
{
unetwarn << myname << "(updateFromSM): sid=" << i.id
<< " undefined state (value=" << ex.value << ")." << endl;
updateItem( i, ex.value );
}
catch( std::exception& ex )
{
unetwarn << myname << "(updateFromSM): " << ex.what() << endl;
if( i.undefined_value != not_specified_value )
updateItem( i, i.undefined_value );
}
}
}
// -----------------------------------------------------------------------------
void UNetSender::updateSensor( uniset::ObjectId id, long value )
{
if( !shm->isLocalwork() )
return;
auto it = items.find(id);
if( it != items.end() )
updateItem( it->second, value );
}
// -----------------------------------------------------------------------------
void UNetSender::updateItem( UItem& it, long value )
{
auto& pk = mypacks[it.pack_sendfactor];
auto& mypack(pk[it.pack_num]);
uniset::uniset_rwmutex_wrlock l(mypack.mut);
if( it.iotype == UniversalIO::DI || it.iotype == UniversalIO::DO )
mypack.msg.setDData(it.pack_ind, value);
else if( it.iotype == UniversalIO::AI || it.iotype == UniversalIO::AO )
mypack.msg.setAData(it.pack_ind, value);
}
// -----------------------------------------------------------------------------
void UNetSender::setCheckConnectionPause( int msec )
{
if( msec > 0 )
ptCheckConnection.setTiming(msec);
}
// -----------------------------------------------------------------------------
void UNetSender::send() noexcept
{
unetinfo << myname << "(send): dlist size = " << items.size() << endl;
ncycle = 0;
ptCheckConnection.reset();
while( activated )
{
if( !udp )
{
if( !ptCheckConnection.checkTime() )
{
msleep(sendpause);
continue;
}
unetinfo << myname << "(send): check connection event.." << endl;
if( !createConnection(false) )
{
ptCheckConnection.reset();
msleep(sendpause);
continue;
}
}
try
{
if( !shm->isLocalwork() )
updateFromSM();
for( auto&& it : mypacks )
{
if( it.first > 1 && (ncycle % it.first) != 0 )
continue;
if( !activated )
break;
auto& pk = it.second;
size_t size = pk.size();
for(size_t i = 0; i < size; ++i)
{
if( !activated )
break;
real_send(pk[i]);
if( packsendpause > 0 && size > 1 )
{
if( packsendpauseFactor <= 0 )
{
msleep(packsendpause);
}
else if( i > 0 && (i % packsendpauseFactor) == 0 )
{
msleep(packsendpause);
}
}
}
}
ncycle++;
}
catch( Poco::Net::NetException& e )
{
unetwarn << myname << "(send): " << e.displayText() << endl;
}
catch( uniset::Exception& ex)
{
unetwarn << myname << "(send): " << ex << std::endl;
}
catch( const std::exception& e )
{
unetwarn << myname << "(send): " << e.what() << std::endl;
}
catch(...)
{
unetwarn << myname << "(send): catch ..." << std::endl;
}
if( !activated )
break;
msleep(sendpause);
}
unetinfo << "************* execute FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
// #define UNETUDP_DISABLE_OPTIMIZATION_N1
void UNetSender::real_send( PackMessage& mypack ) noexcept
{
try
{
uniset::uniset_rwmutex_rlock l(mypack.mut);
=======
// -----------------------------------------------------------------------------
using namespace std;
using namespace uniset::extensions;
......@@ -597,298 +310,11 @@ namespace uniset
try
{
uniset::uniset_rwmutex_rlock l(mypack.mut);
>>>>>>> 2.9.1-alt1
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
mypack.msg.num = packetnum++;
#else
uint16_t crc = mypack.msg.getDataCRC();
<<<<<<< HEAD
if( crc != lastcrc )
{
mypack.msg.header.num = packetnum++;
lastcrc = crc;
}
#endif
// при переходе через ноль (когда счётчик перевалит через UniSetUDP::MaxPacketNum..
// делаем номер пакета "1"
if( packetnum == 0 )
packetnum = 1;
if( !udp || !udp->poll( UniSetTimer::millisecToPoco(writeTimeout), Poco::Net::Socket::SELECT_WRITE) )
return;
size_t ret = udp->sendTo(&mypack.msg, sizeof(mypack.msg), saddr);
if( ret < sizeof(mypack.msg) )
unetcrit << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << sizeof(mypack.msg) << endl;
}
catch( Poco::Net::NetException& ex )
{
unetcrit << myname << "(real_send): error: " << ex.displayText() << endl;
}
catch( std::exception& ex )
{
unetcrit << myname << "(real_send): error: " << ex.what() << endl;
}
}
// -----------------------------------------------------------------------------
void UNetSender::stop()
{
activated = false;
// s_thr->stop();
if( s_thr )
s_thr->join();
}
// -----------------------------------------------------------------------------
void UNetSender::start()
{
if( !activated )
{
activated = true;
s_thr->start();
}
}
// -----------------------------------------------------------------------------
void UNetSender::readConfiguration()
{
xmlNode* root = uniset_conf()->getXMLSensorsSection();
if(!root)
{
ostringstream err;
err << myname << "(readConfiguration): not found <sensors>";
throw SystemError(err.str());
}
UniXML::iterator it(root);
if( !it.goChildren() )
{
std::cerr << myname << "(readConfiguration): empty <sensors>?!!" << endl;
return;
}
for( ; it.getCurrent(); it.goNext() )
{
if( check_filter(it, s_field, s_fvalue) )
initItem(it);
}
}
// ------------------------------------------------------------------------------------------
bool UNetSender::readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec )
{
if( uniset::check_filter(it, s_field, s_fvalue) )
initItem(it);
return true;
}
// ------------------------------------------------------------------------------------------
bool UNetSender::initItem( UniXML::iterator& it )
{
string sname( it.getProp("name") );
string tid(it.getProp("id"));
ObjectId sid;
if( !tid.empty() )
{
sid = uniset::uni_atoi(tid);
if( sid <= 0 )
sid = DefaultObjectId;
}
else
sid = uniset_conf()->getSensorID(sname);
if( sid == DefaultObjectId )
{
unetcrit << myname << "(readItem): ID not found for "
<< sname << endl;
return false;
}
int priority = it.getPIntProp(prop_prefix + "_sendfactor", 0);
auto& pk = mypacks[priority];
UItem p;
p.iotype = uniset::getIOType(it.getProp("iotype"));
p.pack_sendfactor = priority;
long defval = it.getIntProp("default");
if( !it.getProp("undefined_value").empty() )
p.undefined_value = it.getIntProp("undefined_value");
if( p.iotype == UniversalIO::UnknownIOType )
{
unetcrit << myname << "(readItem): Unknown iotype for sid=" << sid << endl;
return false;
}
p.id = sid;
if( p.iotype == UniversalIO::DI || p.iotype == UniversalIO::DO )
{
size_t dnum = packs_dnum[priority];
if( pk.size() <= dnum )
pk.resize(dnum + 1);
auto& mypack = pk[dnum];
{
uniset_rwmutex_wrlock l(mypack.mut);
p.pack_ind = mypack.msg.addDData(sid, defval);
} // unlock mutex....
if( p.pack_ind >= maxDData )
{
dnum++;
if( dnum >= pk.size() )
pk.resize(dnum + 1);
auto& mypack2 = pk[dnum];
uniset_rwmutex_wrlock l2(mypack2.mut);
p.pack_ind = mypack2.msg.addDData(sid, defval);
mypack2.msg.header.nodeID = uniset_conf()->getLocalNode();
mypack2.msg.header.procID = shm->ID();
}
p.pack_num = dnum;
packs_dnum[priority] = dnum;
if ( p.pack_ind >= UniSetUDP::MaxDCount )
{
unetcrit << myname
<< "(readItem): OVERFLOW! MAX UDP DIGITAL DATA LIMIT! max="
<< UniSetUDP::MaxDCount << endl << flush;
std::terminate();
return false;
}
}
else if( p.iotype == UniversalIO::AI || p.iotype == UniversalIO::AO ) // -V560
{
size_t anum = packs_anum[priority];
if( pk.size() <= anum )
pk.resize(anum + 1);
auto& mypack = pk[anum];
{
uniset_rwmutex_wrlock l(mypack.mut);
p.pack_ind = mypack.msg.addAData(sid, defval);
}
if( p.pack_ind >= maxAData )
{
anum++;
if( anum >= pk.size() )
pk.resize(anum + 1);
auto& mypack2 = pk[anum];
uniset_rwmutex_wrlock l2(mypack2.mut);
p.pack_ind = mypack2.msg.addAData(sid, defval);
mypack2.msg.header.nodeID = uniset_conf()->getLocalNode();
mypack2.msg.header.procID = shm->ID();
}
p.pack_num = anum;
packs_anum[priority] = anum;
if ( p.pack_ind >= UniSetUDP::MaxACount )
{
unetcrit << myname
<< "(readItem): OVERFLOW! MAX UDP ANALOG DATA LIMIT! max="
<< UniSetUDP::MaxACount << endl << flush;
std::terminate();
return false;
}
}
unetinfo << myname << "(initItem): add " << p << endl;
auto i = items.find(p.id);
if( i != items.end() )
{
unetcrit << myname
<< "(readItem): Sensor (" << p.id << ")" << sname << " ALREADY ADDED!! ABORT!" << endl;
std::terminate();
return false;
}
items.emplace(p.id, std::move(p));
return true;
}
// ------------------------------------------------------------------------------------------
std::ostream& operator<<( std::ostream& os, UNetSender::UItem& p )
{
return os << " sid=" << p.id;
}
// -----------------------------------------------------------------------------
void UNetSender::initIterators()
{
for( auto&& it : items )
shm->initIterator(it.second.ioit);
}
// -----------------------------------------------------------------------------
void UNetSender::askSensors( UniversalIO::UIOCommand cmd )
{
for( const auto& it : items )
shm->askSensor(it.second.id, cmd);
}
// -----------------------------------------------------------------------------
size_t UNetSender::getDataPackCount() const
{
return mypacks.size();
}
// -----------------------------------------------------------------------------
const std::string UNetSender::getShortInfo() const
{
// warning: будет вызываться из другого потока
// (считаем что чтение безопасно)
ostringstream s;
s << setw(15) << std::right << getAddress() << ":" << std::left << setw(6) << getPort()
<< " lastpacknum=" << packetnum
<< " lastcrc=" << setw(6) << lastcrc
<< " items=" << items.size() << " maxAData=" << getADataSize() << " maxDData=" << getDDataSize()
<< " packsendpause[factor=" << packsendpauseFactor << "]=" << packsendpause
<< " sendpause=" << sendpause
<< endl
<< "\t packs([sendfactor]=num): "
<< endl;
for( auto i = mypacks.begin(); i != mypacks.end(); ++i )
{
s << " \t[" << i->first << "]=" << i->second.size() << endl;
size_t n = 0;
for( const auto& pack : i->second )
{
//uniset_rwmutex_rlock l(p->mut);
s << " \t\t[" << (n++) << "]=" << sizeof(pack.msg) << " bytes"
<< " ( numA=" << setw(5) << pack.msg.asize() << " numD=" << setw(5) << pack.msg.dsize() << ")"
<< endl;
}
}
return s.str();
}
// -----------------------------------------------------------------------------
=======
if( crc != lastcrc )
{
mypack.msg.num = packetnum++;
......@@ -1175,5 +601,4 @@ namespace uniset
return s.str();
}
// -----------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
} // end of namespace uniset
......@@ -33,154 +33,6 @@
// --------------------------------------------------------------------------
namespace uniset
{
<<<<<<< HEAD
// -----------------------------------------------------------------------------
/*
* Распределение датчиков по пакетам
* =========================================================================
* Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
* Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
* Внутри каждой группы пакеты набираются по мере "заполнения".
*
* Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
* Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
* то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
* в свою очередь остальные продолжают "добивать" предыдущий пакет.
* В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
* существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
*
* ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
* Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
На стороне UNetReceiver пакеты с повторными номерами (т.е. уже обработанные) - откидываются.
*
*
* Создание соединения
* ======================================
* Попытка создать соединение производиться сразу в конструкторе, если это не получается,
* то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
* и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
* (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
* ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
* Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
* тогда при создании объекта UNetSender, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
* \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
*/
class UNetSender
{
public:
UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi
, bool nocheckConnection = false
, const std::string& s_field = ""
, const std::string& s_fvalue = ""
, const std::string& prop_prefix = "unet"
, const std::string& prefix = "unet"
, size_t maxDCount = UniSetUDP::MaxDCount
, size_t maxACount = UniSetUDP::MaxACount );
virtual ~UNetSender();
typedef size_t sendfactor_t;
static const long not_specified_value = { std::numeric_limits<long>::max() };
struct UItem
{
UItem():
iotype(UniversalIO::UnknownIOType),
id(uniset::DefaultObjectId),
pack_num(0),
pack_ind(0),
pack_sendfactor(0) {}
UniversalIO::IOType iotype;
uniset::ObjectId id;
IOController::IOStateList::iterator ioit;
size_t pack_num;
size_t pack_ind;
sendfactor_t pack_sendfactor = { 0 };
long undefined_value = { not_specified_value };
friend std::ostream& operator<<( std::ostream& os, UItem& p );
};
typedef std::unordered_map<uniset::ObjectId, UItem> UItemMap;
size_t getDataPackCount() const;
void start();
void stop();
void send() noexcept;
struct PackMessage
{
PackMessage( uniset::UniSetUDP::UDPMessage&& m ) noexcept: msg(std::move(m)) {}
PackMessage( const uniset::UniSetUDP::UDPMessage& m ) = delete;
PackMessage() noexcept {}
uniset::UniSetUDP::UDPMessage msg;
uniset::uniset_rwmutex mut;
};
void real_send( PackMessage& mypack ) noexcept;
/*! (принудительно) обновить все данные (из SM) */
void updateFromSM();
/*! Обновить значение по ID датчика */
void updateSensor( uniset::ObjectId id, long value );
/*! Обновить значение по итератору */
void updateItem( UItem& it, long value );
inline void setSendPause( int msec )
{
sendpause = msec;
}
inline void setPackSendPause( int msec )
{
packsendpause = msec;
}
inline void setPackSendPauseFactor( int factor )
{
packsendpauseFactor = factor;
}
void setCheckConnectionPause( int msec );
/*! заказать датчики */
void askSensors( UniversalIO::UIOCommand cmd );
/*! инициализация итераторов */
void initIterators();
inline std::shared_ptr<DebugStream> getLog()
{
return unetlog;
}
virtual const std::string getShortInfo() const;
inline std::string getAddress() const
{
return addr;
}
inline int getPort() const
{
return port;
}
inline size_t getADataSize() const
{
return maxAData;
}
inline size_t getDDataSize() const
{
return maxDData;
}
=======
// -----------------------------------------------------------------------------
/*
* Распределение датчиков по пакетам
......@@ -327,7 +179,6 @@ namespace uniset
{
return maxDData;
}
>>>>>>> 2.9.1-alt1
protected:
......@@ -354,36 +205,6 @@ namespace uniset
std::string s_host = { "" };
Poco::Net::SocketAddress saddr;
<<<<<<< HEAD
std::string myname = { "" };
timeout_t sendpause = { 150 };
timeout_t packsendpause = { 5 };
int packsendpauseFactor = { 1 };
timeout_t writeTimeout = { 1000 }; // msec
std::atomic_bool activated = { false };
PassiveTimer ptCheckConnection;
typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
// mypacks заполняется в начале и дальше с ним происходит только чтение
// поэтому mutex-ом его не защищаем
Packs mypacks;
std::unordered_map<sendfactor_t, size_t> packs_anum;
std::unordered_map<sendfactor_t, size_t> packs_dnum;
UItemMap items;
size_t packetnum = { 1 }; /*!< номер очередного посылаемого пакета */
uint16_t lastcrc = { 0 };
size_t maxAData = { UniSetUDP::MaxACount };
size_t maxDData = { UniSetUDP::MaxDCount };
std::unique_ptr< ThreadCreator<UNetSender> > s_thr; // send thread
size_t ncycle = { 0 }; /*!< номер цикла посылки */
};
// --------------------------------------------------------------------------
=======
std::string myname = { "" };
timeout_t sendpause = { 150 };
timeout_t packsendpause = { 5 };
......@@ -413,7 +234,6 @@ namespace uniset
};
// --------------------------------------------------------------------------
>>>>>>> 2.9.1-alt1
} // end of namespace uniset
// -----------------------------------------------------------------------------
#endif // UNetSender_H_
......
if HAVE_TESTS
noinst_PROGRAMS = tests-with-sm urecv-perf-test
#noinst_PROGRAMS = urecv-perf-test
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc
tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
......
......@@ -56,31 +56,17 @@ void InitTest()
}
// -----------------------------------------------------------------------------
// pnum - минималный номер ожидаемого пакета ( 0 - любой пришедщий )
// ncycle - сколько пакетов разрешено "пропустить" прежде чем дождёмся нужного. (чтобы не ждать бесконечно)
// ncycle - сколько пакетов разрешено "пропустить" прежде чем дождёмся нужного.. (чтобы не ждать бесконечно)
static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 2000, int ncycle = 20 )
{
<<<<<<< HEAD
UniSetUDP::UDPMessage pack;
=======
UniSetUDP::UDPMessage pack;
UniSetUDP::UDPPacket buf;
>>>>>>> 2.9.1-alt1
while( ncycle > 0 )
{
if( !udp_r->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ) )
break;
<<<<<<< HEAD
size_t ret = udp_r->receiveBytes(&pack, sizeof(pack) );
if( ret <= 0 || pnum == 0 || ( pnum > 0 && pack.header.num >= pnum ) ) // -V560
break;
REQUIRE( pack.header.magic == UniSetUDP::UNETUDP_MAGICNUM );
ncycle--;
}
=======
size_t ret = udp_r->receiveBytes(&(buf.data), sizeof(buf.data) );
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, buf);
......@@ -90,7 +76,6 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20
REQUIRE( pack.magic == UniSetUDP::UNETUDP_MAGICNUM );
ncycle--;
}
>>>>>>> 2.9.1-alt1
// if( pnum > 0 && pack.num < pnum )
// return UniSetUDP::UDPMessage(); // empty message
......@@ -102,14 +87,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
{
CHECK( udp_s->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE) );
<<<<<<< HEAD
pack.header.nodeID = s_nodeID;
pack.header.procID = s_procID;
pack.header.num = s_numpack++;
size_t ret = udp_s->sendTo(&pack, sizeof(pack), s_addr);
REQUIRE( ret == sizeof(pack) );
=======
pack.nodeID = s_nodeID;
pack.procID = s_procID;
pack.num = s_numpack++;
......@@ -118,41 +95,10 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
pack.transport_msg(s_buf);
size_t ret = udp_s->sendTo(&s_buf.data, s_buf.len, s_addr);
REQUIRE( ret == s_buf.len );
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: repack", "[unetudp][repack]")
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
{
<<<<<<< HEAD
UniSetUDP::UDPMessage pack;
pack.header.nodeID = 100;
pack.header.procID = 100;
pack.header.num = 1;
pack.addDData(1, 1);
pack.addDData(2, 0);
pack.addAData(3, 30);
pack.addAData(4, 40);
REQUIRE(pack.header.magic == UniSetUDP::UNETUDP_MAGICNUM);
UniSetUDP::UDPMessage pack2(pack);
pack2.ntoh();
REQUIRE(pack2.header.nodeID == 100);
REQUIRE(pack2.header.procID == 100);
REQUIRE(pack2.header.num == 1);
REQUIRE(pack2.header.magic == UniSetUDP::UNETUDP_MAGICNUM);
REQUIRE(pack2.dID(0) == 1);
REQUIRE(pack2.dValue(0) == true);
REQUIRE(pack2.dID(1) == 2);
REQUIRE(pack2.dValue(1) == false);
REQUIRE(pack2.dID(1) == 2);
REQUIRE(pack2.a_dat[0].id == 3);
REQUIRE(pack2.a_dat[0].val == 30);
REQUIRE(pack2.a_dat[1].id == 4);
REQUIRE(pack2.a_dat[1].val == 40);
=======
UNetReceiver::PacketQueue q;
UniSetUDP::UDPMessage m1;
......@@ -187,37 +133,10 @@ TEST_CASE("[UNetUDP]: repack", "[unetudp][repack]")
t = q.top();
REQUIRE( t.num == 100 );
q.pop();
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
{
<<<<<<< HEAD
SECTION("UDPMessage::isFull()")
{
UniSetUDP::UDPMessage u;
for( unsigned int i = 0; i < UniSetUDP::MaxACount - 1; i++ )
u.addAData( i, i );
REQUIRE( u.asize() == (UniSetUDP::MaxACount - 1) );
CHECK_FALSE( u.isAFull() );
u.addAData( 1, 1 );
CHECK( u.isAFull() );
for( unsigned int i = 0; i < UniSetUDP::MaxDCount - 1; i++ )
u.addDData( i, true );
REQUIRE( u.dsize() == (UniSetUDP::MaxDCount - 1) );
CHECK_FALSE( u.isDFull() );
u.addDData( 1, true );
CHECK( u.isDFull() );
CHECK( u.isFull() );
}
=======
SECTION("UDPMessage::isFull()")
{
UniSetUDP::UDPMessage u;
......@@ -277,7 +196,6 @@ TEST_CASE("[UNetUDP]: sizeOf", "[unetudp][sizeof]")
UniSetUDP::UDPPacket p;
size_t len = m.transport_msg(p);
REQUIRE( len == m.sizeOf() );
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
#if 0
......@@ -297,75 +215,6 @@ TEST_CASE("[UNetUDP]: respond sensor", "[unetudp]")
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check sender", "[unetudp][sender]")
{
<<<<<<< HEAD
InitTest();
SECTION("Test: read default pack..")
{
UniSetUDP::UDPMessage p = receive();
REQUIRE( p.header.num != 0 );
REQUIRE( p.asize() == 4 );
REQUIRE( p.dsize() == 2 );
for( size_t i = 0; i < p.asize(); i++ )
{
REQUIRE( p.a_dat[i].val == i + 1 );
}
REQUIRE( p.dValue(0) == true );
REQUIRE( p.dValue(1) == false );
// т.к. данные в SM не менялись, то должен придти пакет с тем же номером что и был.
UniSetUDP::UDPMessage p2 = receive();
REQUIRE( p2.header.num == p.header.num );
}
SECTION("Test: change AI data..")
{
UniSetUDP::UDPMessage pack0 = receive();
ui->setValue(2, 100);
REQUIRE( ui->getValue(2) == 100 );
msleep(120);
UniSetUDP::UDPMessage pack = receive( pack0.header.num + 1 );
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 100 );
ui->setValue(2, 250);
REQUIRE( ui->getValue(2) == 250 );
msleep(120);
UniSetUDP::UDPMessage pack2 = receive( pack.header.num + 1 );
REQUIRE( pack2.header.num != 0 );
REQUIRE( pack2.header.num > pack.header.num );
REQUIRE( pack2.asize() == 4 );
REQUIRE( pack2.dsize() == 2 );
REQUIRE( pack2.a_dat[0].val == 250 );
}
SECTION("Test: change DI data..")
{
UniSetUDP::UDPMessage pack0 = receive();
ui->setValue(6, 1);
REQUIRE( ui->getValue(6) == 1 );
msleep(120);
UniSetUDP::UDPMessage pack = receive( pack0.header.num + 1 );
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.dValue(0) == true );
ui->setValue(6, 0);
REQUIRE( ui->getValue(6) == 0 );
msleep(120);
UniSetUDP::UDPMessage pack2 = receive( pack.header.num + 1 );
REQUIRE( pack2.header.num != 0 );
REQUIRE( pack2.header.num > pack.header.num );
REQUIRE( pack2.asize() == 4 );
REQUIRE( pack2.dsize() == 2 );
REQUIRE( pack2.dValue(0) == false );
}
=======
InitTest();
SECTION("Test: read default pack...")
......@@ -433,59 +282,10 @@ TEST_CASE("[UNetUDP]: check sender", "[unetudp][sender]")
REQUIRE( pack2.dsize() == 2 );
REQUIRE( pack2.dValue(0) == 0 );
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check receiver", "[unetudp][receiver]")
{
<<<<<<< HEAD
InitTest();
SECTION("Test: send data pack..")
{
REQUIRE( ui->getValue(node2_respond_s) == 0 );
UniSetUDP::UDPMessage pack;
pack.addAData(8, 100);
pack.addAData(9, -100);
pack.addDData(10, true);
pack.addDData(11, false);
REQUIRE( ui->getValue(8) == 0 );
REQUIRE( ui->getValue(9) == 0 );
REQUIRE( ui->getValue(10) == 0 );
REQUIRE( ui->getValue(11) == 0 );
send(pack);
msleep(600);
REQUIRE( ui->getValue(8) == 100 );
REQUIRE( ui->getValue(9) == -100 );
REQUIRE( ui->getValue(10) == 1 );
REQUIRE( ui->getValue(11) == 0 );
WARN("check respond sensor DISABLED!");
// msleep(1500);
// REQUIRE( ui->getValue(node2_respond_s) == 1 );
}
SECTION("Test: send data pack2.")
{
UniSetUDP::UDPMessage pack;
pack.addAData(8, 10);
pack.addAData(9, -10);
pack.addDData(10, false);
pack.addDData(11, true);
send(pack);
msleep(600);
REQUIRE( ui->getValue(8) == 10 );
REQUIRE( ui->getValue(9) == -10 );
REQUIRE( ui->getValue(10) == 0 );
REQUIRE( ui->getValue(11) == 1 );
WARN("check respond sensor DISABLED!");
//REQUIRE( ui->getValue(node2_respond_s) == 1 );
//msleep(2000); // в запускающем файле стоит --unet-recv-timeout 2000
//REQUIRE( ui->getValue(node2_respond_s) == 0 );
}
=======
InitTest();
SECTION("Test: send data pack...")
......@@ -532,60 +332,10 @@ TEST_CASE("[UNetUDP]: check receiver", "[unetudp][receiver]")
//msleep(2000); // в запускающем файле стоит --unet-recv-timeout 2000
//REQUIRE( ui->getValue(node2_respond_s) == 0 );
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check packets 'hole'", "[unetudp][udphole]")
{
<<<<<<< HEAD
InitTest();
// проверяем обработку "дырок" в пакетах.
UniSetUDP::UDPMessage pack;
pack.addAData(8, 15);
send(pack);
msleep(120);
REQUIRE( ui->getValue(8) == 15 );
unsigned long nlost = ui->getValue(node2_lostpackets_as);
int lastnum = s_numpack - 1;
// искусственно делаем дырку в два пакета
s_numpack = lastnum + 3;
UniSetUDP::UDPMessage pack_hole;
pack_hole.addAData(8, 30);
send(pack_hole); // пакет с дыркой
msleep(80);
REQUIRE( ui->getValue(8) == 15 );
REQUIRE( ui->getValue(node2_lostpackets_as) == nlost );
s_numpack = lastnum + 1;
UniSetUDP::UDPMessage pack1;
pack1.addAData(8, 21);
send(pack1); // заполняем первую дырку.// дырка закроется. пакет тут же обработается
msleep(100);
REQUIRE( ui->getValue(8) == 21 );
REQUIRE( ui->getValue(node2_lostpackets_as) == nlost );
s_numpack = lastnum + 2;
UniSetUDP::UDPMessage pack2;
pack2.addAData(8, 25);
send(pack2); // заполняем следующую дырку
msleep(120);
// тут обработка дойдёт уже до "первого" пакета.
REQUIRE( ui->getValue(8) == 30 );
REQUIRE( ui->getValue(node2_lostpackets_as) == nlost );
// возвращаем к нормальному.чтобы следующие тесты не поломались.
for( int i = 0; i < 10; i++ )
{
send(pack2);
msleep(100);
}
=======
InitTest();
// проверяем обработку "дырок" в пакетах..
......@@ -633,24 +383,16 @@ TEST_CASE("[UNetUDP]: check packets 'hole'", "[unetudp][udphole]")
send(pack2);
msleep(100);
}
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: check packets 'MaxDifferens'", "[unetudp][maxdifferens]")
{
InitTest();
<<<<<<< HEAD
// проверяем обработку "дырок" в пакетах.
UniSetUDP::UDPMessage pack;
pack.addAData(8, 50);
send(pack);
=======
// проверяем обработку "дырок" в пакетах..
UniSetUDP::UDPMessage pack;
pack.addAData(8, 50);
send(pack);
>>>>>>> 2.9.1-alt1
msleep(1000);
REQUIRE( ui->getValue(8) == 50 );
......@@ -746,35 +488,6 @@ TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][sender]")
REQUIRE( ui->getValue(2) == 110 );
msleep(600);
<<<<<<< HEAD
UniSetUDP::UDPMessage pack = receive( pack0.header.num + 1, 2000, 40 );
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 110 );
IOController_i::SensorInfo si;
si.id = 2;
si.node = uniset_conf()->getLocalNode();
ui->setUndefinedState(si, true, 6000 /* TestProc */ );
msleep(600);
pack = receive(pack.header.num + 1);
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 65635 );
ui->setUndefinedState(si, false, 6000 /* TestProc */ );
msleep(600);
pack = receive(pack.header.num + 1);
REQUIRE( pack.header.num != 0 );
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 110 );
=======
UniSetUDP::UDPMessage pack = receive( pack0.num + 1, 2000, 40 );
REQUIRE( pack.num != 0 );
......@@ -802,6 +515,5 @@ TEST_CASE("[UNetUDP]: check undefined value", "[unetudp][sender]")
REQUIRE( pack.asize() == 4 );
REQUIRE( pack.dsize() == 2 );
REQUIRE( pack.a_dat[0].val == 110 );
>>>>>>> 2.9.1-alt1
}
// -----------------------------------------------------------------------------
......@@ -32,104 +32,6 @@ shared_ptr<SMInterface> smiInstance()
// --------------------------------------------------------------------------
static void run_senders( size_t max, const std::string& s_host, size_t count = 50, timeout_t usecpause = 50 )
{
<<<<<<< HEAD
std::vector< std::shared_ptr<UDPSocketU> > vsend;
vsend.reserve(max);
cout << "Run " << max << " senders (" << s_host << ")" << endl;
// make sendesrs..
for( size_t i = 0; i < max; i++ )
{
try
{
cout << "create sender: " << s_host << ":" << begPort + i << endl;
auto s = make_shared<UDPSocketU>(s_host, begPort + i);
s->setBroadcast(true);
vsend.emplace_back(s);
}
catch( Poco::Net::NetException& e )
{
cerr << "(run_senders): " << e.displayText() << " (" << s_host << ")" << endl;
throw;
}
catch( std::exception& ex)
{
cerr << "(run_senders): " << ex.what() << endl;
throw;
}
}
UniSetUDP::UDPMessage mypack;
mypack.header.nodeID = 100;
mypack.header.procID = 100;
for( size_t i = 0; i < count; i++ )
{
UniSetUDP::UDPAData d(i, i);
mypack.addAData(d);
}
for( size_t i = 0; i < count; i++ )
mypack.addDData(i, i);
for( size_t i = 0; i < max; i++ )
{
try
{
if( vsend[i] )
vsend[i]->connect( Poco::Net::SocketAddress(s_host, begPort + i) );
}
catch( Poco::Net::NetException& e )
{
cerr << "(run_senders): " << e.message() << " (" << s_host << ")" << endl;
throw;
}
catch( std::exception& ex)
{
cerr << "(run_senders): " << ex.what() << endl;
throw;
}
}
size_t packetnum = 0;
size_t nc = 1;
while( nc ) // -V654
{
mypack.header.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
if( packetnum == 0 )
packetnum = 1;
for( auto&& udp : vsend )
{
try
{
if( udp->poll(100000, Poco::Net::Socket::SELECT_WRITE) )
{
size_t ret = udp->sendBytes(&mypack, sizeof(mypack));
if( ret < sizeof(mypack) )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << sizeof(mypack) << endl;
}
}
catch( Poco::Net::NetException& e )
{
cerr << "(send): " << e.message() << " (" << s_host << ")" << endl;
}
catch( ... )
{
cerr << "(send): catch ..." << endl;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
}
=======
std::vector< std::shared_ptr<UDPSocketU> > vsend;
vsend.reserve(max);
......@@ -227,47 +129,10 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
}
>>>>>>> 2.9.1-alt1
}
// --------------------------------------------------------------------------
static void run_test( size_t max, const std::string& host )
{
<<<<<<< HEAD
std::vector< std::shared_ptr<UNetReceiver> > vrecv;
vrecv.reserve(max);
// make receivers..
for( size_t i = 0; i < max; i++ )
{
cout << "create receiver: " << host << ":" << begPort + i << endl;
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
r->setLockUpdate(true);
vrecv.emplace_back(r);
}
size_t count = 0;
// Run receivers..
for( auto&& r : vrecv )
{
if( r )
{
count++;
r->start();
}
}
cerr << "RUN " << count << " receivers..." << endl;
// wait..
pause();
for( auto&& r : vrecv )
{
if(r)
r->stop();
}
=======
std::vector< std::shared_ptr<UNetReceiver> > vrecv;
vrecv.reserve(max);
......@@ -301,44 +166,10 @@ static void run_test( size_t max, const std::string& host )
if(r)
r->stop();
}
>>>>>>> 2.9.1-alt1
}
// --------------------------------------------------------------------------
int main(int argc, char* argv[] )
{
<<<<<<< HEAD
std::string host = "127.255.255.255";
try
{
auto conf = uniset_init(argc, argv);
if( argc > 1 && !strcmp(argv[1], "s") )
run_senders(1, host);
else
run_test(1, host);
return 0;
}
catch( const SystemError& err )
{
cerr << "(urecv-perf-test): " << err << endl;
}
catch( const uniset::Exception& ex )
{
cerr << "(urecv-perf-test): " << ex << endl;
}
catch( const std::exception& e )
{
cerr << "(tests_with_sm): " << e.what() << endl;
}
catch(...)
{
cerr << "(urecv-perf-test): catch(...)" << endl;
}
return 1;
=======
std::string host = "127.255.255.255";
try
......@@ -370,5 +201,4 @@ int main(int argc, char* argv[] )
}
return 1;
>>>>>>> 2.9.1-alt1
}
......@@ -13,26 +13,6 @@
// --------------------------------------------------------------------------
static struct option longopts[] =
{
<<<<<<< HEAD
{ "help", no_argument, 0, 'h' },
{ "send", required_argument, 0, 's' },
{ "receive", required_argument, 0, 'r' },
{ "proc-id", required_argument, 0, 'p' },
{ "node-id", required_argument, 0, 'n' },
{ "send-pause", required_argument, 0, 'x' },
{ "timeout", required_argument, 0, 't' },
{ "data-count", required_argument, 0, 'c' },
{ "disable-broadcast", no_argument, 0, 'b' },
{ "show-data", no_argument, 0, 'd' },
{ "check-lost", no_argument, 0, 'l' },
{ "verbode", required_argument, 0, 'v' },
{ "num-cycles", required_argument, 0, 'z' },
{ "prof", required_argument, 0, 'y' },
{ "a-data", required_argument, 0, 'a' },
{ "d-data", required_argument, 0, 'i' },
{ "pack-num", required_argument, 0, 'u' },
{ NULL, 0, 0, 0 }
=======
{ "help", no_argument, 0, 'h' },
{ "send", required_argument, 0, 's' },
{ "receive", required_argument, 0, 'r' },
......@@ -50,7 +30,6 @@ static struct option longopts[] =
{ "a-data", required_argument, 0, 'a' },
{ "d-data", required_argument, 0, 'i' },
{ NULL, 0, 0, 0 }
>>>>>>> 2.9.1-alt1
};
// --------------------------------------------------------------------------
using namespace std;
......@@ -82,372 +61,6 @@ static bool split_addr( const string& addr, string& host, int& port )
// --------------------------------------------------------------------------
int main(int argc, char* argv[])
{
<<<<<<< HEAD
int optindex = 0;
int opt = 0;
Command cmd = cmdNOP;
int verb = 0;
std::string addr = "";
int port = 0;
int usecpause = 2000000;
timeout_t tout = UniSetTimer::WaitUpTime;
bool broadcast = true;
int procID = 1;
int nodeID = 1;
size_t count = 50;
bool lost = false;
bool show = false;
size_t ncycles = 0;
unsigned int nprof = 0;
std::string d_data = "";
std::string a_data = "";
size_t packetnum = 1;
while(1)
{
opt = getopt_long(argc, argv, "hs:c:r:p:n:t:x:blvdz:y:a:i:u:", longopts, &optindex);
if( opt == -1 )
break;
switch (opt)
{
case 'h':
cout << "-h|--help - this message" << endl;
cout << "[-s|--send] host:port - Send message." << endl;
cout << "[-c|--data-count] num - Send num count of value. Default: 50." << endl;
cout << "[-r|--receive] host:port - Receive message." << endl;
cout << "[-p|--proc-id] id - Set packet header. From 'procID'. Default: 1" << endl;
cout << "[-n|--node-id] id - Set packet header. From 'nodeID'. Default: 1" << endl;
cout << "[-t|--timeout] msec - timeout for receive. Default: 0 msec (waitup)." << endl;
cout << "[-x|--send-pause] msec - pause for send packets. Default: 200 msec." << endl;
cout << "[-b|--disable-broadcast] - Disable broadcast mode." << endl;
cout << "[-l|--check-lost] - Check the lost packets." << endl;
cout << "[-v|--verbose] - verbose mode." << endl;
cout << "[-d|--show-data] - show receive data." << endl;
cout << "[-z|--num-cycles] num - Number of cycles of exchange. Default: -1 - infinitely." << endl;
cout << "[-y|--prof] num - Print receive statistics every NUM packets (for -r only)" << endl;
cout << "[-a|--a-data] id1=val1,id2=val2,... - Analog data. Send: id1=id1,id2=id2,.. for analog sensors" << endl;
cout << "[-i|--d-data] id1=val1,id2=val2,... - Digital data. Send: id1=id1,id2=id2,.. for digital sensors" << endl;
cout << "[-u|--pack-num] num - first packet numbrt (default: 1)" << endl;
cout << endl;
return 0;
case 'r':
cmd = cmdReceive;
addr = string(optarg);
break;
case 's':
addr = string(optarg);
cmd = cmdSend;
break;
case 'a':
a_data = string(optarg);
break;
case 'i':
d_data = string(optarg);
break;
case 't':
tout = atoi(optarg);
break;
case 'x':
usecpause = atoi(optarg) * 1000;
break;
case 'y':
nprof = atoi(optarg);
break;
case 'c':
count = atoi(optarg);
break;
case 'p':
procID = atoi(optarg);
break;
case 'n':
nodeID = atoi(optarg);
break;
case 'b':
broadcast = false;
break;
case 'd':
show = true;
break;
case 'l':
lost = true;
break;
case 'v':
verb = 1;
break;
case 'z':
ncycles = atoi(optarg);
break;
case 'u':
packetnum = atoi(optarg);
break;
case '?':
default:
cerr << "? argumnet" << endl;
return 0;
}
}
if( cmd == cmdNOP )
{
cerr << "No command... Use -h for help" << endl;
return -1;
}
try
{
string s_host;
if( !split_addr(addr, s_host, port) )
{
cerr << "(main): Unknown 'host:port' for '" << addr << "'" << endl;
return 1;
}
if( verb )
{
cout << " host=" << s_host
<< " port=" << port
<< " timeout=";
if( tout == UniSetTimer::WaitUpTime )
cout << "Waitup";
else
cout << tout;
cout << " msecpause=" << usecpause / 1000
<< endl;
}
switch( cmd )
{
case cmdReceive:
{
UDPReceiveU udp(s_host, port);
UniSetUDP::UDPMessage pack;
unsigned long prev_num = 1;
int nc = 1;
if( ncycles > 0 )
nc = ncycles;
auto t_start = high_resolution_clock::now();
unsigned int npack = 0;
while( nc )
{
try
{
if( nprof > 0 && npack >= nprof )
{
auto t_end = high_resolution_clock::now();
float sec = duration_cast<duration<float>>(t_end - t_start).count();
cout << "Receive " << setw(5) << npack << " packets for " << setw(8) << sec << " sec "
<< " [ 1 packet per " << setw(10) << ( sec / (float)npack ) << " sec ]" << endl;
t_start = t_end;
npack = 0;
}
if( !udp.poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_READ) )
{
cout << "(recv): Timeout.." << endl;
continue;
}
size_t ret = udp.receiveBytes(&pack, sizeof(pack) );
if( ret < 0 )
{
cerr << "(recv): no data?!" << endl;
continue;
}
if( ret == 0 )
{
cerr << "(recv): connection closed?!" << endl;
continue;
}
pack.ntoh();
if( pack.header.magic != UniSetUDP::UNETUDP_MAGICNUM )
{
cerr << "(recv): BAD PROTOCOL VERSION! [ need version '" << UniSetUDP::UNETUDP_MAGICNUM << "']" << endl;
continue;
}
if( lost )
{
if( prev_num != (pack.header.num - 1) )
cerr << "WARNING! Incorrect sequence of packets! current=" << pack.header.num
<< " prev=" << prev_num << endl;
prev_num = pack.header.num;
}
npack++;
if( verb )
cout << "receive OK: "
<< " bytes: " << ret << endl;
if( show )
cout << "receive data: " << pack << endl;
}
catch( Poco::Net::NetException& e )
{
cerr << "(recv): " << e.displayText() << " (" << addr << ")" << endl;
}
catch( ... )
{
cerr << "(recv): catch ..." << endl;
}
if( ncycles > 0 )
{
nc--;
if( nc <= 0 )
break;
}
}
}
break;
case cmdSend:
{
std::shared_ptr<UDPSocketU> udp = make_shared<UDPSocketU>(s_host, port);
udp->setBroadcast(broadcast);
UniSetUDP::UDPMessage mypack;
mypack.header.nodeID = nodeID;
mypack.header.procID = procID;
if( !a_data.empty() )
{
auto vlist = uniset::getSInfoList(a_data, nullptr);
for( const auto& v : vlist )
{
UDPAData d(v.si.id, v.val);
mypack.addAData(d);
}
}
else
{
for( size_t i = 0; i < count; i++ )
{
UDPAData d(i, i);
mypack.addAData(d);
}
}
if( !d_data.empty() )
{
auto vlist = uniset::getSInfoList(d_data, nullptr);
for( const auto& v : vlist )
mypack.addDData(v.si.id, v.val);
}
else
{
for( size_t i = 0; i < count; i++ )
mypack.addDData(i, i);
}
Poco::Net::SocketAddress sa(s_host, port);
udp->connect(sa);
size_t nc = 1;
if( ncycles > 0 )
nc = ncycles;
while( nc )
{
mypack.header.num = packetnum++;
// при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
if( packetnum == 0 )
packetnum = 1;
try
{
if( udp->poll(UniSetTimer::millisecToPoco(tout), Poco::Net::Socket::SELECT_WRITE) )
{
if( verb )
cout << "(send): to addr=" << addr << " d_count=" << mypack.header.dcount
<< " a_count=" << mypack.header.acount << endl;
size_t ret = udp->sendBytes(&mypack, sizeof(mypack) );
if( ret < sizeof(mypack) )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << sizeof(mypack) << endl;
}
}
catch( Poco::Net::NetException& e )
{
cerr << "(send): " << e.message() << " (" << addr << ")" << endl;
}
catch( ... )
{
cerr << "(send): catch ..." << endl;
}
if( ncycles > 0 )
{
nc--;
if( nc <= 0 )
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
}
}
break;
default:
cerr << endl << "Unknown command: '" << cmd << "'. Use -h for help" << endl;
return -1;
break;
}
}
catch( const std::exception& e )
{
cerr << "(main): " << e.what() << endl;
}
catch( ... )
{
cerr << "(main): catch ..." << endl;
return 1;
}
return 0;
=======
int optindex = 0;
int opt = 0;
Command cmd = cmdNOP;
......@@ -805,6 +418,5 @@ int main(int argc, char* argv[])
}
return 0;
>>>>>>> 2.9.1-alt1
}
// --------------------------------------------------------------------------
<<<<<<< HEAD
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++cc-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/gcc-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++cc-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/gcc-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++cc-config-1/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++cc-config-1/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++-config-1/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/130ac2dc34760443befdd3f2a25f8d40/g++-config-1/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/30c6834cc1e9802742e33de4efa3917c/g++cc-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/30c6834cc1e9802742e33de4efa3917c/g++cc-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/30c6834cc1e9802742e33de4efa3917c/g++-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/30c6834cc1e9802742e33de4efa3917c/g++-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/4d02eb73c555e8f07e4124fe33c80de2/g++cc-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/4d02eb73c555e8f07e4124fe33c80de2/g++cc-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/4d02eb73c555e8f07e4124fe33c80de2/g++-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/4d02eb73c555e8f07e4124fe33c80de2/g++-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/4db70a7a9c958929fa315f4c9243d800/g++cc-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/4db70a7a9c958929fa315f4c9243d800/g++cc-config-0/coverity-macro-compat.h
./cov-int/emit/pvbook.localdomain/config/4db70a7a9c958929fa315f4c9243d800/g++-config-0/coverity-compiler-compat.h
./cov-int/emit/pvbook.localdomain/config/4db70a7a9c958929fa315f4c9243d800/g++-config-0/coverity-macro-compat.h
=======
>>>>>>> 2.9.1-alt1
./docs/Makefile.am
./extensions/Backend-OpenTSDB/BackendOpenTSDB.cc
./extensions/Backend-OpenTSDB/BackendOpenTSDB.h
......@@ -49,8 +24,6 @@
./extensions/DBServer-SQLite/SQLiteInterface.cc
./extensions/DBServer-SQLite/SQLiteInterface.h
./extensions/DBServer-SQLite/test.cc
<<<<<<< HEAD
=======
./extensions/HttpResolver/HttpResolver.cc
./extensions/HttpResolver/HttpResolver.h
./extensions/HttpResolver/HttpResolverSugar.h
......@@ -61,7 +34,6 @@
./extensions/HttpResolver/tests/TestObject.cc
./extensions/HttpResolver/tests/TestObject.h
./extensions/HttpResolver/tests/test_uresolver.cc
>>>>>>> 2.9.1-alt1
./extensions/include/Calibration.h
./extensions/include/ComediInterface.h
./extensions/include/DigitalFilter.h
......@@ -148,11 +120,8 @@
./extensions/Makefile.am
./extensions/ModbusMaster/main.cc
./extensions/ModbusMaster/Makefile.am
<<<<<<< HEAD
=======
./extensions/ModbusMaster/MBConfig.cc
./extensions/ModbusMaster/MBConfig.h
>>>>>>> 2.9.1-alt1
./extensions/ModbusMaster/MBExchange.cc
./extensions/ModbusMaster/MBExchange.h
./extensions/ModbusMaster/mb-perf-test.cc
......@@ -226,8 +195,6 @@
./extensions/SMViewer/Makefile.am
./extensions/SMViewer/SMViewer.cc
./extensions/SMViewer/SMViewer.h
<<<<<<< HEAD
=======
./extensions/tests1/Makefile.am
./extensions/tests1/MBSlaveTest/Makefile.am
./extensions/tests1/MBSlaveTest/mbslave-test.cc
......@@ -265,7 +232,6 @@
./extensions/tests1/tests_with_sm.h
./extensions/tests1/test_ui.cc
./extensions/tests1/test_vtypes.cc
>>>>>>> 2.9.1-alt1
./extensions/tests/Makefile.am
./extensions/tests/MBSlaveTest/Makefile.am
./extensions/tests/MBSlaveTest/mbslave-test.cc
......@@ -313,10 +279,6 @@
./extensions/UNetUDP/tests/u.cc
./extensions/UNetUDP/tests/urecv_perf_test.cc
./extensions/UNetUDP/UDPPacket.cc
<<<<<<< HEAD
./extensions/UNetUDP/UDPPacketE.h
=======
>>>>>>> 2.9.1-alt1
./extensions/UNetUDP/UDPPacket.h
./extensions/UNetUDP/unetexchange.cc
./extensions/UNetUDP/UNetExchange.cc
......@@ -406,10 +368,7 @@
./include/UA.h
./include/UDPCore.h
./include/UHelpers.h
<<<<<<< HEAD
=======
./include/UHttpClient.h
>>>>>>> 2.9.1-alt1
./include/UHttpRequestHandler.h
./include/UHttpServer.h
./include/UInterface.h
......@@ -432,10 +391,7 @@
./src/Communications/ComPort485F.cc
./src/Communications/ComPort.cc
./src/Communications/Http/Makefile.am
<<<<<<< HEAD
=======
./src/Communications/Http/UHttpClient.cc
>>>>>>> 2.9.1-alt1
./src/Communications/Http/UHttpRequestHandler.cc
./src/Communications/Http/UHttpServer.cc
./src/Communications/Makefile.am
......@@ -594,10 +550,6 @@
./tests/umutex.cc
./tests/UniXmlTest/Makefile.am
./tests/UniXmlTest/XmlTest.cc
<<<<<<< HEAD
./tt.cc
=======
>>>>>>> 2.9.1-alt1
./uniset-config.h
./Utilities/Admin/admin.cc
./Utilities/Admin/c.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment