Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
U
uniset2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UniSet project repositories
uniset2
Commits
94e22ed7
Commit
94e22ed7
authored
Dec 13, 2020
by
Pavel Vainerman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[unet]: zero copy optimization
parent
9d736403
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
207 additions
and
309 deletions
+207
-309
UDPPacket.cc
extensions/UNetUDP/UDPPacket.cc
+51
-127
UDPPacket.h
extensions/UNetUDP/UDPPacket.h
+32
-44
UNetReceiver.cc
extensions/UNetUDP/UNetReceiver.cc
+52
-61
UNetReceiver.h
extensions/UNetUDP/UNetReceiver.h
+3
-5
UNetSender.cc
extensions/UNetUDP/UNetSender.cc
+20
-22
UNetSender.h
extensions/UNetUDP/UNetSender.h
+3
-4
test_unetudp.cc
extensions/UNetUDP/tests/test_unetudp.cc
+0
-0
urecv_perf_test.cc
extensions/UNetUDP/tests/urecv_perf_test.cc
+11
-14
unet-udp-tester.cc
extensions/UNetUDP/unet-udp-tester.cc
+35
-32
uniset2.files
uniset2.files
+0
-0
No files found.
extensions/UNetUDP/UDPPacket.cc
View file @
94e22ed7
...
...
@@ -35,7 +35,7 @@ static bool HostIsBigEndian = false;
#if __BYTE_ORDER == __BIG_ENDIAN
static
bool
HostIsBigEndian
=
true
;
#define BE_TO_H(x) {}
header
.
#
define
BE_TO_H
(
x
)
{}
#elif INTPTR_MAX == INT64_MAX
#define BE_TO_H(x) x = be64toh(x)
#elif INTPTR_MAX == INT32_MAX
...
...
@@ -141,7 +141,6 @@ namespace uniset
return
os
<<
(
*
p
);
}
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUDP
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUDP
::
UDPAData
&
p
)
{
return
os
<<
"id="
<<
p
.
id
<<
" val="
<<
p
.
val
;
...
...
@@ -153,29 +152,25 @@ namespace uniset
os
<<
"DIGITAL:"
<<
endl
;
for
(
size_t
i
=
0
;
i
<
p
.
dcount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
.
header
.
dcount
;
i
++
)
os
<<
"["
<<
i
<<
"]={"
<<
p
.
dID
(
i
)
<<
","
<<
p
.
dValue
(
i
)
<<
"}"
<<
endl
;
os
<<
"ANALOG:"
<<
endl
;
for
(
size_t
i
=
0
;
i
<
p
.
acount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
.
header
.
acount
;
i
++
)
os
<<
"["
<<
i
<<
"]={"
<<
p
.
a_dat
[
i
].
id
<<
","
<<
p
.
a_dat
[
i
].
val
<<
"}"
<<
endl
;
return
os
;
}
// -----------------------------------------------------------------------------
UDPMessage
::
UDPMessage
()
noexcept
{
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
addAData
(
const
UniSetUDP
::
UDPAData
&
dat
)
noexcept
{
if
(
acount
>=
MaxACount
)
if
(
header
.
acount
>=
MaxACount
)
return
MaxACount
;
a_dat
[
acount
]
=
dat
;
acount
++
;
return
acount
-
1
;
a_dat
[
header
.
acount
]
=
dat
;
header
.
acount
++
;
return
header
.
acount
-
1
;
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
addAData
(
long
id
,
long
val
)
noexcept
...
...
@@ -197,18 +192,18 @@ namespace uniset
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
addDData
(
long
id
,
bool
val
)
noexcept
{
if
(
dcount
>=
MaxDCount
)
if
(
header
.
dcount
>=
MaxDCount
)
return
MaxDCount
;
// сохраняем ID
d_id
[
dcount
]
=
id
;
d_id
[
header
.
dcount
]
=
id
;
bool
res
=
setDData
(
dcount
,
val
);
bool
res
=
setDData
(
header
.
dcount
,
val
);
if
(
res
)
{
dcount
++
;
return
dcount
-
1
;
header
.
dcount
++
;
return
header
.
dcount
-
1
;
}
return
MaxDCount
;
...
...
@@ -219,8 +214,8 @@ namespace uniset
if
(
index
>=
MaxDCount
)
return
false
;
size_t
nbyte
=
index
/
8
*
sizeof
(
u
nsigned
char
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
nsigned
char
);
size_t
nbyte
=
index
/
8
*
sizeof
(
u
int8_t
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
int8_t
);
// выставляем бит
unsigned
char
d
=
d_dat
[
nbyte
];
...
...
@@ -247,177 +242,106 @@ namespace uniset
if
(
index
>=
MaxDCount
)
return
uniset
::
DefaultObjectId
;
size_t
nbyte
=
index
/
8
*
sizeof
(
u
nsigned
char
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
nsigned
char
);
size_t
nbyte
=
index
/
8
*
sizeof
(
u
int8_t
);
size_t
nbit
=
index
%
8
*
sizeof
(
u
int8_t
);
return
(
d_dat
[
nbyte
]
&
(
1
<<
nbit
)
);
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
transport_msg
(
UDPPacket
&
p
)
const
noexcept
{
p
=
UDPPacket
{};
// reset data
size_t
i
=
0
;
memcpy
(
&
(
p
.
data
[
i
]),
this
,
sizeof
(
UDPHeader
));
i
+=
sizeof
(
UDPHeader
);
// копируем аналоговые данные
size_t
sz
=
acount
*
sizeof
(
UDPAData
);
memcpy
(
&
(
p
.
data
[
i
]),
a_dat
,
sz
);
i
+=
sz
;
// копируем булевые индексы
sz
=
dcount
*
sizeof
(
long
);
memcpy
(
&
(
p
.
data
[
i
]),
d_id
,
sz
);
i
+=
sz
;
// копируем булевые данные
size_t
nbyte
=
dcount
/
8
*
sizeof
(
unsigned
char
);
size_t
nbit
=
dcount
%
8
*
sizeof
(
unsigned
char
);
sz
=
nbit
>
0
?
nbyte
+
1
:
nbyte
;
memcpy
(
&
(
p
.
data
[
i
]),
d_dat
,
sz
);
i
+=
sz
;
p
.
len
=
i
;
return
i
;
}
// -----------------------------------------------------------------------------
long
UDPMessage
::
getDataID
()
const
noexcept
{
// в качестве идентификатора берётся ID первого датчика в данных
// приоритет имеет аналоговые датчики
if
(
acount
>
0
)
if
(
header
.
acount
>
0
)
return
a_dat
[
0
].
id
;
if
(
dcount
>
0
)
if
(
header
.
dcount
>
0
)
return
d_id
[
0
];
// если нет данных(?) просто возвращаем номер пакета
return
num
;
return
header
.
num
;
}
// -----------------------------------------------------------------------------
size_t
UniSetUDP
::
UDPMessage
::
sizeOf
()
const
noexcept
size_t
UniSetUDP
::
UDPMessage
::
len
()
const
noexcept
{
return
UniSetUDP
::
MaxDataLen
;
// биты которые не уместились в очередной байт, добавляют ещё один байт
size_t
nbit
=
dcount
%
8
*
sizeof
(
unsigned
char
);
size_t
add
=
nbit
>
0
?
1
:
0
;
// size_t nbit = header.dcount % 8 * sizeof(uint8_t
);
//
size_t add = nbit > 0 ? 1 : 0;
return
sizeof
(
UDPHeader
)
+
acount
*
sizeof
(
UDPAData
)
+
dcount
*
sizeof
(
long
)
+
(
dcount
/
8
*
sizeof
(
unsigned
char
)
+
add
);
// return sizeof(header) + header.acount * sizeof(UDPAData) + header.dcount * sizeof(long) + (header.dcount / 8 * sizeof(uint8_t
) + add);
}
// -----------------------------------------------------------------------------
UDPMessage
::
UDPMessage
(
UDPPacket
&
p
)
noexcept
bool
UDPMessage
::
isOk
(
)
noexcept
{
getMessage
(
*
this
,
p
);
return
(
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
);
}
// -----------------------------------------------------------------------------
size_t
UDPMessage
::
getMessage
(
UDPMessage
&
m
,
UDPPacket
&
p
)
noexcept
void
UDPMessage
::
ntoh
(
)
noexcept
{
// reset data
m
=
UDPMessage
{};
size_t
i
=
0
;
memcpy
(
&
m
,
&
(
p
.
data
[
i
]),
sizeof
(
UDPHeader
));
i
+=
sizeof
(
UDPHeader
);
// byte order from packet
u
_char
be_order
=
m
.
_be_order
;
u
int8_t
be_order
=
header
.
_be_order
;
if
(
be_order
&&
!
HostIsBigEndian
)
{
BE_TO_H
(
m
.
magic
);
BE_TO_H
(
m
.
num
);
BE_TO_H
(
m
.
procID
);
BE_TO_H
(
m
.
nodeID
);
BE_TO_H
(
m
.
dcount
);
BE_TO_H
(
m
.
acount
);
BE_TO_H
(
header
.
magic
);
BE_TO_H
(
header
.
num
);
BE_TO_H
(
header
.
procID
);
BE_TO_H
(
header
.
nodeID
);
BE_TO_H
(
header
.
dcount
);
BE_TO_H
(
header
.
acount
);
}
else
if
(
!
be_order
&&
HostIsBigEndian
)
{
LE_TO_H
(
m
.
magic
);
LE_TO_H
(
m
.
num
);
LE_TO_H
(
m
.
procID
);
LE_TO_H
(
m
.
nodeID
);
LE_TO_H
(
m
.
dcount
);
LE_TO_H
(
m
.
acount
);
LE_TO_H
(
header
.
magic
);
LE_TO_H
(
header
.
num
);
LE_TO_H
(
header
.
procID
);
LE_TO_H
(
header
.
nodeID
);
LE_TO_H
(
header
.
dcount
);
LE_TO_H
(
header
.
acount
);
}
// set host byte order
#if __BYTE_ORDER == __LITTLE_ENDIAN
m
.
_be_order
=
0
;
header
.
_be_order
=
0
;
#elif __BYTE_ORDER == __BIG_ENDIAN
m
.
be_order
=
1
;
header
.
_
be_order
=
1
;
#else
#error UNET(getMessage): Unknown byte order!
#endif
// проверяем наш ли пакет..
if
(
m
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
{
m
.
magic
=
0
;
return
0
;
}
// копируем аналоговые данные
size_t
sz
=
m
.
acount
*
sizeof
(
UDPAData
);
if
(
sz
>
sizeof
(
m
.
a_dat
)
)
sz
=
sizeof
(
m
.
a_dat
);
memcpy
(
m
.
a_dat
,
&
(
p
.
data
[
i
]),
sz
);
i
+=
sz
;
// копируем булевые индексы
sz
=
m
.
dcount
*
sizeof
(
long
);
if
(
sz
>
sizeof
(
m
.
d_id
)
)
sz
=
sizeof
(
m
.
d_id
);
memcpy
(
m
.
d_id
,
&
(
p
.
data
[
i
]),
sz
);
i
+=
sz
;
// копируем булевые данные
size_t
nbyte
=
m
.
dcount
/
8
*
sizeof
(
unsigned
char
);
size_t
nbit
=
m
.
dcount
%
8
*
sizeof
(
unsigned
char
);
sz
=
nbit
>
0
?
nbyte
+
1
:
nbyte
;
if
(
sz
>
sizeof
(
m
.
d_dat
)
)
sz
=
sizeof
(
m
.
d_dat
);
memcpy
(
m
.
d_dat
,
&
(
p
.
data
[
i
]),
sz
);
// CONVERT DATA TO HOST BYTE ORDER
// -------------------------------
if
(
(
be_order
&&
!
HostIsBigEndian
)
||
(
!
be_order
&&
HostIsBigEndian
)
)
{
for
(
size_t
n
=
0
;
n
<
m
.
acount
;
n
++
)
for
(
size_t
n
=
0
;
n
<
header
.
acount
;
n
++
)
{
if
(
be_order
)
{
BE_TO_H
(
m
.
a_dat
[
n
].
id
);
BE_TO_H
(
m
.
a_dat
[
n
].
val
);
BE_TO_H
(
a_dat
[
n
].
id
);
BE_TO_H
(
a_dat
[
n
].
val
);
}
else
{
LE_TO_H
(
m
.
a_dat
[
n
].
id
);
LE_TO_H
(
m
.
a_dat
[
n
].
val
);
LE_TO_H
(
a_dat
[
n
].
id
);
LE_TO_H
(
a_dat
[
n
].
val
);
}
}
for
(
size_t
n
=
0
;
n
<
m
.
dcount
;
n
++
)
for
(
size_t
n
=
0
;
n
<
header
.
dcount
;
n
++
)
{
if
(
be_order
)
{
BE_TO_H
(
m
.
d_id
[
n
]);
BE_TO_H
(
d_id
[
n
]);
}
else
{
LE_TO_H
(
m
.
d_id
[
n
]);
LE_TO_H
(
d_id
[
n
]);
}
}
}
return
i
+
sz
;
}
// -----------------------------------------------------------------------------
uint16_t
UDPMessage
::
getDataCRC
()
const
noexcept
...
...
extensions/UNetUDP/UDPPacket.h
View file @
94e22ed7
...
...
@@ -28,13 +28,13 @@ namespace uniset
namespace
UniSetUDP
{
/*! Для оптимизации размера передаваемых данных, но с учётом того, что ID могут идти не подряд.
Сделан следующий формат:
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
Сделан следующий формат:
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
\todo Подумать на тему сделать два отдельных вида пакетов для булевых значений и для аналоговых,
чтобы уйти от преобразования UDPMessage --> UDPPacket --> UDPMessage.
\todo Подумать на тему сделать два отдельных вида пакетов для булевых значений и для аналоговых,
чтобы уйти от преобразования UDPMessage --> UDPPacket --> UDPMessage.
\warning ТЕКУЩАЯ ВЕРСИЯ ПРОТОКОЛА НЕ БУДЕТ РАБОТАТЬ МЕЖДУ 32-битными и 64-битными системами (из-за отличия в типе long).
т.к. это не сильно актуально, пока не переделываю.
...
...
@@ -49,20 +49,18 @@ namespace uniset
Т.е. если все узлы будут иметь одинаковый порядок байт, фактического перекодирования не будет.
*/
const
uint32_t
UNETUDP_MAGICNUM
=
0x13
3EF54
;
// идентификатор протокола
const
uint32_t
UNETUDP_MAGICNUM
=
0x13
43EFD
;
// идентификатор протокола
struct
UDPHeader
{
UDPHeader
()
noexcept
;
uint32_t
magic
;
u
_char
_be_order
;
// 1 - BE byte order, 0 - LE byte order
u
int8_t
_be_order
;
// 1 - BE byte order, 0 - LE byte order
size_t
num
;
long
nodeID
;
long
procID
;
size_t
dcount
;
/*!< количество булевых величин */
size_t
acount
;
/*!< количество аналоговых величин */
}
__attribute__
((
packed
));
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPHeader
&
p
);
...
...
@@ -89,35 +87,15 @@ namespace uniset
// в сеть посылается фактическое количество данных, а не sizeof(UDPPacket).
// При текущих настройках sizeof(UDPPacket) = 72679 (!)
static
const
size_t
MaxACount
=
2
000
;
static
const
size_t
MaxDCount
=
5
000
;
static
const
size_t
MaxACount
=
1
000
;
static
const
size_t
MaxDCount
=
3
000
;
static
const
size_t
MaxDDataCount
=
1
+
MaxDCount
/
8
*
sizeof
(
unsigned
char
);
struct
UDP
Packet
struct
UDP
Message
{
UDPPacket
()
noexcept
:
len
(
0
)
{}
// -V730
size_t
len
;
uint8_t
data
[
sizeof
(
UDPHeader
)
+
MaxDCount
*
sizeof
(
long
)
+
MaxDDataCount
+
MaxACount
*
sizeof
(
UDPAData
)
];
}
__attribute__
((
packed
));
static
const
size_t
MaxDataLen
=
sizeof
(
UDPPacket
);
struct
UDPMessage
:
public
UDPHeader
{
UDPMessage
()
noexcept
;
UDPMessage
(
UDPMessage
&&
m
)
noexcept
=
default
;
UDPMessage
&
operator
=
(
UDPMessage
&&
)
noexcept
=
default
;
UDPMessage
(
const
UDPMessage
&
m
)
noexcept
=
default
;
UDPMessage
&
operator
=
(
const
UDPMessage
&
)
noexcept
=
default
;
explicit
UDPMessage
(
UDPPacket
&
p
)
noexcept
;
size_t
transport_msg
(
UDPPacket
&
p
)
const
noexcept
;
static
size_t
getMessage
(
UDPMessage
&
m
,
UDPPacket
&
p
)
noexcept
;
// net to host
void
ntoh
()
noexcept
;
bool
isOk
()
noexcept
;
// \warning в случае переполнения возвращается MaxDCount
size_t
addDData
(
long
id
,
bool
val
)
noexcept
;
...
...
@@ -143,47 +121,57 @@ namespace uniset
inline
bool
isAFull
()
const
noexcept
{
return
(
acount
>=
MaxACount
);
return
(
header
.
acount
>=
MaxACount
);
}
inline
bool
isDFull
()
const
noexcept
{
return
(
dcount
>=
MaxDCount
);
return
(
header
.
dcount
>=
MaxDCount
);
}
inline
bool
isFull
()
const
noexcept
{
return
!
((
dcount
<
MaxDCount
)
&&
(
acount
<
MaxACount
));
return
!
((
header
.
dcount
<
MaxDCount
)
&&
(
header
.
acount
<
MaxACount
));
}
inline
size_t
dsize
()
const
noexcept
{
return
dcount
;
return
header
.
dcount
;
}
inline
size_t
asize
()
const
noexcept
{
return
acount
;
return
header
.
acount
;
}
// размер итогового пакета в байтах
size_t
sizeOf
()
const
noexcept
;
size_t
len
()
const
noexcept
;
uint16_t
getDataCRC
()
const
noexcept
;
// количество байт в пакете с булевыми переменными...
size_t
d_byte
()
const
noexcept
{
return
dcount
*
sizeof
(
long
)
+
dcount
;
return
header
.
dcount
*
sizeof
(
long
)
+
header
.
dcount
;
}
UDPHeader
header
;
UDPAData
a_dat
[
MaxACount
];
/*!< аналоговые величины */
long
d_id
[
MaxDCount
];
/*!< список дискретных ID */
uint8_t
d_dat
[
MaxDDataCount
];
/*!< битовые значения */
};
}
__attribute__
((
packed
))
;
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPMessage
&
p
);
uint16_t
makeCRC
(
unsigned
char
*
buf
,
size_t
len
)
noexcept
;
static
const
size_t
MaxDataLen
=
sizeof
(
UDPHeader
)
+
MaxDCount
*
sizeof
(
long
)
+
MaxDDataCount
+
MaxACount
*
sizeof
(
UDPAData
);
union
UDPPacket
{
UDPPacket
()
:
msg
(){};
uint8_t
raw
[
MaxDataLen
];
UDPMessage
msg
;
};
}
// --------------------------------------------------------------------------
}
// end of namespace uniset
...
...
extensions/UNetUDP/UNetReceiver.cc
View file @
94e22ed7
...
...
@@ -29,26 +29,16 @@ using namespace uniset;
using
namespace
uniset
::
extensions
;
// -----------------------------------------------------------------------------
CommonEventLoop
UNetReceiver
::
loop
;
static
UniSetUDP
::
UDPPacket
defpack
;
// -----------------------------------------------------------------------------
UNetReceiver
::
UNetReceiver
(
const
std
::
string
&
s_host
,
int
_port
,
const
std
::
shared_ptr
<
SMInterface
>&
smi
,
bool
nocheckConnection
,
const
std
::
string
&
prefix
)
:
shm
(
smi
),
updatepause
(
100
),
port
(
_port
),
saddr
(
s_host
,
_port
),
recvTimeout
(
5000
),
prepareTime
(
2000
),
lostTimeout
(
200
),
/* 2*updatepause */
lostPackets
(
0
),
sidRespond
(
uniset
::
DefaultObjectId
),
respondInvert
(
false
),
sidLostPackets
(
uniset
::
DefaultObjectId
),
activated
(
false
),
cbuf
(
cbufSize
),
maxDifferens
(
20
),
lockUpdate
(
false
)
cbuf
(
cbufSize
)
{
{
ostringstream
s
;
...
...
@@ -347,9 +337,9 @@ size_t UNetReceiver::rnext( size_t num )
while
(
i
<
wnum
)
{
p
=
&
cbuf
[
i
%
cbufSize
];
p
=
&
cbuf
[
i
%
cbufSize
]
.
msg
;
if
(
p
->
num
>
num
)
if
(
p
->
header
.
num
>
num
)
return
i
;
i
++
;
...
...
@@ -373,21 +363,23 @@ void UNetReceiver::update() noexcept
// либо обнаружится "дырка" в последовательности,
while
(
rnum
<
wnum
)
{
p
=
&
cbuf
[
rnum
%
cbufSize
];
p
=
&
(
cbuf
[
rnum
%
cbufSize
].
msg
);
// cout << "update: num=" << p->header.num << " rnum=" << rnum << " wnum=" << wnum << endl;
// если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
if
(
p
->
num
!=
rnum
)
if
(
p
->
header
.
num
!=
rnum
)
{
if
(
!
ptLostTimeout
.
checkTime
()
)
return
;
size_t
sub
=
1
;
if
(
p
->
num
>
rnum
)
sub
=
(
p
->
num
-
rnum
);
if
(
p
->
header
.
num
>
rnum
)
sub
=
(
p
->
header
.
num
-
rnum
);
unetwarn
<<
myname
<<
"(update): lostTimeout("
<<
ptLostTimeout
.
getInterval
()
<<
")! pnum="
<<
p
->
num
<<
" lost "
<<
sub
<<
" packets "
<<
endl
;
unetwarn
<<
myname
<<
"(update): lostTimeout("
<<
ptLostTimeout
.
getInterval
()
<<
")! pnum="
<<
p
->
header
.
num
<<
" lost "
<<
sub
<<
" packets "
<<
endl
;
lostPackets
+=
sub
;
// ищем следующий пакет для обработки
...
...
@@ -402,7 +394,7 @@ void UNetReceiver::update() noexcept
// Обработка дискретных
auto
d_iv
=
getDCache
(
p
);
for
(
size_t
i
=
0
;
i
<
p
->
dcount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
->
header
.
dcount
;
i
++
)
{
try
{
...
...
@@ -424,7 +416,7 @@ void UNetReceiver::update() noexcept
}
catch
(
const
uniset
::
Exception
&
ex
)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
D:
"
<<
" id="
<<
s_id
<<
" val="
<<
p
->
dValue
(
i
)
<<
" error: "
<<
ex
...
...
@@ -432,7 +424,7 @@ void UNetReceiver::update() noexcept
}
catch
(...)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
D:
"
<<
" id="
<<
s_id
<<
" val="
<<
p
->
dValue
(
i
)
<<
" error: catch..."
...
...
@@ -443,7 +435,7 @@ void UNetReceiver::update() noexcept
// Обработка аналоговых
auto
a_iv
=
getACache
(
p
);
for
(
size_t
i
=
0
;
i
<
p
->
acount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
p
->
header
.
acount
;
i
++
)
{
try
{
...
...
@@ -465,7 +457,7 @@ void UNetReceiver::update() noexcept
}
catch
(
const
uniset
::
Exception
&
ex
)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
A:
"
<<
" id="
<<
dat
->
id
<<
" val="
<<
dat
->
val
<<
" error: "
<<
ex
...
...
@@ -473,7 +465,7 @@ void UNetReceiver::update() noexcept
}
catch
(...)
{
unetcrit
<<
myname
<<
"(update): "
unetcrit
<<
myname
<<
"(update):
A:
"
<<
" id="
<<
dat
->
id
<<
" val="
<<
dat
->
val
<<
" error: catch..."
...
...
@@ -630,8 +622,9 @@ bool UNetReceiver::receive() noexcept
{
try
{
ssize_t
ret
=
udp
->
receiveBytes
(
r_buf
.
data
,
sizeof
(
r_buf
.
data
));
recvCount
++
;
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
pack
=
&
(
cbuf
[
wnum
%
cbufSize
]);
ssize_t
ret
=
udp
->
receiveBytes
(
pack
->
raw
,
sizeof
(
pack
->
raw
)
/* UniSetUDP::MaxDataLen */
);
if
(
ret
<
0
)
{
...
...
@@ -641,27 +634,24 @@ bool UNetReceiver::receive() noexcept
if
(
ret
==
0
)
{
unetwarn
<<
myname
<<
"(receive): disconnected?!... recv 0 byte.."
<<
endl
;
unetwarn
<<
myname
<<
"(receive): disconnected?!... recv 0 byte
s
.."
<<
endl
;
return
false
;
}
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
pack
=
&
cbuf
[
wnum
%
cbufSize
];
size_t
sz
=
UniSetUDP
::
UDPMessage
::
getMessage
(
*
pack
,
r_buf
);
recvCount
++
;
if
(
sz
==
0
)
{
unetcrit
<<
myname
<<
"(receive): FAILED RECEIVE DATA ret="
<<
ret
<<
endl
;
return
false
;
}
// конвертируем byte order
pack
->
msg
.
ntoh
();
if
(
pack
->
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
if
(
pack
->
m
sg
.
header
.
m
agic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
return
false
;
if
(
size_t
(
abs
(
long
(
pack
->
num
-
wnum
)))
>
maxDifferens
||
size_t
(
abs
(
long
(
wnum
-
rnum
)
))
>=
(
cbufSize
-
2
)
)
// cout << "RECV[" << ret << "]: msg: " << pack->msg << endl;
if
(
size_t
(
abs
(
long
(
pack
->
msg
.
header
.
num
-
wnum
)))
>
maxDifferens
||
size_t
(
abs
(
long
(
wnum
-
rnum
)
))
>=
(
cbufSize
-
2
)
)
{
unetcrit
<<
myname
<<
"(receive): DISAGREE "
<<
" packnum="
<<
pack
->
num
<<
" packnum="
<<
pack
->
msg
.
header
.
num
<<
" wnum="
<<
wnum
<<
" rnum="
<<
rnum
<<
" (maxDiff="
<<
maxDifferens
...
...
@@ -669,40 +659,41 @@ bool UNetReceiver::receive() noexcept
<<
")"
<<
endl
;
lostPackets
=
pack
->
num
>
wnum
?
(
pack
->
num
-
wnum
-
1
)
:
lostPackets
+
1
;
lostPackets
=
pack
->
msg
.
header
.
num
>
wnum
?
(
pack
->
msg
.
header
.
num
-
wnum
-
1
)
:
lostPackets
+
1
;
// реинициализируем позицию для чтения
rnum
=
pack
->
num
;
wnum
=
pack
->
num
+
1
;
rnum
=
pack
->
msg
.
header
.
num
;
wnum
=
pack
->
msg
.
header
.
num
+
1
;
// перемещаем пакет в нужное место (если требуется)
if
(
wnum
!=
pack
->
num
)
if
(
wnum
!=
pack
->
msg
.
header
.
num
)
{
cbuf
[
pack
->
num
%
cbufSize
]
=
(
*
pack
)
;
pack
->
num
=
0
;
cbuf
[
pack
->
msg
.
header
.
num
%
cbufSize
].
msg
=
pack
->
msg
;
pack
->
msg
.
header
.
num
=
0
;
}
return
true
;
}
if
(
pack
->
num
!=
wnum
)
if
(
pack
->
msg
.
header
.
num
!=
wnum
)
{
// перемещаем пакет в правильное место
// в соответствии с его номером
cbuf
[
pack
->
num
%
cbufSize
]
=
(
*
pack
)
;
cbuf
[
pack
->
msg
.
header
.
num
%
cbufSize
].
msg
=
pack
->
msg
;
if
(
pack
->
num
>=
wnum
)
wnum
=
pack
->
num
+
1
;
if
(
pack
->
msg
.
header
.
num
>=
wnum
)
wnum
=
pack
->
msg
.
header
.
num
+
1
;
// обнуляем номер в том месте где записали, чтобы его не обрабатывал update
pack
->
num
=
0
;
pack
->
msg
.
header
.
num
=
0
;
}
else
if
(
pack
->
num
>=
wnum
)
wnum
=
pack
->
num
+
1
;
else
if
(
pack
->
msg
.
header
.
num
>=
wnum
)
wnum
=
pack
->
msg
.
header
.
num
+
1
;
// начальная инициализация для чтения
if
(
rnum
==
0
)
rnum
=
pack
->
num
;
rnum
=
pack
->
msg
.
header
.
num
;
// cout << "FINAL: msg: " << cbuf[(wnum-1) % cbufSize].msg << endl;
return
true
;
}
catch
(
Poco
::
Net
::
NetException
&
ex
)
...
...
@@ -748,14 +739,14 @@ UNetReceiver::CacheVec* UNetReceiver::getDCache( UniSetUDP::UDPMessage* pack ) n
CacheVec
*
d_info
=
&
dit
->
second
;
if
(
pack
->
dcount
==
d_info
->
size
()
)
if
(
pack
->
header
.
dcount
==
d_info
->
size
()
)
return
d_info
;
unetinfo
<<
myname
<<
": init dcache["
<<
pack
->
dcount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
unetinfo
<<
myname
<<
": init dcache["
<<
pack
->
header
.
dcount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
d_info
->
resize
(
pack
->
dcount
);
d_info
->
resize
(
pack
->
header
.
dcount
);
for
(
size_t
i
=
0
;
i
<
pack
->
dcount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
pack
->
header
.
dcount
;
i
++
)
{
CacheItem
&
d
=
(
*
d_info
)[
i
];
...
...
@@ -781,14 +772,14 @@ UNetReceiver::CacheVec* UNetReceiver::getACache( UniSetUDP::UDPMessage* pack ) n
CacheVec
*
a_info
=
&
ait
->
second
;
if
(
pack
->
acount
==
a_info
->
size
()
)
if
(
pack
->
header
.
acount
==
a_info
->
size
()
)
return
a_info
;
unetinfo
<<
myname
<<
": init acache["
<<
pack
->
acount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
unetinfo
<<
myname
<<
": init acache["
<<
pack
->
header
.
acount
<<
"] for "
<<
pack
->
getDataID
()
<<
endl
;
a_info
->
resize
(
pack
->
acount
);
a_info
->
resize
(
pack
->
header
.
acount
);
for
(
size_t
i
=
0
;
i
<
pack
->
acount
;
i
++
)
for
(
size_t
i
=
0
;
i
<
pack
->
header
.
acount
;
i
++
)
{
CacheItem
&
d
=
(
*
a_info
)[
i
];
...
...
extensions/UNetUDP/UNetReceiver.h
View file @
94e22ed7
...
...
@@ -234,18 +234,16 @@ namespace uniset
uniset
::
ObjectId
sidRespond
=
{
uniset
::
DefaultObjectId
};
IOController
::
IOStateList
::
iterator
itRespond
;
bool
respondInvert
=
{
false
};
uniset
::
ObjectId
sidLostPackets
;
uniset
::
ObjectId
sidLostPackets
=
{
uniset
::
DefaultObjectId
}
;
IOController
::
IOStateList
::
iterator
itLostPackets
;
std
::
atomic_bool
activated
=
{
false
};
size_t
cbufSize
=
{
100
};
/*!< размер буфера для сообщений по умолчанию */
std
::
vector
<
UniSetUDP
::
UDP
Message
>
cbuf
;
// circular buffer
std
::
vector
<
UniSetUDP
::
UDP
Packet
>
cbuf
;
// circular buffer
size_t
wnum
=
{
1
};
/*!< номер следующего ожидаемого пакета */
size_t
rnum
=
{
0
};
/*!< номер последнего обработанного пакета */
UniSetUDP
::
UDPMessage
*
pack
;
UniSetUDP
::
UDPPacket
r_buf
;
/*!< просто буфер для получения очередного сообщения */
UniSetUDP
::
UDPPacket
*
pack
;
// текущий обрабатываемый пакет
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
...
...
extensions/UNetUDP/UNetSender.cc
View file @
94e22ed7
...
...
@@ -87,8 +87,8 @@ namespace uniset
// выставляем поля, которые не меняются
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
mypack
.
msg
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack
.
msg
.
procID
=
shm
->
ID
();
mypack
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
}
// -------------------------------
...
...
@@ -200,9 +200,9 @@ namespace uniset
uniset
::
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
if
(
it
.
iotype
==
UniversalIO
::
DI
||
it
.
iotype
==
UniversalIO
::
DO
)
mypack
.
msg
.
setDData
(
it
.
pack_ind
,
value
);
mypack
.
p
.
msg
.
setDData
(
it
.
pack_ind
,
value
);
else
if
(
it
.
iotype
==
UniversalIO
::
AI
||
it
.
iotype
==
UniversalIO
::
AO
)
mypack
.
msg
.
setAData
(
it
.
pack_ind
,
value
);
mypack
.
p
.
msg
.
setAData
(
it
.
pack_ind
,
value
);
}
// -----------------------------------------------------------------------------
void
UNetSender
::
setCheckConnectionPause
(
int
msec
)
...
...
@@ -313,11 +313,11 @@ namespace uniset
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
mypack
.
msg
.
num
=
packetnum
++
;
#else
uint16_t
crc
=
mypack
.
msg
.
getDataCRC
();
uint16_t
crc
=
mypack
.
p
.
msg
.
getDataCRC
();
if
(
crc
!=
lastcrc
)
{
mypack
.
msg
.
num
=
packetnum
++
;
mypack
.
p
.
msg
.
header
.
num
=
packetnum
++
;
lastcrc
=
crc
;
}
...
...
@@ -331,12 +331,10 @@ namespace uniset
if
(
!
udp
||
!
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
writeTimeout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
return
;
mypack
.
msg
.
transport_msg
(
s_msg
);
size_t
ret
=
udp
->
sendTo
(
mypack
.
p
.
raw
,
mypack
.
p
.
msg
.
len
(),
saddr
);
size_t
ret
=
udp
->
sendTo
(
&
s_msg
.
data
,
s_msg
.
len
,
saddr
);
if
(
ret
<
s_msg
.
len
)
unetcrit
<<
myname
<<
"(real_send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
s_msg
.
len
<<
endl
;
if
(
ret
<
mypack
.
p
.
msg
.
len
()
)
unetcrit
<<
myname
<<
"(real_send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
mypack
.
p
.
msg
.
len
()
<<
endl
;
}
catch
(
Poco
::
Net
::
NetException
&
ex
)
{
...
...
@@ -457,7 +455,7 @@ namespace uniset
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
p
.
pack_ind
=
mypack
.
msg
.
addDData
(
sid
,
defval
);
p
.
pack_ind
=
mypack
.
p
.
msg
.
addDData
(
sid
,
defval
);
}
// unlock mutex....
if
(
p
.
pack_ind
>=
maxDData
)
...
...
@@ -469,9 +467,9 @@ namespace uniset
auto
&
mypack2
=
pk
[
dnum
];
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
p
.
pack_ind
=
mypack2
.
msg
.
addDData
(
sid
,
defval
);
mypack2
.
msg
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
msg
.
procID
=
shm
->
ID
();
p
.
pack_ind
=
mypack2
.
p
.
msg
.
addDData
(
sid
,
defval
);
mypack2
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
}
p
.
pack_num
=
dnum
;
...
...
@@ -498,7 +496,7 @@ namespace uniset
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
p
.
pack_ind
=
mypack
.
msg
.
addAData
(
sid
,
defval
);
p
.
pack_ind
=
mypack
.
p
.
msg
.
addAData
(
sid
,
defval
);
}
if
(
p
.
pack_ind
>=
maxAData
)
...
...
@@ -510,9 +508,9 @@ namespace uniset
auto
&
mypack2
=
pk
[
anum
];
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
p
.
pack_ind
=
mypack2
.
msg
.
addAData
(
sid
,
defval
);
mypack2
.
msg
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
msg
.
procID
=
shm
->
ID
();
p
.
pack_ind
=
mypack2
.
p
.
msg
.
addAData
(
sid
,
defval
);
mypack2
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
}
p
.
pack_num
=
anum
;
...
...
@@ -589,11 +587,11 @@ namespace uniset
s
<<
"
\t
["
<<
i
->
first
<<
"]="
<<
i
->
second
.
size
()
<<
endl
;
size_t
n
=
0
;
for
(
const
auto
&
p
:
i
->
second
)
for
(
const
auto
&
p
ack
:
i
->
second
)
{
//uniset_rwmutex_rlock l(p->mut);
s
<<
"
\t\t
["
<<
(
n
++
)
<<
"]="
<<
p
.
msg
.
sizeOf
()
<<
" bytes"
<<
" ( numA="
<<
setw
(
5
)
<<
p
.
msg
.
asize
()
<<
" numD="
<<
setw
(
5
)
<<
p
.
msg
.
dsize
()
<<
")"
s
<<
"
\t\t
["
<<
(
n
++
)
<<
"]="
<<
p
ack
.
p
.
msg
.
len
()
<<
" bytes"
<<
" ( numA="
<<
setw
(
5
)
<<
p
ack
.
p
.
msg
.
asize
()
<<
" numD="
<<
setw
(
5
)
<<
pack
.
p
.
msg
.
dsize
()
<<
")"
<<
endl
;
}
}
...
...
extensions/UNetUDP/UNetSender.h
View file @
94e22ed7
...
...
@@ -114,12 +114,12 @@ namespace uniset
struct
PackMessage
{
PackMessage
(
UniSetUDP
::
UDPMessage
&&
m
)
noexcept
:
msg
(
std
::
move
(
m
))
{}
PackMessage
(
const
UniSetUDP
::
UDPMessage
&
m
)
=
delete
;
PackMessage
(
uniset
::
UniSetUDP
::
UDPPacket
&&
p
)
noexcept
:
p
(
std
::
move
(
p
))
{}
PackMessage
(
const
uniset
::
UniSetUDP
::
UDPPacket
&
p
)
=
delete
;
PackMessage
()
noexcept
{}
UniSetUDP
::
UDPMessage
msg
;
uniset
::
UniSetUDP
::
UDPPacket
p
;
uniset
::
uniset_rwmutex
mut
;
};
...
...
@@ -223,7 +223,6 @@ namespace uniset
UItemMap
items
;
size_t
packetnum
=
{
1
};
/*!< номер очередного посылаемого пакета */
uint16_t
lastcrc
=
{
0
};
UniSetUDP
::
UDPPacket
s_msg
;
size_t
maxAData
=
{
UniSetUDP
::
MaxACount
};
size_t
maxDData
=
{
UniSetUDP
::
MaxDCount
};
...
...
extensions/UNetUDP/tests/test_unetudp.cc
View file @
94e22ed7
This diff is collapsed.
Click to expand it.
extensions/UNetUDP/tests/urecv_perf_test.cc
View file @
94e22ed7
...
...
@@ -59,18 +59,18 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
}
}
UniSetUDP
::
UDP
Message
mypack
;
mypack
.
nodeID
=
100
;
mypack
.
procID
=
100
;
UniSetUDP
::
UDP
Packet
mypack
;
mypack
.
msg
.
header
.
nodeID
=
100
;
mypack
.
msg
.
header
.
procID
=
100
;
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
{
UniSetUDP
::
UDPAData
d
(
i
,
i
);
mypack
.
addAData
(
d
);
mypack
.
msg
.
addAData
(
d
);
}
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
mypack
.
addDData
(
i
,
i
);
mypack
.
msg
.
addDData
(
i
,
i
);
for
(
size_t
i
=
0
;
i
<
max
;
i
++
)
{
...
...
@@ -92,13 +92,11 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
}
size_t
packetnum
=
0
;
UniSetUDP
::
UDPPacket
s_buf
;
size_t
nc
=
1
;
while
(
nc
)
// -V654
{
mypack
.
num
=
packetnum
++
;
mypack
.
msg
.
header
.
num
=
packetnum
++
;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
...
...
@@ -111,11 +109,10 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{
if
(
udp
->
poll
(
100000
,
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
{
mypack
.
transport_msg
(
s_buf
);
size_t
ret
=
udp
->
sendBytes
((
char
*
)
&
s_buf
.
data
,
s_buf
.
len
);
size_t
ret
=
udp
->
sendBytes
(
mypack
.
raw
,
mypack
.
msg
.
len
());
if
(
ret
<
s_buf
.
len
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
s_buf
.
len
<<
endl
;
if
(
ret
<
mypack
.
msg
.
len
()
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
mypack
.
msg
.
len
()
<<
endl
;
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
...
...
@@ -180,9 +177,9 @@ int main(int argc, char* argv[] )
auto
conf
=
uniset_init
(
argc
,
argv
);
if
(
argc
>
1
&&
!
strcmp
(
argv
[
1
],
"s"
)
)
run_senders
(
1
,
host
);
run_senders
(
1
0
,
host
);
else
run_test
(
1
,
host
);
run_test
(
1
0
,
host
);
return
0
;
}
...
...
extensions/UNetUDP/unet-udp-tester.cc
View file @
94e22ed7
...
...
@@ -223,9 +223,7 @@ int main(int argc, char* argv[])
{
UDPReceiveU
udp
(
s_host
,
port
);
// char buf[UniSetUDP::MaxDataLen];
UniSetUDP
::
UDPMessage
pack
;
UniSetUDP
::
UDPPacket
buf
;
UniSetUDP
::
UDPPacket
pack
;
unsigned
long
prev_num
=
1
;
int
nc
=
1
;
...
...
@@ -257,27 +255,35 @@ int main(int argc, char* argv[])
continue
;
}
size_t
ret
=
udp
.
receiveBytes
(
&
(
buf
.
data
),
sizeof
(
buf
.
data
)
);
size_t
sz
=
UniSetUDP
::
UDPMessage
::
getMessage
(
pack
,
buf
);
size_t
ret
=
udp
.
receiveBytes
(
pack
.
raw
,
sizeof
(
pack
.
raw
)
);
if
(
sz
==
0
)
if
(
ret
<
0
)
{
if
(
pack
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
cerr
<<
"(recv): BAD PROTOCOL VERSION! [ need version '"
<<
UniSetUDP
::
UNETUDP_MAGICNUM
<<
"']"
<<
endl
;
else
cerr
<<
"(recv): FAILED header ret="
<<
ret
<<
" sizeof="
<<
sz
<<
endl
;
cerr
<<
"(recv): no data?!"
<<
endl
;
continue
;
}
if
(
ret
==
0
)
{
cerr
<<
"(recv): connection closed?!"
<<
endl
;
continue
;
}
pack
.
msg
.
ntoh
();
if
(
pack
.
msg
.
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
{
cerr
<<
"(recv): BAD PROTOCOL VERSION! [ need version '"
<<
UniSetUDP
::
UNETUDP_MAGICNUM
<<
"']"
<<
endl
;
continue
;
}
if
(
lost
)
{
if
(
prev_num
!=
(
pack
.
num
-
1
)
)
cerr
<<
"WARNING! Incorrect sequence of packets! current="
<<
pack
.
num
if
(
prev_num
!=
(
pack
.
msg
.
header
.
num
-
1
)
)
cerr
<<
"WARNING! Incorrect sequence of packets! current="
<<
pack
.
msg
.
header
.
num
<<
" prev="
<<
prev_num
<<
endl
;
prev_num
=
pack
.
num
;
prev_num
=
pack
.
msg
.
header
.
num
;
}
npack
++
;
...
...
@@ -287,7 +293,7 @@ int main(int argc, char* argv[])
<<
" bytes: "
<<
ret
<<
endl
;
if
(
show
)
cout
<<
"receive data: "
<<
pack
<<
endl
;
cout
<<
"receive data: "
<<
pack
.
msg
<<
endl
;
}
catch
(
Poco
::
Net
::
NetException
&
e
)
{
...
...
@@ -314,9 +320,10 @@ int main(int argc, char* argv[])
std
::
shared_ptr
<
UDPSocketU
>
udp
=
make_shared
<
UDPSocketU
>
(
s_host
,
port
);
udp
->
setBroadcast
(
broadcast
);
UniSetUDP
::
UDPMessage
mypack
;
mypack
.
nodeID
=
nodeID
;
mypack
.
procID
=
procID
;
UniSetUDP
::
UDPPacket
mypack
;
UDPMessage
*
msg
=
&
mypack
.
msg
;
msg
->
header
.
nodeID
=
nodeID
;
msg
->
header
.
procID
=
procID
;
if
(
!
a_data
.
empty
()
)
{
...
...
@@ -325,7 +332,7 @@ int main(int argc, char* argv[])
for
(
const
auto
&
v
:
vlist
)
{
UDPAData
d
(
v
.
si
.
id
,
v
.
val
);
m
ypack
.
addAData
(
d
);
m
sg
->
addAData
(
d
);
}
}
else
...
...
@@ -333,7 +340,7 @@ int main(int argc, char* argv[])
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
{
UDPAData
d
(
i
,
i
);
m
ypack
.
addAData
(
d
);
m
sg
->
addAData
(
d
);
}
}
...
...
@@ -342,19 +349,17 @@ int main(int argc, char* argv[])
auto
vlist
=
uniset
::
getSInfoList
(
d_data
,
nullptr
);
for
(
const
auto
&
v
:
vlist
)
m
ypack
.
addDData
(
v
.
si
.
id
,
v
.
val
);
m
sg
->
addDData
(
v
.
si
.
id
,
v
.
val
);
}
else
{
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
m
ypack
.
addDData
(
i
,
i
);
m
sg
->
addDData
(
i
,
i
);
}
Poco
::
Net
::
SocketAddress
sa
(
s_host
,
port
);
udp
->
connect
(
sa
);
UniSetUDP
::
UDPPacket
s_buf
;
size_t
nc
=
1
;
if
(
ncycles
>
0
)
...
...
@@ -362,7 +367,7 @@ int main(int argc, char* argv[])
while
(
nc
)
{
m
ypack
.
num
=
packetnum
++
;
m
sg
->
header
.
num
=
packetnum
++
;
// при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
...
...
@@ -373,16 +378,14 @@ int main(int argc, char* argv[])
{
if
(
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
tout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
{
mypack
.
transport_msg
(
s_buf
);
if
(
verb
)
cout
<<
"(send): to addr="
<<
addr
<<
" d_count="
<<
m
ypack
.
dcount
<<
" a_count="
<<
m
ypack
.
acount
<<
" bytes="
<<
s_buf
.
len
<<
endl
;
cout
<<
"(send): to addr="
<<
addr
<<
" d_count="
<<
m
sg
->
header
.
dcount
<<
" a_count="
<<
m
sg
->
header
.
acount
<<
" bytes="
<<
msg
->
len
()
<<
endl
;
size_t
ret
=
udp
->
sendBytes
(
(
char
*
)
&
s_buf
.
data
,
s_buf
.
len
);
size_t
ret
=
udp
->
sendBytes
(
mypack
.
raw
,
sizeof
(
mypack
.
raw
)
/* mypack.msg.len() */
);
if
(
ret
<
s_buf
.
len
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
s_buf
.
len
<<
endl
;
if
(
ret
<
msg
->
len
()
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
msg
->
len
()
<<
endl
;
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
...
...
uniset2.files
View file @
94e22ed7
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment