Commit f025a2f7 authored by Pavel Vainerman's avatar Pavel Vainerman

(UNET): добавил вывод статистики обмена

(сообщений в секунду, обработано в секунду)
parent fb3e7225
......@@ -78,6 +78,7 @@ UNetReceiver::UNetReceiver( const std::string& s_host, const ost::tpport_t _port
evCheckConnection.set<UNetReceiver, &UNetReceiver::checkConnectionEvent>(this);
evForceUpdate.set<UNetReceiver, &UNetReceiver::forceUpdateEvent>(this);
evStatistic.set<UNetReceiver, &UNetReceiver::statisticsEvent>(this);
}
// -----------------------------------------------------------------------------
UNetReceiver::~UNetReceiver()
......@@ -86,7 +87,6 @@ UNetReceiver::~UNetReceiver()
// -----------------------------------------------------------------------------
void UNetReceiver::setReceiveTimeout( timeout_t msec )
{
uniset_rwmutex_wrlock l(tmMutex);
recvTimeout = msec;
ptRecvTimeout.setTiming(msec);
}
......@@ -158,7 +158,6 @@ void UNetReceiver::setLockUpdate( bool st )
// -----------------------------------------------------------------------------
void UNetReceiver::resetTimeout()
{
uniset_rwmutex_wrlock l(tmMutex);
ptRecvTimeout.reset();
trTimeout.change(false);
}
......@@ -228,6 +227,9 @@ void UNetReceiver::start()
void UNetReceiver::evprepare( const ev::loop_ref& eloop )
{
evForceUpdate.set(eloop);
evStatistic.set(eloop);
evStatistic.start(0, 1.0); // раз в сек
if( !udp )
{
......@@ -260,6 +262,9 @@ void UNetReceiver::evfinish( const ev::loop_ref& eloop )
if( evForceUpdate.is_active() )
evForceUpdate.stop();
if( evStatistic.is_active() )
evStatistic.stop();
//udp->disconnect();
udp = nullptr;
}
......@@ -281,6 +286,24 @@ void UNetReceiver::forceUpdateEvent( ev::timer& watcher, int revents )
// и тем самым заставляем обновить данные в SM (см. update)
}
// -----------------------------------------------------------------------------
void UNetReceiver::statisticsEvent(ev::periodic& tm, int revents)
{
if( EV_ERROR & revents )
{
unetcrit << myname << "(statisticsEvent): EVENT ERROR.." << endl;
return;
}
unetlog9 << myname << "(statisctics):"
<< " recvCount=" << recvCount << "[per sec]"
<< " upCount=" << upCount << "[per sec]"
<< endl;
recvCount = 0;
upCount = 0;
tm.again();
}
// -----------------------------------------------------------------------------
void UNetReceiver::update()
{
UniSetUDP::UDPMessage p;
......@@ -297,7 +320,7 @@ void UNetReceiver::update()
return;
p = qpack.top();
unsigned long sub = labs(p.num - pnum);
size_t sub = labs(p.num - pnum);
if( pnum > 0 )
{
......@@ -358,6 +381,7 @@ void UNetReceiver::update()
k--;
upCount++;
// cerr << myname << "(update): " << p.msg.header << endl;
initDCache(p, !d_cache_init_ok);
......@@ -451,16 +475,11 @@ void UNetReceiver::readEvent( ev::io& watcher )
if( !activated )
return;
bool tout = false;
try
{
if( receive() )
{
uniset_rwmutex_wrlock l(tmMutex);
ptRecvTimeout.reset();
}
}
catch( UniSetTypes::Exception& ex)
{
unetwarn << myname << "(receive): " << ex << std::endl;
......@@ -470,21 +489,14 @@ void UNetReceiver::readEvent( ev::io& watcher )
unetwarn << myname << "(receive): " << e.what() << std::endl;
}
// делаем через промежуточную переменную
// чтобы поскорее освободить mutex
{
uniset_rwmutex_rlock l(tmMutex);
tout = ptRecvTimeout.checkTime();
}
// только если "режим подготовки закончился, то можем генерировать "события"
if( ptPrepare.checkTime() && trTimeout.change(tout) )
if( ptPrepare.checkTime() && trTimeout.change(ptRecvTimeout.checkTime()) )
{
auto w = shared_from_this();
if( w )
{
if( tout )
if( trTimeout.get() )
slEvent(w, evTimeout);
else
slEvent(w, evOK);
......@@ -568,12 +580,6 @@ void UNetReceiver::stop()
// -----------------------------------------------------------------------------
bool UNetReceiver::receive()
{
// if( !udp->isInputReady(recvTimeout) )
// return false;
//udp->UDPReceive::receive((char*)(r_buf.data), sizeof(r_buf.data));
//ssize_t ret = ::recv(udp->getSocket(),r_buf.data,sizeof(r_buf.data),0);
ssize_t ret = udp->receive(r_buf.data, sizeof(r_buf.data));
if( ret < 0 )
......@@ -623,6 +629,7 @@ bool UNetReceiver::receive()
}
rnum = pack.num;
recvCount++;
#if 0
cerr << myname << "(receive): recv DATA OK. ret=" << ret << " sizeof=" << sz
......@@ -647,16 +654,12 @@ bool UNetReceiver::receive()
if( !qpack.empty() )
{
// cerr << myname << "(receive): copy to qtmp..."
// << " header: " << pack.msg.header
// << endl;
qtmp.push(pack);
}
else
{
// cerr << myname << "(receive): copy from qtmp..." << endl;
// очередь освободилась..
// то копируем в неё всё что набралось...
// основная очередь освободилась..
// копируем в неё всё что набралось в qtmp...
while( !qtmp.empty() )
{
qpack.push(qtmp.top());
......@@ -808,7 +811,9 @@ const std::string UNetReceiver::getShortInfo() const
<< " maxDifferens=" << setw(6) << maxDifferens
<< " maxProcessingCount=" << setw(6) << maxProcessingCount
<< " waitClean=" << waitClean
<< " ]";
<< " ]"
<< endl
<< "\t[ recvCount=" << recvCount << " upCount=" << upCount << " per sec ]";
return std::move(s.str());
}
......
......@@ -175,6 +175,7 @@ class UNetReceiver:
void updateEvent( ev::periodic& watcher, int revents );
void checkConnectionEvent( ev::periodic& watcher, int revents );
void forceUpdateEvent( ev::timer& watcher, int revents );
void statisticsEvent( ev::periodic& watcher, int revents );
virtual void evprepare( const ev::loop_ref& eloop ) override;
virtual void evfinish(const ev::loop_ref& eloop ) override;
virtual std::string wname() override
......@@ -213,6 +214,10 @@ class UNetReceiver:
ev::periodic evUpdate;
ev::periodic evCheckConnection;
ev::timer evForceUpdate;
ev::periodic evStatistic;
size_t recvCount = { 0 };
size_t upCount = { 0 };
// делаем loop общим.. одним на всех!
static CommonEventLoop loop;
......@@ -256,7 +261,6 @@ class UNetReceiver:
EventSlot slEvent;
Trigger trTimeout;
UniSetTypes::uniset_rwmutex tmMutex;
struct CacheItem
{
......
#!/bin/sh
SP=50
[ -n "$1" ] && SP="$1"
for p in `seq 50001 50010`; do
uniset2-unet-udp-tester -s 127.255.255.255:$p -x 50 &
uniset2-unet-udp-tester -s 127.255.255.255:$p -x ${SP} &
done
......@@ -140,6 +140,7 @@ static void run_test( size_t max, const std::string& host )
for( size_t i=0; i<max; i++ )
{
auto r = make_shared<UNetReceiver>(host,begPort+i,smiInstance());
r->setLockUpdate(true);
vrecv.emplace_back(r);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment