Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
U
uniset2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UniSet project repositories
uniset2
Commits
71444865
Commit
71444865
authored
Dec 12, 2010
by
Pavel Vainerman
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Заложил основание для UNet2
parent
212cf7fb
Show whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
2405 additions
and
0 deletions
+2405
-0
Makefile.am
extensions/UNet2/Makefile.am
+45
-0
UNetExchange.cc
extensions/UNet2/UNetExchange.cc
+637
-0
UNetExchange.h
extensions/UNet2/UNetExchange.h
+119
-0
UNetPacket.cc
extensions/UNet2/UNetPacket.cc
+38
-0
UNetPacket.h
extensions/UNet2/UNetPacket.h
+50
-0
UNetReceiver.cc
extensions/UNet2/UNetReceiver.cc
+461
-0
UNetReceiver.h
extensions/UNet2/UNetReceiver.h
+83
-0
UNetSender.cc
extensions/UNet2/UNetSender.cc
+584
-0
UNetSender.h
extensions/UNet2/UNetSender.h
+113
-0
create_links.sh
extensions/UNet2/create_links.sh
+6
-0
start_fg.sh
extensions/UNet2/start_fg.sh
+7
-0
unetexchange.cc
extensions/UNet2/unetexchange.cc
+84
-0
unetreceiver.cc
extensions/UNet2/unetreceiver.cc
+90
-0
unetsender.cc
extensions/UNet2/unetsender.cc
+88
-0
No files found.
extensions/UNet2/Makefile.am
0 → 100644
View file @
71444865
bin_PROGRAMS
=
@PACKAGE@-unet-sender @PACKAGE@-unet-receiver
#@PACKAGE@-unetexchange
lib_LTLIBRARIES
=
libUniSetUNet2.la
libUniSetUNet2_la_LIBADD
=
$(top_builddir)
/lib/libUniSet.la
\
$(top_builddir)
/extensions/SharedMemory/libUniSetSharedMemory.la
\
$(top_builddir)
/extensions/lib/libUniSetExtensions.la
\
$(SIGC_LIBS)
$(COMCPP_LIBS)
libUniSetUNet2_la_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
libUniSetUNet2_la_SOURCES
=
UNetPacket.cc UNetSender.cc UNetReceiver.cc
#UNetExchange.cc
#UNetSender.cc
#@PACKAGE@_unetexchange_SOURCES = unetexchange.cc
#@PACKAGE@_unetexchange_LDADD = libUniSetUNet.la $(top_builddir)/lib/libUniSet.la \
# $(top_builddir)/extensions/SharedMemory/libUniSetSharedMemory.la \
# $(top_builddir)/extensions/lib/libUniSetExtensions.la \
# $(SIGC_LIBS) $(COMCPP_LIBS)
#@PACKAGE@_unetexchange_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@
_unet_sender_SOURCES
=
unetsender.cc
@PACKAGE@
_unet_sender_LDADD
=
libUniSetUNet2.la
$(top_builddir)
/lib/libUniSet.la
\
$(top_builddir)
/extensions/SharedMemory/libUniSetSharedMemory.la
\
$(top_builddir)
/extensions/lib/libUniSetExtensions.la
\
$(SIGC_LIBS)
$(COMCPP_LIBS)
@PACKAGE@
_unet_sender_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
@PACKAGE@
_unet_receiver_SOURCES
=
unetreceiver.cc
@PACKAGE@
_unet_receiver_LDADD
=
libUniSetUNet2.la
$(top_builddir)
/lib/libUniSet.la
\
$(top_builddir)
/extensions/SharedMemory/libUniSetSharedMemory.la
\
$(top_builddir)
/extensions/lib/libUniSetExtensions.la
\
$(SIGC_LIBS)
$(COMCPP_LIBS)
@PACKAGE@
_unet_receiver_CXXFLAGS
=
-I
$(top_builddir)
/extensions/include
-I
$(top_builddir)
/extensions/SharedMemory
$(SIGC_CFLAGS)
$(COMCPP_CFLAGS)
# install
#devel_include_HEADERS = *.h
#devel_includedir = $(pkgincludedir)/extensions
#pkgconfigdir = $(libdir)/pkgconfig
#pkgconfig_DATA = libUniSetUNet.pc
all-local
:
ln
-sf
../UNetExchange/
$(devel_include_HEADERS)
../include
extensions/UNet2/UNetExchange.cc
0 → 100644
View file @
71444865
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetExchange.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
UNetExchange
::
UNetExchange
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmId
,
SharedMemory
*
ic
)
:
UniSetObject_LT
(
objId
),
shm
(
0
),
initPause
(
0
),
UNet
(
0
),
activated
(
false
),
dlist
(
100
),
maxItem
(
0
)
{
if
(
objId
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): objId=-1?!! Use --UNet-name"
);
// xmlNode* cnode = conf->getNode(myname);
cnode
=
conf
->
getNode
(
myname
);
if
(
cnode
==
NULL
)
throw
UniSetTypes
::
SystemError
(
"(UNetExchange): Not find conf-node for "
+
myname
);
shm
=
new
SMInterface
(
shmId
,
&
ui
,
objId
,
ic
);
UniXML_iterator
it
(
cnode
);
// определяем фильтр
s_field
=
conf
->
getArgParam
(
"--UNet-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--UNet-filter-value"
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): read fileter-field='"
<<
s_field
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
s_host
=
conf
->
getArgParam
(
"--UNet-host"
,
it
.
getProp
(
"host"
));
if
(
s_host
.
empty
()
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetExchange): Unknown host. Use --UNet-host"
);
host
=
s_host
.
c_str
();
buildReceiverList
();
// port = conf->getArgInt("--UNet-port",it.getProp("port"));
if
(
port
<=
0
||
port
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetExchange): Unknown port address"
);
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
"(UNetExchange): UNet set to "
<<
s_host
<<
":"
<<
port
<<
endl
;
try
{
UNet
=
new
ost
::
UNetBroadcast
(
host
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
thr
=
new
ThreadCreator
<
UNetExchange
>
(
this
,
&
UNetExchange
::
poll
);
recvTimeout
=
conf
->
getArgPInt
(
"--UNet-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
sendTimeout
=
conf
->
getArgPInt
(
"--UNet-send-timeout"
,
it
.
getProp
(
"sendTimeout"
),
5000
);
polltime
=
conf
->
getArgPInt
(
"--UNet-polltime"
,
it
.
getProp
(
"polltime"
),
100
);
// -------------------------------
if
(
shm
->
isLocalwork
()
)
{
readConfiguration
();
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
}
else
ic
->
addReadItem
(
sigc
::
mem_fun
(
this
,
&
UNetExchange
::
readItem
)
);
// ********** HEARTBEAT *************
string
heart
=
conf
->
getArgParam
(
"--UNet-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
if
(
!
heart
.
empty
()
)
{
sidHeartBeat
=
conf
->
getSensorID
(
heart
);
if
(
sidHeartBeat
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
": не найден идентификатор для датчика 'HeartBeat' "
<<
heart
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
int
heartbeatTime
=
getHeartBeatTime
();
if
(
heartbeatTime
)
ptHeartBeat
.
setTiming
(
heartbeatTime
);
else
ptHeartBeat
.
setTiming
(
UniSetTimer
::
WaitUpTime
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--UNet-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
test_id
=
sidHeartBeat
;
}
else
{
test_id
=
conf
->
getSensorID
(
"TestMode_S"
);
if
(
test_id
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
"(init): test_id unknown. 'TestMode_S' not found..."
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): test_id="
<<
test_id
<<
endl
;
activateTimeout
=
conf
->
getArgPInt
(
"--activate-timeout"
,
20000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--UNet-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): UNet-timeout="
<<
msec
<<
" msec"
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetExchange
::~
UNetExchange
()
{
for
(
ReceiverList
::
iterator
it
=
rlist
.
begin
();
it
!=
rlist
.
end
();
it
++
)
delete
(
*
it
);
delete
UNet
;
delete
shm
;
delete
thr
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
waitSMReady
()
{
// waiting for SM is ready...
int
ready_timeout
=
conf
->
getArgInt
(
"--UNet-sm-ready-timeout"
,
"15000"
);
if
(
ready_timeout
==
0
)
ready_timeout
=
15000
;
else
if
(
ready_timeout
<
0
)
ready_timeout
=
UniSetTimer
::
WaitUpTime
;
if
(
!
shm
->
waitSMready
(
ready_timeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(waitSMReady): Не дождались готовности SharedMemory к работе в течение "
<<
ready_timeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
step
()
{
if
(
!
activated
)
return
;
if
(
sidHeartBeat
!=
DefaultObjectId
&&
ptHeartBeat
.
checkTime
()
)
{
try
{
shm
->
localSaveValue
(
aitHeartBeat
,
sidHeartBeat
,
maxHeartBeat
,
getId
());
ptHeartBeat
.
reset
();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
}
}
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
poll
()
{
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
for
(
ReceiverList
::
iterator
it
=
rlist
.
begin
();
it
!=
rlist
.
end
();
it
++
)
{
(
*
it
)
->
setReceiveTimeout
(
recvTimeout
);
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(poll): start exchange for "
<<
(
*
it
)
->
getName
()
<<
endl
;
(
*
it
)
->
start
();
}
ost
::
IPV4Broadcast
h
=
s_host
.
c_str
();
try
{
UNet
->
setPeer
(
h
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(poll): "
<<
s
.
str
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
while
(
activated
)
{
try
{
send
();
}
catch
(
ost
::
SockException
&
e
)
{
cerr
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(step): catch ..."
<<
std
::
endl
;
}
msleep
(
polltime
);
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
send
()
{
cout
<<
myname
<<
": send..."
<<
endl
;
/*
UniSetUNet::UNetHeader h;
h.nodeID = conf->getLocalNode();
h.procID = getId();
h.dcount = mypack.size();
if( UNet->isPending(ost::Socket::pendingOutput) )
{
ssize_t ret = UNet->send((char*)(&h),sizeof(h));
if( ret<(ssize_t)sizeof(h) )
{
cerr << myname << "(send data header): ret=" << ret << " sizeof=" << sizeof(h) << endl;
return;
}
*/
#warning use mutex for list!!!
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
it
=
mypack
.
dlist
.
begin
();
for
(
;
it
!=
mypack
.
dlist
.
end
();
++
it
)
{
// while( !UNet->isPending(ost::Socket::pendingOutput) )
// msleep(30);
cout
<<
myname
<<
"(send): "
<<
(
*
it
)
<<
endl
;
ssize_t
ret
=
UNet
->
send
((
char
*
)(
&
(
*
it
)),
sizeof
(
UniSetUNet
::
UNetData
));
if
(
ret
<
(
ssize_t
)
sizeof
(
UniSetUNet
::
UNetData
)
)
{
cerr
<<
myname
<<
"(send data): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
UniSetUNet
::
UNetData
)
<<
endl
;
break
;
}
}
// }
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
)
{
try
{
switch
(
msg
->
type
)
{
case
UniSetTypes
:
:
Message
::
SysCommand
:
{
UniSetTypes
::
SystemMessage
sm
(
msg
);
sysCommand
(
&
sm
);
}
break
;
case
Message
:
:
SensorInfo
:
{
SensorMessage
sm
(
msg
);
sensorInfo
(
&
sm
);
}
break
;
default
:
break
;
}
}
catch
(
SystemError
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(SystemError): "
<<
ex
<<
std
::
endl
;
// throw SystemError(ex);
raise
(
SIGTERM
);
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): catch ...
\n
"
;
}
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
{
switch
(
sm
->
command
)
{
case
SystemMessage
:
:
StartUp
:
{
waitSMReady
();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep
(
initPause
);
PassiveTimer
ptAct
(
activateTimeout
);
while
(
!
activated
&&
!
ptAct
.
checkTime
()
)
{
cout
<<
myname
<<
"(sysCommand): wait activate..."
<<
endl
;
msleep
(
300
);
if
(
activated
)
break
;
}
if
(
!
activated
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(sysCommand): ************* don`t activate?! ************"
<<
endl
;
{
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
10000
);
askSensors
(
UniversalIO
::
UIONotify
);
}
thr
->
start
();
}
case
SystemMessage
:
:
FoldUp
:
case
SystemMessage
:
:
Finish
:
askSensors
(
UniversalIO
::
UIODontNotify
);
break
;
case
SystemMessage
:
:
WatchDog
:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetExchange запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetExchange)
if
(
shm
->
isLocalwork
()
)
break
;
askSensors
(
UniversalIO
::
UIONotify
);
}
break
;
case
SystemMessage
:
:
LogRotate
:
{
// переоткрываем логи
unideb
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
string
fname
=
unideb
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
unideb
.
logFile
(
fname
);
unideb
<<
myname
<<
"(sysCommand): ***************** UNIDEB LOG ROTATE *****************"
<<
std
::
endl
;
}
dlog
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
fname
=
dlog
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
dlog
.
logFile
(
fname
);
dlog
<<
myname
<<
"(sysCommand): ***************** dlog LOG ROTATE *****************"
<<
std
::
endl
;
}
}
break
;
default
:
break
;
}
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
askSensors
(
UniversalIO
::
UIOCommand
cmd
)
{
if
(
!
shm
->
waitSMworking
(
test_id
,
activateTimeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<<
activateTimeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
kill
(
SIGTERM
,
getpid
());
// прерываем (перезапускаем) процесс...
throw
SystemError
(
err
.
str
());
}
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
try
{
shm
->
askSensor
(
it
->
si
.
id
,
cmd
);
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(askSensors): "
<<
ex
<<
std
::
endl
;
}
catch
(...){}
}
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
)
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
if
(
it
->
si
.
id
==
sm
->
id
)
{
uniset_spin_lock
lock
(
it
->
val_lock
);
it
->
val
=
sm
->
value
;
if
(
it
->
pack_it
!=
mypack
.
dlist
.
end
()
)
it
->
pack_it
->
val
=
sm
->
value
;
}
break
;
}
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
activateObject
()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated
=
false
;
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
5000
);
UniSetObject_LT
::
activateObject
();
initIterators
();
activated
=
true
;
}
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
sigterm
(
int
signo
)
{
cerr
<<
myname
<<
": ********* SIGTERM("
<<
signo
<<
") ********"
<<
endl
;
activated
=
false
;
UNet
->
disconnect
();
for
(
ReceiverList
::
iterator
it
=
rlist
.
begin
();
it
!=
rlist
.
end
();
it
++
)
(
*
it
)
->
stop
();
UniSetObject_LT
::
sigterm
(
signo
);
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
readConfiguration
()
{
#warning Сделать сортировку по диапазонам адресов!!!
// чтобы запрашивать одним запросом, сразу несколько входов...
// readconf_ok = false;
xmlNode
*
root
=
conf
->
getXMLSensorsSection
();
if
(
!
root
)
{
ostringstream
err
;
err
<<
myname
<<
"(readConfiguration): не нашли корневого раздела <sensors>"
;
throw
SystemError
(
err
.
str
());
}
UniXML_iterator
it
(
root
);
if
(
!
it
.
goChildren
()
)
{
std
::
cerr
<<
myname
<<
"(readConfiguration): раздел <sensors> не содержит секций ?!!
\n
"
;
return
;
}
for
(
;
it
.
getCurrent
();
it
.
goNext
()
)
{
if
(
check_item
(
it
)
)
initItem
(
it
);
}
// readconf_ok = true;
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
check_item
(
UniXML_iterator
&
it
)
{
if
(
s_field
.
empty
()
)
return
true
;
// просто проверка на не пустой field
if
(
s_fvalue
.
empty
()
&&
it
.
getProp
(
s_field
).
empty
()
)
return
false
;
// просто проверка что field = value
if
(
!
s_fvalue
.
empty
()
&&
it
.
getProp
(
s_field
)
!=
s_fvalue
)
return
false
;
return
true
;
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
readItem
(
UniXML
&
xml
,
UniXML_iterator
&
it
,
xmlNode
*
sec
)
{
if
(
check_item
(
it
)
)
initItem
(
it
);
return
true
;
}
// ------------------------------------------------------------------------------------------
bool
UNetExchange
::
initItem
(
UniXML_iterator
&
it
)
{
string
sname
(
it
.
getProp
(
"name"
)
);
string
tid
=
it
.
getProp
(
"id"
);
ObjectId
sid
;
if
(
!
tid
.
empty
()
)
{
sid
=
UniSetTypes
::
uni_atoi
(
tid
);
if
(
sid
<=
0
)
sid
=
DefaultObjectId
;
}
else
sid
=
conf
->
getSensorID
(
sname
);
if
(
sid
==
DefaultObjectId
)
{
if
(
dlog
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(readItem): ID not found for "
<<
sname
<<
endl
;
return
false
;
}
UItem
p
;
p
.
si
.
id
=
sid
;
p
.
si
.
node
=
conf
->
getLocalNode
();
mypack
.
addData
(
sid
,
0
);
p
.
pack_it
=
(
mypack
.
dlist
.
end
()
--
);
if
(
maxItem
>=
dlist
.
size
()
)
dlist
.
resize
(
maxItem
+
10
);
dlist
[
maxItem
]
=
p
;
maxItem
++
;
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(initItem): add "
<<
p
<<
endl
;
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetExchange
::
initIterators
()
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
it
++
)
{
shm
->
initDIterator
(
it
->
dit
);
shm
->
initAIterator
(
it
->
ait
);
}
shm
->
initAIterator
(
aitHeartBeat
);
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
help_print
(
int
argc
,
char
*
argv
[]
)
{
cout
<<
"--UNet-polltime msec - Пауза между опросаом карт. По умолчанию 200 мсек."
<<
endl
;
cout
<<
"--UNet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--UNet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--UNet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--UNet-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--UNet-notRespondSensor - датчик связи для данного процесса "
<<
endl
;
cout
<<
"--UNet-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола UNet: "
<<
endl
;
cout
<<
"--UNet-host [ip|hostname] - Адрес сервера"
<<
endl
;
cout
<<
"--UNet-send-timeout - Таймаут на посылку ответа."
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetExchange
*
UNetExchange
::
init_UNetexchange
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
{
string
name
=
conf
->
getArgParam
(
"--UNet-name"
,
"UNetExchange1"
);
if
(
name
.
empty
()
)
{
cerr
<<
"(UNetexchange): Не задан name'"
<<
endl
;
return
0
;
}
ObjectId
ID
=
conf
->
getObjectID
(
name
);
if
(
ID
==
UniSetTypes
::
DefaultObjectId
)
{
cerr
<<
"(UNetexchange): идентификатор '"
<<
name
<<
"' не найден в конф. файле!"
<<
" в секции "
<<
conf
->
getObjectsSection
()
<<
endl
;
return
0
;
}
dlog
[
Debug
::
INFO
]
<<
"(rsexchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
return
new
UNetExchange
(
ID
,
icID
,
ic
);
}
// -----------------------------------------------------------------------------
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetExchange
::
UItem
&
p
)
{
return
os
<<
" sid="
<<
p
.
si
.
id
;
}
// -----------------------------------------------------------------------------
void
UNetExchange
::
buildReceiverList
()
{
xmlNode
*
n
=
conf
->
getXMLNodesSection
();
if
(
!
n
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(buildReceiverList): <nodes> not found! ignore..."
<<
endl
;
return
;
}
UniXML_iterator
it
(
n
);
if
(
!
it
.
goChildren
()
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(buildReceiverList): <nodes> is empty?! ignore..."
<<
endl
;
return
;
}
for
(
;
it
.
getCurrent
();
it
.
goNext
()
)
{
ObjectId
n_id
=
conf
->
getNodeID
(
it
.
getProp
(
"name"
)
);
if
(
n_id
==
conf
->
getLocalNode
()
)
{
port
=
it
.
getIntProp
(
"UNet_port"
);
if
(
port
<=
0
)
port
=
n_id
;
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(buildReceiverList): init myport port="
<<
port
<<
endl
;
continue
;
}
int
p
=
it
.
getIntProp
(
"UNet_port"
);
if
(
p
<=
0
)
p
=
n_id
;
if
(
p
==
DefaultObjectId
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(buildReceiverList): node="
<<
it
.
getProp
(
"name"
)
<<
" unknown port. ignore..."
<<
endl
;
continue
;
}
UNetNReceiver
*
r
=
new
UNetNReceiver
(
p
,
host
,
shm
->
getSMID
(),
shm
->
SM
());
rlist
.
push_back
(
r
);
}
}
// ------------------------------------------------------------------------------------------
extensions/UNet2/UNetExchange.h
0 → 100644
View file @
71444865
#ifndef UNetExchange_H_
#define UNetExchange_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <vector>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNetPacket.h"
#include "UNetNReceiver.h"
// -----------------------------------------------------------------------------
class
UNetExchange
:
public
UniSetObject_LT
{
public
:
UNetExchange
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
virtual
~
UNetExchange
();
/*! глобальная функция для инициализации объекта */
static
UNetExchange
*
init_UNetexchange
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
/*! глобальная функция для вывода help-а */
static
void
help_print
(
int
argc
,
char
*
argv
[]
);
struct
UItem
{
UItem
()
:
val
(
0
)
{}
IOController_i
::
SensorInfo
si
;
IOController
::
AIOStateList
::
iterator
ait
;
IOController
::
DIOStateList
::
iterator
dit
;
UniSetTypes
::
uniset_spin_mutex
val_lock
;
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
pack_it
;
long
val
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UItem
&
p
);
};
protected
:
xmlNode
*
cnode
;
std
::
string
s_field
;
std
::
string
s_fvalue
;
SMInterface
*
shm
;
void
poll
();
void
recv
();
void
send
();
void
step
();
virtual
void
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
);
void
sysCommand
(
UniSetTypes
::
SystemMessage
*
msg
);
void
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
);
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
void
waitSMReady
();
virtual
bool
activateObject
();
// действия при завершении работы
virtual
void
sigterm
(
int
signo
);
void
initIterators
();
bool
initItem
(
UniXML_iterator
&
it
);
bool
readItem
(
UniXML
&
xml
,
UniXML_iterator
&
it
,
xmlNode
*
sec
);
void
readConfiguration
();
bool
check_item
(
UniXML_iterator
&
it
);
void
buildReceiverList
();
private
:
UNetExchange
();
bool
initPause
;
UniSetTypes
::
uniset_mutex
mutex_start
;
PassiveTimer
ptHeartBeat
;
UniSetTypes
::
ObjectId
sidHeartBeat
;
int
maxHeartBeat
;
IOController
::
AIOStateList
::
iterator
aitHeartBeat
;
UniSetTypes
::
ObjectId
test_id
;
int
polltime
;
/*!< переодичность обновления данных, [мсек] */
ost
::
UNetBroadcast
*
UNet
;
ost
::
IPV4Host
host
;
ost
::
tpport_t
port
;
std
::
string
s_host
;
UniSetTypes
::
uniset_mutex
pollMutex
;
Trigger
trTimeout
;
int
recvTimeout
;
int
sendTimeout
;
bool
activated
;
int
activateTimeout
;
UniSetUNet
::
UNetMessage
mypack
;
typedef
std
::
vector
<
UItem
>
DMap
;
DMap
dlist
;
int
maxItem
;
typedef
std
::
list
<
UNetNReceiver
*>
ReceiverList
;
ReceiverList
rlist
;
ThreadCreator
<
UNetExchange
>*
thr
;
};
// -----------------------------------------------------------------------------
#endif // UNetExchange_H_
// -----------------------------------------------------------------------------
extensions/UNet2/UNetPacket.cc
0 → 100644
View file @
71444865
#include "UNetPacket.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetUNet
;
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUNet
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUNet
::
UNetHeader
&
p
)
{
return
os
<<
"nodeID="
<<
p
.
nodeID
<<
" procID="
<<
p
.
procID
<<
" dcount="
<<
p
.
dcount
;
}
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUNet
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUNet
::
UNetData
&
p
)
{
return
os
<<
"id="
<<
p
.
id
<<
" val="
<<
p
.
val
;
}
// -----------------------------------------------------------------------------
std
::
ostream
&
UniSetUNet
::
operator
<<
(
std
::
ostream
&
os
,
UniSetUNet
::
UNetMessage
&
p
)
{
return
os
;
}
// -----------------------------------------------------------------------------
UNetMessage
::
UNetMessage
()
{
}
// -----------------------------------------------------------------------------
void
UNetMessage
::
addData
(
const
UniSetUNet
::
UNetData
&
dat
)
{
dlist
.
push_back
(
dat
);
}
// -----------------------------------------------------------------------------
void
UNetMessage
::
addData
(
long
id
,
long
val
)
{
UNetData
d
(
id
,
val
);
addData
(
d
);
}
// -----------------------------------------------------------------------------
extensions/UNet2/UNetPacket.h
0 → 100644
View file @
71444865
// $Id: UNetPacket.h,v 1.1 2009/02/10 20:38:27 vpashka Exp $
// -----------------------------------------------------------------------------
#ifndef UNetPacket_H_
#define UNetPacket_H_
// -----------------------------------------------------------------------------
#include <list>
#include <ostream>
#include "UniSetTypes.h"
// -----------------------------------------------------------------------------
namespace
UniSetUNet
{
struct
UNetHeader
{
long
nodeID
;
long
procID
;
long
dcount
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetHeader
&
p
);
}
__attribute__
((
packed
));
struct
UNetData
{
UNetData
()
:
id
(
UniSetTypes
::
DefaultObjectId
),
val
(
0
){}
UNetData
(
long
id
,
long
val
)
:
id
(
id
),
val
(
val
){}
long
id
;
long
val
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetData
&
p
);
}
__attribute__
((
packed
));
struct
UNetMessage
:
public
UNetHeader
{
UNetMessage
();
void
addData
(
const
UNetData
&
dat
);
void
addData
(
long
id
,
long
val
);
inline
int
size
(){
return
dlist
.
size
();
}
typedef
std
::
list
<
UNetData
>
UNetDataList
;
UNetDataList
dlist
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetMessage
&
p
);
};
}
// -----------------------------------------------------------------------------
#endif // UNetPacket_H_
// -----------------------------------------------------------------------------
extensions/UNet2/UNetReceiver.cc
0 → 100644
View file @
71444865
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetReceiver.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
UNetReceiver
::
UNetReceiver
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmId
,
SharedMemory
*
ic
)
:
UniSetObject_LT
(
objId
),
shm
(
0
),
initPause
(
0
),
udp
(
0
),
activated
(
false
)
{
if
(
objId
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
"(UNetReceiver): objId=-1?!! Use --udp-name"
);
// xmlNode* cnode = conf->getNode(myname);
cnode
=
conf
->
getNode
(
myname
);
if
(
cnode
==
NULL
)
throw
UniSetTypes
::
SystemError
(
"(UNetReceiver): Not find conf-node for "
+
myname
);
shm
=
new
SMInterface
(
shmId
,
&
ui
,
objId
,
ic
);
UniXML_iterator
it
(
cnode
);
// определяем фильтр
s_field
=
conf
->
getArgParam
(
"--udp-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--udp-filter-value"
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): read fileter-field='"
<<
s_field
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
string
s_host
=
conf
->
getArgParam
(
"--udp-host"
,
it
.
getProp
(
"host"
));
if
(
s_host
.
empty
()
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetReceiver): Unknown host. Use --udp-host"
);
port
=
conf
->
getArgInt
(
"--udp-port"
,
it
.
getProp
(
"port"
));
if
(
port
<=
0
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetReceiver): Unknown port address. Use --udp-port"
);
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
"(UNetReceiver): UNet set to "
<<
s_host
<<
":"
<<
port
<<
endl
;
host
=
s_host
.
c_str
();
try
{
udp
=
new
ost
::
UNetDuplex
(
host
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
thr
=
new
ThreadCreator
<
UNetReceiver
>
(
this
,
&
UNetReceiver
::
poll
);
recvTimeout
=
conf
->
getArgPInt
(
"--udp-recv-timeout"
,
it
.
getProp
(
"recvTimeout"
),
5000
);
polltime
=
conf
->
getArgPInt
(
"--udp-polltime"
,
it
.
getProp
(
"polltime"
),
100
);
// -------------------------------
// ********** HEARTBEAT *************
string
heart
=
conf
->
getArgParam
(
"--udp-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
if
(
!
heart
.
empty
()
)
{
sidHeartBeat
=
conf
->
getSensorID
(
heart
);
if
(
sidHeartBeat
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
": не найден идентификатор для датчика 'HeartBeat' "
<<
heart
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
int
heartbeatTime
=
getHeartBeatTime
();
if
(
heartbeatTime
)
ptHeartBeat
.
setTiming
(
heartbeatTime
);
else
ptHeartBeat
.
setTiming
(
UniSetTimer
::
WaitUpTime
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--udp-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
test_id
=
sidHeartBeat
;
}
else
{
test_id
=
conf
->
getSensorID
(
"TestMode_S"
);
if
(
test_id
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
"(init): test_id unknown. 'TestMode_S' not found..."
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): test_id="
<<
test_id
<<
endl
;
activateTimeout
=
conf
->
getArgPInt
(
"--activate-timeout"
,
20000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--udp-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): udp-timeout="
<<
msec
<<
" msec"
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetReceiver
::~
UNetReceiver
()
{
delete
udp
;
delete
shm
;
delete
thr
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
waitSMReady
()
{
// waiting for SM is ready...
int
ready_timeout
=
conf
->
getArgInt
(
"--udp-sm-ready-timeout"
,
"15000"
);
if
(
ready_timeout
==
0
)
ready_timeout
=
15000
;
else
if
(
ready_timeout
<
0
)
ready_timeout
=
UniSetTimer
::
WaitUpTime
;
if
(
!
shm
->
waitSMready
(
ready_timeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(waitSMReady): Не дождались готовности SharedMemory к работе в течение "
<<
ready_timeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
// -----------------------------------------------------------------------------
/*
void UNetReceiver::timerInfo( TimerMessage *tm )
{
if( tm->id == tmExchange )
step();
}
*/
// -----------------------------------------------------------------------------
void
UNetReceiver
::
step
()
{
// {
// uniset_mutex_lock l(pollMutex,2000);
// poll();
// }
if
(
!
activated
)
return
;
if
(
sidHeartBeat
!=
DefaultObjectId
&&
ptHeartBeat
.
checkTime
()
)
{
try
{
shm
->
localSaveValue
(
aitHeartBeat
,
sidHeartBeat
,
maxHeartBeat
,
getId
());
ptHeartBeat
.
reset
();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
}
}
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
poll
()
{
try
{
// udp->connect(host,port);
// udp->UNetSocket::setPeer(host,port);
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step): "
<<
ex
<<
std
::
endl
;
// reise(SIGTERM);
return
;
}
while
(
activated
)
{
try
{
recv
();
// send();
}
catch
(
ost
::
SockException
&
e
)
{
cerr
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(step): catch ..."
<<
std
::
endl
;
}
msleep
(
polltime
);
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
recv
()
{
cout
<<
myname
<<
": recv....(timeout="
<<
recvTimeout
<<
")"
<<
endl
;
UniSetUNet
::
UNetHeader
h
;
// receive
if
(
udp
->
isInputReady
(
recvTimeout
)
)
{
ssize_t
ret
=
udp
->
UNetReceive
::
receive
(
&
h
,
sizeof
(
h
));
if
(
ret
<
(
ssize_t
)
sizeof
(
h
)
)
{
cerr
<<
myname
<<
"(receive): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
h
)
<<
endl
;
return
;
}
cout
<<
myname
<<
"(receive): header: "
<<
h
<<
endl
;
if
(
h
.
dcount
<=
0
)
{
cout
<<
" data=0"
<<
endl
;
return
;
}
UniSetUNet
::
UNetData
d
;
// ignore echo...
#if 0
if( h.nodeID == conf->getLocalNode() && h.procID == getId() )
{
for( int i=0; i<h.dcount;i++ )
{
ssize_t ret = udp->UNetReceive::receive(&d,sizeof(d));
if( ret < (ssize_t)sizeof(d) )
return;
}
return;
}
#endif
for
(
int
i
=
0
;
i
<
h
.
dcount
;
i
++
)
{
ssize_t
ret
=
udp
->
UNetReceive
::
receive
(
&
d
,
sizeof
(
d
));
if
(
ret
<
(
ssize_t
)
sizeof
(
d
)
)
{
cerr
<<
myname
<<
"(receive data "
<<
i
<<
"): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
d
)
<<
endl
;
break
;
}
cout
<<
myname
<<
"(receive data "
<<
i
<<
"): "
<<
d
<<
endl
;
}
}
// else
// {
// cout << "no InputReady.." << endl;
// }
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
)
{
try
{
switch
(
msg
->
type
)
{
case
UniSetTypes
:
:
Message
::
SysCommand
:
{
UniSetTypes
::
SystemMessage
sm
(
msg
);
sysCommand
(
&
sm
);
}
break
;
case
Message
:
:
SensorInfo
:
{
SensorMessage
sm
(
msg
);
sensorInfo
(
&
sm
);
}
break
;
default
:
break
;
}
}
catch
(
SystemError
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(SystemError): "
<<
ex
<<
std
::
endl
;
// throw SystemError(ex);
raise
(
SIGTERM
);
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): catch ...
\n
"
;
}
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
{
switch
(
sm
->
command
)
{
case
SystemMessage
:
:
StartUp
:
{
waitSMReady
();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep
(
initPause
);
PassiveTimer
ptAct
(
activateTimeout
);
while
(
!
activated
&&
!
ptAct
.
checkTime
()
)
{
cout
<<
myname
<<
"(sysCommand): wait activate..."
<<
endl
;
msleep
(
300
);
if
(
activated
)
break
;
}
if
(
!
activated
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(sysCommand): ************* don`t activate?! ************"
<<
endl
;
{
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
10000
);
askSensors
(
UniversalIO
::
UIONotify
);
}
thr
->
start
();
}
case
SystemMessage
:
:
FoldUp
:
case
SystemMessage
:
:
Finish
:
askSensors
(
UniversalIO
::
UIODontNotify
);
break
;
case
SystemMessage
:
:
WatchDog
:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetReceiver запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetReceiver)
if
(
shm
->
isLocalwork
()
)
break
;
askSensors
(
UniversalIO
::
UIONotify
);
}
break
;
case
SystemMessage
:
:
LogRotate
:
{
// переоткрываем логи
unideb
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
string
fname
=
unideb
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
unideb
.
logFile
(
fname
);
unideb
<<
myname
<<
"(sysCommand): ***************** UNIDEB LOG ROTATE *****************"
<<
std
::
endl
;
}
dlog
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
fname
=
dlog
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
dlog
.
logFile
(
fname
);
dlog
<<
myname
<<
"(sysCommand): ***************** dlog LOG ROTATE *****************"
<<
std
::
endl
;
}
}
break
;
default
:
break
;
}
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
askSensors
(
UniversalIO
::
UIOCommand
cmd
)
{
if
(
!
shm
->
waitSMworking
(
test_id
,
activateTimeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<<
activateTimeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
kill
(
SIGTERM
,
getpid
());
// прерываем (перезапускаем) процесс...
throw
SystemError
(
err
.
str
());
}
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
)
{
}
// ------------------------------------------------------------------------------------------
bool
UNetReceiver
::
activateObject
()
{
// блокирование обработки Starsp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated
=
false
;
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
5000
);
UniSetObject_LT
::
activateObject
();
initIterators
();
activated
=
true
;
}
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
sigterm
(
int
signo
)
{
cerr
<<
myname
<<
": ********* SIGTERM("
<<
signo
<<
") ********"
<<
endl
;
activated
=
false
;
udp
->
disconnect
();
UniSetObject_LT
::
sigterm
(
signo
);
}
// ------------------------------------------------------------------------------------------
void
UNetReceiver
::
initIterators
()
{
shm
->
initAIterator
(
aitHeartBeat
);
}
// -----------------------------------------------------------------------------
void
UNetReceiver
::
help_print
(
int
argc
,
char
*
argv
[]
)
{
cout
<<
"--udp-polltime msec - Пауза между опросаом карт. По умолчанию 200 мсек."
<<
endl
;
cout
<<
"--udp-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--udp-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--udp-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--udp-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--udp-notRespondSensor - датчик связи для данного процесса "
<<
endl
;
cout
<<
"--udp-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола RS: "
<<
endl
;
cout
<<
"--udp-dev devname - файл устройства"
<<
endl
;
cout
<<
"--udp-speed - Скорость обмена (9600,19920,38400,57600,115200)."
<<
endl
;
cout
<<
"--udp-my-addr - адрес текущего узла"
<<
endl
;
cout
<<
"--udp-recv-timeout - Таймаут на ожидание ответа."
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetReceiver
*
UNetReceiver
::
init_udpreceiver
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
{
string
name
=
conf
->
getArgParam
(
"--udp-name"
,
"UNetReceiver1"
);
if
(
name
.
empty
()
)
{
cerr
<<
"(udpexchange): Не задан name'"
<<
endl
;
return
0
;
}
ObjectId
ID
=
conf
->
getObjectID
(
name
);
if
(
ID
==
UniSetTypes
::
DefaultObjectId
)
{
cerr
<<
"(udpexchange): идентификатор '"
<<
name
<<
"' не найден в конф. файле!"
<<
" в секции "
<<
conf
->
getObjectsSection
()
<<
endl
;
return
0
;
}
dlog
[
Debug
::
INFO
]
<<
"(rsexchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
return
new
UNetReceiver
(
ID
,
icID
,
ic
);
}
// -----------------------------------------------------------------------------
extensions/UNet2/UNetReceiver.h
0 → 100644
View file @
71444865
#ifndef UNetReceiver_H_
#define UNetReceiver_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <vector>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNetPacket.h"
// -----------------------------------------------------------------------------
class
UNetReceiver
:
public
UniSetObject_LT
{
public
:
UNetReceiver
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
virtual
~
UNetReceiver
();
/*! глобальная функция для инициализации объекта */
static
UNetReceiver
*
init_udpreceiver
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
/*! глобальная функция для вывода help-а */
static
void
help_print
(
int
argc
,
char
*
argv
[]
);
protected
:
xmlNode
*
cnode
;
std
::
string
s_field
;
std
::
string
s_fvalue
;
SMInterface
*
shm
;
void
poll
();
void
recv
();
void
step
();
virtual
void
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
);
void
sysCommand
(
UniSetTypes
::
SystemMessage
*
msg
);
void
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
);
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
void
waitSMReady
();
virtual
bool
activateObject
();
// действия при завершении работы
virtual
void
sigterm
(
int
signo
);
void
initIterators
();
private
:
UNetReceiver
();
bool
initPause
;
UniSetTypes
::
uniset_mutex
mutex_start
;
PassiveTimer
ptHeartBeat
;
UniSetTypes
::
ObjectId
sidHeartBeat
;
int
maxHeartBeat
;
IOController
::
AIOStateList
::
iterator
aitHeartBeat
;
UniSetTypes
::
ObjectId
test_id
;
int
polltime
;
/*!< переодичность обновления данных, [мсек] */
ost
::
UNetDuplex
*
udp
;
ost
::
IPV4Host
host
;
ost
::
tpport_t
port
;
UniSetTypes
::
uniset_mutex
pollMutex
;
Trigger
trTimeout
;
int
recvTimeout
;
bool
activated
;
int
activateTimeout
;
ThreadCreator
<
UNetReceiver
>*
thr
;
};
// -----------------------------------------------------------------------------
#endif // UNetReceiver_H_
// -----------------------------------------------------------------------------
extensions/UNet2/UNetSender.cc
0 → 100644
View file @
71444865
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "UNetSender.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
UNetSender
::
UNetSender
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmId
,
SharedMemory
*
ic
)
:
UniSetObject_LT
(
objId
),
shm
(
0
),
initPause
(
0
),
tcp
(
0
),
activated
(
false
),
dlist
(
100
),
maxItem
(
0
)
{
if
(
objId
==
DefaultObjectId
)
throw
UniSetTypes
::
SystemError
(
"(UNetSender): objId=-1?!! Use --unet-name"
);
// xmlNode* cnode = conf->getNode(myname);
cnode
=
conf
->
getNode
(
myname
);
if
(
cnode
==
NULL
)
throw
UniSetTypes
::
SystemError
(
"(UNetSender): Not find conf-node for "
+
myname
);
shm
=
new
SMInterface
(
shmId
,
&
ui
,
objId
,
ic
);
UniXML_iterator
it
(
cnode
);
// определяем фильтр
s_field
=
conf
->
getArgParam
(
"--unet-filter-field"
);
s_fvalue
=
conf
->
getArgParam
(
"--unet-filter-value"
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): read fileter-field='"
<<
s_field
<<
"' filter-value='"
<<
s_fvalue
<<
"'"
<<
endl
;
// ---------- init RS ----------
// UniXML_iterator it(cnode);
string
s_host
=
conf
->
getArgParam
(
"--unet-host"
,
it
.
getProp
(
"host"
));
if
(
s_host
.
empty
()
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetSender): Unknown host. Use --unet-host"
);
port
=
conf
->
getArgInt
(
"--unet-port"
,
it
.
getProp
(
"port"
));
if
(
port
<=
0
)
throw
UniSetTypes
::
SystemError
(
myname
+
"(UNetSender): Unknown port address. Use --unet-port"
);
bool
no_broadcast
=
conf
->
getArgInt
(
"--unet-nobroadcast"
,
it
.
getProp
(
"no_broadcast"
));
host
=
s_host
.
c_str
();
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
"(UNetSender): UNet set to "
<<
s_host
<<
":"
<<
port
<<
" broadcast="
<<
broadcast
<<
endl
;
try
{
if
(
no_broadcast
)
tcp
=
new
ost
::
TCPSocket
();
else
tcp
=
new
ost
::
TCPBroadcast
(
host
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
s
.
str
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
thr
=
new
ThreadCreator
<
UNetSender
>
(
this
,
&
UNetSender
::
poll
);
sendTimeout
=
conf
->
getArgPInt
(
"--unet-send-timeout"
,
it
.
getProp
(
"sendTimeout"
),
5000
);
sendtime
=
conf
->
getArgPInt
(
"--unet-sendtime"
,
it
.
getProp
(
"sendtime"
),
100
);
// -------------------------------
if
(
shm
->
isLocalwork
()
)
{
readConfiguration
();
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
}
else
ic
->
addReadItem
(
sigc
::
mem_fun
(
this
,
&
UNetSender
::
readItem
)
);
// ********** HEARTBEAT *************
string
heart
=
conf
->
getArgParam
(
"--unet-heartbeat-id"
,
it
.
getProp
(
"heartbeat_id"
));
if
(
!
heart
.
empty
()
)
{
sidHeartBeat
=
conf
->
getSensorID
(
heart
);
if
(
sidHeartBeat
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
": не найден идентификатор для датчика 'HeartBeat' "
<<
heart
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
int
heartbeatTime
=
getHeartBeatTime
();
if
(
heartbeatTime
)
ptHeartBeat
.
setTiming
(
heartbeatTime
);
else
ptHeartBeat
.
setTiming
(
UniSetTimer
::
WaitUpTime
);
maxHeartBeat
=
conf
->
getArgPInt
(
"--unet-heartbeat-max"
,
it
.
getProp
(
"heartbeat_max"
),
10
);
test_id
=
sidHeartBeat
;
}
else
{
test_id
=
conf
->
getSensorID
(
"TestMode_S"
);
if
(
test_id
==
DefaultObjectId
)
{
ostringstream
err
;
err
<<
myname
<<
"(init): test_id unknown. 'TestMode_S' not found..."
;
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(init): "
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): test_id="
<<
test_id
<<
endl
;
activateTimeout
=
conf
->
getArgPInt
(
"--activate-timeout"
,
20000
);
timeout_t
msec
=
conf
->
getArgPInt
(
"--unet-timeout"
,
it
.
getProp
(
"timeout"
),
3000
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): udp-timeout="
<<
msec
<<
" msec"
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetSender
::~
UNetSender
()
{
delete
tcp
;
delete
shm
;
delete
thr
;
}
// -----------------------------------------------------------------------------
void
UNetSender
::
waitSMReady
()
{
// waiting for SM is ready...
int
ready_timeout
=
conf
->
getArgInt
(
"--unet-sm-ready-timeout"
,
"15000"
);
if
(
ready_timeout
==
0
)
ready_timeout
=
15000
;
else
if
(
ready_timeout
<
0
)
ready_timeout
=
UniSetTimer
::
WaitUpTime
;
if
(
!
shm
->
waitSMready
(
ready_timeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(waitSMReady): Не дождались готовности SharedMemory к работе в течение "
<<
ready_timeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
throw
SystemError
(
err
.
str
());
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
step
()
{
if
(
!
activated
)
return
;
if
(
sidHeartBeat
!=
DefaultObjectId
&&
ptHeartBeat
.
checkTime
()
)
{
try
{
shm
->
localSaveValue
(
aitHeartBeat
,
sidHeartBeat
,
maxHeartBeat
,
getId
());
ptHeartBeat
.
reset
();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(step): (hb) "
<<
ex
<<
std
::
endl
;
}
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
poll
()
{
dlist
.
resize
(
maxItem
);
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(init): dlist size = "
<<
dlist
.
size
()
<<
endl
;
ost
::
Thread
::
setException
(
ost
::
Thread
::
throwException
);
// cerr << "create new tcp..." << endl;
tcp
=
new
ost
::
TCPStream
(
iaddr
.
c_str
());
tcp
->
setTimeout
(
500
);
try
{
tcp
->
setPeer
(
host
,
port
);
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(poll): "
<<
s
.
str
()
<<
endl
;
throw
SystemError
(
s
.
str
());
}
while
(
activated
)
{
try
{
send
();
}
catch
(
ost
::
SockException
&
e
)
{
cerr
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
cerr
<<
myname
<<
"(step): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
cerr
<<
myname
<<
"(step): catch ..."
<<
std
::
endl
;
}
msleep
(
sendtime
);
}
cerr
<<
"************* execute FINISH **********"
<<
endl
;
}
// -----------------------------------------------------------------------------
void
UNetSender
::
send
()
{
cout
<<
myname
<<
": send..."
<<
endl
;
UniSetUNet
::
UNetHeader
h
;
h
.
nodeID
=
conf
->
getLocalNode
();
h
.
procID
=
getId
();
h
.
dcount
=
mypack
.
size
();
// receive
ssize_t
ret
=
tcp
->
send
((
char
*
)(
&
h
),
sizeof
(
h
));
if
(
ret
<
(
ssize_t
)
sizeof
(
h
)
)
{
cerr
<<
myname
<<
"(send data header): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
h
)
<<
endl
;
return
;
}
#warning use mutex for list!!!
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
it
=
mypack
.
dlist
.
begin
();
for
(
;
it
!=
mypack
.
dlist
.
end
();
++
it
)
{
cout
<<
myname
<<
"(send): "
<<
(
*
it
)
<<
endl
;
ssize_t
ret
=
tcp
->
send
((
char
*
)(
&
(
*
it
)),
sizeof
(
*
it
));
if
(
ret
<
(
ssize_t
)
sizeof
(
*
it
)
)
{
cerr
<<
myname
<<
"(send data): ret="
<<
ret
<<
" sizeof="
<<
sizeof
(
*
it
)
<<
endl
;
break
;
}
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
)
{
try
{
switch
(
msg
->
type
)
{
case
UniSetTypes
:
:
Message
::
SysCommand
:
{
UniSetTypes
::
SystemMessage
sm
(
msg
);
sysCommand
(
&
sm
);
}
break
;
case
Message
:
:
SensorInfo
:
{
SensorMessage
sm
(
msg
);
sensorInfo
(
&
sm
);
}
break
;
default
:
break
;
}
}
catch
(
SystemError
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(SystemError): "
<<
ex
<<
std
::
endl
;
// throw SystemError(ex);
raise
(
SIGTERM
);
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(processingMessage): catch ...
\n
"
;
}
}
// -----------------------------------------------------------------------------
void
UNetSender
::
sysCommand
(
UniSetTypes
::
SystemMessage
*
sm
)
{
switch
(
sm
->
command
)
{
case
SystemMessage
:
:
StartUp
:
{
waitSMReady
();
// подождать пока пройдёт инициализация датчиков
// см. activateObject()
msleep
(
initPause
);
PassiveTimer
ptAct
(
activateTimeout
);
while
(
!
activated
&&
!
ptAct
.
checkTime
()
)
{
cout
<<
myname
<<
"(sysCommand): wait activate..."
<<
endl
;
msleep
(
300
);
if
(
activated
)
break
;
}
if
(
!
activated
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(sysCommand): ************* don`t activate?! ************"
<<
endl
;
{
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
10000
);
askSensors
(
UniversalIO
::
UIONotify
);
}
thr
->
start
();
}
case
SystemMessage
:
:
FoldUp
:
case
SystemMessage
:
:
Finish
:
askSensors
(
UniversalIO
::
UIODontNotify
);
break
;
case
SystemMessage
:
:
WatchDog
:
{
// ОПТИМИЗАЦИЯ (защита от двойного перезаказа при старте)
// Если идёт локальная работа
// (т.е. UNetSender запущен в одном процессе с SharedMemory2)
// то обрабатывать WatchDog не надо, т.к. мы и так ждём готовности SM
// при заказе датчиков, а если SM вылетит, то вместе с этим процессом(UNetSender)
if
(
shm
->
isLocalwork
()
)
break
;
askSensors
(
UniversalIO
::
UIONotify
);
}
break
;
case
SystemMessage
:
:
LogRotate
:
{
// переоткрываем логи
unideb
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
string
fname
=
unideb
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
unideb
.
logFile
(
fname
);
unideb
<<
myname
<<
"(sysCommand): ***************** UNIDEB LOG ROTATE *****************"
<<
std
::
endl
;
}
dlog
<<
myname
<<
"(sysCommand): logRotate"
<<
std
::
endl
;
fname
=
dlog
.
getLogFile
();
if
(
!
fname
.
empty
()
)
{
dlog
.
logFile
(
fname
);
dlog
<<
myname
<<
"(sysCommand): ***************** dlog LOG ROTATE *****************"
<<
std
::
endl
;
}
}
break
;
default
:
break
;
}
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
askSensors
(
UniversalIO
::
UIOCommand
cmd
)
{
if
(
!
shm
->
waitSMworking
(
test_id
,
activateTimeout
,
50
)
)
{
ostringstream
err
;
err
<<
myname
<<
"(askSensors): Не дождались готовности(work) SharedMemory к работе в течение "
<<
activateTimeout
<<
" мсек"
;
dlog
[
Debug
::
CRIT
]
<<
err
.
str
()
<<
endl
;
kill
(
SIGTERM
,
getpid
());
// прерываем (перезапускаем) процесс...
throw
SystemError
(
err
.
str
());
}
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
try
{
shm
->
askSensor
(
it
->
si
.
id
,
cmd
);
}
catch
(
UniSetTypes
::
Exception
&
ex
)
{
dlog
[
Debug
::
WARN
]
<<
myname
<<
"(askSensors): "
<<
ex
<<
std
::
endl
;
}
catch
(...){}
}
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
)
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
++
it
)
{
if
(
it
->
si
.
id
==
sm
->
id
)
{
uniset_spin_lock
lock
(
it
->
val_lock
);
it
->
val
=
sm
->
value
;
if
(
it
->
pack_it
!=
mypack
.
dlist
.
end
()
)
it
->
pack_it
->
val
=
sm
->
value
;
}
break
;
}
}
// ------------------------------------------------------------------------------------------
bool
UNetSender
::
activateObject
()
{
// блокирование обработки StarUp
// пока не пройдёт инициализация датчиков
// см. sysCommand()
{
activated
=
false
;
UniSetTypes
::
uniset_mutex_lock
l
(
mutex_start
,
5000
);
UniSetObject_LT
::
activateObject
();
initIterators
();
activated
=
true
;
}
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
sigterm
(
int
signo
)
{
cerr
<<
myname
<<
": ********* SIGTERM("
<<
signo
<<
") ********"
<<
endl
;
activated
=
false
;
tcp
->
disconnect
();
UniSetObject_LT
::
sigterm
(
signo
);
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
readConfiguration
()
{
#warning Сделать сортировку по диапазонам адресов!!!
// чтобы запрашивать одним запросом, сразу несколько входов...
// readconf_ok = false;
xmlNode
*
root
=
conf
->
getXMLSensorsSection
();
if
(
!
root
)
{
ostringstream
err
;
err
<<
myname
<<
"(readConfiguration): не нашли корневого раздела <sensors>"
;
throw
SystemError
(
err
.
str
());
}
UniXML_iterator
it
(
root
);
if
(
!
it
.
goChildren
()
)
{
std
::
cerr
<<
myname
<<
"(readConfiguration): раздел <sensors> не содержит секций ?!!
\n
"
;
return
;
}
for
(
;
it
.
getCurrent
();
it
.
goNext
()
)
{
if
(
check_item
(
it
)
)
initItem
(
it
);
}
// readconf_ok = true;
}
// ------------------------------------------------------------------------------------------
bool
UNetSender
::
check_item
(
UniXML_iterator
&
it
)
{
if
(
s_field
.
empty
()
)
return
true
;
// просто проверка на не пустой field
if
(
s_fvalue
.
empty
()
&&
it
.
getProp
(
s_field
).
empty
()
)
return
false
;
// просто проверка что field = value
if
(
!
s_fvalue
.
empty
()
&&
it
.
getProp
(
s_field
)
!=
s_fvalue
)
return
false
;
return
true
;
}
// ------------------------------------------------------------------------------------------
bool
UNetSender
::
readItem
(
UniXML
&
xml
,
UniXML_iterator
&
it
,
xmlNode
*
sec
)
{
if
(
check_item
(
it
)
)
initItem
(
it
);
return
true
;
}
// ------------------------------------------------------------------------------------------
bool
UNetSender
::
initItem
(
UniXML_iterator
&
it
)
{
string
sname
(
it
.
getProp
(
"name"
)
);
string
tid
=
it
.
getProp
(
"id"
);
ObjectId
sid
;
if
(
!
tid
.
empty
()
)
{
sid
=
UniSetTypes
::
uni_atoi
(
tid
);
if
(
sid
<=
0
)
sid
=
DefaultObjectId
;
}
else
sid
=
conf
->
getSensorID
(
sname
);
if
(
sid
==
DefaultObjectId
)
{
if
(
dlog
)
dlog
[
Debug
::
CRIT
]
<<
myname
<<
"(readItem): ID not found for "
<<
sname
<<
endl
;
return
false
;
}
UItem
p
;
p
.
si
.
id
=
sid
;
p
.
si
.
node
=
conf
->
getLocalNode
();
mypack
.
addData
(
sid
,
0
);
p
.
pack_it
=
(
mypack
.
dlist
.
end
()
--
);
if
(
maxItem
>=
dlist
.
size
()
)
dlist
.
resize
(
maxItem
+
10
);
dlist
[
maxItem
]
=
p
;
maxItem
++
;
if
(
dlog
.
debugging
(
Debug
::
INFO
)
)
dlog
[
Debug
::
INFO
]
<<
myname
<<
"(initItem): add "
<<
p
<<
endl
;
return
true
;
}
// ------------------------------------------------------------------------------------------
void
UNetSender
::
initIterators
()
{
DMap
::
iterator
it
=
dlist
.
begin
();
for
(
;
it
!=
dlist
.
end
();
it
++
)
{
shm
->
initDIterator
(
it
->
dit
);
shm
->
initAIterator
(
it
->
ait
);
}
shm
->
initAIterator
(
aitHeartBeat
);
}
// -----------------------------------------------------------------------------
void
UNetSender
::
help_print
(
int
argc
,
char
*
argv
[]
)
{
cout
<<
"--unet-sendtime msec - Пауза между опросами. По умолчанию 200 мсек."
<<
endl
;
cout
<<
"--unet-heartbeat-id - Данный процесс связан с указанным аналоговым heartbeat-дачиком."
<<
endl
;
cout
<<
"--unet-heartbeat-max - Максимальное значение heartbeat-счётчика для данного процесса. По умолчанию 10."
<<
endl
;
cout
<<
"--unet-ready-timeout - Время ожидания готовности SM к работе, мсек. (-1 - ждать 'вечно')"
<<
endl
;
cout
<<
"--unet-initPause - Задержка перед инициализацией (время на активизация процесса)"
<<
endl
;
cout
<<
"--unet-sm-ready-timeout - время на ожидание старта SM"
<<
endl
;
cout
<<
" Настройки протокола UNet: "
<<
endl
;
cout
<<
"--unet-host [ip|hostname] - Адрес сервера"
<<
endl
;
cout
<<
"--unet-port - Порт."
<<
endl
;
cout
<<
"--unet-send-timeout - Таймаут на посылку ответа."
<<
endl
;
}
// -----------------------------------------------------------------------------
UNetSender
*
UNetSender
::
init_udpsender
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
icID
,
SharedMemory
*
ic
)
{
string
name
=
conf
->
getArgParam
(
"--unet-name"
,
"UNetSender1"
);
if
(
name
.
empty
()
)
{
cerr
<<
"(UNetSender): Не задан name'"
<<
endl
;
return
0
;
}
ObjectId
ID
=
conf
->
getObjectID
(
name
);
if
(
ID
==
UniSetTypes
::
DefaultObjectId
)
{
cerr
<<
"(UNetSender): идентификатор '"
<<
name
<<
"' не найден в конф. файле!"
<<
" в секции "
<<
conf
->
getObjectsSection
()
<<
endl
;
return
0
;
}
dlog
[
Debug
::
INFO
]
<<
"(rsexchange): name = "
<<
name
<<
"("
<<
ID
<<
")"
<<
endl
;
return
new
UNetSender
(
ID
,
icID
,
ic
);
}
// -----------------------------------------------------------------------------
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UNetSender
::
UItem
&
p
)
{
return
os
<<
" sid="
<<
p
.
si
.
id
;
}
// -----------------------------------------------------------------------------
extensions/UNet2/UNetSender.h
0 → 100644
View file @
71444865
#ifndef UNetSender_H_
#define UNetSender_H_
// -----------------------------------------------------------------------------
#include <ostream>
#include <string>
#include <vector>
#include <cc++/socket.h>
#include "UniSetObject_LT.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UNetPacket.h"
// -----------------------------------------------------------------------------
class
UNetSender
:
public
UniSetObject_LT
{
public
:
UNetSender
(
UniSetTypes
::
ObjectId
objId
,
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
virtual
~
UNetSender
();
/*! глобальная функция для инициализации объекта */
static
UNetSender
*
init_udpsender
(
int
argc
,
char
*
argv
[],
UniSetTypes
::
ObjectId
shmID
,
SharedMemory
*
ic
=
0
);
/*! глобальная функция для вывода help-а */
static
void
help_print
(
int
argc
,
char
*
argv
[]
);
struct
UItem
{
UItem
()
:
val
(
0
)
{}
IOController_i
::
SensorInfo
si
;
IOController
::
AIOStateList
::
iterator
ait
;
IOController
::
DIOStateList
::
iterator
dit
;
UniSetTypes
::
uniset_spin_mutex
val_lock
;
UniSetUNet
::
UNetMessage
::
UNetDataList
::
iterator
pack_it
;
long
val
;
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
UItem
&
p
);
};
protected
:
xmlNode
*
cnode
;
std
::
string
s_field
;
std
::
string
s_fvalue
;
SMInterface
*
shm
;
void
poll
();
void
recv
();
void
send
();
void
step
();
virtual
void
processingMessage
(
UniSetTypes
::
VoidMessage
*
msg
);
void
sysCommand
(
UniSetTypes
::
SystemMessage
*
msg
);
void
sensorInfo
(
UniSetTypes
::
SensorMessage
*
sm
);
void
askSensors
(
UniversalIO
::
UIOCommand
cmd
);
void
waitSMReady
();
virtual
bool
activateObject
();
// действия при завершении работы
virtual
void
sigterm
(
int
signo
);
void
initIterators
();
bool
initItem
(
UniXML_iterator
&
it
);
bool
readItem
(
UniXML
&
xml
,
UniXML_iterator
&
it
,
xmlNode
*
sec
);
void
readConfiguration
();
bool
check_item
(
UniXML_iterator
&
it
);
private
:
UNetSender
();
bool
initPause
;
UniSetTypes
::
uniset_mutex
mutex_start
;
PassiveTimer
ptHeartBeat
;
UniSetTypes
::
ObjectId
sidHeartBeat
;
int
maxHeartBeat
;
IOController
::
AIOStateList
::
iterator
aitHeartBeat
;
UniSetTypes
::
ObjectId
test_id
;
int
sendtime
;
/*!< переодичность посылки данных, [мсек] */
ost
::
TCPStream
*
tcp
;
// ost::TCPSocket* tcp;
ost
::
IPV4Host
host
;
ost
::
tpport_t
port
;
UniSetTypes
::
uniset_mutex
sendMutex
;
Trigger
trTimeout
;
int
sendTimeout
;
bool
activated
;
int
activateTimeout
;
UniSetUNet
::
UNetMessage
mypack
;
typedef
std
::
vector
<
UItem
>
DMap
;
DMap
dlist
;
int
maxItem
;
ThreadCreator
<
UNetSender
>*
thr
;
};
// -----------------------------------------------------------------------------
#endif // UNetSender_H_
// -----------------------------------------------------------------------------
extensions/UNet2/create_links.sh
0 → 100755
View file @
71444865
#!/bin/sh
ln
-s
-f
../../Utilities/scripts/uniset-start.sh
ln
-s
-f
../../Utilities/scripts/uniset-stop.sh stop.sh
ln
-s
-f
../../Utilities/scripts/uniset-functions.sh
ln
-s
-f
../../conf/test.xml test.xml
extensions/UNet2/start_fg.sh
0 → 100755
View file @
71444865
#!/bin/sh
uniset-start.sh
-f
./uniset-udpexchange
--udp-name
UDPExchange
--udp-host
192.168.56.255
\
--udp-broadcast
1
--udp-polltime
1000
\
--confile
test.xml
\
--dlog-add-levels
info,crit,warn
# --udp-filter-field udp --udp-filter-value 1 \
extensions/UNet2/unetexchange.cc
0 → 100644
View file @
71444865
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPExchange.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
int
main
(
int
argc
,
char
**
argv
)
{
try
{
if
(
argc
>
1
&&
(
!
strcmp
(
argv
[
1
],
"--help"
)
||
!
strcmp
(
argv
[
1
],
"-h"
))
)
{
cout
<<
"--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>"
<<
endl
;
cout
<<
"--confile filename - configuration file. Default: configure.xml"
<<
endl
;
cout
<<
"--udp-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
endl
;
UDPExchange
::
help_print
(
argc
,
argv
);
return
0
;
}
string
confile
=
UniSetTypes
::
getArgParam
(
"--confile"
,
argc
,
argv
,
"configure.xml"
);
conf
=
new
Configuration
(
argc
,
argv
,
confile
);
string
logfilename
(
conf
->
getArgParam
(
"--udp-logfile"
));
if
(
logfilename
.
empty
()
)
logfilename
=
"udpexchange.log"
;
conf
->
initDebug
(
dlog
,
"dlog"
);
std
::
ostringstream
logname
;
string
dir
(
conf
->
getLogDir
());
logname
<<
dir
<<
logfilename
;
unideb
.
logFile
(
logname
.
str
()
);
dlog
.
logFile
(
logname
.
str
()
);
ObjectId
shmID
=
DefaultObjectId
;
string
sID
=
conf
->
getArgParam
(
"--smemory-id"
);
if
(
!
sID
.
empty
()
)
shmID
=
conf
->
getControllerID
(
sID
);
else
shmID
=
getSharedMemoryID
();
if
(
shmID
==
DefaultObjectId
)
{
cerr
<<
sID
<<
"? SharedMemoryID not found in "
<<
conf
->
getControllersSection
()
<<
" section"
<<
endl
;
return
1
;
}
UDPExchange
*
rs
=
UDPExchange
::
init_udpexchange
(
argc
,
argv
,
shmID
);
if
(
!
rs
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): init не прошёл..."
<<
endl
;
return
1
;
}
ObjectsActivator
act
;
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
rs
));
SystemMessage
sm
(
SystemMessage
::
StartUp
);
act
.
broadcast
(
sm
.
transport_msg
()
);
unideb
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDP Exchange START -------------------------
\n\n
"
;
dlog
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDP Exchange START -------------------------
\n\n
"
;
act
.
run
(
false
);
// msleep(500);
// rs->execute();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): "
<<
ex
<<
std
::
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): catch ..."
<<
std
::
endl
;
}
return
0
;
}
extensions/UNet2/unetreceiver.cc
0 → 100644
View file @
71444865
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPReceiver.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
int
main
(
int
argc
,
char
**
argv
)
{
try
{
if
(
argc
>
1
&&
(
!
strcmp
(
argv
[
1
],
"--help"
)
||
!
strcmp
(
argv
[
1
],
"-h"
))
)
{
cout
<<
"--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>"
<<
endl
;
cout
<<
"--confile filename - configuration file. Default: configure.xml"
<<
endl
;
cout
<<
"--udp-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
endl
;
UDPReceiver
::
help_print
(
argc
,
argv
);
return
0
;
}
string
confile
=
UniSetTypes
::
getArgParam
(
"--confile"
,
argc
,
argv
,
"configure.xml"
);
conf
=
new
Configuration
(
argc
,
argv
,
confile
);
string
logfilename
(
conf
->
getArgParam
(
"--udp-logfile"
));
if
(
logfilename
.
empty
()
)
logfilename
=
"udpexchange.log"
;
conf
->
initDebug
(
dlog
,
"dlog"
);
std
::
ostringstream
logname
;
string
dir
(
conf
->
getLogDir
());
logname
<<
dir
<<
logfilename
;
unideb
.
logFile
(
logname
.
str
()
);
dlog
.
logFile
(
logname
.
str
()
);
ObjectId
shmID
=
DefaultObjectId
;
string
sID
=
conf
->
getArgParam
(
"--smemory-id"
);
if
(
!
sID
.
empty
()
)
shmID
=
conf
->
getControllerID
(
sID
);
else
shmID
=
getSharedMemoryID
();
if
(
shmID
==
DefaultObjectId
)
{
cerr
<<
sID
<<
"? SharedMemoryID not found in "
<<
conf
->
getControllersSection
()
<<
" section"
<<
endl
;
return
1
;
}
UDPReceiver
*
udp
=
UDPReceiver
::
init_udpreceiver
(
argc
,
argv
,
shmID
);
if
(
!
udp
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpreceiver): init не прошёл..."
<<
endl
;
return
1
;
}
ObjectsActivator
act
;
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
udp
));
SystemMessage
sm
(
SystemMessage
::
StartUp
);
act
.
broadcast
(
sm
.
transport_msg
()
);
unideb
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDPRecevier START -------------------------
\n\n
"
;
dlog
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDPReceiver START -------------------------
\n\n
"
;
act
.
run
(
false
);
// msleep(500);
// rs->execute();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): "
<<
ex
<<
std
::
endl
;
}
catch
(
ost
::
SockException
&
e
)
{
ostringstream
s
;
s
<<
e
.
getString
()
<<
": "
<<
e
.
getSystemErrorString
();
dlog
[
Debug
::
CRIT
]
<<
s
.
str
()
<<
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpexchange): catch ..."
<<
std
::
endl
;
}
return
0
;
}
extensions/UNet2/unetsender.cc
0 → 100644
View file @
71444865
#include <sstream>
#include "ObjectsActivator.h"
#include "Extensions.h"
#include "UDPSender.h"
// -----------------------------------------------------------------------------
using
namespace
std
;
using
namespace
UniSetTypes
;
using
namespace
UniSetExtensions
;
// -----------------------------------------------------------------------------
int
main
(
int
argc
,
char
**
argv
)
{
try
{
if
(
argc
>
1
&&
(
!
strcmp
(
argv
[
1
],
"--help"
)
||
!
strcmp
(
argv
[
1
],
"-h"
))
)
{
cout
<<
"--smemory-id objectName - SharedMemory objectID. Default: read from <SharedMemory>"
<<
endl
;
cout
<<
"--confile filename - configuration file. Default: configure.xml"
<<
endl
;
cout
<<
"--udp-logfile filename - logfilename. Default: udpexchange.log"
<<
endl
;
cout
<<
endl
;
UDPSender
::
help_print
(
argc
,
argv
);
return
0
;
}
string
confile
=
UniSetTypes
::
getArgParam
(
"--confile"
,
argc
,
argv
,
"configure.xml"
);
conf
=
new
Configuration
(
argc
,
argv
,
confile
);
string
logfilename
(
conf
->
getArgParam
(
"--udp-logfile"
));
if
(
logfilename
.
empty
()
)
logfilename
=
"udpexchange.log"
;
conf
->
initDebug
(
dlog
,
"dlog"
);
std
::
ostringstream
logname
;
string
dir
(
conf
->
getLogDir
());
logname
<<
dir
<<
logfilename
;
unideb
.
logFile
(
logname
.
str
()
);
dlog
.
logFile
(
logname
.
str
()
);
ObjectId
shmID
=
DefaultObjectId
;
string
sID
=
conf
->
getArgParam
(
"--smemory-id"
);
if
(
!
sID
.
empty
()
)
shmID
=
conf
->
getControllerID
(
sID
);
else
shmID
=
getSharedMemoryID
();
if
(
shmID
==
DefaultObjectId
)
{
cerr
<<
sID
<<
"? SharedMemoryID not found in "
<<
conf
->
getControllersSection
()
<<
" section"
<<
endl
;
return
1
;
}
UDPSender
*
udp
=
UDPSender
::
init_udpsender
(
argc
,
argv
,
shmID
);
if
(
!
udp
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): init не прошёл..."
<<
endl
;
return
1
;
}
ObjectsActivator
act
;
act
.
addObject
(
static_cast
<
class
UniSetObject
*>
(
udp
));
SystemMessage
sm
(
SystemMessage
::
StartUp
);
act
.
broadcast
(
sm
.
transport_msg
()
);
unideb
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
unideb
[
Debug
::
ANY
]
<<
"(main): -------------- UDPSender START -------------------------
\n\n
"
;
dlog
(
Debug
::
ANY
)
<<
"
\n\n\n
"
;
dlog
[
Debug
::
ANY
]
<<
"(main): -------------- UDPSender START -------------------------
\n\n
"
;
act
.
run
(
false
);
// msleep(500);
// rs->execute();
}
catch
(
Exception
&
ex
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): "
<<
ex
<<
std
::
endl
;
}
catch
(
ost
::
SockException
&
e
)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): "
<<
e
.
getSystemErrorString
()
<<
endl
;
}
catch
(...)
{
dlog
[
Debug
::
CRIT
]
<<
"(udpsender): catch ..."
<<
std
::
endl
;
}
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment