Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
U
uniset2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UniSet project repositories
uniset2
Commits
b4672e62
Commit
b4672e62
authored
Dec 13, 2020
by
Pavel Vainerman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[unet]: zero copy optimization
parent
bff519c6
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
747 additions
and
884 deletions
+747
-884
UDPPacket.cc
extensions/UNetUDP/UDPPacket.cc
+51
-126
UDPPacket.h
extensions/UNetUDP/UDPPacket.h
+80
-93
UNetReceiver.cc
extensions/UNetUDP/UNetReceiver.cc
+52
-61
UNetReceiver.h
extensions/UNetUDP/UNetReceiver.h
+3
-5
UNetSender.cc
extensions/UNetUDP/UNetSender.cc
+24
-26
UNetSender.h
extensions/UNetUDP/UNetSender.h
+168
-169
test_unetudp.cc
extensions/UNetUDP/tests/test_unetudp.cc
+323
-321
urecv_perf_test.cc
extensions/UNetUDP/tests/urecv_perf_test.cc
+11
-14
unet-udp-tester.cc
extensions/UNetUDP/unet-udp-tester.cc
+35
-32
uniset2.files
uniset2.files
+0
-37
No files found.
extensions/UNetUDP/UDPPacket.cc
View file @
b4672e62
...
...
@@ -35,7 +35,7 @@ static bool HostIsBigEndian = false;
#if __BYTE_ORDER == __BIG_ENDIAN
static
bool
HostIsBigEndian
=
true
;
#define BE_TO_H(x) {}
header
.
#
define
BE_TO_H
(
x
)
{}
#elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x)
#elif INTPTR_MAX == INT32_MAX
...
...
@@ -141,7 +141,6 @@ namespace uniset
return
os
<<
(
*
p
);
}
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUDP
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUDP
::
UDPAData
&
p
)
{
return
os
<<
"id="
<<
p
.
id
<<
" val="
<<
p
.
val
;
...
...
@@ -153,29 +152,25 @@ namespace uniset
os
<<
"DIGITAL:"
<<
endl
;
for
(
size_t
i
=
0
;
i
<
p
.
dcount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
.
header
.
dcount
;
i
++
)
os
<<
"["
<<
i
<<
"]={"
<<
p
.
dID
(
i
)
<<
","
<<
p
.
dValue
(
i
)
<<
"}"
<<
endl
;
os
<<
"ANALOG:"
<<
endl
;
for
(
size_t
i
=
0
;
i
<
p
.
acount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
.
header
.
acount
;
i
++
)
os
<<
"["
<<
i
<<
"]={"
<<
p
.
a_dat
[
i
].
id
<<
","
<<
p
.
a_dat
[
i
].
val
<<
"}"
<<
endl
;
return
os
;
}
// -----------------------------------------------------------------------------
UDPMessage
::
UDPMessage
()
noexcept
{
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
addAData
(
const
UniSetUDP
::
UDPAData
&
dat
)
noexcept
{
if
(
acount
>=
MaxACount
)
if
(
header
.
acount
>=
MaxACount
)
return
MaxACount
;
a_dat
[
acount
]
=
dat
;
acount
++
;
return
acount
-
1
;
a_dat
[
header
.
acount
]
=
dat
;
header
.
acount
++
;
return
header
.
acount
-
1
;
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
addAData
(
long
id
,
long
val
)
noexcept
...
...
@@ -197,18 +192,18 @@ namespace uniset
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
addDData
(
long
id
,
bool
val
)
noexcept
{
if
(
dcount
>=
MaxDCount
)
if
(
header
.
dcount
>=
MaxDCount
)
return
MaxDCount
;
// сохраняем ID
d_id
[
dcount
]
=
id
;
d_id
[
header
.
dcount
]
=
id
;
bool
res
=
setDData
(
dcount
,
val
);
bool
res
=
setDData
(
header
.
dcount
,
val
);
if
(
res
)
{
dcount
++
;
return
dcount
-
1
;
header
.
dcount
++
;
return
header
.
dcount
-
1
;
}
return
MaxDCount
;
...
...
@@ -219,8 +214,8 @@ namespace uniset
if
(
index
>=
MaxDCount
)
return
false
;
size_t
nbyte
=
index
/
8
*
sizeof
(
u
nsigned
char
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
nsigned
char
);
size_t
nbyte
=
index
/
8
*
sizeof
(
u
int8_t
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
int8_t
);
// выставляем бит
unsigned
char
d
=
d_dat
[
nbyte
];
...
...
@@ -247,176 +242,106 @@ namespace uniset
if
(
index
>=
MaxDCount
)
return
uniset
::
DefaultObjectId
;
size_t
nbyte
=
index
/
8
*
sizeof
(
u
nsigned
char
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
nsigned
char
);
size_t
nbyte
=
index
/
8
*
sizeof
(
u
int8_t
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
int8_t
);
return
(
d_dat
[
nbyte
]
&
(
1
<<
nbit
)
);
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
transport_msg
(
UDPPacket
&
p
)
const
noexcept
{
p
=
UDPPacket
{};
// reset data
size_t
i
=
0
;
memcpy
(
&
(
p
.
data
[
i
]),
this
,
sizeof
(
UDPHeader
));
i
+=
sizeof
(
UDPHeader
);
// копируем аналоговые данные
size_t
sz
=
acount
*
sizeof
(
UDPAData
);
memcpy
(
&
(
p
.
data
[
i
]),
a_dat
,
sz
);
i
+=
sz
;
// копируем булевые индексы
sz
=
dcount
*
sizeof
(
long
);
memcpy
(
&
(
p
.
data
[
i
]),
d_id
,
sz
);
i
+=
sz
;
// копируем булевые данные
size_t
nbyte
=
dcount
/
8
*
sizeof
(
unsigned
char
);
size_t
nbit
=
dcount
%
8
*
sizeof
(
unsigned
char
);
sz
=
nbit
>
0
?
nbyte
+
1
:
nbyte
;
memcpy
(
&
(
p
.
data
[
i
]),
d_dat
,
sz
);
i
+=
sz
;
p
.
len
=
i
;
return
i
;
}
// -----------------------------------------------------------------------------
long
UDPMessage
::
getDataID
()
const
noexcept
{
// в качестве идентификатора берётся ID первого датчика в данных
// приоритет имеет аналоговые датчики
if
(
acount
>
0
)
if
(
header
.
acount
>
0
)
return
a_dat
[
0
].
id
;
if
(
dcount
>
0
)
if
(
header
.
dcount
>
0
)
return
d_id
[
0
];
// если нет данных(?) просто возвращаем номер пакета
return
num
;
return
header
.
num
;
}
// -----------------------------------------------------------------------------
size_t
UniSetUDP
::
UDPMessage
::
sizeOf
()
const
noexcept
size_t
UniSetUDP
::
UDPMessage
::
len
()
const
noexcept
{
return
UniSetUDP
::
MaxDataLen
;
// биты которые не уместились в очередной байт, добавляют ещё один байт
size_t
nbit
=
dcount
%
8
*
sizeof
(
unsigned
char
);
size_t
add
=
nbit
>
0
?
1
:
0
;
// size_t nbit = header.dcount % 8 * sizeof(uint8_t
);
//
size_t add = nbit > 0 ? 1 : 0;
return
sizeof
(
UDPHeader
)
+
acount
*
sizeof
(
UDPAData
)
+
dcount
*
sizeof
(
long
)
+
(
dcount
/
8
*
sizeof
(
unsigned
char
)
+
add
);
// return sizeof(header) + header.acount * sizeof(UDPAData) + header.dcount * sizeof(long) + (header.dcount / 8 * sizeof(uint8_t
) + add);
}
// -----------------------------------------------------------------------------
UDPMessage
::
UDPMessage
(
UDPPacket
&
p
)
noexcept
bool
UDPMessage
::
isOk
(
)
noexcept
{
getMessage
(
*
this
,
p
);
return
(
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
);
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
getMessage
(
UDPMessage
&
m
,
UDPPacket
&
p
)
noexcept
void
UDPMessage
::
ntoh
(
)
noexcept
{
// reset data
m
=
UDPMessage
{};
size_t
i
=
0
;
memcpy
(
&
m
,
&
(
p
.
data
[
i
]),
sizeof
(
UDPHeader
));
i
+=
sizeof
(
UDPHeader
);
// byte order from packet
u
_char
be_order
=
m
.
_be_order
;
u
int8_t
be_order
=
header
.
_be_order
;
if
(
be_order
&&
!
HostIsBigEndian
)
{
BE_TO_H
(
m
.
magic
);
BE_TO_H
(
m
.
num
);
BE_TO_H
(
m
.
procID
);
BE_TO_H
(
m
.
nodeID
);
BE_TO_H
(
m
.
dcount
);
BE_TO_H
(
m
.
acount
);
BE_TO_H
(
header
.
magic
);
BE_TO_H
(
header
.
num
);
BE_TO_H
(
header
.
procID
);
BE_TO_H
(
header
.
nodeID
);
BE_TO_H
(
header
.
dcount
);
BE_TO_H
(
header
.
acount
);
}
else
if
(
!
be_order
&&
HostIsBigEndian
)
{
LE_TO_H
(
m
.
magic
);
LE_TO_H
(
m
.
num
);
LE_TO_H
(
m
.
procID
);
LE_TO_H
(
m
.
nodeID
);
LE_TO_H
(
m
.
dcount
);
LE_TO_H
(
m
.
acount
);
LE_TO_H
(
header
.
magic
);
LE_TO_H
(
header
.
num
);
LE_TO_H
(
header
.
procID
);
LE_TO_H
(
header
.
nodeID
);
LE_TO_H
(
header
.
dcount
);
LE_TO_H
(
header
.
acount
);
}
// set host byte order
#if __BYTE_ORDER == __LITTLE_ENDIAN
m
.
_be_order
=
0
;
header
.
_be_order
=
0
;
#elif __BYTE_ORDER == __BIG_ENDIAN
m
.
be_order
=
1
;
header
.
_
be_order
=
1
;
#else
#error UNET(getMessage): Unknown byte order!
#endif
// проверяем наш ли пакет..
if
(
m
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
{
m
.
magic
=
0
;
return
0
;
}
// копируем аналоговые данные
size_t
sz
=
m
.
acount
*
sizeof
(
UDPAData
);
if
(
sz
>
sizeof
(
m
.
a_dat
)
)
sz
=
sizeof
(
m
.
a_dat
);
memcpy
(
m
.
a_dat
,
&
(
p
.
data
[
i
]),
sz
);
i
+=
sz
;
// копируем булевые индексы
sz
=
m
.
dcount
*
sizeof
(
long
);
if
(
sz
>
sizeof
(
m
.
d_id
)
)
sz
=
sizeof
(
m
.
d_id
);
memcpy
(
m
.
d_id
,
&
(
p
.
data
[
i
]),
sz
);
i
+=
sz
;
// копируем булевые данные
size_t
nbyte
=
m
.
dcount
/
8
*
sizeof
(
unsigned
char
);
size_t
nbit
=
m
.
dcount
%
8
*
sizeof
(
unsigned
char
);
sz
=
nbit
>
0
?
nbyte
+
1
:
nbyte
;
if
(
sz
>
sizeof
(
m
.
d_dat
)
)
sz
=
sizeof
(
m
.
d_dat
);
memcpy
(
m
.
d_dat
,
&
(
p
.
data
[
i
]),
sz
);
// CONVERT DATA TO HOST BYTE ORDER
// -------------------------------
if
(
(
be_order
&&
!
HostIsBigEndian
)
||
(
!
be_order
&&
HostIsBigEndian
)
)
{
for
(
size_t
n
=
0
;
n
<
m
.
acount
;
n
++
)
for
(
size_t
n
=
0
;
n
<
header
.
acount
;
n
++
)
{
if
(
be_order
)
{
BE_TO_H
(
m
.
a_dat
[
n
].
id
);
BE_TO_H
(
m
.
a_dat
[
n
].
val
);
BE_TO_H
(
a_dat
[
n
].
id
);
BE_TO_H
(
a_dat
[
n
].
val
);
}
else
{
LE_TO_H
(
m
.
a_dat
[
n
].
id
);
LE_TO_H
(
m
.
a_dat
[
n
].
val
);
LE_TO_H
(
a_dat
[
n
].
id
);
LE_TO_H
(
a_dat
[
n
].
val
);
}
}
for
(
size_t
n
=
0
;
n
<
m
.
dcount
;
n
++
)
for
(
size_t
n
=
0
;
n
<
header
.
dcount
;
n
++
)
{
if
(
be_order
)
{
BE_TO_H
(
m
.
d_id
[
n
]);
BE_TO_H
(
d_id
[
n
]);
}
else
{
LE_TO_H
(
m
.
d_id
[
n
]);
LE_TO_H
(
d_id
[
n
]);
}
}
}
return
i
+
sz
;
}
// -----------------------------------------------------------------------------
uint16_t
UDPMessage
::
getDataCRC
()
const
noexcept
...
...
extensions/UNetUDP/UDPPacket.h
View file @
b4672e62
...
...
@@ -24,17 +24,17 @@
// --------------------------------------------------------------------------
namespace
uniset
{
// -----------------------------------------------------------------------------
namespace
UniSetUDP
{
/*! Для оптимизации размера передаваемых данных, но с учётом того, что ID могут идти не подряд.
Сделан следующий формат:
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
\todo Подумать на тему сделать два отдельных вида пакетов для булевых значений и для аналоговых,
чтобы уйти от преобразования UDPMessage --> UDPPacket --> UDPMessage.
// -----------------------------------------------------------------------------
namespace
UniSetUDP
{
/*! Для оптимизации размера передаваемых данных, но с учётом того, что ID могут идти не подряд.
Сделан следующий формат:
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
\todo Подумать на тему сделать два отдельных вида пакетов для булевых значений и для аналоговых,
чтобы уйти от преобразования UDPMessage --> UDPPacket --> UDPMessage.
\warning ТЕКУЩАЯ ВЕРСИЯ ПРОТОКОЛА НЕ БУДЕТ РАБОТАТЬ МЕЖДУ 32-битными и 64-битными системами (из-за отличия в типе long).
т.к. это не сильно актуально, пока не переделываю.
...
...
@@ -49,21 +49,19 @@ namespace uniset
Т.е. если все узлы будут иметь одинаковый порядок байт, фактического перекодирования не будет.
*/
const
uint32_t
UNETUDP_MAGICNUM
=
0x133EF54
;
// идентификатор протокола
const
uint32_t
UNETUDP_MAGICNUM
=
0x1343EFD
;
// идентификатор протокола
struct
UDPHeader
{
UDPHeader
()
noexcept
;
uint32_t
magic
;
u_char
_be_order
;
// 1 - BE byte order, 0 - LE byte order
size_t
num
;
long
nodeID
;
long
procID
;
size_t
dcount
;
/*!< количество булевых величин */
size_t
acount
;
/*!< количество аналоговых величин */
}
__attribute__
((
packed
));
struct
UDPHeader
{
UDPHeader
()
noexcept
;
uint32_t
magic
;
uint8_t
_be_order
;
// 1 - BE byte order, 0 - LE byte order
size_t
num
;
long
nodeID
;
long
procID
;
size_t
dcount
;
/*!< количество булевых величин */
size_t
acount
;
/*!< количество аналоговых величин */
}
__attribute__
((
packed
));
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPHeader
&
p
);
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPHeader
*
p
);
...
...
@@ -87,37 +85,16 @@ namespace uniset
// т.е. надо чтобы sizeof(UDPPacket) < 1432
// с другой стороны в текущей реализации
// в сеть посылается фактическое количество данных, а не sizeof(UDPPacket).
// При текущих настройках sizeof(UDPPacket) = 72679 (!)
static
const
size_t
MaxACount
=
1000
;
static
const
size_t
MaxDCount
=
3000
;
static
const
size_t
MaxDDataCount
=
1
+
MaxDCount
/
8
*
sizeof
(
unsigned
char
);
// При текущих настройках sizeof(UDPPacket) = 72679 (!)
static
const
size_t
MaxACount
=
2000
;
static
const
size_t
MaxDCount
=
5000
;
static
const
size_t
MaxDDataCount
=
1
+
MaxDCount
/
8
*
sizeof
(
unsigned
char
);
struct
UDPPacket
{
UDPPacket
()
noexcept
:
len
(
0
)
{}
// -V730
size_t
len
;
uint8_t
data
[
sizeof
(
UDPHeader
)
+
MaxDCount
*
sizeof
(
long
)
+
MaxDDataCount
+
MaxACount
*
sizeof
(
UDPAData
)
];
}
__attribute__
((
packed
));
static
const
size_t
MaxDataLen
=
sizeof
(
UDPPacket
);
struct
UDPMessage
:
public
UDPHeader
{
UDPMessage
()
noexcept
;
UDPMessage
(
UDPMessage
&&
m
)
noexcept
=
default
;
UDPMessage
&
operator
=
(
UDPMessage
&&
)
noexcept
=
default
;
UDPMessage
(
const
UDPMessage
&
m
)
noexcept
=
default
;
UDPMessage
&
operator
=
(
const
UDPMessage
&
)
noexcept
=
default
;
explicit
UDPMessage
(
UDPPacket
&
p
)
noexcept
;
size_t
transport_msg
(
UDPPacket
&
p
)
const
noexcept
;
static
size_t
getMessage
(
UDPMessage
&
m
,
UDPPacket
&
p
)
noexcept
;
struct
UDPMessage
{
// net to host
void
ntoh
()
noexcept
;
bool
isOk
()
noexcept
;
// \warning в случае переполнения возвращается MaxDCount
size_t
addDData
(
long
id
,
bool
val
)
noexcept
;
...
...
@@ -141,51 +118,61 @@ namespace uniset
long
getDataID
(
)
const
noexcept
;
/*!< получение "уникального" идентификатора данных этого пакета */
inline
bool
isAFull
()
const
noexcept
{
return
(
acount
>=
MaxACount
);
}
inline
bool
isDFull
()
const
noexcept
{
return
(
dcount
>=
MaxDCount
);
}
inline
bool
isFull
()
const
noexcept
{
return
!
((
dcount
<
MaxDCount
)
&&
(
acount
<
MaxACount
));
}
inline
size_t
dsize
()
const
noexcept
{
return
dcount
;
}
inline
size_t
asize
()
const
noexcept
{
return
acount
;
}
// размер итогового пакета в байтах
size_t
sizeOf
()
const
noexcept
;
inline
bool
isAFull
()
const
noexcept
{
return
(
header
.
acount
>=
MaxACount
);
}
inline
bool
isDFull
()
const
noexcept
{
return
(
header
.
dcount
>=
MaxDCount
);
}
inline
bool
isFull
()
const
noexcept
{
return
!
((
header
.
dcount
<
MaxDCount
)
&&
(
header
.
acount
<
MaxACount
));
}
inline
size_t
dsize
()
const
noexcept
{
return
header
.
dcount
;
}
inline
size_t
asize
()
const
noexcept
{
return
header
.
acount
;
}
// размер итогового пакета в байтах
size_t
len
()
const
noexcept
;
uint16_t
getDataCRC
()
const
noexcept
;
// количество байт в пакете с булевыми переменными...
size_t
d_byte
()
const
noexcept
{
return
dcount
*
sizeof
(
long
)
+
dcount
;
}
// количество байт в пакете с булевыми переменными...
size_t
d_byte
()
const
noexcept
{
return
header
.
dcount
*
sizeof
(
long
)
+
header
.
dcount
;
}
UDPAData
a_dat
[
MaxACount
];
/*!< аналоговые величины */
long
d_id
[
MaxDCount
];
/*!< список дискретных ID */
uint8_t
d_dat
[
MaxDDataCount
];
/*!< битовые значения */
};
UDPHeader
header
;
UDPAData
a_dat
[
MaxACount
];
/*!< аналоговые величины */
long
d_id
[
MaxDCount
];
/*!< список дискретных ID */
uint8_t
d_dat
[
MaxDDataCount
];
/*!< битовые значения */
}
__attribute__
((
packed
));
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPMessage
&
p
);
uint16_t
makeCRC
(
unsigned
char
*
buf
,
size_t
len
)
noexcept
;
}
// --------------------------------------------------------------------------
uint16_t
makeCRC
(
unsigned
char
*
buf
,
size_t
len
)
noexcept
;
static
const
size_t
MaxDataLen
=
sizeof
(
UDPHeader
)
+
MaxDCount
*
sizeof
(
long
)
+
MaxDDataCount
+
MaxACount
*
sizeof
(
UDPAData
);
union
UDPPacket
{
UDPPacket
()
:
msg
(){};
uint8_t
raw
[
MaxDataLen
];
UDPMessage
msg
;
};
}
// --------------------------------------------------------------------------
}
// end of namespace uniset
// -----------------------------------------------------------------------------
#endif // UDPPacket_H_
...
...
extensions/UNetUDP/UNetReceiver.cc
View file @
b4672e62
...
...
@@ -29,26 +29,16 @@ using namespace uniset;
using
namespace
uniset
::
extensions
;
// -----------------------------------------------------------------------------
CommonEventLoop
UNetReceiver
::
loop
;
static
UniSetUDP
::
UDPPacket
defpack
;
// -----------------------------------------------------------------------------
UNetReceiver
::
UNetReceiver
(
const
std
::
string
&
s_host
,
int
_port
,
const
std
::
shared_ptr
<
SMInterface
>&
smi
,
bool
nocheckConnection
,
const
std
::
string
&
prefix
)
:
shm
(
smi
),
updatepause
(
100
),
port
(
_port
),
saddr
(
s_host
,
_port
),
recvTimeout
(
5000
),
prepareTime
(
2000
),
lostTimeout
(
200
),
/* 2*updatepause */
lostPackets
(
0
),
sidRespond
(
uniset
::
DefaultObjectId
),
respondInvert
(
false
),
sidLostPackets
(
uniset
::
DefaultObjectId
),
activated
(
false
),
cbuf
(
cbufSize
),
maxDifferens
(
20
),
lockUpdate
(
false
)
cbuf
(
cbufSize
)
{
{
ostringstream
s
;
...
...
@@ -345,9 +335,9 @@ size_t UNetReceiver::rnext( size_t num )
while
(
i
<
wnum
)
{
p
=
&
cbuf
[
i
%
cbufSize
];
p
=
&
cbuf
[
i
%
cbufSize
]
.
msg
;
if
(
p
->
num
>
num
)
if
(
p
->
header
.
num
>
num
)
return
i
;
i
++
;
...
...
@@ -371,21 +361,23 @@ void UNetReceiver::update() noexcept
// либо обнаружится "дырка" в последовательности,
while
(
rnum
<
wnum
)
{
p
=
&
cbuf
[
rnum
%
cbufSize
];
p
=
&
(
cbuf
[
rnum
%
cbufSize
].
msg
);
// cout << "update: num=" << p->header.num << " rnum=" << rnum << " wnum=" << wnum << endl;
// если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
if
(
p
->
num
!=
rnum
)
if
(
p
->
header
.
num
!=
rnum
)
{
if
(
!
ptLostTimeout
.
checkTime
()
)
return
;
size_t
sub
=
1
;
if
(
p
->
num
>
rnum
)
sub
=
(
p
->
num
-
rnum
);
if
(
p
->
header
.
num
>
rnum
)
sub
=
(
p
->
header
.
num
-
rnum
);
unetwarn
<<
myname
<<
"(update): lostTimeout("
<<
ptLostTimeout
.
getInterval
()
<<
")! pnum="
<<
p
->
num
<<
" lost "
<<
sub
<<
" packets "
<<
endl
;
unetwarn
<<
myname
<<
"(update): lostTimeout("
<<
ptLostTimeout
.
getInterval
()
<<
")! pnum="
<<
p
->
header
.
num
<<
" lost "
<<
sub
<<
" packets "
<<
endl
;
lostPackets
+=
sub
;
// ищем следующий пакет для обработки
...
...
@@ -400,7 +392,7 @@ void UNetReceiver::update() noexcept
// Обработка дискретных
auto
d_iv
=
getDCache
(
p
);
for
(
size_t
i
=
0
;
i
<
p
->
dcount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
->
header
.
dcount
;
i
++
)
{
try
{
...
...
@@ -422,7 +414,7 @@ void UNetReceiver::update() noexcept
}
catch
(
const
uniset
::
Exception
&
ex
)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
D:
"
<<
" id="
<<
s_id
<<
" val="
<<
p
->
dValue
(
i
)
<<
" error: "
<<
ex
...
...
@@ -430,7 +422,7 @@ void UNetReceiver::update() noexcept
}
catch
(...)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
D:
"
<<
" id="
<<
s_id
<<
" val="
<<
p
->
dValue
(
i
)
<<
" error: catch..."
...
...
@@ -441,7 +433,7 @@ void UNetReceiver::update() noexcept
// Обработка аналоговых
auto
a_iv
=
getACache
(
p
);
for
(
size_t
i
=
0
;
i
<
p
->
acount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
->
header
.
acount
;
i
++
)
{
try
{
...
...
@@ -463,7 +455,7 @@ void UNetReceiver::update() noexcept
}
catch
(
const
uniset
::
Exception
&
ex
)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
A:
"
<<
" id="
<<
dat
->
id
<<
" val="
<<
dat
->
val
<<
" error: "
<<
ex
...
...
@@ -471,7 +463,7 @@ void UNetReceiver::update() noexcept
}
catch
(...)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
A:
"
<<
" id="
<<
dat
->
id
<<
" val="
<<
dat
->
val
<<
" error: catch..."
...
...
@@ -628,8 +620,9 @@ bool UNetReceiver::receive() noexcept
{
try
{
ssize_t
ret
=
udp
->
receiveBytes
(
r_buf
.
data
,
sizeof
(
r_buf
.
data
));
recvCount
++
;
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
pack
=
&
(
cbuf
[
wnum
%
cbufSize
]);
ssize_t
ret
=
udp
->
receiveBytes
(
pack
->
raw
,
sizeof
(
pack
->
raw
)
/* UniSetUDP::MaxDataLen */
);
if
(
ret
<
0
)
{
...
...
@@ -639,27 +632,24 @@ bool UNetReceiver::receive() noexcept
if
(
ret
==
0
)
{
unetwarn
<<
myname
<<
"(receive): disconnected?!... recv 0 byte.."
<<
endl
;
unetwarn
<<
myname
<<
"(receive): disconnected?!... recv 0 byte
s
.."
<<
endl
;
return
false
;
}
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
pack
=
&
cbuf
[
wnum
%
cbufSize
];
size_t
sz
=
UniSetUDP
::
UDPMessage
::
getMessage
(
*
pack
,
r_buf
);
recvCount
++
;
if
(
sz
==
0
)
{
unetcrit
<<
myname
<<
"(receive): FAILED RECEIVE DATA ret="
<<
ret
<<
endl
;
return
false
;
}
// конвертируем byte order
pack
->
msg
.
ntoh
();
if
(
pack
->
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
if
(
pack
->
m
sg
.
header
.
m
agic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
return
false
;
if
(
size_t
(
abs
(
long
(
pack
->
num
-
wnum
)))
>
maxDifferens
||
size_t
(
abs
(
long
(
wnum
-
rnum
)
))
>=
(
cbufSize
-
2
)
)
// cout << "RECV[" << ret << "]: msg: " << pack->msg << endl;
if
(
size_t
(
abs
(
long
(
pack
->
msg
.
header
.
num
-
wnum
)))
>
maxDifferens
||
size_t
(
abs
(
long
(
wnum
-
rnum
)
))
>=
(
cbufSize
-
2
)
)
{
unetcrit
<<
myname
<<
"(receive): DISAGREE "
<<
" packnum="
<<
pack
->
num
<<
" packnum="
<<
pack
->
msg
.
header
.
num
<<
" wnum="
<<
wnum
<<
" rnum="
<<
rnum
<<
" (maxDiff="
<<
maxDifferens
...
...
@@ -667,40 +657,41 @@ bool UNetReceiver::receive() noexcept
<<
")"
<<
endl
;
lostPackets
=
pack
->
num
>
wnum
?
(
pack
->
num
-
wnum
-
1
)
:
lostPackets
+
1
;
lostPackets
=
pack
->
msg
.
header
.
num
>
wnum
?
(
pack
->
msg
.
header
.
num
-
wnum
-
1
)
:
lostPackets
+
1
;
// реинициализируем позицию для чтения
rnum
=
pack
->
num
;
wnum
=
pack
->
num
+
1
;
rnum
=
pack
->
msg
.
header
.
num
;
wnum
=
pack
->
msg
.
header
.
num
+
1
;
// перемещаем пакет в нужное место (если требуется)
if
(
wnum
!=
pack
->
num
)
if
(
wnum
!=
pack
->
msg
.
header
.
num
)
{
cbuf
[
pack
->
num
%
cbufSize
]
=
(
*
pack
)
;
pack
->
num
=
0
;
cbuf
[
pack
->
msg
.
header
.
num
%
cbufSize
].
msg
=
pack
->
msg
;
pack
->
msg
.
header
.
num
=
0
;
}
return
true
;
}
if
(
pack
->
num
!=
wnum
)
if
(
pack
->
msg
.
header
.
num
!=
wnum
)
{
// перемещаем пакет в правильное место
// в соответствии с его номером
cbuf
[
pack
->
num
%
cbufSize
]
=
(
*
pack
)
;
cbuf
[
pack
->
msg
.
header
.
num
%
cbufSize
].
msg
=
pack
->
msg
;
if
(
pack
->
num
>=
wnum
)
wnum
=
pack
->
num
+
1
;
if
(
pack
->
msg
.
header
.
num
>=
wnum
)
wnum
=
pack
->
msg
.
header
.
num
+
1
;
// обнуляем номер в том месте где записали, чтобы его не обрабатывал update
pack
->
num
=
0
;
pack
->
msg
.
header
.
num
=
0
;
}
else
if
(
pack
->
num
>=
wnum
)
wnum
=
pack
->
num
+
1
;
else
if
(
pack
->
msg
.
header
.
num
>=
wnum
)
wnum
=
pack
->
msg
.
header
.
num
+
1
;
// начальная инициализация для чтения
if
(
rnum
==
0
)
rnum
=
pack
->
num
;
rnum
=
pack
->
msg
.
header
.
num
;
// cout << "FINAL: msg: " << cbuf[(wnum-1) % cbufSize].msg << endl;
return
true
;
}
catch
(
Poco
::
Net
::
NetException
&
ex
)
...
...
@@ -745,14 +736,14 @@ UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) n
CacheVec
*
d_info
=
&
dit
->
second
;
if
(
pack
->
dcount
==
d_info
->
size
()
)
if
(
pack
->
header
.
dcount
==
d_info
->
size
()
)
return
d_info
;
unetinfo
<<
myname
<<
": init dcache["
<<
pack
->
dcount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
unetinfo
<<
myname
<<
": init dcache["
<<
pack
->
header
.
dcount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
d_info
->
resize
(
pack
->
dcount
);
d_info
->
resize
(
pack
->
header
.
dcount
);
for
(
size_t
i
=
0
;
i
<
pack
->
dcount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
pack
->
header
.
dcount
;
i
++
)
{
CacheItem
&
d
=
(
*
d_info
)[
i
];
...
...
@@ -777,14 +768,14 @@ UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) n
CacheVec
*
a_info
=
&
ait
->
second
;
if
(
pack
->
acount
==
a_info
->
size
()
)
if
(
pack
->
header
.
acount
==
a_info
->
size
()
)
return
a_info
;
unetinfo
<<
myname
<<
": init acache["
<<
pack
->
acount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
unetinfo
<<
myname
<<
": init acache["
<<
pack
->
header
.
acount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
a_info
->
resize
(
pack
->
acount
);
a_info
->
resize
(
pack
->
header
.
acount
);
for
(
size_t
i
=
0
;
i
<
pack
->
acount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
pack
->
header
.
acount
;
i
++
)
{
CacheItem
&
d
=
(
*
a_info
)[
i
];
...
...
extensions/UNetUDP/UNetReceiver.h
View file @
b4672e62
...
...
@@ -234,18 +234,16 @@ namespace uniset
uniset
::
ObjectId
sidRespond
=
{
uniset
::
DefaultObjectId
};
IOController
::
IOStateList
::
iterator
itRespond
;
bool
respondInvert
=
{
false
};
uniset
::
ObjectId
sidLostPackets
;
uniset
::
ObjectId
sidLostPackets
=
{
uniset
::
DefaultObjectId
}
;
IOController
::
IOStateList
::
iterator
itLostPackets
;
std
::
atomic_bool
activated
=
{
false
};
size_t
cbufSize
=
{
100
};
/*!< размер буфера для сообщений по умолчанию */
std
::
vector
<
UniSetUDP
::
UDP
Message
>
cbuf
;
// circular buffer
std
::
vector
<
UniSetUDP
::
UDP
Packet
>
cbuf
;
// circular buffer
size_t
wnum
=
{
1
};
/*!< номер следующего ожидаемого пакета */
size_t
rnum
=
{
0
};
/*!< номер последнего обработанного пакета */
UniSetUDP
::
UDPMessage
*
pack
;
UniSetUDP
::
UDPPacket
r_buf
;
/*!< просто буфер для получения очередного сообщения */
UniSetUDP
::
UDPPacket
*
pack
;
// текущий обрабатываемый пакет
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
...
...
extensions/UNetUDP/UNetSender.cc
View file @
b4672e62
...
...
@@ -87,8 +87,8 @@ namespace uniset
// выставляем поля, которые не меняются
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
mypack
.
msg
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack
.
msg
.
procID
=
shm
->
ID
();
mypack
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
}
// -------------------------------
...
...
@@ -200,9 +200,9 @@ namespace uniset
uniset
::
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
if
(
it
.
iotype
==
UniversalIO
::
DI
||
it
.
iotype
==
UniversalIO
::
DO
)
mypack
.
msg
.
setDData
(
it
.
pack_ind
,
value
);
mypack
.
p
.
msg
.
setDData
(
it
.
pack_ind
,
value
);
else
if
(
it
.
iotype
==
UniversalIO
::
AI
||
it
.
iotype
==
UniversalIO
::
AO
)
mypack
.
msg
.
setAData
(
it
.
pack_ind
,
value
);
mypack
.
p
.
msg
.
setAData
(
it
.
pack_ind
,
value
);
}
// -----------------------------------------------------------------------------
void
UNetSender
::
setCheckConnectionPause
(
int
msec
)
...
...
@@ -313,13 +313,13 @@ namespace uniset
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
mypack
.
msg
.
num
=
packetnum
++
;
#else
uint16_t
crc
=
mypack
.
msg
.
getDataCRC
();
uint16_t
crc
=
mypack
.
p
.
msg
.
getDataCRC
();
if
(
crc
!=
lastcrc
)
{
mypack
.
msg
.
num
=
packetnum
++
;
lastcrc
=
crc
;
}
if
(
crc
!=
lastcrc
)
{
mypack
.
p
.
msg
.
header
.
num
=
packetnum
++
;
lastcrc
=
crc
;
}
#endif
// при переходе через ноль (когда счётчик перевалит через UniSetUDP::MaxPacketNum..
...
...
@@ -330,12 +330,10 @@ namespace uniset
if
(
!
udp
||
!
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
writeTimeout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
return
;
mypack
.
msg
.
transport_msg
(
s_msg
);
size_t
ret
=
udp
->
sendTo
(
&
s_msg
.
data
,
s_msg
.
len
,
saddr
);
size_t
ret
=
udp
->
sendTo
(
mypack
.
p
.
raw
,
mypack
.
p
.
msg
.
len
(),
saddr
);
if
(
ret
<
s_msg
.
len
)
unetcrit
<<
myname
<<
"(real_send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
s_msg
.
len
<<
endl
;
if
(
ret
<
mypack
.
p
.
msg
.
len
()
)
unetcrit
<<
myname
<<
"(real_send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
mypack
.
p
.
msg
.
len
()
<<
endl
;
}
catch
(
Poco
::
Net
::
NetException
&
ex
)
{
...
...
@@ -456,7 +454,7 @@ namespace uniset
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
p
.
pack_ind
=
mypack
.
msg
.
addDData
(
sid
,
defval
);
p
.
pack_ind
=
mypack
.
p
.
msg
.
addDData
(
sid
,
defval
);
}
// unlock mutex....
if
(
p
.
pack_ind
>=
maxDData
)
...
...
@@ -468,9 +466,9 @@ namespace uniset
auto
&
mypack2
=
pk
[
dnum
];
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
p
.
pack_ind
=
mypack2
.
msg
.
addDData
(
sid
,
defval
);
mypack2
.
msg
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
msg
.
procID
=
shm
->
ID
();
p
.
pack_ind
=
mypack2
.
p
.
msg
.
addDData
(
sid
,
defval
);
mypack2
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
}
p
.
pack_num
=
dnum
;
...
...
@@ -497,7 +495,7 @@ namespace uniset
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
p
.
pack_ind
=
mypack
.
msg
.
addAData
(
sid
,
defval
);
p
.
pack_ind
=
mypack
.
p
.
msg
.
addAData
(
sid
,
defval
);
}
if
(
p
.
pack_ind
>=
maxAData
)
...
...
@@ -509,9 +507,9 @@ namespace uniset
auto
&
mypack2
=
pk
[
anum
];
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
p
.
pack_ind
=
mypack2
.
msg
.
addAData
(
sid
,
defval
);
mypack2
.
msg
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
msg
.
procID
=
shm
->
ID
();
p
.
pack_ind
=
mypack2
.
p
.
msg
.
addAData
(
sid
,
defval
);
mypack2
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
}
p
.
pack_num
=
anum
;
...
...
@@ -588,11 +586,11 @@ namespace uniset
s
<<
"
\t
["
<<
i
->
first
<<
"]="
<<
i
->
second
.
size
()
<<
endl
;
size_t
n
=
0
;
for
(
const
auto
&
p
:
i
->
second
)
for
(
const
auto
&
p
ack
:
i
->
second
)
{
//uniset_rwmutex_rlock l(p->mut);
s
<<
"
\t\t
["
<<
(
n
++
)
<<
"]="
<<
p
.
msg
.
sizeOf
()
<<
" bytes"
<<
" ( numA="
<<
setw
(
5
)
<<
p
.
msg
.
asize
()
<<
" numD="
<<
setw
(
5
)
<<
p
.
msg
.
dsize
()
<<
")"
s
<<
"
\t\t
["
<<
(
n
++
)
<<
"]="
<<
p
ack
.
p
.
msg
.
len
()
<<
" bytes"
<<
" ( numA="
<<
setw
(
5
)
<<
p
ack
.
p
.
msg
.
asize
()
<<
" numD="
<<
setw
(
5
)
<<
pack
.
p
.
msg
.
dsize
()
<<
")"
<<
endl
;
}
}
...
...
extensions/UNetUDP/UNetSender.h
View file @
b4672e62
...
...
@@ -33,152 +33,152 @@
// --------------------------------------------------------------------------
namespace
uniset
{
// -----------------------------------------------------------------------------
/*
* Распределение датчиков по пакетам
* =========================================================================
*
Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
* Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
* Внутри каждой группы пакеты набираются по мере "заполнения".
*
* Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
* Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
* то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
* в свою очередь остальные продолжают "добивать" предыдущий пакет.
* В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
* существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
*
* ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
* Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
На стороне UNetReceiver пакеты с повторными номерами (т.е. уже обработанные) - откидываются.
*
*
* Создание соединения
* ======================================
* Попытка создать соединение производиться сразу в конструкторе, если это не получается,
* то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
* и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
* (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
* ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
* Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
* тогда при создании объекта UNetSender, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
* \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
*/
class
UNetSender
{
public
:
UNetSender
(
const
std
::
string
&
host
,
const
int
port
,
const
std
::
shared_ptr
<
SMInterface
>&
smi
,
bool
nocheckConnection
=
false
,
const
std
::
string
&
s_field
=
""
,
const
std
::
string
&
s_fvalue
=
""
,
const
std
::
string
&
prop_prefix
=
"unet"
,
const
std
::
string
&
prefix
=
"unet"
,
size_t
maxDCount
=
UniSetUDP
::
MaxDCount
,
size_t
maxACount
=
UniSetUDP
::
MaxACount
);
virtual
~
UNetSender
();
typedef
size_t
sendfactor_t
;
static
const
long
not_specified_value
=
{
std
::
numeric_limits
<
long
>::
max
()
};
struct
UItem
{
UItem
()
:
iotype
(
UniversalIO
::
UnknownIOType
),
id
(
uniset
::
DefaultObjectId
),
pack_num
(
0
),
pack_ind
(
0
),
pack_sendfactor
(
0
)
{}
UniversalIO
::
IOType
iotype
;
uniset
::
ObjectId
id
;
IOController
::
IOStateList
::
iterator
ioit
;
size_t
pack_num
;
size_t
pack_ind
;
sendfactor_t
pack_sendfactor
=
{
0
};
long
undefined_value
=
{
not_specified_value
};
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UItem
&
p
);
};
typedef
std
::
unordered_map
<
uniset
::
ObjectId
,
UItem
>
UItemMap
;
size_t
getDataPackCount
()
const
;
void
start
();
void
stop
();
void
send
()
noexcept
;
struct
PackMessage
{
PackMessage
(
UniSetUDP
::
UDPMessage
&&
m
)
noexcept
:
msg
(
std
::
move
(
m
))
{}
PackMessage
(
const
UniSetUDP
::
UDPMessage
&
m
)
=
delete
;
PackMessage
()
noexcept
{}
UniSetUDP
::
UDPMessage
msg
;
uniset
::
uniset_rwmutex
mut
;
};
void
real_send
(
PackMessage
&
mypack
)
noexcept
;
/*! (принудительно) обновить все данные (из SM) */
void
updateFromSM
();
/*! Обновить значение по ID датчика */
void
updateSensor
(
uniset
::
ObjectId
id
,
long
value
);
/*! Обновить значение по итератору */
void
updateItem
(
UItem
&
it
,
long
value
);
inline
void
setSendPause
(
int
msec
)
{
sendpause
=
msec
;
}
inline
void
setPackSendPause
(
int
msec
)
{
packsendpause
=
msec
;
}
inline
void
setPackSendPauseFactor
(
int
factor
)
{
packsendpauseFactor
=
factor
;
}
void
setCheckConnectionPause
(
int
msec
);
/*! заказать датчики */
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
/*! инициализация итераторов */
void
initIterators
();
inline
std
::
shared_ptr
<
DebugStream
>
getLog
()
{
return
unetlog
;
}
virtual
const
std
::
string
getShortInfo
()
const
;
inline
std
::
string
getAddress
()
const
{
return
addr
;
}
inline
int
getPort
()
const
{
return
port
;
}
inline
size_t
getADataSize
()
const
{
return
maxAData
;
}
inline
size_t
getDDataSize
()
const
{
return
maxDData
;
}
// -----------------------------------------------------------------------------
/*
* Распределение датчиков по пакетам
* =========================================================================
*
Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
* Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
* Внутри каждой группы пакеты набираются по мере "заполнения".
*
* Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
* Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
* то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
* в свою очередь остальные продолжают "добивать" предыдущий пакет.
* В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
* существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
*
* ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
* Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
На стороне UNetReceiver пакеты с повторными номерами (т.е. уже обработанные) - откидываются.
*
*
* Создание соединения
* ======================================
* Попытка создать соединение производиться сразу в конструкторе, если это не получается,
* то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
* и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
* (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
* ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
* Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
* тогда при создании объекта UNetSender, в конструкторе будет
* выкинуто исключение при неудачной попытке создания соединения.
* \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
*/
class
UNetSender
{
public
:
UNetSender
(
const
std
::
string
&
host
,
const
int
port
,
const
std
::
shared_ptr
<
SMInterface
>&
smi
,
bool
nocheckConnection
=
false
,
const
std
::
string
&
s_field
=
""
,
const
std
::
string
&
s_fvalue
=
""
,
const
std
::
string
&
prop_prefix
=
"unet"
,
const
std
::
string
&
prefix
=
"unet"
,
size_t
maxDCount
=
UniSetUDP
::
MaxDCount
,
size_t
maxACount
=
UniSetUDP
::
MaxACount
);
virtual
~
UNetSender
();
typedef
size_t
sendfactor_t
;
static
const
long
not_specified_value
=
{
std
::
numeric_limits
<
long
>::
max
()
};
struct
UItem
{
UItem
()
:
iotype
(
UniversalIO
::
UnknownIOType
),
id
(
uniset
::
DefaultObjectId
),
pack_num
(
0
),
pack_ind
(
0
),
pack_sendfactor
(
0
)
{}
UniversalIO
::
IOType
iotype
;
uniset
::
ObjectId
id
;
IOController
::
IOStateList
::
iterator
ioit
;
size_t
pack_num
;
size_t
pack_ind
;
sendfactor_t
pack_sendfactor
=
{
0
};
long
undefined_value
=
{
not_specified_value
};
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UItem
&
p
);
};
typedef
std
::
unordered_map
<
uniset
::
ObjectId
,
UItem
>
UItemMap
;
size_t
getDataPackCount
()
const
;
void
start
();
void
stop
();
void
send
()
noexcept
;
struct
PackMessage
{
PackMessage
(
uniset
::
UniSetUDP
::
UDPPacket
&&
p
)
noexcept
:
p
(
std
::
move
(
p
))
{}
PackMessage
(
const
uniset
::
UniSetUDP
::
UDPPacket
&
p
)
=
delete
;
PackMessage
()
noexcept
{}
uniset
::
UniSetUDP
::
UDPPacket
p
;
uniset
::
uniset_rwmutex
mut
;
};
void
real_send
(
PackMessage
&
mypack
)
noexcept
;
/*! (принудительно) обновить все данные (из SM) */
void
updateFromSM
();
/*! Обновить значение по ID датчика */
void
updateSensor
(
uniset
::
ObjectId
id
,
long
value
);
/*! Обновить значение по итератору */
void
updateItem
(
UItem
&
it
,
long
value
);
inline
void
setSendPause
(
int
msec
)
{
sendpause
=
msec
;
}
inline
void
setPackSendPause
(
int
msec
)
{
packsendpause
=
msec
;
}
inline
void
setPackSendPauseFactor
(
int
factor
)
{
packsendpauseFactor
=
factor
;
}
void
setCheckConnectionPause
(
int
msec
);
/*! заказать датчики */
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
/*! инициализация итераторов */
void
initIterators
();
inline
std
::
shared_ptr
<
DebugStream
>
getLog
()
{
return
unetlog
;
}
virtual
const
std
::
string
getShortInfo
()
const
;
inline
std
::
string
getAddress
()
const
{
return
addr
;
}
inline
int
getPort
()
const
{
return
port
;
}
inline
size_t
getADataSize
()
const
{
return
maxAData
;
}
inline
size_t
getDDataSize
()
const
{
return
maxDData
;
}
protected
:
...
...
@@ -205,35 +205,34 @@ namespace uniset
std
::
string
s_host
=
{
""
};
Poco
::
Net
::
SocketAddress
saddr
;
std
::
string
myname
=
{
""
};
timeout_t
sendpause
=
{
150
};
timeout_t
packsendpause
=
{
5
};
int
packsendpauseFactor
=
{
1
};
timeout_t
writeTimeout
=
{
1000
};
// msec
std
::
atomic_bool
activated
=
{
false
};
PassiveTimer
ptCheckConnection
;
std
::
string
myname
=
{
""
};
timeout_t
sendpause
=
{
150
};
timeout_t
packsendpause
=
{
5
};
int
packsendpauseFactor
=
{
1
};
timeout_t
writeTimeout
=
{
1000
};
// msec
std
::
atomic_bool
activated
=
{
false
};
PassiveTimer
ptCheckConnection
;
typedef
std
::
unordered_map
<
sendfactor_t
,
std
::
vector
<
PackMessage
>>
Packs
;
typedef
std
::
unordered_map
<
sendfactor_t
,
std
::
vector
<
PackMessage
>>
Packs
;
// mypacks заполняется в начале и дальше с ним происходит только чтение
// поэтому mutex-ом его не защищаем
Packs
mypacks
;
std
::
unordered_map
<
sendfactor_t
,
size_t
>
packs_anum
;
std
::
unordered_map
<
sendfactor_t
,
size_t
>
packs_dnum
;
UItemMap
items
;
size_t
packetnum
=
{
1
};
/*!< номер очередного посылаемого пакета */
uint16_t
lastcrc
=
{
0
};
UniSetUDP
::
UDPPacket
s_msg
;
// mypacks заполняется в начале и дальше с ним происходит только чтение
// поэтому mutex-ом его не защищаем
Packs
mypacks
;
std
::
unordered_map
<
sendfactor_t
,
size_t
>
packs_anum
;
std
::
unordered_map
<
sendfactor_t
,
size_t
>
packs_dnum
;
UItemMap
items
;
size_t
packetnum
=
{
1
};
/*!< номер очередного посылаемого пакета */
uint16_t
lastcrc
=
{
0
};
size_t
maxAData
=
{
UniSetUDP
::
MaxACount
};
size_t
maxDData
=
{
UniSetUDP
::
MaxDCount
};
size_t
maxAData
=
{
UniSetUDP
::
MaxACount
};
size_t
maxDData
=
{
UniSetUDP
::
MaxDCount
};
std
::
unique_ptr
<
ThreadCreator
<
UNetSender
>
>
s_thr
;
// send thread
std
::
unique_ptr
<
ThreadCreator
<
UNetSender
>
>
s_thr
;
// send thread
size_t
ncycle
=
{
0
};
/*!< номер цикла посылки */
size_t
ncycle
=
{
0
};
/*!< номер цикла посылки */
};
// --------------------------------------------------------------------------
};
// --------------------------------------------------------------------------
}
// end of namespace uniset
// -----------------------------------------------------------------------------
#endif // UNetSender_H_
...
...
extensions/UNetUDP/tests/test_unetudp.cc
View file @
b4672e62
...
...
@@ -56,26 +56,23 @@ void InitTest()
}
// -----------------------------------------------------------------------------
// pnum - минималный номер ожидаемого пакета ( 0 - любой пришедщий )
// ncycle - сколько пакетов разрешено "пропустить" прежде чем дождёмся нужного.
.
(чтобы не ждать бесконечно)
static
UniSetUDP
::
UDP
Message
receive
(
unsigned
int
pnum
=
0
,
timeout_t
tout
=
2000
,
int
ncycle
=
20
)
// ncycle - сколько пакетов разрешено "пропустить" прежде чем дождёмся нужного. (чтобы не ждать бесконечно)
static
UniSetUDP
::
UDP
Packet
receive
(
unsigned
int
pnum
=
0
,
timeout_t
tout
=
2000
,
int
ncycle
=
20
)
{
UniSetUDP
::
UDPMessage
pack
;
UniSetUDP
::
UDPPacket
buf
;
UniSetUDP
::
UDPPacket
pack
;
while
(
ncycle
>
0
)
{
if
(
!
udp_r
->
poll
(
UniSetTimer
::
millisecToPoco
(
tout
),
Poco
::
Net
::
Socket
::
SELECT_READ
)
)
break
;
size_t
ret
=
udp_r
->
receiveBytes
(
pack
.
raw
,
sizeof
(
pack
.
raw
)
);
size_t
ret
=
udp_r
->
receiveBytes
(
&
(
buf
.
data
),
sizeof
(
buf
.
data
)
);
size_t
sz
=
UniSetUDP
::
UDPMessage
::
getMessage
(
pack
,
buf
)
;
if
(
ret
<=
0
||
pnum
==
0
||
(
pnum
>
0
&&
pack
.
msg
.
header
.
num
>=
pnum
)
)
// -V560
break
;
if
(
sz
==
0
||
pnum
==
0
||
(
pnum
>
0
&&
pack
.
num
>=
pnum
)
)
// -V560
break
;
REQUIRE
(
pack
.
magic
==
UniSetUDP
::
UNETUDP_MAGICNUM
);
ncycle
--
;
}
REQUIRE
(
pack
.
msg
.
header
.
magic
==
UniSetUDP
::
UNETUDP_MAGICNUM
);
ncycle
--
;
}
// if( pnum > 0 && pack.num < pnum )
// return UniSetUDP::UDPMessage(); // empty message
...
...
@@ -83,81 +80,88 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20
return
pack
;
}
// -----------------------------------------------------------------------------
void
send
(
UniSetUDP
::
UDP
Message
&
pack
,
int
tout
=
2000
)
void
send
(
UniSetUDP
::
UDP
Packet
&
pack
,
int
tout
=
2000
)
{
CHECK
(
udp_s
->
poll
(
UniSetTimer
::
millisecToPoco
(
tout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
);
pack
.
msg
.
header
.
nodeID
=
s_nodeID
;
pack
.
msg
.
header
.
procID
=
s_procID
;
pack
.
msg
.
header
.
num
=
s_numpack
++
;
pack
.
nodeID
=
s_nodeID
;
pack
.
procID
=
s_procID
;
pack
.
num
=
s_numpack
++
;
UniSetUDP
::
UDPPacket
s_buf
;
pack
.
transport_msg
(
s_buf
);
size_t
ret
=
udp_s
->
sendTo
(
&
s_buf
.
data
,
s_buf
.
len
,
s_addr
);
REQUIRE
(
ret
==
s_buf
.
len
);
size_t
ret
=
udp_s
->
sendTo
(
pack
.
raw
,
pack
.
msg
.
len
(),
s_addr
);
REQUIRE
(
ret
==
pack
.
msg
.
len
()
);
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: repack"
,
"[unetudp][repack]"
)
{
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
header
.
nodeID
=
100
;
pack
.
msg
.
header
.
procID
=
100
;
pack
.
msg
.
header
.
num
=
1
;
pack
.
msg
.
addDData
(
1
,
1
);
pack
.
msg
.
addDData
(
2
,
0
);
pack
.
msg
.
addAData
(
3
,
30
);
pack
.
msg
.
addAData
(
4
,
40
);
REQUIRE
(
pack
.
msg
.
header
.
magic
==
UniSetUDP
::
UNETUDP_MAGICNUM
);
UniSetUDP
::
UDPPacket
pack2
;
memcpy
(
pack2
.
raw
,
pack
.
raw
,
sizeof
(
pack2
.
raw
));
pack2
.
msg
.
ntoh
();
REQUIRE
(
pack2
.
msg
.
header
.
nodeID
==
100
);
REQUIRE
(
pack2
.
msg
.
header
.
procID
==
100
);
REQUIRE
(
pack2
.
msg
.
header
.
num
==
1
);
REQUIRE
(
pack2
.
msg
.
header
.
magic
==
UniSetUDP
::
UNETUDP_MAGICNUM
);
REQUIRE
(
pack2
.
msg
.
dID
(
0
)
==
1
);
REQUIRE
(
pack2
.
msg
.
dValue
(
0
)
==
true
);
REQUIRE
(
pack2
.
msg
.
dID
(
1
)
==
2
);
REQUIRE
(
pack2
.
msg
.
dValue
(
1
)
==
false
);
REQUIRE
(
pack2
.
msg
.
dID
(
1
)
==
2
);
REQUIRE
(
pack2
.
msg
.
a_dat
[
0
].
id
==
3
);
REQUIRE
(
pack2
.
msg
.
a_dat
[
0
].
val
==
30
);
REQUIRE
(
pack2
.
msg
.
a_dat
[
1
].
id
==
4
);
REQUIRE
(
pack2
.
msg
.
a_dat
[
1
].
val
==
40
);
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: UDPMessage"
,
"[unetudp][udpmessage]"
)
{
SECTION
(
"UDPMessage::isFull()"
)
{
UniSetUDP
::
UDPMessage
u
;
SECTION
(
"UDPMessage::isFull()"
)
{
UniSetUDP
::
UDPMessage
u
;
for
(
unsigned
int
i
=
0
;
i
<
UniSetUDP
::
MaxACount
-
1
;
i
++
)
u
.
addAData
(
i
,
i
);
for
(
unsigned
int
i
=
0
;
i
<
UniSetUDP
::
MaxACount
-
1
;
i
++
)
u
.
addAData
(
i
,
i
);
REQUIRE
(
u
.
asize
()
==
(
UniSetUDP
::
MaxACount
-
1
)
);
REQUIRE
(
u
.
asize
()
==
(
UniSetUDP
::
MaxACount
-
1
)
);
CHECK_FALSE
(
u
.
isAFull
()
);
u
.
addAData
(
1
,
1
);
CHECK
(
u
.
isAFull
()
);
CHECK_FALSE
(
u
.
isAFull
()
);
u
.
addAData
(
1
,
1
);
CHECK
(
u
.
isAFull
()
);
for
(
unsigned
int
i
=
0
;
i
<
UniSetUDP
::
MaxDCount
-
1
;
i
++
)
u
.
addDData
(
i
,
true
);
for
(
unsigned
int
i
=
0
;
i
<
UniSetUDP
::
MaxDCount
-
1
;
i
++
)
u
.
addDData
(
i
,
true
);
REQUIRE
(
u
.
dsize
()
==
(
UniSetUDP
::
MaxDCount
-
1
)
);
REQUIRE
(
u
.
dsize
()
==
(
UniSetUDP
::
MaxDCount
-
1
)
);
CHECK_FALSE
(
u
.
isDFull
()
);
u
.
addDData
(
1
,
true
);
CHECK
(
u
.
isDFull
()
);
CHECK_FALSE
(
u
.
isDFull
()
);
u
.
addDData
(
1
,
true
);
CHECK
(
u
.
isDFull
()
);
CHECK
(
u
.
isFull
()
);
}
SECTION
(
"UDPMessage transport.."
)
{
// создаём сообщение, преобразуем к Package.. потом обратно.. проверяём, что информация не исказилась
UniSetUDP
::
UDPMessage
u
;
size_t
a
=
u
.
addAData
(
100
,
100
);
size_t
d
=
u
.
addDData
(
110
,
true
);
UniSetUDP
::
UDPPacket
p
;
size_t
len
=
u
.
transport_msg
(
p
);
CHECK
(
len
!=
0
);
REQUIRE
(
a
<
UniSetUDP
::
MaxACount
);
REQUIRE
(
d
<
UniSetUDP
::
MaxDCount
);
UniSetUDP
::
UDPMessage
u2
(
p
);
REQUIRE
(
u2
.
a_dat
[
a
].
id
==
100
);
REQUIRE
(
u2
.
a_dat
[
a
].
val
==
100
);
REQUIRE
(
u2
.
dID
(
d
)
==
110
);
REQUIRE
(
u2
.
dValue
(
d
)
==
true
);
}
CHECK
(
u
.
isFull
()
);
}
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]:
sizeOf"
,
"[unetudp][sizeof
]"
)
TEST_CASE
(
"[UNetUDP]:
len"
,
"[unetudp][len
]"
)
{
UniSetUDP
::
UDPMessage
m
;
REQUIRE
(
m
.
sizeOf
()
==
sizeof
(
UniSetUDP
::
UDPHeader
)
);
UniSetUDP
::
UDPPacket
p
;
m
.
addAData
(
8
,
70
);
REQUIRE
(
p
.
msg
.
len
()
==
sizeof
(
UniSetUDP
::
UDPHeader
)
);
REQUIRE
(
m
.
sizeOf
()
==
sizeof
(
UniSetUDP
::
UDPHeader
)
+
sizeof
(
UniSetUDP
::
UDPAData
)
);
p
.
msg
.
addAData
(
8
,
70
);
UniSetUDP
::
UDPPacket
p
;
size_t
len
=
m
.
transport_msg
(
p
);
REQUIRE
(
len
==
m
.
sizeOf
()
);
REQUIRE
(
p
.
msg
.
len
()
==
sizeof
(
UniSetUDP
::
UDPHeader
)
+
sizeof
(
UniSetUDP
::
UDPAData
)
);
}
// -----------------------------------------------------------------------------
#if 0
...
...
@@ -177,184 +181,183 @@ TEST_CASE("[UNetUDP]: respond sensor", "[unetudp]")
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: check sender"
,
"[unetudp][sender]"
)
{
InitTest
();
SECTION
(
"Test: read default pack.
.."
)
{
UniSetUDP
::
UDPMessage
pack
=
receive
();
REQUIRE
(
pack
.
num
!=
0
);
REQUIRE
(
pack
.
asize
()
==
4
);
REQUIRE
(
pack
.
dsize
()
==
2
);
for
(
size_t
i
=
0
;
i
<
pack
.
asize
();
i
++
)
{
REQUIRE
(
pack
.
a_dat
[
i
].
val
==
i
+
1
);
}
REQUIRE
(
pack
.
dValue
(
0
)
==
1
);
REQUIRE
(
pack
.
dValue
(
1
)
==
0
);
// т.к. данные в SM не менялись, то должен придти пакет с тем же номером что и был.
.
UniSetUDP
::
UDPMessage
pack
2
=
receive
();
REQUIRE
(
pack2
.
num
==
pack
.
num
);
}
SECTION
(
"Test: change AI data.
.."
)
{
UniSetUDP
::
UDPMessage
pack0
=
receive
();
ui
->
setValue
(
2
,
100
);
REQUIRE
(
ui
->
getValue
(
2
)
==
100
);
msleep
(
120
);
UniSetUDP
::
UDPMessage
pack
=
receive
(
pack0
.
num
+
1
);
REQUIRE
(
pack
.
num
!=
0
);
REQUIRE
(
pack
.
asize
()
==
4
);
REQUIRE
(
pack
.
dsize
()
==
2
);
REQUIRE
(
pack
.
a_dat
[
0
].
val
==
100
);
ui
->
setValue
(
2
,
250
);
REQUIRE
(
ui
->
getValue
(
2
)
==
250
);
msleep
(
120
);
UniSetUDP
::
UDPMessage
pack2
=
receive
(
pack
.
num
+
1
);
REQUIRE
(
pack2
.
num
!=
0
);
REQUIRE
(
pack2
.
num
>
pack
.
num
);
REQUIRE
(
pack2
.
asize
()
==
4
);
REQUIRE
(
pack2
.
dsize
()
==
2
);
REQUIRE
(
pack2
.
a_dat
[
0
].
val
==
250
);
}
SECTION
(
"Test: change DI data.
.."
)
{
UniSetUDP
::
UDPMessage
pack0
=
receive
();
ui
->
setValue
(
6
,
1
);
REQUIRE
(
ui
->
getValue
(
6
)
==
1
);
msleep
(
120
);
UniSetUDP
::
UDPMessage
pack
=
receive
(
pack0
.
num
+
1
);
REQUIRE
(
pack
.
num
!=
0
);
REQUIRE
(
pack
.
asize
()
==
4
);
REQUIRE
(
pack
.
dsize
()
==
2
);
REQUIRE
(
pack
.
dValue
(
0
)
==
1
);
ui
->
setValue
(
6
,
0
);
REQUIRE
(
ui
->
getValue
(
6
)
==
0
);
msleep
(
120
);
UniSetUDP
::
UDPMessage
pack2
=
receive
(
pack
.
num
+
1
);
REQUIRE
(
pack2
.
num
!=
0
);
REQUIRE
(
pack2
.
num
>
pack
.
num
);
REQUIRE
(
pack2
.
asize
()
==
4
);
REQUIRE
(
pack2
.
dsize
()
==
2
);
REQUIRE
(
pack2
.
dValue
(
0
)
==
0
);
}
InitTest
();
SECTION
(
"Test: read default pack
.."
)
{
UniSetUDP
::
UDPPacket
p
=
receive
();
REQUIRE
(
p
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
p
.
msg
.
asize
()
==
4
);
REQUIRE
(
p
.
msg
.
dsize
()
==
2
);
for
(
size_t
i
=
0
;
i
<
p
.
msg
.
asize
();
i
++
)
{
REQUIRE
(
p
.
msg
.
a_dat
[
i
].
val
==
i
+
1
);
}
REQUIRE
(
p
.
msg
.
dValue
(
0
)
==
true
);
REQUIRE
(
p
.
msg
.
dValue
(
1
)
==
false
);
// т.к. данные в SM не менялись, то должен придти пакет с тем же номером что и был
.
UniSetUDP
::
UDPPacket
p
2
=
receive
();
REQUIRE
(
p2
.
msg
.
header
.
num
==
p
.
msg
.
header
.
num
);
}
SECTION
(
"Test: change AI data
.."
)
{
UniSetUDP
::
UDPPacket
pack0
=
receive
();
ui
->
setValue
(
2
,
100
);
REQUIRE
(
ui
->
getValue
(
2
)
==
100
);
msleep
(
120
);
UniSetUDP
::
UDPPacket
pack
=
receive
(
pack0
.
msg
.
header
.
num
+
1
);
REQUIRE
(
pack
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack
.
msg
.
a_dat
[
0
].
val
==
100
);
ui
->
setValue
(
2
,
250
);
REQUIRE
(
ui
->
getValue
(
2
)
==
250
);
msleep
(
120
);
UniSetUDP
::
UDPPacket
pack2
=
receive
(
pack
.
msg
.
header
.
num
+
1
);
REQUIRE
(
pack2
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack2
.
msg
.
header
.
num
>
pack
.
msg
.
header
.
num
);
REQUIRE
(
pack2
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack2
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack2
.
msg
.
a_dat
[
0
].
val
==
250
);
}
SECTION
(
"Test: change DI data
.."
)
{
UniSetUDP
::
UDPPacket
pack0
=
receive
();
ui
->
setValue
(
6
,
1
);
REQUIRE
(
ui
->
getValue
(
6
)
==
1
);
msleep
(
120
);
UniSetUDP
::
UDPPacket
pack
=
receive
(
pack0
.
msg
.
header
.
num
+
1
);
REQUIRE
(
pack
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack
.
msg
.
dValue
(
0
)
==
true
);
ui
->
setValue
(
6
,
0
);
REQUIRE
(
ui
->
getValue
(
6
)
==
0
);
msleep
(
120
);
UniSetUDP
::
UDPPacket
pack2
=
receive
(
pack
.
msg
.
header
.
num
+
1
);
REQUIRE
(
pack2
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack2
.
msg
.
header
.
num
>
pack
.
msg
.
header
.
num
);
REQUIRE
(
pack2
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack2
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack2
.
msg
.
dValue
(
0
)
==
false
);
}
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: check receiver"
,
"[unetudp][receiver]"
)
{
InitTest
();
SECTION
(
"Test: send data pack.
.."
)
{
REQUIRE
(
ui
->
getValue
(
node2_respond_s
)
==
0
);
UniSetUDP
::
UDPMessage
pack
;
pack
.
addAData
(
8
,
100
);
pack
.
addAData
(
9
,
-
100
);
pack
.
addDData
(
10
,
true
);
pack
.
addDData
(
11
,
false
);
REQUIRE
(
ui
->
getValue
(
8
)
==
0
);
REQUIRE
(
ui
->
getValue
(
9
)
==
0
);
REQUIRE
(
ui
->
getValue
(
10
)
==
0
);
REQUIRE
(
ui
->
getValue
(
11
)
==
0
);
send
(
pack
);
msleep
(
600
);
REQUIRE
(
ui
->
getValue
(
8
)
==
100
);
REQUIRE
(
ui
->
getValue
(
9
)
==
-
100
);
REQUIRE
(
ui
->
getValue
(
10
)
==
1
);
REQUIRE
(
ui
->
getValue
(
11
)
==
0
);
WARN
(
"check respond sensor DISABLED!"
);
// msleep(1500);
// REQUIRE( ui->getValue(node2_respond_s) == 1 );
}
SECTION
(
"Test: send data pack2.
."
)
{
UniSetUDP
::
UDPMessage
pack
;
pack
.
addAData
(
8
,
10
);
pack
.
addAData
(
9
,
-
10
);
pack
.
addDData
(
10
,
false
);
pack
.
addDData
(
11
,
true
);
send
(
pack
);
msleep
(
600
);
REQUIRE
(
ui
->
getValue
(
8
)
==
10
);
REQUIRE
(
ui
->
getValue
(
9
)
==
-
10
);
REQUIRE
(
ui
->
getValue
(
10
)
==
0
);
REQUIRE
(
ui
->
getValue
(
11
)
==
1
);
WARN
(
"check respond sensor DISABLED!"
);
//REQUIRE( ui->getValue(node2_respond_s) == 1 );
//msleep(2000); // в запускающем файле стоит --unet-recv-timeout 2000
//REQUIRE( ui->getValue(node2_respond_s) == 0 );
}
InitTest
();
SECTION
(
"Test: send data pack
.."
)
{
REQUIRE
(
ui
->
getValue
(
node2_respond_s
)
==
0
);
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
addAData
(
8
,
100
);
pack
.
msg
.
addAData
(
9
,
-
100
);
pack
.
msg
.
addDData
(
10
,
true
);
pack
.
msg
.
addDData
(
11
,
false
);
REQUIRE
(
ui
->
getValue
(
8
)
==
0
);
REQUIRE
(
ui
->
getValue
(
9
)
==
0
);
REQUIRE
(
ui
->
getValue
(
10
)
==
0
);
REQUIRE
(
ui
->
getValue
(
11
)
==
0
);
send
(
pack
);
msleep
(
600
);
REQUIRE
(
ui
->
getValue
(
8
)
==
100
);
REQUIRE
(
ui
->
getValue
(
9
)
==
-
100
);
REQUIRE
(
ui
->
getValue
(
10
)
==
1
);
REQUIRE
(
ui
->
getValue
(
11
)
==
0
);
WARN
(
"check respond sensor DISABLED!"
);
// msleep(1500);
// REQUIRE( ui->getValue(node2_respond_s) == 1 );
}
SECTION
(
"Test: send data pack2
."
)
{
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
addAData
(
8
,
10
);
pack
.
msg
.
addAData
(
9
,
-
10
);
pack
.
msg
.
addDData
(
10
,
false
);
pack
.
msg
.
addDData
(
11
,
true
);
send
(
pack
);
msleep
(
600
);
REQUIRE
(
ui
->
getValue
(
8
)
==
10
);
REQUIRE
(
ui
->
getValue
(
9
)
==
-
10
);
REQUIRE
(
ui
->
getValue
(
10
)
==
0
);
REQUIRE
(
ui
->
getValue
(
11
)
==
1
);
WARN
(
"check respond sensor DISABLED!"
);
//REQUIRE( ui->getValue(node2_respond_s) == 1 );
//msleep(2000); // в запускающем файле стоит --unet-recv-timeout 2000
//REQUIRE( ui->getValue(node2_respond_s) == 0 );
}
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: check packets 'hole'"
,
"[unetudp][udphole]"
)
{
InitTest
();
// проверяем обработку "дырок" в пакетах.
.
UniSetUDP
::
UDPMessage
pack
;
pack
.
addAData
(
8
,
15
);
send
(
pack
);
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
15
);
unsigned
long
nlost
=
ui
->
getValue
(
node2_lostpackets_as
);
int
lastnum
=
s_numpack
-
1
;
// искусственно делаем дырку в два пакета
s_numpack
=
lastnum
+
3
;
UniSetUDP
::
UDPMessage
pack_hole
;
pack_hole
.
addAData
(
8
,
30
);
send
(
pack_hole
);
// пакет с дыркой
msleep
(
80
);
REQUIRE
(
ui
->
getValue
(
8
)
==
15
);
REQUIRE
(
ui
->
getValue
(
node2_lostpackets_as
)
==
nlost
);
s_numpack
=
lastnum
+
1
;
UniSetUDP
::
UDPMessage
pack1
;
pack1
.
addAData
(
8
,
21
);
send
(
pack1
);
// заполняем первую дырку..// дырка закроется.
. пакет тут же обработается
msleep
(
100
);
REQUIRE
(
ui
->
getValue
(
8
)
==
21
);
REQUIRE
(
ui
->
getValue
(
node2_lostpackets_as
)
==
nlost
);
s_numpack
=
lastnum
+
2
;
UniSetUDP
::
UDPMessage
pack2
;
pack2
.
addAData
(
8
,
25
);
send
(
pack2
);
// заполняем следующую дырку
msleep
(
120
);
// тут обработка дойдёт уже до "первого" пакета.
.
REQUIRE
(
ui
->
getValue
(
8
)
==
30
);
REQUIRE
(
ui
->
getValue
(
node2_lostpackets_as
)
==
nlost
);
// возвращаем к нормальному..чтобы следующие тесты не поломались.
.
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
send
(
pack2
);
msleep
(
100
);
}
InitTest
();
// проверяем обработку "дырок" в пакетах
.
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
addAData
(
8
,
15
);
send
(
pack
);
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
15
);
unsigned
long
nlost
=
ui
->
getValue
(
node2_lostpackets_as
);
int
lastnum
=
s_numpack
-
1
;
// искусственно делаем дырку в два пакета
s_numpack
=
lastnum
+
3
;
UniSetUDP
::
UDPPacket
pack_hole
;
pack_hole
.
msg
.
addAData
(
8
,
30
);
send
(
pack_hole
);
// пакет с дыркой
msleep
(
80
);
REQUIRE
(
ui
->
getValue
(
8
)
==
15
);
REQUIRE
(
ui
->
getValue
(
node2_lostpackets_as
)
==
nlost
);
s_numpack
=
lastnum
+
1
;
UniSetUDP
::
UDPPacket
pack1
;
pack1
.
msg
.
addAData
(
8
,
21
);
send
(
pack1
);
// заполняем первую дырку.// дырка закроется
. пакет тут же обработается
msleep
(
100
);
REQUIRE
(
ui
->
getValue
(
8
)
==
21
);
REQUIRE
(
ui
->
getValue
(
node2_lostpackets_as
)
==
nlost
);
s_numpack
=
lastnum
+
2
;
UniSetUDP
::
UDPPacket
pack2
;
pack2
.
msg
.
addAData
(
8
,
25
);
send
(
pack2
);
// заполняем следующую дырку
msleep
(
120
);
// тут обработка дойдёт уже до "первого" пакета
.
REQUIRE
(
ui
->
getValue
(
8
)
==
30
);
REQUIRE
(
ui
->
getValue
(
node2_lostpackets_as
)
==
nlost
);
// возвращаем к нормальному.чтобы следующие тесты не поломались
.
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
send
(
pack2
);
msleep
(
100
);
}
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: check packets 'MaxDifferens'"
,
"[unetudp][maxdifferens]"
)
{
InitTest
();
// проверяем обработку "дырок" в пакетах..
UniSetUDP
::
UDPMessage
pack
;
pack
.
addAData
(
8
,
50
);
send
(
pack
);
// проверяем обработку "дырок" в пакетах.
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
addAData
(
8
,
50
);
send
(
pack
);
msleep
(
1000
);
REQUIRE
(
ui
->
getValue
(
8
)
==
50
);
...
...
@@ -365,9 +368,9 @@ TEST_CASE("[UNetUDP]: check packets 'MaxDifferens'", "[unetudp][maxdifferens]")
// искуственно делаем дырку в два пакета
s_numpack
+=
maxDifferense
+
1
;
UniSetUDP
::
UDPMessage
pack_hole
;
pack_hole
.
addAData
(
8
,
150
);
send
(
pack_hole
);
// пакет с дыркой > maxDifference (должен обработаться)
UniSetUDP
::
UDPPacket
pack_hole
;
pack_hole
.
msg
.
addAData
(
8
,
150
);
send
(
pack_hole
);
// пакет с дыркой > maxDifference (должен обработаться)
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
150
);
...
...
@@ -377,105 +380,104 @@ TEST_CASE("[UNetUDP]: check packets 'MaxDifferens'", "[unetudp][maxdifferens]")
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: bad packet number"
,
"[unetudp][badnumber]"
)
{
InitTest
();
// посылаем нормальный пакет
UniSetUDP
::
UDPMessage
pack
;
pack
.
addAData
(
8
,
60
);
send
(
pack
);
msleep
(
150
);
REQUIRE
(
ui
->
getValue
(
8
)
==
60
);
int
lastpack
=
s_numpack
-
1
;
// посылаем пакет с тем же номером
s_numpack
=
lastpack
;
UniSetUDP
::
UDPMessage
pack1
;
pack1
.
addAData
(
8
,
150
);
send
(
pack1
);
// должен быть "откинут"
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
60
);
// посылаем пакет с меньшим номером
s_numpack
=
lastpack
-
2
;
UniSetUDP
::
UDPMessage
pack2
;
pack2
.
addAData
(
8
,
155
);
send
(
pack2
);
// должен быть "откинут"
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
60
);
// посылаем нормальный
s_numpack
=
lastpack
+
1
;
UniSetUDP
::
UDPMessage
pack3
;
pack3
.
addAData
(
8
,
160
);
send
(
pack3
);
// должен быть "обработан"
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
160
);
InitTest
();
// посылаем нормальный пакет
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
addAData
(
8
,
60
);
send
(
pack
);
msleep
(
150
);
REQUIRE
(
ui
->
getValue
(
8
)
==
60
);
int
lastpack
=
s_numpack
-
1
;
// посылаем пакет с тем же номером
s_numpack
=
lastpack
;
UniSetUDP
::
UDPPacket
pack1
;
pack1
.
msg
.
addAData
(
8
,
150
);
send
(
pack1
);
// должен быть "откинут"
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
60
);
// посылаем пакет с меньшим номером
s_numpack
=
lastpack
-
2
;
UniSetUDP
::
UDPPacket
pack2
;
pack2
.
msg
.
addAData
(
8
,
155
);
send
(
pack2
);
// должен быть "откинут"
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
60
);
// посылаем нормальный
s_numpack
=
lastpack
+
1
;
UniSetUDP
::
UDPPacket
pack3
;
pack3
.
msg
.
addAData
(
8
,
160
);
send
(
pack3
);
// должен быть "обработан"
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
160
);
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: switching channels"
,
"[unetudp][chswitch]"
)
{
InitTest
();
UniSetUDP
::
UDPMessage
pack
;
pack
.
addAData
(
8
,
70
);
send
(
pack
);
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
70
);
// и счётчик переключений каналов в нуле
REQUIRE
(
ui
->
getValue
(
node1_channelSwitchCount_as
)
==
0
);
// К сожалению в текущей реализации тестов
// обмена по второму каналу нет
// поэтому проверить переключение нет возможности
// остаётся только проверить, что мы не "ушли" с первого канала
// т.к. на втором нет связи и мы не должны на него переключаться
msleep
(
recvTimeout
*
2
);
REQUIRE
(
ui
->
getValue
(
node1_numchannel_as
)
==
1
);
// и счётчик переключений каналов остался в нуле
REQUIRE
(
ui
->
getValue
(
node1_channelSwitchCount_as
)
==
0
);
InitTest
();
UniSetUDP
::
UDPPacket
pack
;
pack
.
msg
.
addAData
(
8
,
70
);
send
(
pack
);
msleep
(
120
);
REQUIRE
(
ui
->
getValue
(
8
)
==
70
);
// и счётчик переключений каналов в нуле
REQUIRE
(
ui
->
getValue
(
node1_channelSwitchCount_as
)
==
0
);
// К сожалению в текущей реализации тестов
// обмена по второму каналу нет
// поэтому проверить переключение нет возможности
// остаётся только проверить, что мы не "ушли" с первого канала
// т.к. на втором нет связи и мы не должны на него переключаться
msleep
(
recvTimeout
*
2
);
REQUIRE
(
ui
->
getValue
(
node1_numchannel_as
)
==
1
);
// и счётчик переключений каналов остался в нуле
REQUIRE
(
ui
->
getValue
(
node1_channelSwitchCount_as
)
==
0
);
}
// -----------------------------------------------------------------------------
TEST_CASE
(
"[UNetUDP]: check undefined value"
,
"[unetudp][sender]"
)
{
InitTest
();
UniSetUDP
::
UDPMessage
pack0
=
receive
();
UniSetUDP
::
UDPPacket
pack0
=
receive
();
ui
->
setValue
(
2
,
110
);
REQUIRE
(
ui
->
getValue
(
2
)
==
110
);
msleep
(
600
);
UniSetUDP
::
UDPMessage
pack
=
receive
(
pack0
.
num
+
1
,
2000
,
40
);
REQUIRE
(
pack
.
num
!=
0
);
REQUIRE
(
pack
.
asize
()
==
4
);
REQUIRE
(
pack
.
dsize
()
==
2
);
REQUIRE
(
pack
.
a_dat
[
0
].
val
==
110
);
IOController_i
::
SensorInfo
si
;
si
.
id
=
2
;
si
.
node
=
uniset_conf
()
->
getLocalNode
();
ui
->
setUndefinedState
(
si
,
true
,
6000
/* TestProc */
);
msleep
(
600
);
pack
=
receive
(
pack
.
num
+
1
);
REQUIRE
(
pack
.
num
!=
0
);
REQUIRE
(
pack
.
asize
()
==
4
);
REQUIRE
(
pack
.
dsize
()
==
2
);
REQUIRE
(
pack
.
a_dat
[
0
].
val
==
65635
);
ui
->
setUndefinedState
(
si
,
false
,
6000
/* TestProc */
);
msleep
(
600
);
pack
=
receive
(
pack
.
num
+
1
);
REQUIRE
(
pack
.
num
!=
0
);
REQUIRE
(
pack
.
asize
()
==
4
);
REQUIRE
(
pack
.
dsize
()
==
2
);
REQUIRE
(
pack
.
a_dat
[
0
].
val
==
110
);
UniSetUDP
::
UDPPacket
pack
=
receive
(
pack0
.
msg
.
header
.
num
+
1
,
2000
,
40
);
REQUIRE
(
pack
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack
.
msg
.
a_dat
[
0
].
val
==
110
);
IOController_i
::
SensorInfo
si
;
si
.
id
=
2
;
si
.
node
=
uniset_conf
()
->
getLocalNode
();
ui
->
setUndefinedState
(
si
,
true
,
6000
/* TestProc */
);
msleep
(
600
);
pack
=
receive
(
pack
.
msg
.
header
.
num
+
1
);
REQUIRE
(
pack
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack
.
msg
.
a_dat
[
0
].
val
==
65635
);
ui
->
setUndefinedState
(
si
,
false
,
6000
/* TestProc */
);
msleep
(
600
);
pack
=
receive
(
pack
.
msg
.
header
.
num
+
1
);
REQUIRE
(
pack
.
msg
.
header
.
num
!=
0
);
REQUIRE
(
pack
.
msg
.
asize
()
==
4
);
REQUIRE
(
pack
.
msg
.
dsize
()
==
2
);
REQUIRE
(
pack
.
msg
.
a_dat
[
0
].
val
==
110
);
}
// -----------------------------------------------------------------------------
extensions/UNetUDP/tests/urecv_perf_test.cc
View file @
b4672e62
...
...
@@ -59,18 +59,18 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
}
}
UniSetUDP
::
UDP
Message
mypack
;
mypack
.
nodeID
=
100
;
mypack
.
procID
=
100
;
UniSetUDP
::
UDP
Packet
mypack
;
mypack
.
msg
.
header
.
nodeID
=
100
;
mypack
.
msg
.
header
.
procID
=
100
;
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
{
UniSetUDP
::
UDPAData
d
(
i
,
i
);
mypack
.
addAData
(
d
);
mypack
.
msg
.
addAData
(
d
);
}
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
mypack
.
addDData
(
i
,
i
);
mypack
.
msg
.
addDData
(
i
,
i
);
for
(
size_t
i
=
0
;
i
<
max
;
i
++
)
{
...
...
@@ -92,13 +92,11 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
}
size_t
packetnum
=
0
;
UniSetUDP
::
UDPPacket
s_buf
;
size_t
nc
=
1
;
while
(
nc
)
// -V654
{
mypack
.
num
=
packetnum
++
;
mypack
.
msg
.
header
.
num
=
packetnum
++
;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
...
...
@@ -111,11 +109,10 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{
if
(
udp
->
poll
(
100000
,
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
{
mypack
.
transport_msg
(
s_buf
);
size_t
ret
=
udp
->
sendBytes
((
char
*
)
&
s_buf
.
data
,
s_buf
.
len
);
size_t
ret
=
udp
->
sendBytes
(
mypack
.
raw
,
mypack
.
msg
.
len
());
if
(
ret
<
s_buf
.
len
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
s_buf
.
len
<<
endl
;
if
(
ret
<
mypack
.
msg
.
len
()
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
mypack
.
msg
.
len
()
<<
endl
;
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
...
...
@@ -180,9 +177,9 @@ int main(int argc, char* argv[] )
auto
conf
=
uniset_init
(
argc
,
argv
);
if
(
argc
>
1
&&
!
strcmp
(
argv
[
1
],
"s"
)
)
run_senders
(
1
,
host
);
run_senders
(
1
0
,
host
);
else
run_test
(
1
,
host
);
run_test
(
1
0
,
host
);
return
0
;
}
...
...
extensions/UNetUDP/unet-udp-tester.cc
View file @
b4672e62
...
...
@@ -223,9 +223,7 @@ int main(int argc, char* argv[])
{
UDPReceiveU
udp
(
s_host
,
port
);
// char buf[UniSetUDP::MaxDataLen];
UniSetUDP
::
UDPMessage
pack
;
UniSetUDP
::
UDPPacket
buf
;
UniSetUDP
::
UDPPacket
pack
;
unsigned
long
prev_num
=
1
;
int
nc
=
1
;
...
...
@@ -257,27 +255,35 @@ int main(int argc, char* argv[])
continue
;
}
size_t
ret
=
udp
.
receiveBytes
(
&
(
buf
.
data
),
sizeof
(
buf
.
data
)
);
size_t
sz
=
UniSetUDP
::
UDPMessage
::
getMessage
(
pack
,
buf
);
size_t
ret
=
udp
.
receiveBytes
(
pack
.
raw
,
sizeof
(
pack
.
raw
)
);
if
(
sz
==
0
)
if
(
ret
<
0
)
{
if
(
pack
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
cerr
<<
"(recv): BAD PROTOCOL VERSION! [ need version '"
<<
UniSetUDP
::
UNETUDP_MAGICNUM
<<
"']"
<<
endl
;
else
cerr
<<
"(recv): FAILED header ret="
<<
ret
<<
" sizeof="
<<
sz
<<
endl
;
cerr
<<
"(recv): no data?!"
<<
endl
;
continue
;
}
if
(
ret
==
0
)
{
cerr
<<
"(recv): connection closed?!"
<<
endl
;
continue
;
}
pack
.
msg
.
ntoh
();
if
(
pack
.
msg
.
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
{
cerr
<<
"(recv): BAD PROTOCOL VERSION! [ need version '"
<<
UniSetUDP
::
UNETUDP_MAGICNUM
<<
"']"
<<
endl
;
continue
;
}
if
(
lost
)
{
if
(
prev_num
!=
(
pack
.
num
-
1
)
)
cerr
<<
"WARNING! Incorrect sequence of packets! current="
<<
pack
.
num
if
(
prev_num
!=
(
pack
.
msg
.
header
.
num
-
1
)
)
cerr
<<
"WARNING! Incorrect sequence of packets! current="
<<
pack
.
msg
.
header
.
num
<<
" prev="
<<
prev_num
<<
endl
;
prev_num
=
pack
.
num
;
prev_num
=
pack
.
msg
.
header
.
num
;
}
npack
++
;
...
...
@@ -287,7 +293,7 @@ int main(int argc, char* argv[])
<<
" bytes: "
<<
ret
<<
endl
;
if
(
show
)
cout
<<
"receive data: "
<<
pack
<<
endl
;
cout
<<
"receive data: "
<<
pack
.
msg
<<
endl
;
}
catch
(
Poco
::
Net
::
NetException
&
e
)
{
...
...
@@ -314,9 +320,10 @@ int main(int argc, char* argv[])
std
::
shared_ptr
<
UDPSocketU
>
udp
=
make_shared
<
UDPSocketU
>
(
s_host
,
port
);
udp
->
setBroadcast
(
broadcast
);
UniSetUDP
::
UDPMessage
mypack
;
mypack
.
nodeID
=
nodeID
;
mypack
.
procID
=
procID
;
UniSetUDP
::
UDPPacket
mypack
;
UDPMessage
*
msg
=
&
mypack
.
msg
;
msg
->
header
.
nodeID
=
nodeID
;
msg
->
header
.
procID
=
procID
;
if
(
!
a_data
.
empty
()
)
{
...
...
@@ -325,7 +332,7 @@ int main(int argc, char* argv[])
for
(
const
auto
&
v
:
vlist
)
{
UDPAData
d
(
v
.
si
.
id
,
v
.
val
);
m
ypack
.
addAData
(
d
);
m
sg
->
addAData
(
d
);
}
}
else
...
...
@@ -333,7 +340,7 @@ int main(int argc, char* argv[])
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
{
UDPAData
d
(
i
,
i
);
m
ypack
.
addAData
(
d
);
m
sg
->
addAData
(
d
);
}
}
...
...
@@ -342,19 +349,17 @@ int main(int argc, char* argv[])
auto
vlist
=
uniset
::
getSInfoList
(
d_data
,
nullptr
);
for
(
const
auto
&
v
:
vlist
)
m
ypack
.
addDData
(
v
.
si
.
id
,
v
.
val
);
m
sg
->
addDData
(
v
.
si
.
id
,
v
.
val
);
}
else
{
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
m
ypack
.
addDData
(
i
,
i
);
m
sg
->
addDData
(
i
,
i
);
}
Poco
::
Net
::
SocketAddress
sa
(
s_host
,
port
);
udp
->
connect
(
sa
);
UniSetUDP
::
UDPPacket
s_buf
;
size_t
nc
=
1
;
if
(
ncycles
>
0
)
...
...
@@ -362,7 +367,7 @@ int main(int argc, char* argv[])
while
(
nc
)
{
m
ypack
.
num
=
packetnum
++
;
m
sg
->
header
.
num
=
packetnum
++
;
// при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
...
...
@@ -373,16 +378,14 @@ int main(int argc, char* argv[])
{
if
(
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
tout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
{
mypack
.
transport_msg
(
s_buf
);
if
(
verb
)
cout
<<
"(send): to addr="
<<
addr
<<
" d_count="
<<
m
ypack
.
dcount
<<
" a_count="
<<
m
ypack
.
acount
<<
" bytes="
<<
s_buf
.
len
<<
endl
;
cout
<<
"(send): to addr="
<<
addr
<<
" d_count="
<<
m
sg
->
header
.
dcount
<<
" a_count="
<<
m
sg
->
header
.
acount
<<
" bytes="
<<
msg
->
len
()
<<
endl
;
size_t
ret
=
udp
->
sendBytes
(
(
char
*
)
&
s_buf
.
data
,
s_buf
.
len
);
size_t
ret
=
udp
->
sendBytes
(
mypack
.
raw
,
sizeof
(
mypack
.
raw
)
/* mypack.msg.len() */
);
if
(
ret
<
s_buf
.
len
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
s_buf
.
len
<<
endl
;
if
(
ret
<
msg
->
len
()
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
msg
->
len
()
<<
endl
;
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
...
...
uniset2.files
View file @
b4672e62
...
...
@@ -195,43 +195,6 @@
./extensions/SMViewer/Makefile.am
./extensions/SMViewer/SMViewer.cc
./extensions/SMViewer/SMViewer.h
./extensions/tests1/Makefile.am
./extensions/tests1/MBSlaveTest/Makefile.am
./extensions/tests1/MBSlaveTest/mbslave-test.cc
./extensions/tests1/MBSlaveTest/TestProc.cc
./extensions/tests1/MBSlaveTest/TestProc.h
./extensions/tests1/MBSlaveTest/testproc.src.xml
./extensions/tests1/MQPerfTest/Makefile.am
./extensions/tests1/MQPerfTest/mq-test.cc
./extensions/tests1/MQPerfTest/TestProc.cc
./extensions/tests1/MQPerfTest/TestProc.h
./extensions/tests1/MQPerfTest/testproc.src.xml
./extensions/tests1/r/t.cc
./extensions/tests1/SMemoryTest/LostPassiveTestProc.cc
./extensions/tests1/SMemoryTest/LostPassiveTestProc.h
./extensions/tests1/SMemoryTest/LostTestProc.cc
./extensions/tests1/SMemoryTest/LostTestProc.h
./extensions/tests1/SMemoryTest/losttestproc.src.xml
./extensions/tests1/SMemoryTest/Makefile.am
./extensions/tests1/SMemoryTest/smemory-test.cc
./extensions/tests1/SMemoryTest/sm-lostmessage-test.cc
./extensions/tests1/SMemoryTest/TestProc.cc
./extensions/tests1/SMemoryTest/TestProc.h
./extensions/tests1/SMemoryTest/testproc.src.xml
./extensions/tests1/sm_perf_test2.cc
./extensions/tests1/sm_perf_test.cc
./extensions/tests1/sz.cc
./extensions/tests1/test_calibration.cc
./extensions/tests1/test_digitalfilter.cc
./extensions/tests1/test_iobase.cc
./extensions/tests1/test_iobase_with_sm.cc
./extensions/tests1/test_restapi_uniset.cc
./extensions/tests1/tests.cc
./extensions/tests1/tests_with_conf.cc
./extensions/tests1/tests_with_sm.cc
./extensions/tests1/tests_with_sm.h
./extensions/tests1/test_ui.cc
./extensions/tests1/test_vtypes.cc
./extensions/tests/Makefile.am
./extensions/tests/MBSlaveTest/Makefile.am
./extensions/tests/MBSlaveTest/mbslave-test.cc
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment