Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
U
uniset2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UniSet project repositories
uniset2
Commits
ecf9499b
Commit
ecf9499b
authored
Dec 22, 2010
by
Pavel Vainerman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
UNet2: переделал обработку (теперь не используется minBufSize).
и сделал защиту от сбоя (или переполнения) счётчика..
parent
a44c9b90
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
121 additions
and
95 deletions
+121
-95
UNetExchange.cc
extensions/UDPExchange/UNetExchange.cc
+4
-2
UNetReceiver.cc
extensions/UDPExchange/UNetReceiver.cc
+74
-42
UNetReceiver.h
extensions/UDPExchange/UNetReceiver.h
+34
-17
UNetSender.cc
extensions/UDPExchange/UNetSender.cc
+6
-28
UNetSender.h
extensions/UDPExchange/UNetSender.h
+3
-6
No files found.
extensions/UDPExchange/UNetExchange.cc
View file @
ecf9499b
...
@@ -33,11 +33,12 @@ sender(0)
...
@@ -33,11 +33,12 @@ sender(0)
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
int
recvTimeout
=
conf
->
getArgPInt
(
"--unet-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
int
recvTimeout
=
conf
->
getArgPInt
(
"--unet-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
int
lostTimeout
=
conf
->
getArgPInt
(
"--unet-lost-timeout"
,
it
.
getProp
(
"lostTimeout"
),
recvTimeout
);
int
recvpause
=
conf
->
getArgPInt
(
"--unet-recvpause"
,
it
.
getProp
(
"recvpause"
),
10
);
int
recvpause
=
conf
->
getArgPInt
(
"--unet-recvpause"
,
it
.
getProp
(
"recvpause"
),
10
);
int
sendpause
=
conf
->
getArgPInt
(
"--unet-sendpause"
,
it
.
getProp
(
"sendpause"
),
150
);
int
sendpause
=
conf
->
getArgPInt
(
"--unet-sendpause"
,
it
.
getProp
(
"sendpause"
),
150
);
int
updatepause
=
conf
->
getArgPInt
(
"--unet-updatepause"
,
it
.
getProp
(
"updatepause"
),
100
);
int
updatepause
=
conf
->
getArgPInt
(
"--unet-updatepause"
,
it
.
getProp
(
"updatepause"
),
100
);
steptime
=
conf
->
getArgPInt
(
"--unet-steptime"
,
it
.
getProp
(
"steptime"
),
1000
);
steptime
=
conf
->
getArgPInt
(
"--unet-steptime"
,
it
.
getProp
(
"steptime"
),
1000
);
int
m
inBufSize
=
conf
->
getArgPInt
(
"--unet-minbufsize"
,
it
.
getProp
(
"minBufSize"
),
3
0
);
int
m
axDiff
=
conf
->
getArgPInt
(
"--unet-maxdifferense"
,
it
.
getProp
(
"maxDifferense"
),
100
0
);
int
maxProcessingCount
=
conf
->
getArgPInt
(
"--unet-maxprocessingcount"
,
it
.
getProp
(
"maxProcessingCount"
),
100
);
int
maxProcessingCount
=
conf
->
getArgPInt
(
"--unet-maxprocessingcount"
,
it
.
getProp
(
"maxProcessingCount"
),
100
);
no_sender
=
conf
->
getArgInt
(
"--unet-nosender"
,
it
.
getProp
(
"nosender"
));
no_sender
=
conf
->
getArgInt
(
"--unet-nosender"
,
it
.
getProp
(
"nosender"
));
...
@@ -86,9 +87,10 @@ sender(0)
...
@@ -86,9 +87,10 @@ sender(0)
UNetReceiver
*
r
=
new
UNetReceiver
(
h
,
p
,
shm
);
UNetReceiver
*
r
=
new
UNetReceiver
(
h
,
p
,
shm
);
r
->
setReceiveTimeout
(
recvTimeout
);
r
->
setReceiveTimeout
(
recvTimeout
);
r
->
setLostTimeout
(
lostTimeout
);
r
->
setReceivePause
(
recvpause
);
r
->
setReceivePause
(
recvpause
);
r
->
setUpdatePause
(
updatepause
);
r
->
setUpdatePause
(
updatepause
);
r
->
setM
inBudSize
(
minBufSize
);
r
->
setM
axDifferens
(
maxDiff
);
r
->
setMaxProcessingCount
(
maxProcessingCount
);
r
->
setMaxProcessingCount
(
maxProcessingCount
);
recvlist
.
push_back
(
r
);
recvlist
.
push_back
(
r
);
}
}
...
...
extensions/UDPExchange/UNetReceiver.cc
View file @
ecf9499b
...
@@ -20,12 +20,17 @@ UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port,
...
@@ -20,12 +20,17 @@ UNetReceiver::UNetReceiver( const std::string s_host, const ost::tpport_t port,
shm
(
smi
),
shm
(
smi
),
recvpause
(
10
),
recvpause
(
10
),
updatepause
(
100
),
updatepause
(
100
),
recvTimeout
(
5000
),
udp
(
0
),
udp
(
0
),
recvTimeout
(
5000
),
lostTimeout
(
5000
),
lostPackets
(
0
),
activated
(
false
),
activated
(
false
),
r_thr
(
0
),
r_thr
(
0
),
u_thr
(
0
),
u_thr
(
0
),
minBufSize
(
30
),
pnum
(
0
),
maxDifferens
(
1000
),
waitClean
(
false
),
rnum
(
0
),
maxProcessingCount
(
100
),
maxProcessingCount
(
100
),
icache
(
200
),
icache
(
200
),
cache_init_ok
(
false
)
cache_init_ok
(
false
)
...
@@ -67,25 +72,26 @@ UNetReceiver::~UNetReceiver()
...
@@ -67,25 +72,26 @@ UNetReceiver::~UNetReceiver()
delete
udp
;
delete
udp
;
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetReceiver
::
setReceiveTimeout
(
in
t
msec
)
void
UNetReceiver
::
setReceiveTimeout
(
timeout_
t
msec
)
{
{
recvTimeout
=
msec
;
recvTimeout
=
msec
;
ptRecvTimeout
.
setTiming
(
msec
);
ptRecvTimeout
.
setTiming
(
msec
);
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetReceiver
::
set
ReceivePause
(
in
t
msec
)
void
UNetReceiver
::
set
LostTimeout
(
timeout_
t
msec
)
{
{
recvpause
=
msec
;
lostTimeout
=
msec
;
ptLostTimeout
.
setTiming
(
msec
);
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetReceiver
::
set
UpdatePause
(
in
t
msec
)
void
UNetReceiver
::
set
ReceivePause
(
timeout_
t
msec
)
{
{
update
pause
=
msec
;
recv
pause
=
msec
;
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetReceiver
::
set
MinBudSize
(
int
set
)
void
UNetReceiver
::
set
UpdatePause
(
timeout_t
msec
)
{
{
minBufSize
=
set
;
updatepause
=
msec
;
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetReceiver
::
setMaxProcessingCount
(
int
set
)
void
UNetReceiver
::
setMaxProcessingCount
(
int
set
)
...
@@ -93,6 +99,11 @@ void UNetReceiver::setMaxProcessingCount( int set )
...
@@ -93,6 +99,11 @@ void UNetReceiver::setMaxProcessingCount( int set )
maxProcessingCount
=
set
;
maxProcessingCount
=
set
;
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetReceiver
::
setMaxDifferens
(
unsigned
long
set
)
{
maxDifferens
=
set
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
start
()
void
UNetReceiver
::
start
()
{
{
if
(
!
activated
)
if
(
!
activated
)
...
@@ -128,41 +139,41 @@ void UNetReceiver::update()
...
@@ -128,41 +139,41 @@ void UNetReceiver::update()
void
UNetReceiver
::
real_update
()
void
UNetReceiver
::
real_update
()
{
{
UniSetUDP
::
UDPMessage
p
;
UniSetUDP
::
UDPMessage
p
;
bool
buf_ok
=
false
;
// обрабатываем пока, очередь либо не опустеет
{
// либо обнаружится "дырка" в последовательности
uniset_mutex_lock
l
(
packMutex
);
// но при этом обрабатываем не больше maxProcessingCount
if
(
qpack
.
size
()
<=
minBufSize
)
// за один раз..
return
;
buf_ok
=
true
;
}
int
k
=
maxProcessingCount
;
int
k
=
maxProcessingCount
;
while
(
buf_ok
&&
k
>
0
)
while
(
k
>
0
)
{
{
{
{
// lock qpack
uniset_mutex_lock
l
(
packMutex
);
uniset_mutex_lock
l
(
packMutex
);
if
(
qpack
.
empty
()
)
return
;
p
=
qpack
.
top
();
p
=
qpack
.
top
();
qpack
.
pop
();
}
if
(
labs
(
p
.
msg
.
header
.
num
-
pnum
)
>
1
)
if
(
pnum
>
0
&&
labs
(
p
.
msg
.
header
.
num
-
pnum
)
>
1
)
{
{
dlog
[
Debug
::
CRIT
]
<<
"************ FAILED! ORDER PACKETS! recv.num="
<<
pack
.
msg
.
header
.
num
if
(
!
ptLostTimeout
.
checkTime
()
)
<<
" num="
<<
pnum
<<
endl
;
return
;
}
pnum
=
p
.
msg
.
header
.
num
;
lostPackets
++
;
k
--
;
}
{
ptLostTimeout
.
reset
();
uniset_mutex_lock
l
(
packMutex
);
buf_ok
=
(
qpack
.
size
()
>
minBufSize
);
// удаляем из очереди, только если
}
// всё в порядке с последовательностью..
qpack
.
pop
();
pnum
=
p
.
msg
.
header
.
num
;
}
// unlock qpack
k
--
;
initCache
(
p
,
!
cache_init_ok
);
initCache
(
p
,
!
cache_init_ok
);
for
(
in
t
i
=
0
;
i
<
p
.
msg
.
header
.
dcount
;
i
++
)
for
(
size_
t
i
=
0
;
i
<
p
.
msg
.
header
.
dcount
;
i
++
)
{
{
try
try
{
{
...
@@ -238,25 +249,46 @@ bool UNetReceiver::recv()
...
@@ -238,25 +249,46 @@ bool UNetReceiver::recv()
ssize_t
ret
=
udp
->
UDPReceive
::
receive
(
&
(
pack
.
msg
),
sizeof
(
pack
.
msg
));
ssize_t
ret
=
udp
->
UDPReceive
::
receive
(
&
(
pack
.
msg
),
sizeof
(
pack
.
msg
));
if
(
ret
<
sizeof
(
UniSetUDP
::
UDPHeader
)
)
if
(
ret
<
sizeof
(
UniSetUDP
::
UDPHeader
)
)
{
{
cerr
<<
myname
<<
"(receive): FAILED header ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
UniSetUDP
::
UDPHeader
)
<<
endl
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(receive): FAILED header ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
UniSetUDP
::
UDPHeader
)
<<
endl
;
return
false
;
return
false
;
}
}
ssize_t
sz
=
pack
.
msg
.
header
.
dcount
*
sizeof
(
UniSetUDP
::
UDPData
)
+
sizeof
(
UniSetUDP
::
UDPHeader
);
ssize_t
sz
=
pack
.
msg
.
header
.
dcount
*
sizeof
(
UniSetUDP
::
UDPData
)
+
sizeof
(
UniSetUDP
::
UDPHeader
);
if
(
ret
<
sz
)
if
(
ret
<
sz
)
{
{
cerr
<<
myname
<<
"(receive): FAILED data ret="
<<
ret
<<
" sizeof="
<<
sz
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(receive): FAILED data ret="
<<
ret
<<
" sizeof="
<<
sz
<<
" packnum="
<<
pack
.
msg
.
header
.
num
<<
endl
;
<<
" packnum="
<<
pack
.
msg
.
header
.
num
<<
endl
;
return
false
;
return
false
;
}
}
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header << endl;
// << " header: " << pack.msg.header << endl;
{
uniset_mutex_lock
l
(
packMutex
);
if
(
rnum
>
0
&&
labs
(
pack
.
msg
.
header
.
num
-
rnum
)
>
maxDifferens
)
qpack
.
push
(
pack
);
waitClean
=
true
;
}
{
// lock qpack
uniset_mutex_lock
l
(
packMutex
,
500
);
if
(
!
waitClean
)
{
qpack
.
push
(
pack
);
return
true
;
}
if
(
!
qpack
.
empty
()
)
qtmp
.
push
(
pack
);
else
{
// очередь освободилась..
// то копируем в неё всё что набралось...
while
(
!
qtmp
.
empty
()
)
{
qpack
.
push
(
qtmp
.
top
());
qtmp
.
pop
();
}
waitClean
=
false
;
}
}
// unlock qpack
return
true
;
return
true
;
}
}
...
@@ -279,7 +311,7 @@ void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
...
@@ -279,7 +311,7 @@ void UNetReceiver::initCache( UniSetUDP::UDPMessage& pack, bool force )
cache_init_ok
=
true
;
cache_init_ok
=
true
;
icache
.
resize
(
pack
.
msg
.
header
.
dcount
);
icache
.
resize
(
pack
.
msg
.
header
.
dcount
);
for
(
in
t
i
=
0
;
i
<
icache
.
size
();
i
++
)
for
(
size_
t
i
=
0
;
i
<
icache
.
size
();
i
++
)
{
{
ItemInfo
&
d
(
icache
[
i
]);
ItemInfo
&
d
(
icache
[
i
]);
if
(
d
.
id
!=
pack
.
msg
.
dat
[
i
].
id
)
if
(
d
.
id
!=
pack
.
msg
.
dat
[
i
].
id
)
...
...
extensions/UDPExchange/UNetReceiver.h
View file @
ecf9499b
...
@@ -18,10 +18,10 @@
...
@@ -18,10 +18,10 @@
* Собственно реализация сделана так:
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше).
И при этом эта очередь постоянно поддерживается наполненной на minBufSize записей.
* (чем меньше тем старше).
При этом обработка ведётся только тех пакетов, которые идут "подряд",
*
Это гарантирует, что соседние пакеты пришедшие не в той последовательности, тем не менее обработаны будут в правильной.
*
как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
*
Т.к. в очереди они "отсортируются" по номеру пакета, ещё до обработки
.
*
"дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.
.
*
*
Всё это реализовано в функции UNetReceiver::real_update()
*
*
* КЭШ
* КЭШ
* ===
* ===
...
@@ -32,7 +32,16 @@
...
@@ -32,7 +32,16 @@
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*/
*
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
* =========================================================================
* Для защиты от сбоя счётика сделана следующая логика:
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
*/
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class
UNetReceiver
class
UNetReceiver
{
{
...
@@ -46,12 +55,14 @@ class UNetReceiver
...
@@ -46,12 +55,14 @@ class UNetReceiver
void
update
();
void
update
();
inline
bool
isRecvOK
(){
return
ptRecvTimeout
.
checkTime
();
}
inline
bool
isRecvOK
(){
return
ptRecvTimeout
.
checkTime
();
}
inline
unsigned
long
getLostPacketsNum
(){
return
lostPackets
;
}
void
setReceiveTimeout
(
int
msec
);
void
setReceiveTimeout
(
timeout_t
msec
);
void
setReceivePause
(
int
msec
);
void
setReceivePause
(
timeout_t
msec
);
void
setUpdatePause
(
int
msec
);
void
setUpdatePause
(
timeout_t
msec
);
void
setLostTimeout
(
timeout_t
msec
);
void
setMaxDifferens
(
unsigned
long
set
);
void
setMinBudSize
(
int
set
);
void
setMaxProcessingCount
(
int
set
);
void
setMaxProcessingCount
(
int
set
);
inline
ost
::
IPV4Address
getAddress
(){
return
addr
;
}
inline
ost
::
IPV4Address
getAddress
(){
return
addr
;
}
...
@@ -80,8 +91,11 @@ class UNetReceiver
...
@@ -80,8 +91,11 @@ class UNetReceiver
UniSetTypes
::
uniset_mutex
pollMutex
;
UniSetTypes
::
uniset_mutex
pollMutex
;
PassiveTimer
ptRecvTimeout
;
PassiveTimer
ptRecvTimeout
;
int
recvTimeout
;
timeout_t
recvTimeout
;
timeout_t
lostTimeout
;
PassiveTimer
ptLostTimeout
;
unsigned
long
lostPackets
;
/*!< счётчик потерянных пакетов */
bool
activated
;
bool
activated
;
ThreadCreator
<
UNetReceiver
>*
r_thr
;
// receive thread
ThreadCreator
<
UNetReceiver
>*
r_thr
;
// receive thread
...
@@ -96,16 +110,19 @@ class UNetReceiver
...
@@ -96,16 +110,19 @@ class UNetReceiver
};
};
typedef
std
::
priority_queue
<
UniSetUDP
::
UDPMessage
,
std
::
vector
<
UniSetUDP
::
UDPMessage
>
,
PacketCompare
>
PacketQueue
;
typedef
std
::
priority_queue
<
UniSetUDP
::
UDPMessage
,
std
::
vector
<
UniSetUDP
::
UDPMessage
>
,
PacketCompare
>
PacketQueue
;
PacketQueue
qpack
;
/*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
PacketQueue
qpack
;
/*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP
::
UDPMessage
pack
;
/*!< пр
сто буфер для получения очерещ
ного сообщения */
UniSetUDP
::
UDPMessage
pack
;
/*!< пр
осто буфер для получения очереl
ного сообщения */
UniSetTypes
::
uniset_mutex
packMutex
;
/*!< mutex для работы с очередью */
UniSetTypes
::
uniset_mutex
packMutex
;
/*!< mutex для работы с очередью */
long
pnum
;
/*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
unsigned
long
pnum
;
/*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! Минимальный размер очереди.
/*! максимальная разница межд номерами пакетов, при которой считается, что счётчик пакетов
* Предназначен для создания буфера, чтобы обработка сообщений шла
* прошёл через максимум или сбился...
* в порядке возрастания номеров пакетов. Даже если при приёме последовательность нарушалась
*/
*/
int
minBufSize
;
unsigned
long
maxDifferens
;
PacketQueue
qtmp
;
/*!< очередь на время обработки(очистки) основной очереди */
bool
waitClean
;
/*!< флаг означающий, что ждём очистики очереди до конца */
unsigned
long
rnum
;
/*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
int
maxProcessingCount
;
/*! максимальное число обрабатываемых за один раз сообщений */
int
maxProcessingCount
;
/*! максимальное число обрабатываемых за один раз сообщений */
struct
ItemInfo
struct
ItemInfo
...
...
extensions/UDPExchange/UNetSender.cc
View file @
ecf9499b
...
@@ -60,6 +60,11 @@ s_thr(0)
...
@@ -60,6 +60,11 @@ s_thr(0)
}
}
else
else
ic
->
addReadItem
(
sigc
::
mem_fun
(
this
,
&
UNetSender
::
readItem
)
);
ic
->
addReadItem
(
sigc
::
mem_fun
(
this
,
&
UNetSender
::
readItem
)
);
// выставляем поля, которые не меняются
mypack
.
msg
.
header
.
nodeID
=
conf
->
getLocalNode
();
mypack
.
msg
.
header
.
procID
=
shm
->
ID
();
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
UNetSender
::~
UNetSender
()
UNetSender
::~
UNetSender
()
...
@@ -128,13 +133,7 @@ void UNetSender::send()
...
@@ -128,13 +133,7 @@ void UNetSender::send()
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetSender
::
real_send
()
void
UNetSender
::
real_send
()
{
{
// cout << myname << ": send..." << endl;
mypack
.
msg
.
header
.
num
=
packetnum
++
;
UniSetUDP
::
UDPHeader
h
;
h
.
nodeID
=
conf
->
getLocalNode
();
h
.
procID
=
shm
->
ID
();
h
.
dcount
=
mypack
.
msg
.
header
.
dcount
;
h
.
num
=
packetnum
++
;
mypack
.
msg
.
header
=
h
;
// cout << "************* send header: " << mypack.msg.header << endl;
// cout << "************* send header: " << mypack.msg.header << endl;
int
sz
=
mypack
.
byte_size
()
+
sizeof
(
UniSetUDP
::
UDPHeader
);
int
sz
=
mypack
.
byte_size
()
+
sizeof
(
UniSetUDP
::
UDPHeader
);
...
@@ -143,28 +142,7 @@ void UNetSender::real_send()
...
@@ -143,28 +142,7 @@ void UNetSender::real_send()
ssize_t
ret
=
udp
->
send
(
(
char
*
)
&
(
mypack
.
msg
),
sz
);
ssize_t
ret
=
udp
->
send
(
(
char
*
)
&
(
mypack
.
msg
),
sz
);
if
(
ret
<
sz
)
if
(
ret
<
sz
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
sz
<<
endl
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
sz
<<
endl
;
return
;
}
// если вышли за границы..
// посылаем несколько одинаковых пакетов с новыми номерами..
// т.к. первый будет откинут (см. UNetReceiver::update)
if
(
packetnum
>=
UniSetUDP
::
MaxPacketNum
)
{
packetnum
=
1
;
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
mypack
.
msg
.
header
.
num
=
packetnum
++
;
if
(
udp
->
isPending
(
ost
::
Socket
::
pendingOutput
)
)
{
ssize_t
ret
=
udp
->
send
(
(
char
*
)
&
(
mypack
.
msg
),
sz
);
if
(
res
<
sz
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(send): FAILED. ret="
<<
ret
<<
" < sizeof="
<<
sz
<<
endl
;
}
}
}
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void
UNetSender
::
start
()
void
UNetSender
::
start
()
...
...
extensions/UDPExchange/UNetSender.h
View file @
ecf9499b
...
@@ -16,11 +16,8 @@
...
@@ -16,11 +16,8 @@
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
/*
/*
* Для защиты от потери пакета при переполнении "номера пакета".
* Для защиты от потери пакета при переполнении "номера пакета".
* UNetReceiver при обнаружении "разырва" в последовательнности, просто игнорирует пакет, обновляет счётчик
* UNetReceiver при обнаружении "разрыва" в последовательнности, просто игнорирует пакет, обновляет счётчик
* и начинает обработку уже со следующего.
* и начинает обработку пока буфер опять не заполнится..
* Соотвественно здесь, реализован следующий механизм: При переходе номера пакета через maxnum,
* в сеть посылается один и тоже пакет данных с номерами идущими подряд.
* В результате первй будет откинут, как идущий "не подряд", а второй - будет обработан.
*/
*/
class
UNetSender
class
UNetSender
{
{
...
@@ -84,7 +81,7 @@ class UNetSender
...
@@ -84,7 +81,7 @@ class UNetSender
typedef
std
::
vector
<
UItem
>
DMap
;
typedef
std
::
vector
<
UItem
>
DMap
;
DMap
dlist
;
DMap
dlist
;
int
maxItem
;
int
maxItem
;
long
packetnum
;
unsigned
long
packetnum
;
ThreadCreator
<
UNetSender
>*
s_thr
;
// send thread
ThreadCreator
<
UNetSender
>*
s_thr
;
// send thread
};
};
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment