Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
U
uniset2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UniSet project repositories
uniset2
Commits
7c8d5a65
Commit
7c8d5a65
authored
Dec 13, 2020
by
Pavel Vainerman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[unet]: const size
parent
b4672e62
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
102 additions
and
150 deletions
+102
-150
UDPPacket.cc
extensions/UNetUDP/UDPPacket.cc
+1
-12
UDPPacket.h
extensions/UNetUDP/UDPPacket.h
+19
-45
UNetReceiver.cc
extensions/UNetUDP/UNetReceiver.cc
+25
-35
UNetReceiver.h
extensions/UNetUDP/UNetReceiver.h
+2
-2
UNetSender.cc
extensions/UNetUDP/UNetSender.cc
+19
-19
UNetSender.h
extensions/UNetUDP/UNetSender.h
+3
-3
test_unetudp.cc
extensions/UNetUDP/tests/test_unetudp.cc
+0
-0
urecv_perf_test.cc
extensions/UNetUDP/tests/urecv_perf_test.cc
+12
-12
unet-udp-tester.cc
extensions/UNetUDP/unet-udp-tester.cc
+21
-22
No files found.
extensions/UNetUDP/UDPPacket.cc
View file @
7c8d5a65
...
@@ -263,20 +263,9 @@ namespace uniset
...
@@ -263,20 +263,9 @@ namespace uniset
return
header
.
num
;
return
header
.
num
;
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
size_t
UniSetUDP
::
UDPMessage
::
len
()
const
noexcept
{
return
UniSetUDP
::
MaxDataLen
;
// биты которые не уместились в очередной байт, добавляют ещё один байт
// size_t nbit = header.dcount % 8 * sizeof(uint8_t);
// size_t add = nbit > 0 ? 1 : 0;
// return sizeof(header) + header.acount * sizeof(UDPAData) + header.dcount * sizeof(long) + (header.dcount / 8 * sizeof(uint8_t) + add);
}
// -----------------------------------------------------------------------------
bool
UDPMessage
::
isOk
()
noexcept
bool
UDPMessage
::
isOk
()
noexcept
{
{
return
(
header
.
magic
!
=
UniSetUDP
::
UNETUDP_MAGICNUM
);
return
(
header
.
magic
=
=
UniSetUDP
::
UNETUDP_MAGICNUM
);
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UDPMessage
::
ntoh
()
noexcept
void
UDPMessage
::
ntoh
()
noexcept
...
...
extensions/UNetUDP/UDPPacket.h
View file @
7c8d5a65
...
@@ -27,27 +27,23 @@ namespace uniset
...
@@ -27,27 +27,23 @@ namespace uniset
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
namespace
UniSetUDP
namespace
UniSetUDP
{
{
/*! Для оптимизации размера передаваемых данных, но с учётом того, что ID могут идти не подряд.
/*! С учётом того, что ID могут идти не подряд. Сделан следующий формат:
Сделан следующий формат:
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для аналоговых величин передаётся массив пар "id-value"(UDPAData).
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
Для булевых величин - отдельно массив ID и отдельно битовый массив со значениями,
(по количеству битов такого же размера).
(по количеству битов такого же размера).
\
todo Подумать на тему сделать два отдельных вида пакетов для булевых значений и для аналоговых,
\
warning ТЕКУЩАЯ ВЕРСИЯ ПРОТОКОЛА НЕ БУДЕТ РАБОТАТЬ МЕЖДУ 32-битными и 64-битными системами (из-за отличия в типе long).
чтобы уйти от преобразования UDPMessage --> UDPPacket --> UDPMessage
.
т.к. это не сильно актуально, пока не переделываю
.
\warning ТЕКУЩАЯ ВЕРСИЯ ПРОТОКОЛА НЕ БУДЕТ РАБОТАТЬ МЕЖДУ 32-битными и 64-битными системами (из-за отличия в типе long).
"ByteOrder"
т.к. это не сильно актуально, пока не переделываю.
============
В текущей версии протокола. В UDPHeader содержится информации о порядке байт.
"ByteOrder"
Поэтому логика следующая:
============
- Узел который посылает, ничего не перекодирует и просто посылает данные так как хранит
В текущей версии протокола. В UDPHeader содержится информации о порядке байт.
(информация о порядке байт, если специально не выставить, будет выставлена при компиляции, см. конструктор)
Поэтому логика следующая:
- Узел который принимает данные, декодирует их, если на его узле порядок байт не совпадает.
- Узел который посылает, ничего не перекодирует и просто посылает данные так как хранит
Т.е. если все узлы будут иметь одинаковый порядок байт, фактического перекодирования не будет.
(информация о порядке байт, если специально не выставить, будет выставлена при компиляции, см. конструктор)
*/
- Узел который принимает данные, декодирует их, если на его узле порядок байт не совпадает.
Т.е. если все узлы будут иметь одинаковый порядок байт, фактического перекодирования не будет.
*/
const
uint32_t
UNETUDP_MAGICNUM
=
0x1343EFD
;
// идентификатор протокола
const
uint32_t
UNETUDP_MAGICNUM
=
0x1343EFD
;
// идентификатор протокола
...
@@ -78,17 +74,13 @@ namespace uniset
...
@@ -78,17 +74,13 @@ namespace uniset
}
__attribute__
((
packed
));
}
__attribute__
((
packed
));
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPAData
&
p
);
// Теоретический размер данных в UDP пакете (исключая заголовки) 65507
// Фактически желательно не вылезать за размер MTU (обычно 1500) - заголовки = 1432 байта
// Теоретический размер данных в UDP пакете (исключая заголовки) 65507
// т.е. надо чтобы sizeof(UDPPacket) < 1432
// Фактически желательно не вылезать за размер MTU (обычно 1500) - заголовки = 1432 байта
// При текущих настройках sizeof(UDPPacket) = 56421 (!)
// т.е. надо чтобы sizeof(UDPPacket) < 1432
static
const
size_t
MaxACount
=
2000
;
// с другой стороны в текущей реализации
// в сеть посылается фактическое количество данных, а не sizeof(UDPPacket).
// При текущих настройках sizeof(UDPPacket) = 72679 (!)
static
const
size_t
MaxACount
=
1000
;
static
const
size_t
MaxDCount
=
3000
;
static
const
size_t
MaxDCount
=
3000
;
static
const
size_t
MaxDDataCount
=
1
+
MaxDCount
/
8
*
sizeof
(
u
nsigned
char
);
static
const
size_t
MaxDDataCount
=
1
+
MaxDCount
/
8
*
sizeof
(
u
int8_t
);
struct
UDPMessage
struct
UDPMessage
{
{
...
@@ -142,16 +134,7 @@ namespace uniset
...
@@ -142,16 +134,7 @@ namespace uniset
return
header
.
acount
;
return
header
.
acount
;
}
}
// размер итогового пакета в байтах
uint16_t
getDataCRC
()
const
noexcept
;
size_t
len
()
const
noexcept
;
uint16_t
getDataCRC
()
const
noexcept
;
// количество байт в пакете с булевыми переменными...
size_t
d_byte
()
const
noexcept
{
return
header
.
dcount
*
sizeof
(
long
)
+
header
.
dcount
;
}
UDPHeader
header
;
UDPHeader
header
;
UDPAData
a_dat
[
MaxACount
];
/*!< аналоговые величины */
UDPAData
a_dat
[
MaxACount
];
/*!< аналоговые величины */
...
@@ -162,15 +145,6 @@ namespace uniset
...
@@ -162,15 +145,6 @@ namespace uniset
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPMessage
&
p
);
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UDPMessage
&
p
);
uint16_t
makeCRC
(
unsigned
char
*
buf
,
size_t
len
)
noexcept
;
uint16_t
makeCRC
(
unsigned
char
*
buf
,
size_t
len
)
noexcept
;
static
const
size_t
MaxDataLen
=
sizeof
(
UDPHeader
)
+
MaxDCount
*
sizeof
(
long
)
+
MaxDDataCount
+
MaxACount
*
sizeof
(
UDPAData
);
union
UDPPacket
{
UDPPacket
()
:
msg
(){};
uint8_t
raw
[
MaxDataLen
];
UDPMessage
msg
;
};
}
}
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
}
// end of namespace uniset
}
// end of namespace uniset
...
...
extensions/UNetUDP/UNetReceiver.cc
View file @
7c8d5a65
...
@@ -29,7 +29,6 @@ using namespace uniset;
...
@@ -29,7 +29,6 @@ using namespace uniset;
using
namespace
uniset
::
extensions
;
using
namespace
uniset
::
extensions
;
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
CommonEventLoop
UNetReceiver
::
loop
;
CommonEventLoop
UNetReceiver
::
loop
;
static
UniSetUDP
::
UDPPacket
defpack
;
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
UNetReceiver
::
UNetReceiver
(
const
std
::
string
&
s_host
,
int
_port
UNetReceiver
::
UNetReceiver
(
const
std
::
string
&
s_host
,
int
_port
,
const
std
::
shared_ptr
<
SMInterface
>&
smi
,
const
std
::
shared_ptr
<
SMInterface
>&
smi
...
@@ -335,7 +334,7 @@ size_t UNetReceiver::rnext( size_t num )
...
@@ -335,7 +334,7 @@ size_t UNetReceiver::rnext( size_t num )
while
(
i
<
wnum
)
while
(
i
<
wnum
)
{
{
p
=
&
cbuf
[
i
%
cbufSize
]
.
msg
;
p
=
&
cbuf
[
i
%
cbufSize
];
if
(
p
->
header
.
num
>
num
)
if
(
p
->
header
.
num
>
num
)
return
i
;
return
i
;
...
@@ -361,9 +360,7 @@ void UNetReceiver::update() noexcept
...
@@ -361,9 +360,7 @@ void UNetReceiver::update() noexcept
// либо обнаружится "дырка" в последовательности,
// либо обнаружится "дырка" в последовательности,
while
(
rnum
<
wnum
)
while
(
rnum
<
wnum
)
{
{
p
=
&
(
cbuf
[
rnum
%
cbufSize
].
msg
);
p
=
&
(
cbuf
[
rnum
%
cbufSize
]);
// cout << "update: num=" << p->header.num << " rnum=" << rnum << " wnum=" << wnum << endl;
// если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// если номер пакета не равен ожидаемому, ждём считая что это "дырка"
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
// т.к. разрывы и другие случаи обрабатываются при приёме пакетов
...
@@ -389,6 +386,10 @@ void UNetReceiver::update() noexcept
...
@@ -389,6 +386,10 @@ void UNetReceiver::update() noexcept
rnum
++
;
rnum
++
;
upCount
++
;
upCount
++
;
// обновление данных в SM (блокировано)
if
(
lockUpdate
)
continue
;
// Обработка дискретных
// Обработка дискретных
auto
d_iv
=
getDCache
(
p
);
auto
d_iv
=
getDCache
(
p
);
...
@@ -406,10 +407,6 @@ void UNetReceiver::update() noexcept
...
@@ -406,10 +407,6 @@ void UNetReceiver::update() noexcept
shm
->
initIterator
(
c_it
->
ioit
);
shm
->
initIterator
(
c_it
->
ioit
);
}
}
// обновление данных в SM (блокировано)
if
(
lockUpdate
)
continue
;
shm
->
localSetValue
(
c_it
->
ioit
,
s_id
,
p
->
dValue
(
i
),
shm
->
ID
());
shm
->
localSetValue
(
c_it
->
ioit
,
s_id
,
p
->
dValue
(
i
),
shm
->
ID
());
}
}
catch
(
const
uniset
::
Exception
&
ex
)
catch
(
const
uniset
::
Exception
&
ex
)
...
@@ -447,10 +444,6 @@ void UNetReceiver::update() noexcept
...
@@ -447,10 +444,6 @@ void UNetReceiver::update() noexcept
shm
->
initIterator
(
c_it
->
ioit
);
shm
->
initIterator
(
c_it
->
ioit
);
}
}
// обновление данных в SM (блокировано)
if
(
lockUpdate
)
continue
;
shm
->
localSetValue
(
c_it
->
ioit
,
dat
->
id
,
dat
->
val
,
shm
->
ID
());
shm
->
localSetValue
(
c_it
->
ioit
,
dat
->
id
,
dat
->
val
,
shm
->
ID
());
}
}
catch
(
const
uniset
::
Exception
&
ex
)
catch
(
const
uniset
::
Exception
&
ex
)
...
@@ -622,7 +615,7 @@ bool UNetReceiver::receive() noexcept
...
@@ -622,7 +615,7 @@ bool UNetReceiver::receive() noexcept
{
{
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
// сперва пробуем сохранить пакет в том месте, где должен быть очередной пакет
pack
=
&
(
cbuf
[
wnum
%
cbufSize
]);
pack
=
&
(
cbuf
[
wnum
%
cbufSize
]);
ssize_t
ret
=
udp
->
receiveBytes
(
pack
->
raw
,
sizeof
(
pack
->
raw
)
/* UniSetUDP::MaxDataLen */
);
ssize_t
ret
=
udp
->
receiveBytes
(
pack
,
sizeof
(
UniSetUDP
::
UDPMessage
)
);
if
(
ret
<
0
)
if
(
ret
<
0
)
{
{
...
@@ -639,17 +632,15 @@ bool UNetReceiver::receive() noexcept
...
@@ -639,17 +632,15 @@ bool UNetReceiver::receive() noexcept
recvCount
++
;
recvCount
++
;
// конвертируем byte order
// конвертируем byte order
pack
->
msg
.
ntoh
();
pack
->
ntoh
();
if
(
pack
->
msg
.
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
if
(
!
pack
->
isOk
()
)
return
false
;
return
false
;
// cout << "RECV[" << ret << "]: msg: " << pack->msg << endl;
if
(
size_t
(
abs
(
long
(
pack
->
header
.
num
-
wnum
)))
>
maxDifferens
||
size_t
(
abs
(
long
(
wnum
-
rnum
)
))
>=
(
cbufSize
-
2
)
)
if
(
size_t
(
abs
(
long
(
pack
->
msg
.
header
.
num
-
wnum
)))
>
maxDifferens
||
size_t
(
abs
(
long
(
wnum
-
rnum
)
))
>=
(
cbufSize
-
2
)
)
{
{
unetcrit
<<
myname
<<
"(receive): DISAGREE "
unetcrit
<<
myname
<<
"(receive): DISAGREE "
<<
" packnum="
<<
pack
->
msg
.
header
.
num
<<
" packnum="
<<
pack
->
header
.
num
<<
" wnum="
<<
wnum
<<
" wnum="
<<
wnum
<<
" rnum="
<<
rnum
<<
" rnum="
<<
rnum
<<
" (maxDiff="
<<
maxDifferens
<<
" (maxDiff="
<<
maxDifferens
...
@@ -657,41 +648,40 @@ bool UNetReceiver::receive() noexcept
...
@@ -657,41 +648,40 @@ bool UNetReceiver::receive() noexcept
<<
")"
<<
")"
<<
endl
;
<<
endl
;
lostPackets
=
pack
->
msg
.
header
.
num
>
wnum
?
(
pack
->
msg
.
header
.
num
-
wnum
-
1
)
:
lostPackets
+
1
;
lostPackets
=
pack
->
header
.
num
>
wnum
?
(
pack
->
header
.
num
-
wnum
-
1
)
:
lostPackets
+
1
;
// реинициализируем позицию для чтения
// реинициализируем позицию для чтения
rnum
=
pack
->
msg
.
header
.
num
;
rnum
=
pack
->
header
.
num
;
wnum
=
pack
->
msg
.
header
.
num
+
1
;
wnum
=
pack
->
header
.
num
+
1
;
// перемещаем пакет в нужное место (если требуется)
// перемещаем пакет в нужное место (если требуется)
if
(
wnum
!=
pack
->
msg
.
header
.
num
)
if
(
wnum
!=
pack
->
header
.
num
)
{
{
cbuf
[
pack
->
msg
.
header
.
num
%
cbufSize
].
msg
=
pack
->
msg
;
cbuf
[
pack
->
header
.
num
%
cbufSize
]
=
(
*
pack
)
;
pack
->
msg
.
header
.
num
=
0
;
pack
->
header
.
num
=
0
;
}
}
return
true
;
return
true
;
}
}
if
(
pack
->
msg
.
header
.
num
!=
wnum
)
if
(
pack
->
header
.
num
!=
wnum
)
{
{
// перемещаем пакет в правильное место
// перемещаем пакет в правильное место
// в соответствии с его номером
// в соответствии с его номером
cbuf
[
pack
->
msg
.
header
.
num
%
cbufSize
].
msg
=
pack
->
msg
;
cbuf
[
pack
->
header
.
num
%
cbufSize
]
=
(
*
pack
)
;
if
(
pack
->
msg
.
header
.
num
>=
wnum
)
if
(
pack
->
header
.
num
>=
wnum
)
wnum
=
pack
->
msg
.
header
.
num
+
1
;
wnum
=
pack
->
header
.
num
+
1
;
// обнуляем номер в том месте где записали, чтобы его не обрабатывал update
// обнуляем номер в том месте где записали, чтобы его не обрабатывал update
pack
->
msg
.
header
.
num
=
0
;
pack
->
header
.
num
=
0
;
}
}
else
if
(
pack
->
msg
.
header
.
num
>=
wnum
)
else
if
(
pack
->
header
.
num
>=
wnum
)
wnum
=
pack
->
msg
.
header
.
num
+
1
;
wnum
=
pack
->
header
.
num
+
1
;
// начальная инициализация для чтения
// начальная инициализация для чтения
if
(
rnum
==
0
)
if
(
rnum
==
0
)
rnum
=
pack
->
msg
.
header
.
num
;
rnum
=
pack
->
header
.
num
;
// cout << "FINAL: msg: " << cbuf[(wnum-1) % cbufSize].msg << endl;
return
true
;
return
true
;
}
}
catch
(
Poco
::
Net
::
NetException
&
ex
)
catch
(
Poco
::
Net
::
NetException
&
ex
)
...
...
extensions/UNetUDP/UNetReceiver.h
View file @
7c8d5a65
...
@@ -240,10 +240,10 @@ namespace uniset
...
@@ -240,10 +240,10 @@ namespace uniset
std
::
atomic_bool
activated
=
{
false
};
std
::
atomic_bool
activated
=
{
false
};
size_t
cbufSize
=
{
100
};
/*!< размер буфера для сообщений по умолчанию */
size_t
cbufSize
=
{
100
};
/*!< размер буфера для сообщений по умолчанию */
std
::
vector
<
UniSetUDP
::
UDP
Packet
>
cbuf
;
// circular buffer
std
::
vector
<
UniSetUDP
::
UDP
Message
>
cbuf
;
// circular buffer
size_t
wnum
=
{
1
};
/*!< номер следующего ожидаемого пакета */
size_t
wnum
=
{
1
};
/*!< номер следующего ожидаемого пакета */
size_t
rnum
=
{
0
};
/*!< номер последнего обработанного пакета */
size_t
rnum
=
{
0
};
/*!< номер последнего обработанного пакета */
UniSetUDP
::
UDP
Packet
*
pack
;
// текущий обрабатываемый пакет
UniSetUDP
::
UDP
Message
*
pack
;
// текущий обрабатываемый пакет
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
/*! максимальная разница между номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
* прошёл через максимум или сбился...
...
...
extensions/UNetUDP/UNetSender.cc
View file @
7c8d5a65
...
@@ -87,8 +87,8 @@ namespace uniset
...
@@ -87,8 +87,8 @@ namespace uniset
// выставляем поля, которые не меняются
// выставляем поля, которые не меняются
{
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
mypack
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
mypack
.
msg
.
header
.
procID
=
shm
->
ID
();
}
}
// -------------------------------
// -------------------------------
...
@@ -200,9 +200,9 @@ namespace uniset
...
@@ -200,9 +200,9 @@ namespace uniset
uniset
::
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
uniset
::
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
if
(
it
.
iotype
==
UniversalIO
::
DI
||
it
.
iotype
==
UniversalIO
::
DO
)
if
(
it
.
iotype
==
UniversalIO
::
DI
||
it
.
iotype
==
UniversalIO
::
DO
)
mypack
.
p
.
msg
.
setDData
(
it
.
pack_ind
,
value
);
mypack
.
msg
.
setDData
(
it
.
pack_ind
,
value
);
else
if
(
it
.
iotype
==
UniversalIO
::
AI
||
it
.
iotype
==
UniversalIO
::
AO
)
else
if
(
it
.
iotype
==
UniversalIO
::
AI
||
it
.
iotype
==
UniversalIO
::
AO
)
mypack
.
p
.
msg
.
setAData
(
it
.
pack_ind
,
value
);
mypack
.
msg
.
setAData
(
it
.
pack_ind
,
value
);
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetSender
::
setCheckConnectionPause
(
int
msec
)
void
UNetSender
::
setCheckConnectionPause
(
int
msec
)
...
@@ -313,11 +313,11 @@ namespace uniset
...
@@ -313,11 +313,11 @@ namespace uniset
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
#ifdef UNETUDP_DISABLE_OPTIMIZATION_N1
mypack
.
msg
.
num
=
packetnum
++
;
mypack
.
msg
.
num
=
packetnum
++
;
#else
#else
uint16_t
crc
=
mypack
.
p
.
msg
.
getDataCRC
();
uint16_t
crc
=
mypack
.
msg
.
getDataCRC
();
if
(
crc
!=
lastcrc
)
if
(
crc
!=
lastcrc
)
{
{
mypack
.
p
.
msg
.
header
.
num
=
packetnum
++
;
mypack
.
msg
.
header
.
num
=
packetnum
++
;
lastcrc
=
crc
;
lastcrc
=
crc
;
}
}
...
@@ -330,10 +330,10 @@ namespace uniset
...
@@ -330,10 +330,10 @@ namespace uniset
if
(
!
udp
||
!
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
writeTimeout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
if
(
!
udp
||
!
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
writeTimeout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
return
;
return
;
size_t
ret
=
udp
->
sendTo
(
mypack
.
p
.
raw
,
mypack
.
p
.
msg
.
len
(
),
saddr
);
size_t
ret
=
udp
->
sendTo
(
&
mypack
.
msg
,
sizeof
(
mypack
.
msg
),
saddr
);
if
(
ret
<
mypack
.
p
.
msg
.
len
(
)
)
if
(
ret
<
sizeof
(
mypack
.
msg
)
)
unetcrit
<<
myname
<<
"(real_send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
mypack
.
p
.
msg
.
len
(
)
<<
endl
;
unetcrit
<<
myname
<<
"(real_send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
sizeof
(
mypack
.
msg
)
<<
endl
;
}
}
catch
(
Poco
::
Net
::
NetException
&
ex
)
catch
(
Poco
::
Net
::
NetException
&
ex
)
{
{
...
@@ -454,7 +454,7 @@ namespace uniset
...
@@ -454,7 +454,7 @@ namespace uniset
{
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
p
.
pack_ind
=
mypack
.
p
.
msg
.
addDData
(
sid
,
defval
);
p
.
pack_ind
=
mypack
.
msg
.
addDData
(
sid
,
defval
);
}
// unlock mutex....
}
// unlock mutex....
if
(
p
.
pack_ind
>=
maxDData
)
if
(
p
.
pack_ind
>=
maxDData
)
...
@@ -466,9 +466,9 @@ namespace uniset
...
@@ -466,9 +466,9 @@ namespace uniset
auto
&
mypack2
=
pk
[
dnum
];
auto
&
mypack2
=
pk
[
dnum
];
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
p
.
pack_ind
=
mypack2
.
p
.
msg
.
addDData
(
sid
,
defval
);
p
.
pack_ind
=
mypack2
.
msg
.
addDData
(
sid
,
defval
);
mypack2
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
mypack2
.
msg
.
header
.
procID
=
shm
->
ID
();
}
}
p
.
pack_num
=
dnum
;
p
.
pack_num
=
dnum
;
...
@@ -495,7 +495,7 @@ namespace uniset
...
@@ -495,7 +495,7 @@ namespace uniset
{
{
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
uniset_rwmutex_wrlock
l
(
mypack
.
mut
);
p
.
pack_ind
=
mypack
.
p
.
msg
.
addAData
(
sid
,
defval
);
p
.
pack_ind
=
mypack
.
msg
.
addAData
(
sid
,
defval
);
}
}
if
(
p
.
pack_ind
>=
maxAData
)
if
(
p
.
pack_ind
>=
maxAData
)
...
@@ -507,9 +507,9 @@ namespace uniset
...
@@ -507,9 +507,9 @@ namespace uniset
auto
&
mypack2
=
pk
[
anum
];
auto
&
mypack2
=
pk
[
anum
];
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
uniset_rwmutex_wrlock
l2
(
mypack2
.
mut
);
p
.
pack_ind
=
mypack2
.
p
.
msg
.
addAData
(
sid
,
defval
);
p
.
pack_ind
=
mypack2
.
msg
.
addAData
(
sid
,
defval
);
mypack2
.
p
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
msg
.
header
.
nodeID
=
uniset_conf
()
->
getLocalNode
();
mypack2
.
p
.
msg
.
header
.
procID
=
shm
->
ID
();
mypack2
.
msg
.
header
.
procID
=
shm
->
ID
();
}
}
p
.
pack_num
=
anum
;
p
.
pack_num
=
anum
;
...
@@ -589,8 +589,8 @@ namespace uniset
...
@@ -589,8 +589,8 @@ namespace uniset
for
(
const
auto
&
pack
:
i
->
second
)
for
(
const
auto
&
pack
:
i
->
second
)
{
{
//uniset_rwmutex_rlock l(p->mut);
//uniset_rwmutex_rlock l(p->mut);
s
<<
"
\t\t
["
<<
(
n
++
)
<<
"]="
<<
pack
.
p
.
msg
.
len
(
)
<<
" bytes"
s
<<
"
\t\t
["
<<
(
n
++
)
<<
"]="
<<
sizeof
(
pack
.
msg
)
<<
" bytes"
<<
" ( numA="
<<
setw
(
5
)
<<
pack
.
p
.
msg
.
asize
()
<<
" numD="
<<
setw
(
5
)
<<
pack
.
p
.
msg
.
dsize
()
<<
")"
<<
" ( numA="
<<
setw
(
5
)
<<
pack
.
msg
.
asize
()
<<
" numD="
<<
setw
(
5
)
<<
pack
.
msg
.
dsize
()
<<
")"
<<
endl
;
<<
endl
;
}
}
}
}
...
...
extensions/UNetUDP/UNetSender.h
View file @
7c8d5a65
...
@@ -114,12 +114,12 @@ namespace uniset
...
@@ -114,12 +114,12 @@ namespace uniset
struct
PackMessage
struct
PackMessage
{
{
PackMessage
(
uniset
::
UniSetUDP
::
UDP
Packet
&&
p
)
noexcept
:
p
(
std
::
move
(
p
))
{}
PackMessage
(
uniset
::
UniSetUDP
::
UDP
Message
&&
m
)
noexcept
:
msg
(
std
::
move
(
m
))
{}
PackMessage
(
const
uniset
::
UniSetUDP
::
UDP
Packet
&
p
)
=
delete
;
PackMessage
(
const
uniset
::
UniSetUDP
::
UDP
Message
&
m
)
=
delete
;
PackMessage
()
noexcept
{}
PackMessage
()
noexcept
{}
uniset
::
UniSetUDP
::
UDP
Packet
p
;
uniset
::
UniSetUDP
::
UDP
Message
msg
;
uniset
::
uniset_rwmutex
mut
;
uniset
::
uniset_rwmutex
mut
;
};
};
...
...
extensions/UNetUDP/tests/test_unetudp.cc
View file @
7c8d5a65
This diff is collapsed.
Click to expand it.
extensions/UNetUDP/tests/urecv_perf_test.cc
View file @
7c8d5a65
...
@@ -59,18 +59,18 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
...
@@ -59,18 +59,18 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
}
}
}
}
UniSetUDP
::
UDP
Packet
mypack
;
UniSetUDP
::
UDP
Message
mypack
;
mypack
.
msg
.
header
.
nodeID
=
100
;
mypack
.
header
.
nodeID
=
100
;
mypack
.
msg
.
header
.
procID
=
100
;
mypack
.
header
.
procID
=
100
;
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
{
{
UniSetUDP
::
UDPAData
d
(
i
,
i
);
UniSetUDP
::
UDPAData
d
(
i
,
i
);
mypack
.
msg
.
addAData
(
d
);
mypack
.
addAData
(
d
);
}
}
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
mypack
.
msg
.
addDData
(
i
,
i
);
mypack
.
addDData
(
i
,
i
);
for
(
size_t
i
=
0
;
i
<
max
;
i
++
)
for
(
size_t
i
=
0
;
i
<
max
;
i
++
)
{
{
...
@@ -96,7 +96,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
...
@@ -96,7 +96,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
while
(
nc
)
// -V654
while
(
nc
)
// -V654
{
{
mypack
.
msg
.
header
.
num
=
packetnum
++
;
mypack
.
header
.
num
=
packetnum
++
;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
// пакет опять должен иметь номер "1"
...
@@ -109,10 +109,10 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
...
@@ -109,10 +109,10 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{
{
if
(
udp
->
poll
(
100000
,
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
if
(
udp
->
poll
(
100000
,
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
{
{
size_t
ret
=
udp
->
sendBytes
(
mypack
.
raw
,
mypack
.
msg
.
len
(
));
size_t
ret
=
udp
->
sendBytes
(
&
mypack
,
sizeof
(
mypack
));
if
(
ret
<
mypack
.
msg
.
len
(
)
)
if
(
ret
<
sizeof
(
mypack
)
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
mypack
.
msg
.
len
(
)
<<
endl
;
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
sizeof
(
mypack
)
<<
endl
;
}
}
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
catch
(
Poco
::
Net
::
NetException
&
e
)
...
@@ -140,7 +140,7 @@ static void run_test( size_t max, const std::string& host )
...
@@ -140,7 +140,7 @@ static void run_test( size_t max, const std::string& host )
{
{
cout
<<
"create receiver: "
<<
host
<<
":"
<<
begPort
+
i
<<
endl
;
cout
<<
"create receiver: "
<<
host
<<
":"
<<
begPort
+
i
<<
endl
;
auto
r
=
make_shared
<
UNetReceiver
>
(
host
,
begPort
+
i
,
smiInstance
());
auto
r
=
make_shared
<
UNetReceiver
>
(
host
,
begPort
+
i
,
smiInstance
());
//
r->setLockUpdate(true);
r
->
setLockUpdate
(
true
);
vrecv
.
emplace_back
(
r
);
vrecv
.
emplace_back
(
r
);
}
}
...
@@ -177,9 +177,9 @@ int main(int argc, char* argv[] )
...
@@ -177,9 +177,9 @@ int main(int argc, char* argv[] )
auto
conf
=
uniset_init
(
argc
,
argv
);
auto
conf
=
uniset_init
(
argc
,
argv
);
if
(
argc
>
1
&&
!
strcmp
(
argv
[
1
],
"s"
)
)
if
(
argc
>
1
&&
!
strcmp
(
argv
[
1
],
"s"
)
)
run_senders
(
1
0
,
host
);
run_senders
(
1
,
host
);
else
else
run_test
(
1
0
,
host
);
run_test
(
1
,
host
);
return
0
;
return
0
;
}
}
...
...
extensions/UNetUDP/unet-udp-tester.cc
View file @
7c8d5a65
...
@@ -223,7 +223,7 @@ int main(int argc, char* argv[])
...
@@ -223,7 +223,7 @@ int main(int argc, char* argv[])
{
{
UDPReceiveU
udp
(
s_host
,
port
);
UDPReceiveU
udp
(
s_host
,
port
);
UniSetUDP
::
UDP
Packet
pack
;
UniSetUDP
::
UDP
Message
pack
;
unsigned
long
prev_num
=
1
;
unsigned
long
prev_num
=
1
;
int
nc
=
1
;
int
nc
=
1
;
...
@@ -255,7 +255,7 @@ int main(int argc, char* argv[])
...
@@ -255,7 +255,7 @@ int main(int argc, char* argv[])
continue
;
continue
;
}
}
size_t
ret
=
udp
.
receiveBytes
(
pack
.
raw
,
sizeof
(
pack
.
raw
)
);
size_t
ret
=
udp
.
receiveBytes
(
&
pack
,
sizeof
(
pack
)
);
if
(
ret
<
0
)
if
(
ret
<
0
)
{
{
...
@@ -269,9 +269,9 @@ int main(int argc, char* argv[])
...
@@ -269,9 +269,9 @@ int main(int argc, char* argv[])
continue
;
continue
;
}
}
pack
.
msg
.
ntoh
();
pack
.
ntoh
();
if
(
pack
.
msg
.
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
if
(
pack
.
header
.
magic
!=
UniSetUDP
::
UNETUDP_MAGICNUM
)
{
{
cerr
<<
"(recv): BAD PROTOCOL VERSION! [ need version '"
<<
UniSetUDP
::
UNETUDP_MAGICNUM
<<
"']"
<<
endl
;
cerr
<<
"(recv): BAD PROTOCOL VERSION! [ need version '"
<<
UniSetUDP
::
UNETUDP_MAGICNUM
<<
"']"
<<
endl
;
continue
;
continue
;
...
@@ -279,11 +279,11 @@ int main(int argc, char* argv[])
...
@@ -279,11 +279,11 @@ int main(int argc, char* argv[])
if
(
lost
)
if
(
lost
)
{
{
if
(
prev_num
!=
(
pack
.
msg
.
header
.
num
-
1
)
)
if
(
prev_num
!=
(
pack
.
header
.
num
-
1
)
)
cerr
<<
"WARNING! Incorrect sequence of packets! current="
<<
pack
.
msg
.
header
.
num
cerr
<<
"WARNING! Incorrect sequence of packets! current="
<<
pack
.
header
.
num
<<
" prev="
<<
prev_num
<<
endl
;
<<
" prev="
<<
prev_num
<<
endl
;
prev_num
=
pack
.
msg
.
header
.
num
;
prev_num
=
pack
.
header
.
num
;
}
}
npack
++
;
npack
++
;
...
@@ -293,7 +293,7 @@ int main(int argc, char* argv[])
...
@@ -293,7 +293,7 @@ int main(int argc, char* argv[])
<<
" bytes: "
<<
ret
<<
endl
;
<<
" bytes: "
<<
ret
<<
endl
;
if
(
show
)
if
(
show
)
cout
<<
"receive data: "
<<
pack
.
msg
<<
endl
;
cout
<<
"receive data: "
<<
pack
<<
endl
;
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
catch
(
Poco
::
Net
::
NetException
&
e
)
{
{
...
@@ -320,10 +320,9 @@ int main(int argc, char* argv[])
...
@@ -320,10 +320,9 @@ int main(int argc, char* argv[])
std
::
shared_ptr
<
UDPSocketU
>
udp
=
make_shared
<
UDPSocketU
>
(
s_host
,
port
);
std
::
shared_ptr
<
UDPSocketU
>
udp
=
make_shared
<
UDPSocketU
>
(
s_host
,
port
);
udp
->
setBroadcast
(
broadcast
);
udp
->
setBroadcast
(
broadcast
);
UniSetUDP
::
UDPPacket
mypack
;
UniSetUDP
::
UDPMessage
mypack
;
UDPMessage
*
msg
=
&
mypack
.
msg
;
mypack
.
header
.
nodeID
=
nodeID
;
msg
->
header
.
nodeID
=
nodeID
;
mypack
.
header
.
procID
=
procID
;
msg
->
header
.
procID
=
procID
;
if
(
!
a_data
.
empty
()
)
if
(
!
a_data
.
empty
()
)
{
{
...
@@ -332,7 +331,7 @@ int main(int argc, char* argv[])
...
@@ -332,7 +331,7 @@ int main(int argc, char* argv[])
for
(
const
auto
&
v
:
vlist
)
for
(
const
auto
&
v
:
vlist
)
{
{
UDPAData
d
(
v
.
si
.
id
,
v
.
val
);
UDPAData
d
(
v
.
si
.
id
,
v
.
val
);
m
sg
->
addAData
(
d
);
m
ypack
.
addAData
(
d
);
}
}
}
}
else
else
...
@@ -340,7 +339,7 @@ int main(int argc, char* argv[])
...
@@ -340,7 +339,7 @@ int main(int argc, char* argv[])
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
{
{
UDPAData
d
(
i
,
i
);
UDPAData
d
(
i
,
i
);
m
sg
->
addAData
(
d
);
m
ypack
.
addAData
(
d
);
}
}
}
}
...
@@ -349,12 +348,12 @@ int main(int argc, char* argv[])
...
@@ -349,12 +348,12 @@ int main(int argc, char* argv[])
auto
vlist
=
uniset
::
getSInfoList
(
d_data
,
nullptr
);
auto
vlist
=
uniset
::
getSInfoList
(
d_data
,
nullptr
);
for
(
const
auto
&
v
:
vlist
)
for
(
const
auto
&
v
:
vlist
)
m
sg
->
addDData
(
v
.
si
.
id
,
v
.
val
);
m
ypack
.
addDData
(
v
.
si
.
id
,
v
.
val
);
}
}
else
else
{
{
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
for
(
size_t
i
=
0
;
i
<
count
;
i
++
)
m
sg
->
addDData
(
i
,
i
);
m
ypack
.
addDData
(
i
,
i
);
}
}
Poco
::
Net
::
SocketAddress
sa
(
s_host
,
port
);
Poco
::
Net
::
SocketAddress
sa
(
s_host
,
port
);
...
@@ -367,7 +366,7 @@ int main(int argc, char* argv[])
...
@@ -367,7 +366,7 @@ int main(int argc, char* argv[])
while
(
nc
)
while
(
nc
)
{
{
m
sg
->
header
.
num
=
packetnum
++
;
m
ypack
.
header
.
num
=
packetnum
++
;
// при переходе через максимум (UniSetUDP::MaxPacketNum)
// при переходе через максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
// пакет опять должен иметь номер "1"
...
@@ -379,13 +378,13 @@ int main(int argc, char* argv[])
...
@@ -379,13 +378,13 @@ int main(int argc, char* argv[])
if
(
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
tout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
if
(
udp
->
poll
(
UniSetTimer
::
millisecToPoco
(
tout
),
Poco
::
Net
::
Socket
::
SELECT_WRITE
)
)
{
{
if
(
verb
)
if
(
verb
)
cout
<<
"(send): to addr="
<<
addr
<<
" d_count="
<<
m
sg
->
header
.
dcount
cout
<<
"(send): to addr="
<<
addr
<<
" d_count="
<<
m
ypack
.
header
.
dcount
<<
" a_count="
<<
m
sg
->
header
.
acount
<<
" bytes="
<<
msg
->
len
()
<<
endl
;
<<
" a_count="
<<
m
ypack
.
header
.
acount
<<
endl
;
size_t
ret
=
udp
->
sendBytes
(
mypack
.
raw
,
sizeof
(
mypack
.
raw
)
/* mypack.msg.len() */
);
size_t
ret
=
udp
->
sendBytes
(
&
mypack
,
sizeof
(
mypack
)
);
if
(
ret
<
msg
->
len
(
)
)
if
(
ret
<
sizeof
(
mypack
)
)
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
msg
->
len
(
)
<<
endl
;
cerr
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
sizeof
(
mypack
)
<<
endl
;
}
}
}
}
catch
(
Poco
::
Net
::
NetException
&
e
)
catch
(
Poco
::
Net
::
NetException
&
e
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment