Commit 6de90468 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNetReceiver): первая версия реализации на libev

parent 69067405
......@@ -189,7 +189,7 @@
priority - приоритет сообщения об изменении данного датчика
textname - текстовое имя датчика
-->
<nodes port="2809" unet_broadcast_ip="192.168.1.255" unet_broadcast_ip2="192.168.122.255">
<nodes port="2809" unet_broadcast_ip="127.255.255.255" unet_broadcast_ip2="192.168.56.255">
<item id="3000" dbserver="DBServer1" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode" textname="Локальный узел" unet_ignore="0" unet_port="2048">
<iocards>
<item card="1" name="DI32"/>
......@@ -198,8 +198,8 @@
</iocards>
</item>
<item id="3001" infserver="InfoServer" ip="127.0.0.1" name="LocalhostNode1" textname="Локальный узел" unet_ignore="0" unet_port="2049"/>
<item id="3002" ip="192.168.56.10" name="Node1" textname="Node1" unet_ignore="0" unet_ip="192.168.56.255" unet_port="3001" unet_respond_id="Input1_S" unet_respond_invert="1"/>
<item id="3003" ip="192.168.56.11" name="Node2" textname="Node2" unet_ignore="0" unet_ip="192.168.56.255" unet_port="3002"/>
<item id="3002" ip="192.168.56.10" name="Node1" textname="Node1" unet_ignore="0" unet_port="3001" unet_respond_id="Input1_S" unet_respond_invert="1"/>
<item id="3003" ip="192.168.56.11" name="Node2" textname="Node2" unet_ignore="0" unet_port="3002"/>
</nodes>
<!-- ************************ Датчики ********************** -->
<sensors name="Sensors">
......
......@@ -276,7 +276,6 @@ void SharedMemory::sysCommand( const SystemMessage* sm )
// ------------------------------------------------------------------------------------------
bool SharedMemory::deactivateObject()
{
cerr << myname << "************* deactivateObject()..." << endl;
workready = false;
// if( logserv && logserv->isRunning() )
// logserv->terminate();
......
......@@ -124,21 +124,10 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
continue;
// Если указано поле unet_broadcast_ip непосредственно у узла - берём его
// если указано общий broadcast ip для всех узлов - берём его
string h("");
string h2("");
// если не указано берём общий broadcast_ip
if( !default_ip.empty() )
h = default_ip;
if( !n_it.getProp("unet_broadcast_ip").empty() )
h = n_it.getProp("unet_broadcast_ip");
if( !default_ip2.empty() )
h2 = default_ip2;
if( !n_it.getProp("unet_broadcast_ip2").empty() )
h2 = n_it.getProp("unet_broadcast_ip2");
string h = { n_it.getProp2("unet_broadcast_ip",default_ip) };
string h2 = { n_it.getProp2("unet_broadcast_ip2",default_ip2) };
if( h.empty() )
{
......@@ -153,15 +142,10 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
// Если указано поле unet_port - используем его
// Иначе port = идентификатору узла
int p = n_it.getIntProp("id");
if( !n_it.getProp("unet_port").empty() )
p = n_it.getIntProp("unet_port");
int p2 = p; // по умолчанию порт на втором канале такой же как на первом
int p = n_it.getPIntProp("unet_port",n_it.getIntProp("id"));
if( !n_it.getProp("unet_port2").empty() )
p2 = n_it.getIntProp("unet_port2");
// по умолчанию порт на втором канале такой же как на первом (если не задан отдельно)
int p2 = n_it.getPIntProp("unet_port2",p);
string n(n_it.getProp("name"));
......@@ -323,8 +307,8 @@ UNetExchange::UNetExchange(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId sh
}
}
unetinfo << myname << "(init): (node='" << n << "') add receiver "
<< h2 << ":" << p2 << endl;
unetinfo << myname << "(init): (node='" << n << "') add basic receiver "
<< h << ":" << p << endl;
auto r = make_shared<UNetReceiver>(h, p, shm);
loga->add(r->getLog());
......
......@@ -73,11 +73,9 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
try
{
// ost::IPV4Cidr ci(s_host.c_str());
// addr = ci.getBroadcast();
// cerr << "****************** addr: " << addr << endl;
addr = s_host.c_str();
udp = make_shared<ost::UDPDuplex>(addr, port);
udp = make_shared<UDPDuplexU>(addr, port);
udp->setReceiveCompletion(false); // делаем неблокирующее чтение (нужно для libev)
}
catch( const std::exception& e )
{
......@@ -94,8 +92,10 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t port,
throw SystemError(s.str());
}
r_thr = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::receive);
u_thr = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::update);
//r_thr = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::receive);
//u_thr = make_shared< ThreadCreator<UNetReceiver> >(this, &UNetReceiver::update);
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
evUpdate.set<UNetReceiver, &UNetReceiver::updateEvent>(this);
ptRecvTimeout.setTiming(recvTimeout);
ptPrepare.setTiming(prepareTime);
......@@ -132,6 +132,9 @@ void UNetReceiver::setReceivePause( timeout_t msec )
void UNetReceiver::setUpdatePause( timeout_t msec )
{
updatepause = msec;
updateTime = (double)updatepause/1000.0;
if( evUpdate.is_active() )
evUpdate.start(updateTime);
}
// -----------------------------------------------------------------------------
void UNetReceiver::setMaxProcessingCount( int set )
......@@ -178,69 +181,25 @@ void UNetReceiver::start()
if( !activated )
{
activated = true;
u_thr->start();
r_thr->start();
//u_thr->start();
//r_thr->start();
evReceive.start(udp->getReceiveSocket(),ev::READ);
evUpdate.start( updateTime );
evloop = DefaultEventLoop::inst();
evloop->run( this, true );
}
else
forceUpdate();
}
// -----------------------------------------------------------------------------
void UNetReceiver::update()
{
unetinfo << myname << "(update): start.." << endl;
while(activated)
{
try
{
real_update();
}
catch( UniSetTypes::Exception& ex)
{
unetcrit << myname << "(update): " << ex << std::endl;
}
catch(...)
{
unetcrit << myname << "(update): catch ..." << std::endl;
}
if( sidRespond != DefaultObjectId )
{
try
{
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
catch( const Exception& ex )
{
unetcrit << myname << "(step): (respond) " << ex << std::endl;
}
}
if( sidLostPackets != DefaultObjectId )
{
try
{
shm->localSetValue(itLostPackets, sidLostPackets, getLostPacketsNum(), shm->ID());
}
catch( const Exception& ex )
{
unetcrit << myname << "(step): (lostPackets) " << ex << std::endl;
}
}
msleep(updatepause);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::forceUpdate()
{
uniset_rwmutex_wrlock l(packMutex);
pnum = 0; // сбрасываем запомненый номер последнего обработанного пакета
// и тем самым заставляем обновить данные в SM (см. real_update)
// и тем самым заставляем обновить данные в SM (см. update)
}
// -----------------------------------------------------------------------------
void UNetReceiver::real_update()
void UNetReceiver::update()
{
UniSetUDP::UDPMessage p;
// обрабатываем, пока очередь либо не опустеет,
......@@ -403,72 +362,121 @@ void UNetReceiver::real_update()
}
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::stop()
void UNetReceiver::callback( ev::io& watcher, int revents )
{
activated = false;
if( EV_ERROR & revents )
{
unetcrit << myname << "(callback): EVENT ERROR.." << endl;
return;
}
if( revents & EV_READ )
readEvent(watcher);
}
// -----------------------------------------------------------------------------
void UNetReceiver::receive()
void UNetReceiver::readEvent( ev::io& watcher )
{
unetinfo << myname << ": ******************* receive start" << endl;
if( !activated )
return;
cerr << "******** readEvent..." << endl;
bool tout = false;
try
{
uniset_rwmutex_wrlock l(tmMutex);
ptRecvTimeout.setTiming(recvTimeout);
if( receive() )
{
uniset_rwmutex_wrlock l(tmMutex);
ptRecvTimeout.reset();
}
}
catch( UniSetTypes::Exception& ex)
{
unetwarn << myname << "(receive): " << ex << std::endl;
}
catch( const std::exception& e )
{
unetwarn << myname << "(receive): " << e.what() << std::endl;
}
bool tout = false;
// делаем через промежуточную переменную
// чтобы поскорее освободить mutex
{
uniset_rwmutex_rlock l(tmMutex);
tout = ptRecvTimeout.checkTime();
}
// только если "режим подготовки закончился, то можем генерировать "события"
if( ptPrepare.checkTime() && trTimeout.change(tout) )
{
if( tout )
slEvent(shared_from_this(), evTimeout);
else
slEvent(shared_from_this(), evOK);
}
}
// -----------------------------------------------------------------------------
void UNetReceiver::updateEvent(ev::periodic& tm, int revents )
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(callback): EVENT ERROR.." << endl;
return;
}
if( !activated )
return;
while( activated )
// взводим
tm.start(updateTime);
// собственно обработка события
try
{
update();
}
catch( std::exception& ex )
{
unetcrit << myname << "(update): " << ex.what() << std::endl;
}
if( sidRespond != DefaultObjectId )
{
try
{
if( recv() )
{
uniset_rwmutex_wrlock l(tmMutex);
ptRecvTimeout.reset();
}
}
catch( UniSetTypes::Exception& ex)
{
unetwarn << myname << "(receive): " << ex << std::endl;
bool r = respondInvert ? !isRecvOK() : isRecvOK();
shm->localSetValue(itRespond, sidRespond, ( r ? 1 : 0 ), shm->ID());
}
catch( const std::exception& e )
catch( const std::exception& ex )
{
unetwarn << myname << "(receive): " << e.what() << std::endl;
unetcrit << myname << "(step): (respond) " << ex.what() << std::endl;
}
}
/*
catch(...)
{
unetwarn << myname << "(receive): catch ..." << std::endl;
}
*/
// делаем через промежуточную переменную
// чтобы поскорее освободить mutex
if( sidLostPackets != DefaultObjectId )
{
try
{
uniset_rwmutex_rlock l(tmMutex);
tout = ptRecvTimeout.checkTime();
shm->localSetValue(itLostPackets, sidLostPackets, getLostPacketsNum(), shm->ID());
}
// только если "режим подготовки закончился, то можем генерировать "события"
if( ptPrepare.checkTime() && trTimeout.change(tout) )
catch( const std::exception& ex )
{
if( tout )
slEvent(shared_from_this(), evTimeout);
else
slEvent(shared_from_this(), evOK);
unetcrit << myname << "(step): (lostPackets) " << ex.what() << std::endl;
}
msleep(recvpause);
}
unetinfo << myname << ": ************* receive FINISH **********" << endl;
}
// -----------------------------------------------------------------------------
bool UNetReceiver::recv()
void UNetReceiver::stop()
{
activated = false;
evReceive.stop();
evUpdate.stop();
if( evloop )
evloop->terminate(this);
}
// -----------------------------------------------------------------------------
bool UNetReceiver::receive()
{
if( !udp->isInputReady(recvTimeout) )
return false;
......
......@@ -24,13 +24,15 @@
#include <unordered_map>
#include <cc++/socket.h>
#include <sigc++/sigc++.h>
#include <ev++.h>
#include "UniSetObject.h"
#include "Trigger.h"
#include "Mutex.h"
#include "SMInterface.h"
#include "SharedMemory.h"
#include "ThreadCreator.h"
#include "UDPPacket.h"
#include "DefaultEventLoop.h"
#include "UDPCore.h"
// -----------------------------------------------------------------------------
/* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
* ===============
......@@ -75,7 +77,8 @@
*/
// -----------------------------------------------------------------------------
class UNetReceiver:
public std::enable_shared_from_this<UNetReceiver>
public std::enable_shared_from_this<UNetReceiver>,
public EventWatcher
{
public:
UNetReceiver( const std::string& host, const ost::tpport_t port, const std::shared_ptr<SMInterface>& smi );
......@@ -84,9 +87,6 @@ class UNetReceiver:
void start();
void stop();
void receive();
void update();
inline const std::string getName() const
{
return myname;
......@@ -155,9 +155,12 @@ class UNetReceiver:
const std::shared_ptr<SMInterface> shm;
std::shared_ptr<DebugStream> unetlog;
bool recv();
bool receive();
void step();
void real_update();
void update();
void callback( ev::io& watcher, int revents );
void readEvent( ev::io& watcher );
void updateEvent( ev::periodic& watcher, int revents );
void initIterators();
......@@ -181,10 +184,14 @@ class UNetReceiver:
timeout_t recvpause = { 10 }; /*!< пауза меджду приёмами пакетов, [мсек] */
timeout_t updatepause = { 100 }; /*!< переодичность обновления данных в SM, [мсек] */
std::shared_ptr<ost::UDPReceive> udp;
std::shared_ptr<UDPDuplexU> udp;
ost::IPV4Address addr;
ost::tpport_t port = { 0 };
std::string myname;
ev::io evReceive;
ev::periodic evUpdate;
std::shared_ptr<DefaultEventLoop> evloop;
double updateTime = { 0.0 };
UniSetTypes::uniset_rwmutex pollMutex;
PassiveTimer ptRecvTimeout;
......@@ -203,9 +210,6 @@ class UNetReceiver:
std::atomic_bool activated = { false };
std::shared_ptr< ThreadCreator<UNetReceiver> > r_thr; // receive thread
std::shared_ptr< ThreadCreator<UNetReceiver> > u_thr; // update thread
PacketQueue qpack; /*!< очередь принятых пакетов (отсортированных по возрастанию номера пакета) */
UniSetUDP::UDPMessage pack; /*!< просто буфер для получения очередного сообщения */
UniSetUDP::UDPPacket r_buf;
......
// -------------------------------------------------------------------------
#ifndef UDPReceiveU_H_
#define UDPReceiveU_H_
// -------------------------------------------------------------------------
#include <cc++/socket.h>
// -------------------------------------------------------------------------
// обёртка над ost::UDPReceive, чтобы достучаться до "сырого сокета"
// для дальнейшего использования с libev..
class UDPReceiveU:
public ost::UDPReceive
{
public:
SOCKET getSocket(){ return ost::UDPReceive::so; }
};
// -------------------------------------------------------------------------
// обёртка над ost::UDPReceive, чтобы достучаться до "сырого сокета"
// для дальнейшего использования с libev..
class UDPDuplexU:
public ost::UDPDuplex
{
public:
UDPDuplexU(const ost::IPV4Address &bind, ost::tpport_t port):
ost::UDPDuplex(bind,port)
{}
SOCKET getReceiveSocket(){ return ost::UDPReceive::so; }
void setReceiveCompletion( bool set ){ ost::UDPReceive::setCompletion(set); }
};
// -------------------------------------------------------------------------
#endif // UDPReceiveU_H_
// -------------------------------------------------------------------------
......@@ -313,6 +313,7 @@ include/UTCPCore.h
include/UTCPStream.h
include/UTCPSocket.h
include/USocket.h
include/UDPCore.h
include/WDTInterface.h
lib/Makefile.am
python/examples/test.xml
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment