Commit bff519c6 authored by Pavel Vainerman's avatar Pavel Vainerman

(unet): receiver refactoring: remove "thread strategy",

used circular buffer instead priority_queue and some other minor optimizations.
parent 2a40c0e2
if HAVE_TESTS
noinst_PROGRAMS = tests-with-sm urecv-perf-test
#noinst_PROGRAMS = urecv-perf-test
tests_with_sm_SOURCES = tests_with_sm.cc test_unetudp.cc
tests_with_sm_LDADD = $(top_builddir)/lib/libUniSet2.la $(top_builddir)/extensions/lib/libUniSet2Extensions.la \
......
......@@ -97,44 +97,6 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
REQUIRE( ret == s_buf.len );
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: queue sort", "[unetudp][packetqueue]")
{
UNetReceiver::PacketQueue q;
UniSetUDP::UDPMessage m1;
m1.num = 10;
UniSetUDP::UDPMessage m2;
m2.num = 11;
UniSetUDP::UDPMessage m3;
m3.num = 13;
UniSetUDP::UDPMessage m4;
m4.num = 100;
// специально складываем в обратном порядке
// чтобы проверить "сортировку"
q.push(m1);
q.push(m3);
q.push(m2);
q.push(m4);
UniSetUDP::UDPMessage t = q.top();
REQUIRE( t.num == 10 );
q.pop();
t = q.top();
REQUIRE( t.num == 11 );
q.pop();
t = q.top();
REQUIRE( t.num == 13 );
q.pop();
t = q.top();
REQUIRE( t.num == 100 );
q.pop();
}
// -----------------------------------------------------------------------------
TEST_CASE("[UNetUDP]: UDPMessage", "[unetudp][udpmessage]")
{
SECTION("UDPMessage::isFull()")
......
#!/bin/sh
# '--' - нужен для отделения аргументов catch, от наших..
cd ../../../Utilities/Admin/
./uniset2-start.sh -f ./create_links.sh
./uniset2-start.sh -f ./create
./uniset2-start.sh -f ./exist | grep -q UNISET_PLC/Controllers || exit 1
cd -
./uniset2-start.sh -f ./tests-with-sm $* -- --confile unetudp-test-configure.xml --e-startup-pause 10 \
--unet-name UNetExchange --unet-filter-field unet --unet-filter-value 1 --unet-maxdifferense 5 \
--unet-recv-timeout 1000 --unet-sendpause 500 --unet-update-strategy thread
#--unet-log-add-levels any
AT_SETUP([UNetUDP tests (with SM)(thread)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_thread.sh],[0],[ignore],[ignore])
AT_CLEANUP
AT_SETUP([UNetUDP tests (with SM)(evloop)])
AT_CHECK([$abs_top_builddir/testsuite/at-test-launch.sh $abs_top_builddir/extensions/UNetUDP/tests tests_with_sm_evloop.sh],[0],[ignore],[ignore])
AT_CLEANUP
......
......@@ -32,173 +32,176 @@ shared_ptr<SMInterface> smiInstance()
// --------------------------------------------------------------------------
static void run_senders( size_t max, const std::string& s_host, size_t count = 50, timeout_t usecpause = 50 )
{
std::vector< std::shared_ptr<UDPSocketU> > vsend;
vsend.reserve(max);
cout << "Run " << max << " senders (" << s_host << ")" << endl;
// make sendesrs..
for( size_t i = 0; i < max; i++ )
{
try
{
auto s = make_shared<UDPSocketU>(s_host, begPort + i);
vsend.emplace_back(s);
}
catch( Poco::Net::NetException& e )
{
cerr << "(run_senders): " << e.displayText() << " (" << s_host << ")" << endl;
throw;
}
catch( std::exception& ex)
{
cerr << "(run_senders): " << ex.what() << endl;
throw;
}
}
UniSetUDP::UDPMessage mypack;
mypack.nodeID = 100;
mypack.procID = 100;
for( size_t i = 0; i < count; i++ )
{
UniSetUDP::UDPAData d(i, i);
mypack.addAData(d);
}
for( size_t i = 0; i < count; i++ )
mypack.addDData(i, i);
for( size_t i = 0; i < max; i++ )
{
try
{
if( vsend[i] )
vsend[i]->connect( Poco::Net::SocketAddress(s_host, begPort + i) );
}
catch( Poco::Net::NetException& e )
{
cerr << "(run_senders): " << e.message() << " (" << s_host << ")" << endl;
throw;
}
catch( std::exception& ex)
{
cerr << "(run_senders): " << ex.what() << endl;
throw;
}
}
size_t packetnum = 0;
UniSetUDP::UDPPacket s_buf;
size_t nc = 1;
while( nc ) // -V654
{
mypack.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
if( packetnum == 0 )
packetnum = 1;
for( auto&& udp : vsend )
{
try
{
if( udp->poll(100000, Poco::Net::Socket::SELECT_WRITE) )
{
mypack.transport_msg(s_buf);
size_t ret = udp->sendBytes((char*)&s_buf.data, s_buf.len);
if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
}
}
catch( Poco::Net::NetException& e )
{
cerr << "(send): " << e.message() << " (" << s_host << ")" << endl;
}
catch( ... )
{
cerr << "(send): catch ..." << endl;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
}
std::vector< std::shared_ptr<UDPSocketU> > vsend;
vsend.reserve(max);
cout << "Run " << max << " senders (" << s_host << ")" << endl;
// make sendesrs..
for( size_t i = 0; i < max; i++ )
{
try
{
cout << "create sender: " << s_host << ":" << begPort + i << endl;
auto s = make_shared<UDPSocketU>(s_host, begPort + i);
s->setBroadcast(true);
vsend.emplace_back(s);
}
catch( Poco::Net::NetException& e )
{
cerr << "(run_senders): " << e.displayText() << " (" << s_host << ")" << endl;
throw;
}
catch( std::exception& ex)
{
cerr << "(run_senders): " << ex.what() << endl;
throw;
}
}
UniSetUDP::UDPMessage mypack;
mypack.nodeID = 100;
mypack.procID = 100;
for( size_t i = 0; i < count; i++ )
{
UniSetUDP::UDPAData d(i, i);
mypack.addAData(d);
}
for( size_t i = 0; i < count; i++ )
mypack.addDData(i, i);
for( size_t i = 0; i < max; i++ )
{
try
{
if( vsend[i] )
vsend[i]->connect( Poco::Net::SocketAddress(s_host, begPort + i) );
}
catch( Poco::Net::NetException& e )
{
cerr << "(run_senders): " << e.message() << " (" << s_host << ")" << endl;
throw;
}
catch( std::exception& ex)
{
cerr << "(run_senders): " << ex.what() << endl;
throw;
}
}
size_t packetnum = 0;
UniSetUDP::UDPPacket s_buf;
size_t nc = 1;
while( nc ) // -V654
{
mypack.num = packetnum++;
// при переходе черех максимум (UniSetUDP::MaxPacketNum)
// пакет опять должен иметь номер "1"
if( packetnum == 0 )
packetnum = 1;
for( auto&& udp : vsend )
{
try
{
if( udp->poll(100000, Poco::Net::Socket::SELECT_WRITE) )
{
mypack.transport_msg(s_buf);
size_t ret = udp->sendBytes((char*)&s_buf.data, s_buf.len);
if( ret < s_buf.len )
cerr << "(send): FAILED ret=" << ret << " < sizeof=" << s_buf.len << endl;
}
}
catch( Poco::Net::NetException& e )
{
cerr << "(send): " << e.message() << " (" << s_host << ")" << endl;
}
catch( ... )
{
cerr << "(send): catch ..." << endl;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(usecpause));
}
}
// --------------------------------------------------------------------------
static void run_test( size_t max, const std::string& host )
{
std::vector< std::shared_ptr<UNetReceiver> > vrecv;
vrecv.reserve(max);
// make receivers..
for( size_t i = 0; i < max; i++ )
{
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
r->setLockUpdate(true);
vrecv.emplace_back(r);
}
size_t count = 0;
// Run receivers..
for( auto&& r : vrecv )
{
if( r )
{
count++;
r->start();
}
}
cerr << "RUn " << count << " receivers..." << endl;
// wait..
pause();
for( auto&& r : vrecv )
{
if(r)
r->stop();
}
std::vector< std::shared_ptr<UNetReceiver> > vrecv;
vrecv.reserve(max);
// make receivers..
for( size_t i = 0; i < max; i++ )
{
cout << "create receiver: " << host << ":" << begPort + i << endl;
auto r = make_shared<UNetReceiver>(host, begPort + i, smiInstance());
//r->setLockUpdate(true);
vrecv.emplace_back(r);
}
size_t count = 0;
// Run receivers..
for( auto&& r : vrecv )
{
if( r )
{
count++;
r->start();
}
}
cerr << "RUN " << count << " receivers..." << endl;
// wait..
pause();
for( auto&& r : vrecv )
{
if(r)
r->stop();
}
}
// --------------------------------------------------------------------------
int main(int argc, char* argv[] )
{
std::string host = "127.255.255.255";
try
{
auto conf = uniset_init(argc, argv);
if( argc > 1 && !strcmp(argv[1], "s") )
run_senders(10, host);
else
run_test(10, host);
return 0;
}
catch( const SystemError& err )
{
cerr << "(urecv-perf-test): " << err << endl;
}
catch( const uniset::Exception& ex )
{
cerr << "(urecv-perf-test): " << ex << endl;
}
catch( const std::exception& e )
{
cerr << "(tests_with_sm): " << e.what() << endl;
}
catch(...)
{
cerr << "(urecv-perf-test): catch(...)" << endl;
}
return 1;
std::string host = "127.255.255.255";
try
{
auto conf = uniset_init(argc, argv);
if( argc > 1 && !strcmp(argv[1], "s") )
run_senders(1, host);
else
run_test(1, host);
return 0;
}
catch( const SystemError& err )
{
cerr << "(urecv-perf-test): " << err << endl;
}
catch( const uniset::Exception& ex )
{
cerr << "(urecv-perf-test): " << ex << endl;
}
catch( const std::exception& e )
{
cerr << "(tests_with_sm): " << e.what() << endl;
}
catch(...)
{
cerr << "(urecv-perf-test): catch(...)" << endl;
}
return 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment