Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
U
uniset2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UniSet project repositories
uniset2
Commits
f8916917
Commit
f8916917
authored
Dec 27, 2010
by
Pavel Vainerman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Перенёс UNet2 в каталог UNet2
parent
27589353
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
648 additions
and
1531 deletions
+648
-1531
Makefile.am
extensions/UNet2/Makefile.am
+10
-29
UNetExchange.cc
extensions/UNet2/UNetExchange.cc
+129
-336
UNetExchange.h
extensions/UNet2/UNetExchange.h
+17
-49
UNetPacket.cc
extensions/UNet2/UNetPacket.cc
+0
-38
UNetPacket.h
extensions/UNet2/UNetPacket.h
+0
-50
UNetReceiver.cc
extensions/UNet2/UNetReceiver.cc
+275
-358
UNetReceiver.h
extensions/UNet2/UNetReceiver.h
+106
-42
UNetSender.cc
extensions/UNet2/UNetSender.cc
+63
-383
UNetSender.h
extensions/UNet2/UNetSender.h
+28
-51
start_fg.sh
extensions/UNet2/start_fg.sh
+2
-3
unetexchange.cc
extensions/UNet2/unetexchange.cc
+18
-14
unetreceiver.cc
extensions/UNet2/unetreceiver.cc
+0
-90
unetsender.cc
extensions/UNet2/unetsender.cc
+0
-88
No files found.
extensions/UNet2/Makefile.am
View file @
f8916917
bin_PROGRAMS
=
@PACKAGE@-unet-sender @PACKAGE@-unet-receiver
#@PACKAGE@-unetexchange
bin_PROGRAMS
=
@PACKAGE@-unetexchange
lib_LTLIBRARIES
=
libUniSetUNet2.la
libUniSetUNet2_la_LIBADD
=
$(top_builddir)
/lib/libUniSet.la
\
...
...
@@ -7,39 +6,21 @@ libUniSetUNet2_la_LIBADD = $(top_builddir)/lib/libUniSet.la \
$(top_builddir)
/extensions/lib/libUniSetExtensions.la
\
$(SIGC_LIBS)
$(COMCPP_LIBS)
libUniSetUNet2_la_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
libUniSetUNet2_la_SOURCES
=
UNetPacket.cc UNetSender.cc UNetReceiver.cc
#UNetExchange.cc
libUniSetUNet2_la_SOURCES
=
UDPPacket.cc UNetReceiver.cc UNetSender.cc UNetExchange.cc
#UNetSender.cc
#@PACKAGE@_unetexchange_SOURCES = unetexchange.cc
#@PACKAGE@_unetexchange_LDADD = libUniSetUNet.la $(top_builddir)/lib/libUniSet.la \
# $(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
# $(top_builddir)/extensions/lib/libUniSetExtensions.la \
# $(SIGC_LIBS) $(COMCPP_LIBS)
#@PACKAGE@_unetexchange_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@
_unet_sender_SOURCES
=
unetsender.cc
@PACKAGE@
_unet_sender_LDADD
=
libUniSetUNet2.la
$(top_builddir)
/lib/libUniSet.la
\
@PACKAGE@
_unetexchange_SOURCES
=
unetexchange.cc
@PACKAGE@
_unetexchange_LDADD
=
libUniSetUNet2.la
$(top_builddir)
/lib/libUniSet.la
\
$(top_builddir)
/extensions/SharedMemory/libUniSetSharedMemory.la
\
$(top_builddir)
/extensions/lib/libUniSetExtensions.la
\
$(SIGC_LIBS)
$(COMCPP_LIBS)
@PACKAGE@
_unet_sender_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
@PACKAGE@
_unet_receiver_SOURCES
=
unetreceiver.cc
@PACKAGE@
_unet_receiver_LDADD
=
libUniSetUNet2.la
$(top_builddir)
/lib/libUniSet.la
\
$(top_builddir)
/extensions/SharedMemory/libUniSetSharedMemory.la
\
$(top_builddir)
/extensions/lib/libUniSetExtensions.la
\
$(SIGC_LIBS)
$(COMCPP_LIBS)
@PACKAGE@
_unet_receiver_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
@PACKAGE@
_unetexchange_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
# install
#
devel_include_HEADERS = *.h
#
devel_includedir = $(pkgincludedir)/extensions
devel_include_HEADERS
=
*
.h
devel_includedir
=
$(pkgincludedir)
/extensions
#
pkgconfigdir = $(libdir)/pkgconfig
#pkgconfig_DATA = libUniSetUNet
.pc
pkgconfigdir
=
$(libdir)
/pkgconfig
pkgconfig_DATA
=
libUniSetUNet2
.pc
all-local
:
ln
-sf
../U
Net
Exchange/
$(devel_include_HEADERS)
../include
ln
-sf
../U
DP
Exchange/
$(devel_include_HEADERS)
../include
extensions/UNet2/UNetExchange.cc
View file @
f8916917
...
...
@@ -6,80 +6,98 @@
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
UNetExchange
::
UNetExchange
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmId
,
SharedMemory
*
ic
)
:
UniSetObject_LT
(
objId
),
shm
(
0
),
initPause
(
0
),
UNet
(
0
),
activated
(
false
),
dlist
(
100
),
maxItem
(
0
)
no_sender
(
false
),
sender
(
0
)
{
if
(
objId
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): objId=-1?!! Use --
UN
et-name"
);
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): objId=-1?!! Use --
un
et-name"
);
// xmlNode* cnode = conf->getNode(myname);
cnode
=
conf
->
getNode
(
myname
);
if
(
cnode
==
NULL
)
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): Not f
i
nd conf-node for "
+
myname
);
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): Not f
ou
nd conf-node for "
+
myname
);
shm
=
new
SMInterface
(
shmId
,
&
ui
,
objId
,
ic
);
UniXML_iterator
it
(
cnode
);
// определяем фильтр
s_field
=
conf
->
getArgParam
(
"--
UN
et-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--
UN
et-filter-value"
);
s_field
=
conf
->
getArgParam
(
"--
un
et-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--
un
et-filter-value"
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): read fileter-field='"
<<
s_field
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
s_host
=
conf
->
getArgParam
(
"--UNet-host"
,
it
.
getProp
(
"host"
));
if
(
s_host
.
empty
()
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetExchange): Unknown host. Use --UNet-host"
);
host
=
s_host
.
c_str
();
int
recvTimeout
=
conf
->
getArgPInt
(
"--unet-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
int
lostTimeout
=
conf
->
getArgPInt
(
"--unet-lost-timeout"
,
it
.
getProp
(
"lostTimeout"
),
recvTimeout
);
int
recvpause
=
conf
->
getArgPInt
(
"--unet-recvpause"
,
it
.
getProp
(
"recvpause"
),
10
);
int
sendpause
=
conf
->
getArgPInt
(
"--unet-sendpause"
,
it
.
getProp
(
"sendpause"
),
150
);
int
updatepause
=
conf
->
getArgPInt
(
"--unet-updatepause"
,
it
.
getProp
(
"updatepause"
),
100
);
steptime
=
conf
->
getArgPInt
(
"--unet-steptime"
,
it
.
getProp
(
"steptime"
),
1000
);
int
maxDiff
=
conf
->
getArgPInt
(
"--unet-maxdifferense"
,
it
.
getProp
(
"maxDifferense"
),
1000
);
int
maxProcessingCount
=
conf
->
getArgPInt
(
"--unet-maxprocessingcount"
,
it
.
getProp
(
"maxProcessingCount"
),
100
);
buildReceiverList
(
);
no_sender
=
conf
->
getArgInt
(
"--unet-nosender"
,
it
.
getProp
(
"nosender"
)
);
// port = conf->getArgInt("--UNet-port",it.getProp("port")
);
if
(
port
<=
0
||
port
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetExchange): Unknown port address"
);
xmlNode
*
nodes
=
conf
->
getXMLNodesSection
(
);
if
(
!
nodes
)
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): Not found <nodes>"
);
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
"(UNetExchange): UNet set to "
<<
s_host
<<
":"
<<
port
<<
endl
;
UniXML_iterator
n_it
(
nodes
);
if
(
!
n_it
.
goChildren
()
)
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): Items not found for <nodes>"
);
try
for
(
;
n_it
.
getCurrent
();
n_it
.
goNext
()
)
{
UNet
=
new
ost
::
UNetBroadcast
(
host
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
string
h
(
n_it
.
getProp
(
"ip"
));
if
(
!
n_it
.
getProp
(
"unet_ip"
).
empty
()
)
h
=
n_it
.
getProp
(
"unet_ip"
);
thr
=
new
ThreadCreator
<
UNetExchange
>
(
this
,
&
UNetExchange
::
poll
);
int
p
=
n_it
.
getIntProp
(
"id"
);
if
(
!
n_it
.
getProp
(
"unet_port"
).
empty
()
)
p
=
n_it
.
getIntProp
(
"unet_port"
);
recvTimeout
=
conf
->
getArgPInt
(
"--UNet-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
sendTimeout
=
conf
->
getArgPInt
(
"--UNet-send-timeout"
,
it
.
getProp
(
"sendTimeout"
),
5000
);
polltime
=
conf
->
getArgPInt
(
"--UNet-polltime"
,
it
.
getProp
(
"polltime"
),
100
);
string
n
(
n_it
.
getProp
(
"name"
));
if
(
n
==
conf
->
getLocalNodeName
()
)
{
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): init sender.. my node "
<<
n_it
.
getProp
(
"name"
)
<<
endl
;
sender
=
new
UNetSender
(
h
,
p
,
shm
,
s_field
,
s_fvalue
,
ic
);
sender
->
setSendPause
(
sendpause
);
continue
;
}
// -------------------------------
if
(
shm
->
isLocalwork
()
)
{
readConfiguration
();
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
if
(
!
n_it
.
getProp
(
"unet_ignore"
).
empty
()
)
{
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): unet_ignore.. for "
<<
n_it
.
getProp
(
"name"
)
<<
endl
;
continue
;
}
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): add UNetReceiver for "
<<
h
<<
":"
<<
p
<<
endl
;
if
(
checkExistUNetHost
(
h
,
p
)
)
{
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): "
<<
h
<<
":"
<<
p
<<
" already added! Ignore.."
<<
endl
;
continue
;
}
UNetReceiver
*
r
=
new
UNetReceiver
(
h
,
p
,
shm
);
r
->
setReceiveTimeout
(
recvTimeout
);
r
->
setLostTimeout
(
lostTimeout
);
r
->
setReceivePause
(
recvpause
);
r
->
setUpdatePause
(
updatepause
);
r
->
setMaxDifferens
(
maxDiff
);
r
->
setMaxProcessingCount
(
maxProcessingCount
);
recvlist
.
push_back
(
r
);
}
else
ic
->
addReadItem
(
sigc
::
mem_fun
(
this
,
&
UNetExchange
::
readItem
)
);
// -------------------------------
// ********** HEARTBEAT *************
string
heart
=
conf
->
getArgParam
(
"--
UN
et-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
string
heart
=
conf
->
getArgParam
(
"--
un
et-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
if
(
!
heart
.
empty
()
)
{
sidHeartBeat
=
conf
->
getSensorID
(
heart
);
...
...
@@ -97,7 +115,7 @@ maxItem(0)
else
ptHeartBeat
.
setTiming
(
UniSetTimer
::
WaitUpTime
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--
UN
et-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--
un
et-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
test_id
=
sidHeartBeat
;
}
else
...
...
@@ -116,25 +134,48 @@ maxItem(0)
activateTimeout
=
conf
->
getArgPInt
(
"--activate-timeout"
,
20000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--
UN
et-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--
un
et-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init):
UNet
-timeout="
<<
msec
<<
" msec"
<<
endl
;
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init):
udp
-timeout="
<<
msec
<<
" msec"
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetExchange
::~
UNetExchange
()
{
for
(
ReceiverList
::
iterator
it
=
r
list
.
begin
();
it
!=
rlist
.
end
();
it
++
)
for
(
ReceiverList
::
iterator
it
=
r
ecvlist
.
begin
();
it
!=
recvlist
.
end
();
++
it
)
delete
(
*
it
);
delete
UNet
;
delete
sender
;
delete
shm
;
delete
thr
;
}
// -----------------------------------------------------------------------------
bool
UNetExchange
::
checkExistUNetHost
(
const
std
::
string
addr
,
ost
::
tpport_t
port
)
{
ost
::
IPV4Address
a1
(
addr
.
c_str
());
for
(
ReceiverList
::
iterator
it
=
recvlist
.
begin
();
it
!=
recvlist
.
end
();
++
it
)
{
if
(
(
*
it
)
->
getAddress
()
==
a1
.
getAddress
()
&&
(
*
it
)
->
getPort
()
==
port
)
return
true
;
}
return
false
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
startReceivers
()
{
for
(
ReceiverList
::
iterator
it
=
recvlist
.
begin
();
it
!=
recvlist
.
end
();
++
it
)
(
*
it
)
->
start
();
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
initSender
(
const
std
::
string
s_host
,
const
ost
::
tpport_t
port
,
UniXML_iterator
&
it
)
{
if
(
no_sender
)
return
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
waitSMReady
()
{
// waiting for SM is ready...
int
ready_timeout
=
conf
->
getArgInt
(
"--
UN
et-sm-ready-timeout"
,
"15000"
);
int
ready_timeout
=
conf
->
getArgInt
(
"--
un
et-sm-ready-timeout"
,
"15000"
);
if
(
ready_timeout
==
0
)
ready_timeout
=
15000
;
else
if
(
ready_timeout
<
0
)
...
...
@@ -149,6 +190,15 @@ void UNetExchange::waitSMReady()
}
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
timerInfo
(
TimerMessage
*
tm
)
{
if
(
!
activated
)
return
;
if
(
tm
->
id
==
tmStep
)
step
();
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
step
()
{
if
(
!
activated
)
...
...
@@ -163,99 +213,11 @@ void UNetExchange::step()
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
}
}
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
poll
()
{
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
for
(
ReceiverList
::
iterator
it
=
rlist
.
begin
();
it
!=
rlist
.
end
();
it
++
)
{
(
*
it
)
->
setReceiveTimeout
(
recvTimeout
);
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(poll): start exchange for "
<<
(
*
it
)
->
getName
()
<<
endl
;
(
*
it
)
->
start
();
}
ost
::
IPV4Broadcast
h
=
s_host
.
c_str
();
try
{
UNet
->
setPeer
(
h
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(poll): "
<<
s
.
str
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
while
(
activated
)
{
try
{
send
();
}
catch
(
ost
::
SockException
&
e
)
{
cerr
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step): "
<<
ex
<<
std
::
endl
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(step): catch ..."
<<
std
::
endl
;
}
msleep
(
polltime
);
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
send
()
{
cout
<<
myname
<<
": send..."
<<
endl
;
/*
UniSetUNet::UNetHeader h;
h.nodeID = conf->getLocalNode();
h.procID = getId();
h.dcount = mypack.size();
if( UNet->isPending(ost::Socket::pendingOutput) )
{
ssize_t ret = UNet->send((char*)(&h),sizeof(h));
if( ret<(ssize_t)sizeof(h) )
{
cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sizeof(h) << endl;
return;
}
*/
#warning use mutex for list!!!
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
it
=
mypack
.
dlist
.
begin
();
for
(
;
it
!=
mypack
.
dlist
.
end
();
++
it
)
{
// while( !UNet->isPending(ost::Socket::pendingOutput) )
// msleep(30);
cout
<<
myname
<<
"(send): "
<<
(
*
it
)
<<
endl
;
ssize_t
ret
=
UNet
->
send
((
char
*
)(
&
(
*
it
)),
sizeof
(
UniSetUNet
::
UNetData
));
if
(
ret
<
(
ssize_t
)
sizeof
(
UniSetUNet
::
UNetData
)
)
{
cerr
<<
myname
<<
"(send data): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
UniSetUNet
::
UNetData
)
<<
endl
;
break
;
}
}
// }
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
)
{
...
...
@@ -277,6 +239,13 @@ void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg)
}
break
;
case
Message
:
:
Timer
:
{
TimerMessage
tm
(
msg
);
timerInfo
(
&
tm
);
}
break
;
default
:
break
;
}
...
...
@@ -297,7 +266,7 @@ void UNetExchange::processingMessage(UniSetTypes::VoidMessage *msg)
}
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
void
UNetExchange
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
{
switch
(
sm
->
command
)
{
...
...
@@ -324,7 +293,10 @@ void UNetExchange::sysCommand(UniSetTypes::SystemMessage *sm)
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
10000
);
askSensors
(
UniversalIO
::
UIONotify
);
}
thr
->
start
();
askTimer
(
tmStep
,
steptime
);
startReceivers
();
if
(
sender
)
sender
->
start
();
}
case
SystemMessage
:
:
FoldUp
:
...
...
@@ -385,36 +357,12 @@ void UNetExchange::askSensors( UniversalIO::UIOCommand cmd )
kill
(
SIGTERM
,
getpid
());
// прерываем (перезапускаем) процесс...
throw
SystemError
(
err
.
str
());
}
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
try
{
shm
->
askSensor
(
it
->
si
.
id
,
cmd
);
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(askSensors): "
<<
ex
<<
std
::
endl
;
}
catch
(...){}
}
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
)
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
if
(
it
->
si
.
id
==
sm
->
id
)
{
uniset_spin_lock
lock
(
it
->
val_lock
);
it
->
val
=
sm
->
value
;
if
(
it
->
pack_it
!=
mypack
.
dlist
.
end
()
)
it
->
pack_it
->
val
=
sm
->
value
;
}
break
;
}
if
(
sender
)
sender
->
update
(
sm
->
id
,
sm
->
value
);
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
activateObject
()
...
...
@@ -437,201 +385,46 @@ void UNetExchange::sigterm( int signo )
{
cerr
<<
myname
<<
": ********* SIGTERM("
<<
signo
<<
") ********"
<<
endl
;
activated
=
false
;
UNet
->
disconnect
();
for
(
ReceiverList
::
iterator
it
=
rlist
.
begin
();
it
!=
rlist
.
end
();
it
++
)
(
*
it
)
->
stop
();
UniSetObject_LT
::
sigterm
(
signo
);
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
readConfiguration
()
{
#warning Сделать сортировку по диапазонам адресов!!!
// чтобы запрашивать одним запросом, сразу несколько входов...
// readconf_ok = false;
xmlNode
*
root
=
conf
->
getXMLSensorsSection
();
if
(
!
root
)
{
ostringstream
err
;
err
<<
myname
<<
"(readConfiguration): не нашли корневого раздела <sensors>"
;
throw
SystemError
(
err
.
str
());
}
UniXML_iterator
it
(
root
);
if
(
!
it
.
goChildren
()
)
{
std
::
cerr
<<
myname
<<
"(readConfiguration): раздел <sensors> не содержит секций ?!!
\n
"
;
return
;
}
for
(
;
it
.
getCurrent
();
it
.
goNext
()
)
{
if
(
check_item
(
it
)
)
initItem
(
it
);
}
// readconf_ok = true;
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
check_item
(
UniXML_iterator
&
it
)
{
if
(
s_field
.
empty
()
)
return
true
;
// просто проверка на не пустой field
if
(
s_fvalue
.
empty
()
&&
it
.
getProp
(
s_field
).
empty
()
)
return
false
;
// просто проверка что field = value
if
(
!
s_fvalue
.
empty
()
&&
it
.
getProp
(
s_field
)
!=
s_fvalue
)
return
false
;
return
true
;
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
readItem
(
UniXML
&
xml
,
UniXML_iterator
&
it
,
xmlNode
*
sec
)
{
if
(
check_item
(
it
)
)
initItem
(
it
);
return
true
;
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
initItem
(
UniXML_iterator
&
it
)
{
string
sname
(
it
.
getProp
(
"name"
)
);
string
tid
=
it
.
getProp
(
"id"
);
ObjectId
sid
;
if
(
!
tid
.
empty
()
)
{
sid
=
UniSetTypes
::
uni_atoi
(
tid
);
if
(
sid
<=
0
)
sid
=
DefaultObjectId
;
}
else
sid
=
conf
->
getSensorID
(
sname
);
if
(
sid
==
DefaultObjectId
)
{
if
(
dlog
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(readItem): ID not found for "
<<
sname
<<
endl
;
return
false
;
}
UItem
p
;
p
.
si
.
id
=
sid
;
p
.
si
.
node
=
conf
->
getLocalNode
();
mypack
.
addData
(
sid
,
0
);
p
.
pack_it
=
(
mypack
.
dlist
.
end
()
--
);
if
(
maxItem
>=
dlist
.
size
()
)
dlist
.
resize
(
maxItem
+
10
);
dlist
[
maxItem
]
=
p
;
maxItem
++
;
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(initItem): add "
<<
p
<<
endl
;
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
initIterators
()
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
it
++
)
{
shm
->
initDIterator
(
it
->
dit
);
shm
->
initAIterator
(
it
->
ait
);
}
shm
->
initAIterator
(
aitHeartBeat
);
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
help_print
(
int
argc
,
char
*
argv
[]
)
{
cout
<<
"--UNet-polltime msec - Пауза между опросаом карт. По умолчанию 200 мсек."
<<
endl
;
cout
<<
"--UNet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--UNet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--UNet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--UNet-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--UNet-notRespondSensor - датчик связи для данного процесса "
<<
endl
;
cout
<<
"--UNet-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола UNet: "
<<
endl
;
cout
<<
"--UNet-host [ip|hostname] - Адрес сервера"
<<
endl
;
cout
<<
"--UNet-send-timeout - Таймаут на посылку ответа."
<<
endl
;
cout
<<
"--unet-recvpause msec - Пауза между получением пакетов. По умолчанию 10 мсек."
<<
endl
;
cout
<<
"--unet-updatepause msec - Пауза между обновлением данных в SM. По умолчанию 100 мсек."
<<
endl
;
cout
<<
"--unet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--unet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--unet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--unet-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--unet-notRespondSensor - датчик связи для данного процесса "
<<
endl
;
cout
<<
"--unet-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола RS: "
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetExchange
*
UNetExchange
::
init_
UN
etexchange
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
UNetExchange
*
UNetExchange
::
init_
un
etexchange
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
{
string
name
=
conf
->
getArgParam
(
"--
UN
et-name"
,
"UNetExchange1"
);
string
name
=
conf
->
getArgParam
(
"--
un
et-name"
,
"UNetExchange1"
);
if
(
name
.
empty
()
)
{
cerr
<<
"(
UN
etexchange): Не задан name'"
<<
endl
;
cerr
<<
"(
un
etexchange): Не задан name'"
<<
endl
;
return
0
;
}
ObjectId
ID
=
conf
->
getObjectID
(
name
);
if
(
ID
==
UniSetTypes
::
DefaultObjectId
)
{
cerr
<<
"(
UN
etexchange): идентификатор '"
<<
name
cerr
<<
"(
un
etexchange): идентификатор '"
<<
name
<<
"' не найден в конф. файле!"
<<
" в секции "
<<
conf
->
getObjectsSection
()
<<
endl
;
return
0
;
}
dlog
[
Debug
::
INFO
]
<<
"(
rs
exchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
dlog
[
Debug
::
INFO
]
<<
"(
unet
exchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
return
new
UNetExchange
(
ID
,
icID
,
ic
);
}
// -----------------------------------------------------------------------------
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetExchange
::
UItem
&
p
)
{
return
os
<<
" sid="
<<
p
.
si
.
id
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
buildReceiverList
()
{
xmlNode
*
n
=
conf
->
getXMLNodesSection
();
if
(
!
n
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(buildReceiverList): <nodes> not found! ignore..."
<<
endl
;
return
;
}
UniXML_iterator
it
(
n
);
if
(
!
it
.
goChildren
()
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(buildReceiverList): <nodes> is empty?! ignore..."
<<
endl
;
return
;
}
for
(
;
it
.
getCurrent
();
it
.
goNext
()
)
{
ObjectId
n_id
=
conf
->
getNodeID
(
it
.
getProp
(
"name"
)
);
if
(
n_id
==
conf
->
getLocalNode
()
)
{
port
=
it
.
getIntProp
(
"UNet_port"
);
if
(
port
<=
0
)
port
=
n_id
;
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(buildReceiverList): init myport port="
<<
port
<<
endl
;
continue
;
}
int
p
=
it
.
getIntProp
(
"UNet_port"
);
if
(
p
<=
0
)
p
=
n_id
;
if
(
p
==
DefaultObjectId
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(buildReceiverList): node="
<<
it
.
getProp
(
"name"
)
<<
" unknown port. ignore..."
<<
endl
;
continue
;
}
UNetNReceiver
*
r
=
new
UNetNReceiver
(
p
,
host
,
shm
->
getSMID
(),
shm
->
SM
());
rlist
.
push_back
(
r
);
}
}
// ------------------------------------------------------------------------------------------
extensions/UNet2/UNetExchange.h
View file @
f8916917
...
...
@@ -3,7 +3,7 @@
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <
vector
>
#include <
queue
>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
...
...
@@ -11,8 +11,8 @@
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNet
Packet
.h"
#include "UNet
NReceiv
er.h"
#include "UNet
Receiver
.h"
#include "UNet
Send
er.h"
// -----------------------------------------------------------------------------
class
UNetExchange
:
public
UniSetObject_LT
...
...
@@ -22,27 +22,13 @@ class UNetExchange:
virtual
~
UNetExchange
();
/*! глобальная функция для инициализации объекта */
static
UNetExchange
*
init_
UN
etexchange
(
int
argc
,
char
*
argv
[],
static
UNetExchange
*
init_
un
etexchange
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
/*! глобальная функция для вывода help-а */
static
void
help_print
(
int
argc
,
char
*
argv
[]
);
struct
UItem
{
UItem
()
:
val
(
0
)
{}
IOController_i
::
SensorInfo
si
;
IOController
::
AIOStateList
::
iterator
ait
;
IOController
::
DIOStateList
::
iterator
dit
;
UniSetTypes
::
uniset_spin_mutex
val_lock
;
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
pack_it
;
long
val
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UItem
&
p
);
};
bool
checkExistUNetHost
(
const
std
::
string
host
,
ost
::
tpport_t
port
);
protected
:
...
...
@@ -51,16 +37,12 @@ class UNetExchange:
std
::
string
s_fvalue
;
SMInterface
*
shm
;
void
poll
();
void
recv
();
void
send
();
void
step
();
virtual
void
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
);
void
sysCommand
(
UniSetTypes
::
SystemMessage
*
msg
);
void
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
);
void
timerInfo
(
UniSetTypes
::
TimerMessage
*
tm
);
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
void
waitSMReady
();
...
...
@@ -70,12 +52,13 @@ class UNetExchange:
virtual
void
sigterm
(
int
signo
);
void
initIterators
();
bool
initItem
(
UniXML_iterator
&
it
);
bool
readItem
(
UniXML
&
xml
,
UniXML_iterator
&
it
,
xmlNode
*
sec
);
void
startReceivers
(
);
void
initSender
(
const
std
::
string
host
,
const
ost
::
tpport_t
port
,
UniXML_iterator
&
it
);
void
readConfiguration
();
bool
check_item
(
UniXML_iterator
&
it
);
void
buildReceiverList
();
enum
Timer
{
tmStep
};
private
:
UNetExchange
();
...
...
@@ -88,31 +71,16 @@ class UNetExchange:
IOController
::
AIOStateList
::
iterator
aitHeartBeat
;
UniSetTypes
::
ObjectId
test_id
;
int
polltime
;
/*!< переодичность обновления данных, [мсек] */
ost
::
UNetBroadcast
*
UNet
;
ost
::
IPV4Host
host
;
ost
::
tpport_t
port
;
std
::
string
s_host
;
UniSetTypes
::
uniset_mutex
pollMutex
;
Trigger
trTimeout
;
int
recvTimeout
;
int
sendTimeout
;
int
steptime
;
/*!< периодичность вызова step, [мсек] */
bool
activated
;
int
activateTimeout
;
UniSetUNet
::
UNetMessage
mypack
;
typedef
std
::
vector
<
UItem
>
DMap
;
DMap
dlist
;
int
maxItem
;
typedef
std
::
list
<
UNetNReceiver
*>
ReceiverList
;
ReceiverList
rlist
;
typedef
std
::
list
<
UNetReceiver
*>
ReceiverList
;
ReceiverList
recvlist
;
ThreadCreator
<
UNetExchange
>*
thr
;
bool
no_sender
;
/*!< флаг отключения посылки сообщений */
UNetSender
*
sender
;
};
// -----------------------------------------------------------------------------
#endif // UNetExchange_H_
...
...
extensions/UNet2/UNetPacket.cc
deleted
100644 → 0
View file @
27589353
#include "UNetPacket.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetUNet
;
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUNet
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUNet
::
UNetHeader
&
p
)
{
return
os
<<
"nodeID="
<<
p
.
nodeID
<<
" procID="
<<
p
.
procID
<<
" dcount="
<<
p
.
dcount
;
}
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUNet
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUNet
::
UNetData
&
p
)
{
return
os
<<
"id="
<<
p
.
id
<<
" val="
<<
p
.
val
;
}
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUNet
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUNet
::
UNetMessage
&
p
)
{
return
os
;
}
// -----------------------------------------------------------------------------
UNetMessage
::
UNetMessage
()
{
}
// -----------------------------------------------------------------------------
void
UNetMessage
::
addData
(
const
UniSetUNet
::
UNetData
&
dat
)
{
dlist
.
push_back
(
dat
);
}
// -----------------------------------------------------------------------------
void
UNetMessage
::
addData
(
long
id
,
long
val
)
{
UNetData
d
(
id
,
val
);
addData
(
d
);
}
// -----------------------------------------------------------------------------
extensions/UNet2/UNetPacket.h
deleted
100644 → 0
View file @
27589353
// $Id: UNetPacket.h,v 1.1 2009/02/10 20:38:27 vpashka Exp $
// -----------------------------------------------------------------------------
#ifndef UNetPacket_H_
#define UNetPacket_H_
// -----------------------------------------------------------------------------
#include <list>
#include <ostream>
#include "UniSetTypes.h"
// -----------------------------------------------------------------------------
namespace
UniSetUNet
{
struct
UNetHeader
{
long
nodeID
;
long
procID
;
long
dcount
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetHeader
&
p
);
}
__attribute__
((
packed
));
struct
UNetData
{
UNetData
()
:
id
(
UniSetTypes
::
DefaultObjectId
),
val
(
0
){}
UNetData
(
long
id
,
long
val
)
:
id
(
id
),
val
(
val
){}
long
id
;
long
val
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetData
&
p
);
}
__attribute__
((
packed
));
struct
UNetMessage
:
public
UNetHeader
{
UNetMessage
();
void
addData
(
const
UNetData
&
dat
);
void
addData
(
long
id
,
long
val
);
inline
int
size
(){
return
dlist
.
size
();
}
typedef
std
::
list
<
UNetData
>
UNetDataList
;
UNetDataList
dlist
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetMessage
&
p
);
};
}
// -----------------------------------------------------------------------------
#endif // UNetPacket_H_
// -----------------------------------------------------------------------------
extensions/UNet2/UNetReceiver.cc
View file @
f8916917
...
...
@@ -7,455 +7,372 @@ using namespace std;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
UNetReceiver
::
UNetReceiver
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmId
,
SharedMemory
*
ic
)
:
UniSetObject_LT
(
objId
),
shm
(
0
),
initPause
(
0
),
udp
(
0
),
activated
(
false
)
bool
UNetReceiver
::
PacketCompare
::
operator
()(
const
UniSetUDP
::
UDPMessage
&
lhs
,
const
UniSetUDP
::
UDPMessage
&
rhs
)
const
{
if
(
objId
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
"(UNetReceiver): objId=-1?!! Use --udp-name"
);
// xmlNode* cnode = conf->getNode(myname);
cnode
=
conf
->
getNode
(
myname
);
if
(
cnode
==
NULL
)
throw
UniSetTypes
::
SystemError
(
"(UNetReceiver): Not find conf-node for "
+
myname
);
shm
=
new
SMInterface
(
shmId
,
&
ui
,
objId
,
ic
);
UniXML_iterator
it
(
cnode
);
// определяем фильтр
s_field
=
conf
->
getArgParam
(
"--udp-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--udp-filter-value"
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): read fileter-field='"
<<
s_field
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
string
s_host
=
conf
->
getArgParam
(
"--udp-host"
,
it
.
getProp
(
"host"
));
if
(
s_host
.
empty
()
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetReceiver): Unknown host. Use --udp-host"
);
// if( lhs.msg.header.num == rhs.msg.header.num )
// return (lhs.msg < rhs.msg);
port
=
conf
->
getArgInt
(
"--udp-port"
,
it
.
getProp
(
"port"
));
if
(
port
<=
0
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetReceiver): Unknown port address. Use --udp-port"
);
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
"(UNetReceiver): UNet set to "
<<
s_host
<<
":"
<<
port
<<
endl
;
return
lhs
.
msg
.
header
.
num
>
rhs
.
msg
.
header
.
num
;
}
// ------------------------------------------------------------------------------------------
UNetReceiver
::
UNetReceiver
(
const
std
::
string
s_host
,
const
ost
::
tpport_t
port
,
SMInterface
*
smi
)
:
shm
(
smi
),
recvpause
(
10
),
updatepause
(
100
),
udp
(
0
),
recvTimeout
(
5000
),
lostTimeout
(
5000
),
lostPackets
(
0
),
activated
(
false
),
r_thr
(
0
),
u_thr
(
0
),
pnum
(
0
),
maxDifferens
(
1000
),
waitClean
(
false
),
rnum
(
0
),
maxProcessingCount
(
100
),
icache
(
200
),
cache_init_ok
(
false
)
{
{
ostringstream
s
;
s
<<
"("
<<
s_host
<<
":"
<<
port
<<
")"
;
myname
=
s
.
str
();
}
host
=
s_host
.
c_str
();
try
{
udp
=
new
ost
::
UNetDuplex
(
host
,
port
);
// ost::IPV4Cidr ci(s_host.c_str());
// addr = ci.getBroadcast();
// cerr << "****************** addr: " << addr << endl;
addr
=
s_host
.
c_str
();
udp
=
new
ost
::
UDPDuplex
(
addr
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
s
.
str
()
<<
std
::
endl
;
thr
=
new
ThreadCreator
<
UNetReceiver
>
(
this
,
&
UNetReceiver
::
poll
);
recvTimeout
=
conf
->
getArgPInt
(
"--udp-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
polltime
=
conf
->
getArgPInt
(
"--udp-polltime"
,
it
.
getProp
(
"polltime"
),
100
);
// -------------------------------
// ********** HEARTBEAT *************
string
heart
=
conf
->
getArgParam
(
"--udp-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
if
(
!
heart
.
empty
()
)
{
sidHeartBeat
=
conf
->
getSensorID
(
heart
);
if
(
sidHeartBeat
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
": не найден идентификатор для датчика 'HeartBeat' "
<<
heart
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
int
heartbeatTime
=
getHeartBeatTime
();
if
(
heartbeatTime
)
ptHeartBeat
.
setTiming
(
heartbeatTime
);
else
ptHeartBeat
.
setTiming
(
UniSetTimer
::
WaitUpTime
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--udp-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
test_id
=
sidHeartBeat
;
}
else
{
test_id
=
conf
->
getSensorID
(
"TestMode_S"
);
if
(
test_id
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
"(init): test_id unknown. 'TestMode_S' not found..."
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
throw
SystemError
(
s
.
str
());
}
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): test_id="
<<
test_id
<<
endl
;
r_thr
=
new
ThreadCreator
<
UNetReceiver
>
(
this
,
&
UNetReceiver
::
receive
);
u_thr
=
new
ThreadCreator
<
UNetReceiver
>
(
this
,
&
UNetReceiver
::
update
);
activateTimeout
=
conf
->
getArgPInt
(
"--activate-timeout"
,
20000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--udp-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): udp-timeout="
<<
msec
<<
" msec"
<<
endl
;
ptRecvTimeout
.
setTiming
(
recvTimeout
);
}
// -----------------------------------------------------------------------------
UNetReceiver
::~
UNetReceiver
()
{
delete
r_thr
;
delete
u_thr
;
delete
udp
;
delete
shm
;
delete
thr
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
waitSMReady
(
)
void
UNetReceiver
::
setReceiveTimeout
(
timeout_t
msec
)
{
// waiting for SM is ready...
int
ready_timeout
=
conf
->
getArgInt
(
"--udp-sm-ready-timeout"
,
"15000"
);
if
(
ready_timeout
==
0
)
ready_timeout
=
15000
;
else
if
(
ready_timeout
<
0
)
ready_timeout
=
UniSetTimer
::
WaitUpTime
;
if
(
!
shm
->
waitSMready
(
ready_timeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(waitSMReady): Не дождались готовности SharedMemory к работе в течение "
<<
ready_timeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
recvTimeout
=
msec
;
ptRecvTimeout
.
setTiming
(
msec
);
}
// -----------------------------------------------------------------------------
/*
void UNetReceiver::timerInfo( TimerMessage *tm )
void
UNetReceiver
::
setLostTimeout
(
timeout_t
msec
)
{
if( tm->id == tmExchange )
step(
);
lostTimeout
=
msec
;
ptLostTimeout
.
setTiming
(
msec
);
}
*/
// -----------------------------------------------------------------------------
void
UNetReceiver
::
step
()
void
UNetReceiver
::
setReceivePause
(
timeout_t
msec
)
{
recvpause
=
msec
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
setUpdatePause
(
timeout_t
msec
)
{
updatepause
=
msec
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
setMaxProcessingCount
(
int
set
)
{
maxProcessingCount
=
set
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
setMaxDifferens
(
unsigned
long
set
)
{
maxDifferens
=
set
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
start
()
{
// {
// uniset_mutex_lock l(pollMutex,2000);
// poll();
// }
if
(
!
activated
)
return
;
if
(
sidHeartBeat
!=
DefaultObjectId
&&
ptHeartBeat
.
checkTime
()
)
{
try
{
shm
->
localSaveValue
(
aitHeartBeat
,
sidHeartBeat
,
maxHeartBeat
,
getId
());
ptHeartBeat
.
reset
();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
}
activated
=
true
;
u_thr
->
start
();
r_thr
->
start
();
}
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
poll
()
void
UNetReceiver
::
update
()
{
try
{
// udp->connect(host,port);
// udp->UNetSocket::setPeer(host,port);
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step): "
<<
ex
<<
std
::
endl
;
// reise(SIGTERM);
return
;
}
while
(
activated
)
cerr
<<
"******************* udpate start"
<<
endl
;
while
(
activated
)
{
try
{
recv
();
// send();
}
catch
(
ost
::
SockException
&
e
)
{
cerr
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
real_update
();
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step
): "
<<
ex
<<
std
::
endl
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(update
): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(step
): catch ..."
<<
std
::
endl
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(update
): catch ..."
<<
std
::
endl
;
}
msleep
(
polltim
e
);
msleep
(
updatepaus
e
);
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
re
cv
()
void
UNetReceiver
::
re
al_update
()
{
cout
<<
myname
<<
": recv....(timeout="
<<
recvTimeout
<<
")"
<<
endl
;
UniSetUNet
::
UNetHeader
h
;
// receive
if
(
udp
->
isInputReady
(
recvTimeout
)
)
UniSetUDP
::
UDPMessage
p
;
// обрабатываем пока, очередь либо не опустеет
// либо обнаружится "дырка" в последовательности
// но при этом обрабатываем не больше maxProcessingCount
// за один раз..
int
k
=
maxProcessingCount
;
while
(
k
>
0
)
{
ssize_t
ret
=
udp
->
UNetReceive
::
receive
(
&
h
,
sizeof
(
h
));
if
(
ret
<
(
ssize_t
)
sizeof
(
h
)
)
{
cerr
<<
myname
<<
"(receive): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
h
)
<<
endl
;
return
;
}
{
// lock qpack
uniset_mutex_lock
l
(
packMutex
);
if
(
qpack
.
empty
()
)
return
;
p
=
qpack
.
top
();
unsigned
long
sub
=
labs
(
p
.
msg
.
header
.
num
-
pnum
);
if
(
pnum
>
0
)
{
// если sub > maxDifferens
// значит это просто "разрыв"
// и нам ждать lostTimeout не надо
// сразу начинаем обрабатывать новые пакеты
// а если > 1 && < maxDifferens
// значит это временная "дырка"
// и надо подождать lostTimeout
// чтобы констатировать потерю пакета..
if
(
sub
>
1
&&
sub
<
maxDifferens
)
{
if
(
!
ptLostTimeout
.
checkTime
()
)
return
;
lostPackets
++
;
}
else
if
(
p
.
msg
.
header
.
num
==
pnum
)
{
/* а что делать если идут повторные пакеты ?!
* для надёжности лучше обрабатывать..
* для "оптимизации".. лучше игнорировать
*/
qpack
.
pop
();
// пока выбрали вариант "оптимизации"
continue
;
}
}
cout
<<
myname
<<
"(receive): header: "
<<
h
<<
endl
;
if
(
h
.
dcount
<=
0
)
{
cout
<<
" data=0"
<<
endl
;
return
;
}
ptLostTimeout
.
reset
();
// удаляем из очереди, только если
// всё в порядке с последовательностью..
qpack
.
pop
();
pnum
=
p
.
msg
.
header
.
num
;
}
// unlock qpack
k
--
;
// cerr << myname << "(update): " << p.msg.header << endl;
initCache
(
p
,
!
cache_init_ok
);
UniSetUNet
::
UNetData
d
;
// ignore echo...
#if 0
if( h.nodeID == conf->getLocalNode() && h.procID == getId() )
for
(
size_t
i
=
0
;
i
<
p
.
msg
.
header
.
dcount
;
i
++
)
{
for( int i=0; i<h.dcount;i++ )
try
{
ssize_t ret = udp->UNetReceive::receive(&d,sizeof(d));
if( ret < (ssize_t)sizeof(d) )
return;
UniSetUDP
::
UDPData
&
d
=
p
.
msg
.
dat
[
i
];
ItemInfo
&
ii
(
icache
[
i
]);
if
(
ii
.
id
!=
d
.
id
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(update): reinit cache for sid="
<<
d
.
id
<<
endl
;
ii
.
id
=
d
.
id
;
shm
->
initAIterator
(
ii
.
ait
);
shm
->
initDIterator
(
ii
.
dit
);
}
if
(
ii
.
iotype
==
UniversalIO
::
DigitalInput
)
shm
->
localSaveState
(
ii
.
dit
,
d
.
id
,
d
.
val
,
shm
->
ID
());
else
if
(
ii
.
iotype
==
UniversalIO
::
AnalogInput
)
shm
->
localSaveValue
(
ii
.
ait
,
d
.
id
,
d
.
val
,
shm
->
ID
());
else
if
(
ii
.
iotype
==
UniversalIO
::
AnalogOutput
)
shm
->
localSetValue
(
ii
.
ait
,
d
.
id
,
d
.
val
,
shm
->
ID
());
else
if
(
ii
.
iotype
==
UniversalIO
::
DigitalOutput
)
shm
->
localSetState
(
ii
.
dit
,
d
.
id
,
d
.
val
,
shm
->
ID
());
else
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(update): Unknown iotype for sid="
<<
d
.
id
<<
endl
;
}
return;
}
#endif
for
(
int
i
=
0
;
i
<
h
.
dcount
;
i
++
)
{
ssize_t
ret
=
udp
->
UNetReceive
::
receive
(
&
d
,
sizeof
(
d
));
if
(
ret
<
(
ssize_t
)
sizeof
(
d
)
)
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(receive data "
<<
i
<<
"): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
d
)
<<
endl
;
break
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(update): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(update): catch ..."
<<
std
::
endl
;
}
cout
<<
myname
<<
"(receive data "
<<
i
<<
"): "
<<
d
<<
endl
;
}
}
// else
// {
// cout << "no InputReady.." << endl;
// }
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
)
void
UNetReceiver
::
receive
(
)
{
try
cerr
<<
"******************* receive start"
<<
endl
;
ptRecvTimeout
.
setTiming
(
recvTimeout
);
while
(
activated
)
{
switch
(
msg
->
type
)
try
{
case
UniSetTypes
:
:
Message
::
SysCommand
:
{
UniSetTypes
::
SystemMessage
sm
(
msg
);
sysCommand
(
&
sm
);
}
break
;
case
Message
:
:
SensorInfo
:
{
SensorMessage
sm
(
msg
);
sensorInfo
(
&
sm
);
}
break
;
default
:
break
;
if
(
recv
()
)
ptRecvTimeout
.
reset
();
}
catch
(
ost
::
SockException
&
e
)
{
cerr
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(poll): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(poll): catch ..."
<<
std
::
endl
;
}
msleep
(
recvpause
);
}
catch
(
SystemError
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(SystemError): "
<<
ex
<<
std
::
endl
;
// throw SystemError(ex);
raise
(
SIGTERM
);
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): catch ...
\n
"
;
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
bool
UNetReceiver
::
recv
(
)
{
switch
(
sm
->
command
)
if
(
!
udp
->
isInputReady
(
recvTimeout
)
)
return
false
;
ssize_t
ret
=
udp
->
UDPReceive
::
receive
(
&
(
pack
.
msg
),
sizeof
(
pack
.
msg
));
if
(
ret
<
sizeof
(
UniSetUDP
::
UDPHeader
)
)
{
case
SystemMessage
:
:
StartUp
:
{
waitSMReady
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(receive): FAILED header ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
UniSetUDP
::
UDPHeader
)
<<
endl
;
return
false
;
}
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep
(
initPause
);
PassiveTimer
ptAct
(
activateTimeout
);
while
(
!
activated
&&
!
ptAct
.
checkTime
()
)
{
cout
<<
myname
<<
"(sysCommand): wait activate..."
<<
endl
;
msleep
(
300
);
if
(
activated
)
break
;
}
ssize_t
sz
=
pack
.
msg
.
header
.
dcount
*
sizeof
(
UniSetUDP
::
UDPData
)
+
sizeof
(
UniSetUDP
::
UDPHeader
);
if
(
ret
<
sz
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(receive): FAILED data ret="
<<
ret
<<
" sizeof="
<<
sz
<<
" packnum="
<<
pack
.
msg
.
header
.
num
<<
endl
;
return
false
;
}
if
(
!
activated
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(sysCommand): ************* don`t activate?! ************"
<<
endl
;
{
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
10000
);
askSensors
(
UniversalIO
::
UIONotify
);
}
thr
->
start
();
if
(
rnum
>
0
&&
labs
(
pack
.
msg
.
header
.
num
-
rnum
)
>
maxDifferens
)
{
/* А что делать если мы уже ждём и ещё не "разгребли предыдущее".. а тут уже повторный "разрыв"
* Можно откинуть всё.. что сложили во временную очередь и заново "копить" (но тогда теряем информацию)
* А можно породолжать складывать во временную, но тогда есть риск "никогда" не разгрести временную
* очередь, при "частых обрывах". Потому-что update будет на каждом разрыве ждать ещё lostTimeout..
*/
// Пока выбираю.. чистить qtmp. Это будет соотвествовать логике работы с картами у которых ограничен буфер приёма.
// Обычно "кольцевой". Т.е. если не успели обработать и "вынуть" из буфера информацию.. он будет переписан новыми данными
if
(
waitClean
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(receive): reset qtmp.."
<<
endl
;
while
(
!
qtmp
.
empty
()
)
qtmp
.
pop
();
}
case
SystemMessage
:
:
FoldUp
:
case
SystemMessage
:
:
Finish
:
askSensors
(
UniversalIO
::
UIODontNotify
);
break
;
waitClean
=
true
;
}
rnum
=
pack
.
msg
.
header
.
num
;
// cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
// << " header: " << pack.msg.header
// << " waitClean=" << waitClean
// << endl;
case
SystemMessage
:
:
WatchDog
:
{
// lock qpack
uniset_mutex_lock
l
(
packMutex
,
500
);
if
(
!
waitClean
)
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetReceiver запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetReceiver)
if
(
shm
->
isLocalwork
()
)
break
;
askSensors
(
UniversalIO
::
UIONotify
);
qpack
.
push
(
pack
);
return
true
;
}
break
;
case
SystemMessage
:
:
LogRotate
:
if
(
!
qpack
.
empty
()
)
{
// cerr << myname << "(receive): copy to qtmp..."
// << " header: " << pack.msg.header
// << endl;
qtmp
.
push
(
pack
);
}
else
{
// переоткрываем логи
unideb
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
string
fname
=
unideb
.
getLogFile
();
if
(
!
fname
.
empty
()
)
// cerr << myname << "(receive): copy from qtmp..." << endl;
// очередь освободилась..
// то копируем в неё всё что набралось...
while
(
!
qtmp
.
empty
()
)
{
unideb
.
logFile
(
fname
);
unideb
<<
myname
<<
"(sysCommand): ***************** UNIDEB LOG ROTATE *****************"
<<
std
::
endl
;
qpack
.
push
(
qtmp
.
top
()
);
qtmp
.
pop
()
;
}
dlog
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
fname
=
dlog
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
dlog
.
logFile
(
fname
);
dlog
<<
myname
<<
"(sysCommand): ***************** dlog LOG ROTATE *****************"
<<
std
::
endl
;
}
// не забываем и текущий поместить в очередь..
qpack
.
push
(
pack
);
waitClean
=
false
;
}
break
;
default
:
break
;
}
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
askSensors
(
UniversalIO
::
UIOCommand
cmd
)
{
if
(
!
shm
->
waitSMworking
(
test_id
,
activateTimeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<<
activateTimeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
kill
(
SIGTERM
,
getpid
());
// прерываем (перезапускаем) процесс...
throw
SystemError
(
err
.
str
());
}
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
)
{
}
// ------------------------------------------------------------------------------------------
bool
UNetReceiver
::
activateObject
()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated
=
false
;
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
5000
);
UniSetObject_LT
::
activateObject
();
initIterators
();
activated
=
true
;
}
}
// unlock qpack
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
sigterm
(
int
signo
)
{
cerr
<<
myname
<<
": ********* SIGTERM("
<<
signo
<<
") ********"
<<
endl
;
activated
=
false
;
udp
->
disconnect
();
UniSetObject_LT
::
sigterm
(
signo
);
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
initIterators
()
{
shm
->
initAIterator
(
aitHeartBeat
);
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
help_print
(
int
argc
,
char
*
argv
[]
)
void
UNetReceiver
::
initIterators
(
)
{
cout
<<
"--udp-polltime msec - Пауза между опросаом карт. По умолчанию 200 мсек."
<<
endl
;
cout
<<
"--udp-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--udp-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--udp-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--udp-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--udp-notRespondSensor - датчик связи для данного процесса "
<<
endl
;
cout
<<
"--udp-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола RS: "
<<
endl
;
cout
<<
"--udp-dev devname - файл устройства"
<<
endl
;
cout
<<
"--udp-speed - Скорость обмена (9600,19920,38400,57600,115200)."
<<
endl
;
cout
<<
"--udp-my-addr - адрес текущего узла"
<<
endl
;
cout
<<
"--udp-recv-timeout - Таймаут на ожидание ответа."
<<
endl
;
for
(
ItemVec
::
iterator
it
=
icache
.
begin
();
it
!=
icache
.
end
();
++
it
)
{
shm
->
initAIterator
(
it
->
ait
);
shm
->
initDIterator
(
it
->
dit
);
}
}
// -----------------------------------------------------------------------------
UNetReceiver
*
UNetReceiver
::
init_udpreceiver
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
void
UNetReceiver
::
initCache
(
UniSetUDP
::
UDPMessage
&
pack
,
bool
force
)
{
string
name
=
conf
->
getArgParam
(
"--udp-name"
,
"UNetReceiver1"
);
if
(
name
.
empty
()
)
{
cerr
<<
"(udpexchange): Не задан name'
"
<<
endl
;
return
0
;
}
ObjectId
ID
=
conf
->
getObjectID
(
name
);
if
(
ID
==
UniSetTypes
::
DefaultObjectId
)
{
cerr
<<
"(udpexchange): идентификатор '"
<<
name
<<
"' не найден в конф. файле!"
<<
" в секции "
<<
conf
->
getObjectsSection
()
<<
endl
;
return
0
;
}
dlog
[
Debug
::
INFO
]
<<
"(rsexchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
return
new
UNetReceiver
(
ID
,
icID
,
ic
);
if
(
!
force
&&
pack
.
msg
.
header
.
dcount
==
icache
.
size
()
)
return
;
dlog
[
Debug
::
INFO
]
<<
myname
<<
": init icache..
"
<<
endl
;
cache_init_ok
=
true
;
icache
.
resize
(
pack
.
msg
.
header
.
dcount
);
for
(
size_t
i
=
0
;
i
<
icache
.
size
();
i
++
)
{
ItemInfo
&
d
(
icache
[
i
]);
if
(
d
.
id
!=
pack
.
msg
.
dat
[
i
].
id
)
{
d
.
id
=
pack
.
msg
.
dat
[
i
].
id
;
d
.
iotype
=
conf
->
getIOType
(
d
.
id
)
;
shm
->
initAIterator
(
d
.
ait
);
shm
->
initDIterator
(
d
.
dit
);
}
}
}
// -----------------------------------------------------------------------------
extensions/UNet2/UNetReceiver.h
View file @
f8916917
...
...
@@ -3,7 +3,7 @@
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <
vector
>
#include <
queue
>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
...
...
@@ -11,72 +11,136 @@
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "U
Net
Packet.h"
#include "U
DP
Packet.h"
// -----------------------------------------------------------------------------
class
UNetReceiver
:
public
UniSetObject_LT
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
* Собственно реализация сделана так:
* В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
* что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
* (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
* как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
* "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
* Всё это реализовано в функции UNetReceiver::real_update()
*
* КЭШ
* ===
* Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
* Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
* Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
* Порядокый номер данных в пакете является индексом в кэше.
* Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
* ID который пришёл в пакете - элемент кэша обновляется.
* Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
*
* Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
* =========================================================================
* Для защиты от сбоя счётика сделана следующая логика:
* Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
* что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
* При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
* в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
* Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
* и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
* затирают старые, если их не успели вынуть и обработать.
* \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
*/
// -----------------------------------------------------------------------------
class
UNetReceiver
{
public
:
UNetReceiver
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
virtual
~
UNetReceiver
();
UNetReceiver
(
const
std
::
string
host
,
const
ost
::
tpport_t
port
,
SMInterface
*
smi
);
~
UNetReceiver
();
/*! глобальная функция для инициализации объекта */
static
UNetReceiver
*
init_udpreceiver
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
void
start
();
/*! глобальная функция для вывода help-а */
static
void
help_print
(
int
argc
,
char
*
argv
[]
);
void
receive
();
void
update
(
);
protected
:
inline
bool
isRecvOK
(){
return
ptRecvTimeout
.
checkTime
();
}
inline
unsigned
long
getLostPacketsNum
(){
return
lostPackets
;
}
xmlNode
*
cnode
;
std
::
string
s_field
;
std
::
string
s_fvalue
;
void
setReceiveTimeout
(
timeout_t
msec
);
void
setReceivePause
(
timeout_t
msec
);
void
setUpdatePause
(
timeout_t
msec
);
void
setLostTimeout
(
timeout_t
msec
);
void
setMaxDifferens
(
unsigned
long
set
);
SMInterface
*
shm
;
void
setMaxProcessingCount
(
int
set
)
;
void
poll
();
void
recv
();
void
step
();
inline
ost
::
IPV4Address
getAddress
(){
return
addr
;
}
inline
ost
::
tpport_t
getPort
(){
return
port
;
}
virtual
void
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
);
void
sysCommand
(
UniSetTypes
::
SystemMessage
*
msg
);
void
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
);
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
void
waitSMReady
();
protected
:
virtual
bool
activateObject
()
;
SMInterface
*
shm
;
// действия при завершении работы
virtual
void
sigterm
(
int
signo
);
bool
recv
();
void
step
();
void
real_update
();
void
initIterators
();
private
:
UNetReceiver
();
bool
initPause
;
UniSetTypes
::
uniset_mutex
mutex_start
;
PassiveTimer
ptHeartBeat
;
UniSetTypes
::
ObjectId
sidHeartBeat
;
int
maxHeartBeat
;
IOController
::
AIOStateList
::
iterator
aitHeartBeat
;
UniSetTypes
::
ObjectId
test_id
;
int
recvpause
;
/*!< пауза меджду приёмами пакетов, [мсек] */
int
updatepause
;
/*!< переодичность обновления данных в SM, [мсек] */
int
polltime
;
/*!< переодичность обновления данных, [мсек] */
ost
::
UNetDuplex
*
udp
;
ost
::
IPV4Host
host
;
ost
::
UDPReceive
*
udp
;
ost
::
IPV4Address
addr
;
ost
::
tpport_t
port
;
std
::
string
myname
;
UniSetTypes
::
uniset_mutex
pollMutex
;
Trigger
trTimeout
;
int
recvTimeout
;
PassiveTimer
ptRecvTimeout
;
timeout_t
recvTimeout
;
timeout_t
lostTimeout
;
PassiveTimer
ptLostTimeout
;
unsigned
long
lostPackets
;
/*!< счётчик потерянных пакетов */
bool
activated
;
int
activateTimeout
;
ThreadCreator
<
UNetReceiver
>*
thr
;
ThreadCreator
<
UNetReceiver
>*
r_thr
;
// receive thread
ThreadCreator
<
UNetReceiver
>*
u_thr
;
// update thread
// функция определения приоритетного сообщения для обработки
struct
PacketCompare
:
public
std
::
binary_function
<
UniSetUDP
::
UDPMessage
,
UniSetUDP
::
UDPMessage
,
bool
>
{
bool
operator
()(
const
UniSetUDP
::
UDPMessage
&
lhs
,
const
UniSetUDP
::
UDPMessage
&
rhs
)
const
;
};
typedef
std
::
priority_queue
<
UniSetUDP
::
UDPMessage
,
std
::
vector
<
UniSetUDP
::
UDPMessage
>
,
PacketCompare
>
PacketQueue
;
PacketQueue
qpack
;
/*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP
::
UDPMessage
pack
;
/*!< просто буфер для получения очереlного сообщения */
UniSetTypes
::
uniset_mutex
packMutex
;
/*!< mutex для работы с очередью */
unsigned
long
pnum
;
/*!< текущий номер обработанного сообщения, для проверки непрерывности последовательности пакетов */
/*! максимальная разница межд номерами пакетов, при которой считается, что счётчик пакетов
* прошёл через максимум или сбился...
*/
unsigned
long
maxDifferens
;
PacketQueue
qtmp
;
/*!< очередь на время обработки(очистки) основной очереди */
bool
waitClean
;
/*!< флаг означающий, что ждём очистики очереди до конца */
unsigned
long
rnum
;
/*!< текущий номер принятого сообщения, для проверки "переполнения" или "сбоя" счётчика */
int
maxProcessingCount
;
/*! максимальное число обрабатываемых за один раз сообщений */
struct
ItemInfo
{
long
id
;
IOController
::
AIOStateList
::
iterator
ait
;
IOController
::
DIOStateList
::
iterator
dit
;
UniversalIO
::
IOTypes
iotype
;
};
typedef
std
::
vector
<
ItemInfo
>
ItemVec
;
ItemVec
icache
;
/*!< кэш итераторов */
bool
cache_init_ok
;
void
initCache
(
UniSetUDP
::
UDPMessage
&
pack
,
bool
force
=
false
);
};
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
...
...
extensions/UNet2/UNetSender.cc
View file @
f8916917
...
...
@@ -7,71 +7,49 @@ using namespace std;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
UNetSender
::
UNetSender
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmId
,
SharedMemory
*
ic
)
:
UniSetObject_LT
(
objId
),
shm
(
0
),
initPause
(
0
),
tcp
(
0
),
UNetSender
::
UNetSender
(
const
std
::
string
s_host
,
const
ost
::
tpport_t
port
,
SMInterface
*
smi
,
const
std
::
string
s_f
,
const
std
::
string
s_val
,
SharedMemory
*
ic
)
:
s_field
(
s_f
),
s_fvalue
(
s_val
),
shm
(
smi
),
s_host
(
s_host
),
sendpause
(
150
),
activated
(
false
),
dlist
(
100
),
maxItem
(
0
)
maxItem
(
0
),
packetnum
(
1
),
s_thr
(
0
)
{
if
(
objId
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
"(UNetSender): objId=-1?!! Use --unet-name"
);
// xmlNode* cnode = conf->getNode(myname);
cnode
=
conf
->
getNode
(
myname
);
if
(
cnode
==
NULL
)
throw
UniSetTypes
::
SystemError
(
"(UNetSender): Not find conf-node for "
+
myname
);
shm
=
new
SMInterface
(
shmId
,
&
ui
,
objId
,
ic
);
UniXML_iterator
it
(
cnode
);
{
ostringstream
s
;
s
<<
"("
<<
s_host
<<
":"
<<
port
<<
")"
;
myname
=
s
.
str
();
}
// определяем фильтр
s_field
=
conf
->
getArgParam
(
"--unet
-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--unet
-filter-value"
);
// s_field = conf->getArgParam("--udp
-filter-field");
// s_fvalue = conf->getArgParam("--udp
-filter-value");
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): read fileter-field='"
<<
s_field
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
string
s_host
=
conf
->
getArgParam
(
"--unet-host"
,
it
.
getProp
(
"host"
));
if
(
s_host
.
empty
()
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetSender): Unknown host. Use --unet-host"
);
port
=
conf
->
getArgInt
(
"--unet-port"
,
it
.
getProp
(
"port"
));
if
(
port
<=
0
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetSender): Unknown port address. Use --unet-port"
);
bool
no_broadcast
=
conf
->
getArgInt
(
"--unet-nobroadcast"
,
it
.
getProp
(
"no_broadcast"
));
host
=
s_host
.
c_str
();
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
"(UNetSender): UNet set to "
<<
s_host
<<
":"
<<
port
<<
" broadcast="
<<
broadcast
<<
endl
;
dlog
[
Debug
::
INFO
]
<<
"(UNetSender): UDP set to "
<<
s_host
<<
":"
<<
port
<<
endl
;
try
{
if
(
no_broadcast
)
tcp
=
new
ost
::
TCPSocket
();
else
tcp
=
new
ost
::
TCPBroadcast
(
host
,
port
);
addr
=
s_host
.
c_str
();
udp
=
new
ost
::
UDPBroadcast
(
addr
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
s
.
str
()
<<
endl
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
thr
=
new
ThreadCreator
<
UNetSender
>
(
this
,
&
UNetSender
::
poll
);
sendTimeout
=
conf
->
getArgPInt
(
"--unet-send-timeout"
,
it
.
getProp
(
"sendTimeout"
),
5000
);
sendtime
=
conf
->
getArgPInt
(
"--unet-sendtime"
,
it
.
getProp
(
"sendtime"
),
100
);
s_thr
=
new
ThreadCreator
<
UNetSender
>
(
this
,
&
UNetSender
::
send
);
// -------------------------------
if
(
shm
->
isLocalwork
()
)
...
...
@@ -83,107 +61,42 @@ maxItem(0)
else
ic
->
addReadItem
(
sigc
::
mem_fun
(
this
,
&
UNetSender
::
readItem
)
);
// ********** HEARTBEAT *************
string
heart
=
conf
->
getArgParam
(
"--unet-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
if
(
!
heart
.
empty
()
)
{
sidHeartBeat
=
conf
->
getSensorID
(
heart
);
if
(
sidHeartBeat
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
": не найден идентификатор для датчика 'HeartBeat' "
<<
heart
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
int
heartbeatTime
=
getHeartBeatTime
();
if
(
heartbeatTime
)
ptHeartBeat
.
setTiming
(
heartbeatTime
);
else
ptHeartBeat
.
setTiming
(
UniSetTimer
::
WaitUpTime
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--unet-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
test_id
=
sidHeartBeat
;
}
else
{
test_id
=
conf
->
getSensorID
(
"TestMode_S"
);
if
(
test_id
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
"(init): test_id unknown. 'TestMode_S' not found..."
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): test_id="
<<
test_id
<<
endl
;
activateTimeout
=
conf
->
getArgPInt
(
"--activate-timeout"
,
20000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--unet-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): udp-timeout="
<<
msec
<<
" msec"
<<
endl
;
// выставляем поля, которые не меняются
mypack
.
msg
.
header
.
nodeID
=
conf
->
getLocalNode
();
mypack
.
msg
.
header
.
procID
=
shm
->
ID
()
;
}
// -----------------------------------------------------------------------------
UNetSender
::~
UNetSender
()
{
delete
tcp
;
delete
s_thr
;
delete
udp
;
delete
shm
;
delete
thr
;
}
// -----------------------------------------------------------------------------
void
UNetSender
::
waitSMReady
()
{
// waiting for SM is ready...
int
ready_timeout
=
conf
->
getArgInt
(
"--unet-sm-ready-timeout"
,
"15000"
);
if
(
ready_timeout
==
0
)
ready_timeout
=
15000
;
else
if
(
ready_timeout
<
0
)
ready_timeout
=
UniSetTimer
::
WaitUpTime
;
if
(
!
shm
->
waitSMready
(
ready_timeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(waitSMReady): Не дождались готовности SharedMemory к работе в течение "
<<
ready_timeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
step
(
)
void
UNetSender
::
update
(
UniSetTypes
::
ObjectId
id
,
long
value
)
{
if
(
!
activated
)
return
;
if
(
sidHeartBeat
!=
DefaultObjectId
&&
ptHeartBeat
.
checkTime
()
)
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
try
if
(
it
->
si
.
id
==
id
)
{
shm
->
localSaveValue
(
aitHeartBeat
,
sidHeartBeat
,
maxHeartBeat
,
getId
());
ptHeartBeat
.
reset
();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
uniset_spin_lock
lock
(
it
->
val_lock
);
it
->
val
=
value
;
}
break
;
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
poll
()
void
UNetSender
::
send
()
{
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
ost
::
Thread
::
setException
(
ost
::
Thread
::
throwException
);
// cerr << "create new tcp..." << endl;
tcp
=
new
ost
::
TCPStream
(
iaddr
.
c_str
());
tcp
->
setTimeout
(
500
);
/*
ost::IPV4Broadcast h = s_host.c_str();
try
{
tcp
->
setPeer
(
host
,
port
);
udp->setPeer(h
,port);
}
catch( ost::SockException& e )
{
...
...
@@ -192,13 +105,12 @@ void UNetSender::poll()
dlog[Debug::CRIT] << myname << "(poll): " << s.str() << endl;
throw SystemError(s.str());
}
*/
while
(
activated
)
{
try
{
send
();
real_
send
();
}
catch
(
ost
::
SockException
&
e
)
{
...
...
@@ -206,251 +118,59 @@ void UNetSender::poll()
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(s
tep
): "
<<
ex
<<
std
::
endl
;
cerr
<<
myname
<<
"(s
end
): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(s
tep
): catch ..."
<<
std
::
endl
;
cerr
<<
myname
<<
"(s
end
): catch ..."
<<
std
::
endl
;
}
msleep
(
send
tim
e
);
msleep
(
send
paus
e
);
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetSender
::
send
()
void
UNetSender
::
real_
send
()
{
cout
<<
myname
<<
": send..."
<<
endl
;
mypack
.
msg
.
header
.
num
=
packetnum
++
;
UniSetUNet
::
UNetHeader
h
;
h
.
nodeID
=
conf
->
getLocalNode
();
h
.
procID
=
getId
();
h
.
dcount
=
mypack
.
size
();
// receive
ssize_t
ret
=
tcp
->
send
((
char
*
)(
&
h
),
sizeof
(
h
));
if
(
ret
<
(
ssize_t
)
sizeof
(
h
)
)
{
cerr
<<
myname
<<
"(send data header): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
h
)
<<
endl
;
return
;
}
#warning use mutex for list!!!
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
it
=
mypack
.
dlist
.
begin
();
for
(
;
it
!=
mypack
.
dlist
.
end
();
++
it
)
{
cout
<<
myname
<<
"(send): "
<<
(
*
it
)
<<
endl
;
ssize_t
ret
=
tcp
->
send
((
char
*
)(
&
(
*
it
)),
sizeof
(
*
it
));
if
(
ret
<
(
ssize_t
)
sizeof
(
*
it
)
)
{
cerr
<<
myname
<<
"(send data): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
*
it
)
<<
endl
;
break
;
}
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
)
{
try
{
switch
(
msg
->
type
)
{
case
UniSetTypes
:
:
Message
::
SysCommand
:
{
UniSetTypes
::
SystemMessage
sm
(
msg
);
sysCommand
(
&
sm
);
}
break
;
if
(
packetnum
>
UniSetUDP
::
MaxPacketNum
)
packetnum
=
1
;
case
Message
:
:
SensorInfo
:
{
SensorMessage
sm
(
msg
);
sensorInfo
(
&
sm
);
}
break
;
// cout << "************* send header: " << mypack.msg.header << endl;
int
sz
=
mypack
.
byte_size
()
+
sizeof
(
UniSetUDP
::
UDPHeader
);
if
(
!
udp
->
isPending
(
ost
::
Socket
::
pendingOutput
)
)
return
;
default
:
break
;
}
}
catch
(
SystemError
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(SystemError): "
<<
ex
<<
std
::
endl
;
// throw SystemError(ex);
raise
(
SIGTERM
);
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): catch ...
\n
"
;
}
ssize_t
ret
=
udp
->
send
(
(
char
*
)
&
(
mypack
.
msg
),
sz
);
if
(
ret
<
sz
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(send): FAILED ret="
<<
ret
<<
" < sizeof="
<<
sz
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetSender
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
{
switch
(
sm
->
command
)
{
case
SystemMessage
:
:
StartUp
:
{
waitSMReady
();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep
(
initPause
);
PassiveTimer
ptAct
(
activateTimeout
);
while
(
!
activated
&&
!
ptAct
.
checkTime
()
)
{
cout
<<
myname
<<
"(sysCommand): wait activate..."
<<
endl
;
msleep
(
300
);
if
(
activated
)
break
;
}
if
(
!
activated
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(sysCommand): ************* don`t activate?! ************"
<<
endl
;
{
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
10000
);
askSensors
(
UniversalIO
::
UIONotify
);
}
thr
->
start
();
}
case
SystemMessage
:
:
FoldUp
:
case
SystemMessage
:
:
Finish
:
askSensors
(
UniversalIO
::
UIODontNotify
);
break
;
case
SystemMessage
:
:
WatchDog
:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetSender запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetSender)
if
(
shm
->
isLocalwork
()
)
break
;
askSensors
(
UniversalIO
::
UIONotify
);
}
break
;
case
SystemMessage
:
:
LogRotate
:
{
// переоткрываем логи
unideb
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
string
fname
=
unideb
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
unideb
.
logFile
(
fname
);
unideb
<<
myname
<<
"(sysCommand): ***************** UNIDEB LOG ROTATE *****************"
<<
std
::
endl
;
}
dlog
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
fname
=
dlog
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
dlog
.
logFile
(
fname
);
dlog
<<
myname
<<
"(sysCommand): ***************** dlog LOG ROTATE *****************"
<<
std
::
endl
;
}
}
break
;
default
:
break
;
}
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
askSensors
(
UniversalIO
::
UIOCommand
cmd
)
{
if
(
!
shm
->
waitSMworking
(
test_id
,
activateTimeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<<
activateTimeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
kill
(
SIGTERM
,
getpid
());
// прерываем (перезапускаем) процесс...
throw
SystemError
(
err
.
str
());
}
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
try
{
shm
->
askSensor
(
it
->
si
.
id
,
cmd
);
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(askSensors): "
<<
ex
<<
std
::
endl
;
}
catch
(...){}
}
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
)
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
if
(
it
->
si
.
id
==
sm
->
id
)
{
uniset_spin_lock
lock
(
it
->
val_lock
);
it
->
val
=
sm
->
value
;
if
(
it
->
pack_it
!=
mypack
.
dlist
.
end
()
)
it
->
pack_it
->
val
=
sm
->
value
;
}
break
;
}
}
// ------------------------------------------------------------------------------------------
bool
UNetSender
::
activateObject
()
void
UNetSender
::
start
()
{
// блокирование обработки StarUp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
if
(
!
activated
)
{
activated
=
false
;
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
5000
);
UniSetObject_LT
::
activateObject
();
initIterators
();
activated
=
true
;
s_thr
->
start
();
}
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
sigterm
(
int
signo
)
{
cerr
<<
myname
<<
": ********* SIGTERM("
<<
signo
<<
") ********"
<<
endl
;
activated
=
false
;
tcp
->
disconnect
();
UniSetObject_LT
::
sigterm
(
signo
);
}
// -----------------------------------------------------------------------------
-------------
// -----------------------------------------------------------------------------
void
UNetSender
::
readConfiguration
()
{
#warning Сделать сортировку по диапазонам адресов!!!
// чтобы запрашивать одним запросом, сразу несколько входов...
// readconf_ok = false;
xmlNode
*
root
=
conf
->
getXMLSensorsSection
();
if
(
!
root
)
{
ostringstream
err
;
err
<<
myname
<<
"(readConfiguration):
не нашли корневого раздела
<sensors>"
;
err
<<
myname
<<
"(readConfiguration):
not found
<sensors>"
;
throw
SystemError
(
err
.
str
());
}
UniXML_iterator
it
(
root
);
if
(
!
it
.
goChildren
()
)
{
std
::
cerr
<<
myname
<<
"(readConfiguration):
раздел <sensors> не содержит секций ?!!
\n
"
;
std
::
cerr
<<
myname
<<
"(readConfiguration):
empty <sensors>?!!"
<<
endl
;
return
;
}
...
...
@@ -459,8 +179,6 @@ void UNetSender::readConfiguration()
if
(
check_item
(
it
)
)
initItem
(
it
);
}
// readconf_ok = true;
}
// ------------------------------------------------------------------------------------------
bool
UNetSender
::
check_item
(
UniXML_iterator
&
it
)
...
...
@@ -514,9 +232,9 @@ bool UNetSender::initItem( UniXML_iterator& it )
p
.
si
.
id
=
sid
;
p
.
si
.
node
=
conf
->
getLocalNode
();
mypack
.
addData
(
sid
,
0
);
p
.
pack_i
t
=
(
mypack
.
dlist
.
end
()
--
)
;
p
.
pack_i
nd
=
mypack
.
size
()
-
1
;
if
(
maxItem
>=
dlist
.
size
()
)
if
(
maxItem
>=
mypack
.
size
()
)
dlist
.
resize
(
maxItem
+
10
);
dlist
[
maxItem
]
=
p
;
...
...
@@ -537,44 +255,6 @@ void UNetSender::initIterators()
shm
->
initDIterator
(
it
->
dit
);
shm
->
initAIterator
(
it
->
ait
);
}
shm
->
initAIterator
(
aitHeartBeat
);
}
// -----------------------------------------------------------------------------
void
UNetSender
::
help_print
(
int
argc
,
char
*
argv
[]
)
{
cout
<<
"--unet-sendtime msec - Пауза между опросами. По умолчанию 200 мсек."
<<
endl
;
cout
<<
"--unet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--unet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--unet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--unet-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--unet-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола UNet: "
<<
endl
;
cout
<<
"--unet-host [ip|hostname] - Адрес сервера"
<<
endl
;
cout
<<
"--unet-port - Порт."
<<
endl
;
cout
<<
"--unet-send-timeout - Таймаут на посылку ответа."
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetSender
*
UNetSender
::
init_udpsender
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
{
string
name
=
conf
->
getArgParam
(
"--unet-name"
,
"UNetSender1"
);
if
(
name
.
empty
()
)
{
cerr
<<
"(UNetSender): Не задан name'"
<<
endl
;
return
0
;
}
ObjectId
ID
=
conf
->
getObjectID
(
name
);
if
(
ID
==
UniSetTypes
::
DefaultObjectId
)
{
cerr
<<
"(UNetSender): идентификатор '"
<<
name
<<
"' не найден в конф. файле!"
<<
" в секции "
<<
conf
->
getObjectsSection
()
<<
endl
;
return
0
;
}
dlog
[
Debug
::
INFO
]
<<
"(rsexchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
return
new
UNetSender
(
ID
,
icID
,
ic
);
}
// -----------------------------------------------------------------------------
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetSender
::
UItem
&
p
)
...
...
extensions/UNet2/UNetSender.h
View file @
f8916917
...
...
@@ -11,21 +11,21 @@
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNetPacket.h"
#include "UDPPacket.h"
#include "UDPNReceiver.h"
// -----------------------------------------------------------------------------
class
UNetSender
:
public
UniSetObject_LT
/*
* Для защиты от потери пакета при переполнении "номера пакета".
* UNetReceiver при обнаружении "разрыва" в последовательнности, просто игнорирует пакет, обновляет счётчик
* и начинает обработку пока буфер опять не заполнится..
*/
class
UNetSender
{
public
:
UNetSender
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
virtual
~
UNetSender
(
);
UNetSender
(
const
std
::
string
host
,
const
ost
::
tpport_t
port
,
SMInterface
*
smi
,
const
std
::
string
s_field
=
""
,
const
std
::
string
s_fvalue
=
""
,
SharedMemory
*
ic
=
0
);
/*! глобальная функция для инициализации объекта */
static
UNetSender
*
init_udpsender
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
/*! глобальная функция для вывода help-а */
static
void
help_print
(
int
argc
,
char
*
argv
[]
);
~
UNetSender
();
struct
UItem
{
...
...
@@ -37,36 +37,26 @@ class UNetSender:
IOController
::
AIOStateList
::
iterator
ait
;
IOController
::
DIOStateList
::
iterator
dit
;
UniSetTypes
::
uniset_spin_mutex
val_lock
;
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
pack_it
;
int
pack_ind
;
long
val
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UItem
&
p
);
};
protected
:
xmlNode
*
cnode
;
std
::
string
s_field
;
std
::
string
s_fvalue
;
SMInterface
*
shm
;
void
start
();
void
poll
();
void
recv
();
void
send
();
void
real_send
();
void
update
(
UniSetTypes
::
ObjectId
id
,
long
value
);
void
step
();
inline
void
setSendPause
(
int
msec
){
sendpause
=
msec
;
}
virtual
void
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
);
void
sysCommand
(
UniSetTypes
::
SystemMessage
*
msg
);
void
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
);
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
void
waitSMReady
();
protected
:
virtual
bool
activateObject
();
std
::
string
s_field
;
std
::
string
s_fvalue
;
// действия при завершении работы
virtual
void
sigterm
(
int
signo
);
SMInterface
*
shm
;
void
initIterators
();
bool
initItem
(
UniXML_iterator
&
it
);
...
...
@@ -77,36 +67,23 @@ class UNetSender:
private
:
UNetSender
();
bool
initPause
;
UniSetTypes
::
uniset_mutex
mutex_start
;
PassiveTimer
ptHeartBeat
;
UniSetTypes
::
ObjectId
sidHeartBeat
;
int
maxHeartBeat
;
IOController
::
AIOStateList
::
iterator
aitHeartBeat
;
UniSetTypes
::
ObjectId
test_id
;
int
sendtime
;
/*!< переодичность посылки данных, [мсек] */
ost
::
TCPStream
*
tcp
;
// ost::TCPSocket* tcp;
ost
::
IPV4Host
host
;
ost
::
UDPBroadcast
*
udp
;
ost
::
IPV4Address
addr
;
ost
::
tpport_t
port
;
std
::
string
s_host
;
UniSetTypes
::
uniset_mutex
sendMutex
;
Trigger
trTimeout
;
int
sendTimeout
;
std
::
string
myname
;
int
sendpause
;
bool
activated
;
int
activateTimeout
;
UniSetU
Net
::
UNet
Message
mypack
;
UniSetU
DP
::
UDP
Message
mypack
;
typedef
std
::
vector
<
UItem
>
DMap
;
DMap
dlist
;
int
maxItem
;
unsigned
long
packetnum
;
ThreadCreator
<
UNetSender
>*
thr
;
ThreadCreator
<
UNetSender
>*
s_thr
;
// send thread
};
// -----------------------------------------------------------------------------
#endif // UNetSender_H_
...
...
extensions/UNet2/start_fg.sh
View file @
f8916917
#!/bin/sh
uniset-start.sh
-f
./uniset-udpexchange
--udp-name
UDPExchange
--udp-host
192.168.56.255
\
--udp-broadcast
1
--udp-polltime
1000
\
uniset-start.sh
-f
./uniset-unetexchange
--unet-name
UNetExchange
\
--confile
test.xml
\
--unet-filter-field
rs
--unet-filter-value
2
--unet-maxdifferense
40
\
--dlog-add-levels
info,crit,warn
# --udp-filter-field udp --udp-filter-value 1 \
extensions/UNet2/unetexchange.cc
View file @
f8916917
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "U
DP
Exchange.h"
#include "U
Net
Exchange.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
...
...
@@ -15,16 +15,16 @@ int main( int argc, char** argv )
{
cout
<<
"--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>"
<<
endl
;
cout
<<
"--confile filename - configuration file. Default: configure.xml"
<<
endl
;
cout
<<
"--u
dp
-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
"--u
net
-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
endl
;
U
DP
Exchange
::
help_print
(
argc
,
argv
);
U
Net
Exchange
::
help_print
(
argc
,
argv
);
return
0
;
}
string
confile
=
UniSetTypes
::
getArgParam
(
"--confile"
,
argc
,
argv
,
"configure.xml"
);
conf
=
new
Configuration
(
argc
,
argv
,
confile
);
string
logfilename
(
conf
->
getArgParam
(
"--u
dp
-logfile"
));
string
logfilename
(
conf
->
getArgParam
(
"--u
net
-logfile"
));
if
(
logfilename
.
empty
()
)
logfilename
=
"udpexchange.log"
;
...
...
@@ -49,35 +49,39 @@ int main( int argc, char** argv )
return
1
;
}
U
DPExchange
*
rs
=
UDPExchange
::
init_udp
exchange
(
argc
,
argv
,
shmID
);
if
(
!
rs
)
U
NetExchange
*
unet
=
UNetExchange
::
init_unet
exchange
(
argc
,
argv
,
shmID
);
if
(
!
unet
)
{
dlog
[
Debug
::
CRIT
]
<<
"(u
dpexchange): init не прошёл.
.."
<<
endl
;
dlog
[
Debug
::
CRIT
]
<<
"(u
netexchange): init failed
.."
<<
endl
;
return
1
;
}
ObjectsActivator
act
;
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
rs
));
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
unet
));
SystemMessage
sm
(
SystemMessage
::
StartUp
);
act
.
broadcast
(
sm
.
transport_msg
()
);
unideb
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDP
Exchange
START -------------------------
\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDP
Recevier
START -------------------------
\n\n
"
;
dlog
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDP
Exchange
START -------------------------
\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDP
Receiver
START -------------------------
\n\n
"
;
act
.
run
(
false
);
// msleep(500);
// rs->execute();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): "
<<
ex
<<
std
::
endl
;
dlog
[
Debug
::
CRIT
]
<<
"(unetexchange): "
<<
ex
<<
std
::
endl
;
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
s
.
str
()
<<
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
"(u
dp
exchange): catch ..."
<<
std
::
endl
;
dlog
[
Debug
::
CRIT
]
<<
"(u
net
exchange): catch ..."
<<
std
::
endl
;
}
return
0
;
...
...
extensions/UNet2/unetreceiver.cc
deleted
100644 → 0
View file @
27589353
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPReceiver.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
int
main
(
int
argc
,
char
**
argv
)
{
try
{
if
(
argc
>
1
&&
(
!
strcmp
(
argv
[
1
],
"--help"
)
||
!
strcmp
(
argv
[
1
],
"-h"
))
)
{
cout
<<
"--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>"
<<
endl
;
cout
<<
"--confile filename - configuration file. Default: configure.xml"
<<
endl
;
cout
<<
"--udp-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
endl
;
UDPReceiver
::
help_print
(
argc
,
argv
);
return
0
;
}
string
confile
=
UniSetTypes
::
getArgParam
(
"--confile"
,
argc
,
argv
,
"configure.xml"
);
conf
=
new
Configuration
(
argc
,
argv
,
confile
);
string
logfilename
(
conf
->
getArgParam
(
"--udp-logfile"
));
if
(
logfilename
.
empty
()
)
logfilename
=
"udpexchange.log"
;
conf
->
initDebug
(
dlog
,
"dlog"
);
std
::
ostringstream
logname
;
string
dir
(
conf
->
getLogDir
());
logname
<<
dir
<<
logfilename
;
unideb
.
logFile
(
logname
.
str
()
);
dlog
.
logFile
(
logname
.
str
()
);
ObjectId
shmID
=
DefaultObjectId
;
string
sID
=
conf
->
getArgParam
(
"--smemory-id"
);
if
(
!
sID
.
empty
()
)
shmID
=
conf
->
getControllerID
(
sID
);
else
shmID
=
getSharedMemoryID
();
if
(
shmID
==
DefaultObjectId
)
{
cerr
<<
sID
<<
"? SharedMemoryID not found in "
<<
conf
->
getControllersSection
()
<<
" section"
<<
endl
;
return
1
;
}
UDPReceiver
*
udp
=
UDPReceiver
::
init_udpreceiver
(
argc
,
argv
,
shmID
);
if
(
!
udp
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpreceiver): init не прошёл..."
<<
endl
;
return
1
;
}
ObjectsActivator
act
;
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
udp
));
SystemMessage
sm
(
SystemMessage
::
StartUp
);
act
.
broadcast
(
sm
.
transport_msg
()
);
unideb
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDPRecevier START -------------------------
\n\n
"
;
dlog
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDPReceiver START -------------------------
\n\n
"
;
act
.
run
(
false
);
// msleep(500);
// rs->execute();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): "
<<
ex
<<
std
::
endl
;
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
s
.
str
()
<<
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): catch ..."
<<
std
::
endl
;
}
return
0
;
}
extensions/UNet2/unetsender.cc
deleted
100644 → 0
View file @
27589353
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPSender.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
int
main
(
int
argc
,
char
**
argv
)
{
try
{
if
(
argc
>
1
&&
(
!
strcmp
(
argv
[
1
],
"--help"
)
||
!
strcmp
(
argv
[
1
],
"-h"
))
)
{
cout
<<
"--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>"
<<
endl
;
cout
<<
"--confile filename - configuration file. Default: configure.xml"
<<
endl
;
cout
<<
"--udp-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
endl
;
UDPSender
::
help_print
(
argc
,
argv
);
return
0
;
}
string
confile
=
UniSetTypes
::
getArgParam
(
"--confile"
,
argc
,
argv
,
"configure.xml"
);
conf
=
new
Configuration
(
argc
,
argv
,
confile
);
string
logfilename
(
conf
->
getArgParam
(
"--udp-logfile"
));
if
(
logfilename
.
empty
()
)
logfilename
=
"udpexchange.log"
;
conf
->
initDebug
(
dlog
,
"dlog"
);
std
::
ostringstream
logname
;
string
dir
(
conf
->
getLogDir
());
logname
<<
dir
<<
logfilename
;
unideb
.
logFile
(
logname
.
str
()
);
dlog
.
logFile
(
logname
.
str
()
);
ObjectId
shmID
=
DefaultObjectId
;
string
sID
=
conf
->
getArgParam
(
"--smemory-id"
);
if
(
!
sID
.
empty
()
)
shmID
=
conf
->
getControllerID
(
sID
);
else
shmID
=
getSharedMemoryID
();
if
(
shmID
==
DefaultObjectId
)
{
cerr
<<
sID
<<
"? SharedMemoryID not found in "
<<
conf
->
getControllersSection
()
<<
" section"
<<
endl
;
return
1
;
}
UDPSender
*
udp
=
UDPSender
::
init_udpsender
(
argc
,
argv
,
shmID
);
if
(
!
udp
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): init не прошёл..."
<<
endl
;
return
1
;
}
ObjectsActivator
act
;
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
udp
));
SystemMessage
sm
(
SystemMessage
::
StartUp
);
act
.
broadcast
(
sm
.
transport_msg
()
);
unideb
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDPSender START -------------------------
\n\n
"
;
dlog
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDPSender START -------------------------
\n\n
"
;
act
.
run
(
false
);
// msleep(500);
// rs->execute();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): "
<<
ex
<<
std
::
endl
;
}
catch
(
ost
::
SockException
&
e
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): catch ..."
<<
std
::
endl
;
}
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment