Commit fc4f125f authored by Pavel Vainerman's avatar Pavel Vainerman

[Переход на libPoco]: (UNetUDP): добился прохождения теста

parent c127f9e5
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <Poco/Net/NetException.h>
#include "Exceptions.h" #include "Exceptions.h"
#include "Extensions.h" #include "Extensions.h"
#include "UNetReceiver.h" #include "UNetReceiver.h"
...@@ -43,6 +44,7 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port, const std::shar ...@@ -43,6 +44,7 @@ UNetReceiver::UNetReceiver(const std::string& s_host, int _port, const std::shar
recvpause(10), recvpause(10),
updatepause(100), updatepause(100),
port(_port), port(_port),
saddr(s_host,_port),
recvTimeout(5000), recvTimeout(5000),
prepareTime(2000), prepareTime(2000),
lostTimeout(200), /* 2*updatepause */ lostTimeout(200), /* 2*updatepause */
...@@ -170,7 +172,8 @@ bool UNetReceiver::createConnection( bool throwEx ) ...@@ -170,7 +172,8 @@ bool UNetReceiver::createConnection( bool throwEx )
try try
{ {
udp = make_shared<UDPReceiveU>(addr, port); udp = make_shared<UDPReceiveU>(addr, port);
udp->setCompletion(false); // делаем неблокирующее чтение (нужно для libev) //udp = make_shared<UDPReceiveU>();
udp->setBlocking(false); // делаем неблокирующее чтение (нужно для libev)
evReceive.set<UNetReceiver, &UNetReceiver::callback>(this); evReceive.set<UNetReceiver, &UNetReceiver::callback>(this);
if( evCheckConnection.is_active() ) if( evCheckConnection.is_active() )
...@@ -624,25 +627,34 @@ void UNetReceiver::stop() ...@@ -624,25 +627,34 @@ void UNetReceiver::stop()
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool UNetReceiver::receive() bool UNetReceiver::receive()
{ {
ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data)); try
if( ret < 0 )
{ {
unetcrit << myname << "(receive): recv err(" << errno << "): " << strerror(errno) << endl; ssize_t ret = udp->receiveBytes(r_buf.data, sizeof(r_buf.data));
return false; //ssize_t ret = udp->receiveFrom(r_buf.data, sizeof(r_buf.data),saddr);
}
if( ret == 0 ) if( ret < 0 )
{ {
unetwarn << myname << "(receive): disconnected?!... recv 0 byte.." << endl; unetcrit << myname << "(receive): recv err(" << errno << "): " << strerror(errno) << endl;
return false; return false;
} }
if( ret == 0 )
{
unetwarn << myname << "(receive): disconnected?!... recv 0 byte.." << endl;
return false;
}
size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf); size_t sz = UniSetUDP::UDPMessage::getMessage(pack, r_buf);
if( sz == 0 ) if( sz == 0 )
{
unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl;
return false;
}
}
catch( Poco::Net::NetException& ex )
{ {
unetcrit << myname << "(receive): FAILED RECEIVE DATA ret=" << ret << endl; unetcrit << myname << "(receive): recv err: " << ex.displayText() << endl;
return false; return false;
} }
......
...@@ -208,6 +208,7 @@ class UNetReceiver: ...@@ -208,6 +208,7 @@ class UNetReceiver:
std::shared_ptr<UDPReceiveU> udp; std::shared_ptr<UDPReceiveU> udp;
std::string addr; std::string addr;
int port = { 0 }; int port = { 0 };
Poco::Net::SocketAddress saddr;
std::string myname; std::string myname;
ev::io evReceive; ev::io evReceive;
ev::periodic evCheckConnection; ev::periodic evCheckConnection;
......
...@@ -35,6 +35,7 @@ UNetSender::UNetSender(const std::string& _host, const int _port, const std::sha ...@@ -35,6 +35,7 @@ UNetSender::UNetSender(const std::string& _host, const int _port, const std::sha
shm(smi), shm(smi),
port(_port), port(_port),
s_host(_host), s_host(_host),
saddr(_host,_port),
sendpause(150), sendpause(150),
packsendpause(5), packsendpause(5),
activated(false), activated(false),
...@@ -111,8 +112,11 @@ bool UNetSender::createConnection( bool throwEx ) ...@@ -111,8 +112,11 @@ bool UNetSender::createConnection( bool throwEx )
try try
{ {
udp = make_shared<UDPSocketU>(addr, port); //udp = make_shared<UDPSocketU>(addr, port);
udp = make_shared<UDPSocketU>();
udp->setBroadcast(true); udp->setBroadcast(true);
udp->setSendTimeout(writeTimeout*1000);
// udp->setNoDelay(true);
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
...@@ -301,14 +305,20 @@ void UNetSender::real_send( UniSetUDP::UDPMessage& mypack ) ...@@ -301,14 +305,20 @@ void UNetSender::real_send( UniSetUDP::UDPMessage& mypack )
if( packetnum == 0 ) if( packetnum == 0 )
packetnum = 1; packetnum = 1;
if( !udp || !udp->poll(writeTimeout, Poco::Net::Socket::SELECT_WRITE) ) if( !udp || !udp->poll(writeTimeout*1000, Poco::Net::Socket::SELECT_WRITE) )
return; return;
mypack.transport_msg(s_msg); mypack.transport_msg(s_msg);
size_t ret = udp->sendBytes( (char*)s_msg.data, s_msg.len ); try
{
if( ret < s_msg.len ) size_t ret = udp->sendTo((char*)s_msg.data, s_msg.len, saddr);
unetcrit << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl; if( ret < s_msg.len )
unetcrit << myname << "(real_send): FAILED ret=" << ret << " < sizeof=" << s_msg.len << endl;
}
catch( Poco::Net::NetException& ex )
{
unetcrit << myname << "(real_send): error: " << ex.displayText() << endl;
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UNetSender::stop() void UNetSender::stop()
......
...@@ -175,6 +175,7 @@ class UNetSender ...@@ -175,6 +175,7 @@ class UNetSender
std::string addr; std::string addr;
int port = { 0 }; int port = { 0 };
std::string s_host = { "" }; std::string s_host = { "" };
Poco::Net::SocketAddress saddr;
std::string myname = { "" }; std::string myname = { "" };
timeout_t sendpause = { 150 }; timeout_t sendpause = { 150 };
......
...@@ -23,6 +23,7 @@ static int s_port = 3003; // Node2 ...@@ -23,6 +23,7 @@ static int s_port = 3003; // Node2
static int s_nodeID = 3003; static int s_nodeID = 3003;
static int s_procID = 123; static int s_procID = 123;
static int s_numpack = 1; static int s_numpack = 1;
static Poco::Net::SocketAddress s_addr(host,s_port);
static ObjectId node2_respond_s = 12; static ObjectId node2_respond_s = 12;
static ObjectId node2_lostpackets_as = 13; static ObjectId node2_lostpackets_as = 13;
static int maxDifferense = 5; // см. unetudp-test-configure.xml --unet-maxdifferense static int maxDifferense = 5; // см. unetudp-test-configure.xml --unet-maxdifferense
...@@ -48,7 +49,7 @@ void InitTest() ...@@ -48,7 +49,7 @@ void InitTest()
if( udp_s == nullptr ) if( udp_s == nullptr )
{ {
udp_s = make_shared<UDPSocketU>(host, s_port); udp_s = make_shared<UDPSocketU>(); //(host, s_port);
udp_s->setBroadcast(true); udp_s->setBroadcast(true);
} }
} }
...@@ -62,7 +63,7 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20 ...@@ -62,7 +63,7 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20
while( ncycle > 0 ) while( ncycle > 0 )
{ {
if( !udp_r->poll(tout,Poco::Net::Socket::SELECT_READ) ) if( !udp_r->poll(tout*1000,Poco::Net::Socket::SELECT_READ) )
break; break;
size_t ret = udp_r->receiveBytes(&(buf.data), sizeof(buf.data) ); size_t ret = udp_r->receiveBytes(&(buf.data), sizeof(buf.data) );
...@@ -80,7 +81,7 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20 ...@@ -80,7 +81,7 @@ static UniSetUDP::UDPMessage receive( unsigned int pnum = 0, timeout_t tout = 20
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void send( UniSetUDP::UDPMessage& pack, int tout = 2000 ) void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
{ {
CHECK( udp_s->poll(tout,Poco::Net::Socket::SELECT_WRITE) ); CHECK( udp_s->poll(tout*1000,Poco::Net::Socket::SELECT_WRITE) );
pack.nodeID = s_nodeID; pack.nodeID = s_nodeID;
pack.procID = s_procID; pack.procID = s_procID;
...@@ -88,7 +89,7 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 ) ...@@ -88,7 +89,7 @@ void send( UniSetUDP::UDPMessage& pack, int tout = 2000 )
UniSetUDP::UDPPacket s_buf; UniSetUDP::UDPPacket s_buf;
pack.transport_msg(s_buf); pack.transport_msg(s_buf);
size_t ret = udp_s->sendBytes((char*)&s_buf.data, s_buf.len); size_t ret = udp_s->sendTo((char*)&s_buf.data, s_buf.len,s_addr);
REQUIRE( ret == s_buf.len ); REQUIRE( ret == s_buf.len );
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
......
...@@ -107,7 +107,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5 ...@@ -107,7 +107,7 @@ static void run_senders( size_t max, const std::string& s_host, size_t count = 5
{ {
try try
{ {
if( udp->poll(100,Poco::Net::Socket::SELECT_WRITE) ) if( udp->poll(100000,Poco::Net::Socket::SELECT_WRITE) )
{ {
mypack.transport_msg(s_buf); mypack.transport_msg(s_buf);
size_t ret = udp->sendBytes((char*)&s_buf.data, s_buf.len); size_t ret = udp->sendBytes((char*)&s_buf.data, s_buf.len);
......
...@@ -228,7 +228,7 @@ int main(int argc, char* argv[]) ...@@ -228,7 +228,7 @@ int main(int argc, char* argv[])
npack = 0; npack = 0;
} }
if( !udp.poll(tout,Poco::Net::Socket::SELECT_READ) ) if( !udp.poll(tout*1000,Poco::Net::Socket::SELECT_READ) )
{ {
cout << "(recv): Timeout.." << endl; cout << "(recv): Timeout.." << endl;
continue; continue;
...@@ -326,7 +326,7 @@ int main(int argc, char* argv[]) ...@@ -326,7 +326,7 @@ int main(int argc, char* argv[])
try try
{ {
if( udp->poll(tout,Poco::Net::Socket::SELECT_WRITE) ) if( udp->poll(tout*1000,Poco::Net::Socket::SELECT_WRITE) )
{ {
mypack.transport_msg(s_buf); mypack.transport_msg(s_buf);
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include <Poco/Net/DatagramSocket.h> #include <Poco/Net/DatagramSocket.h>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// различные классы-обёртки, чтобы достучаться до "сырого сокета" и других функций // Классы-обёртки, чтобы достучаться до "сырого сокета" и других функций
// необходимых при использовании с libev.. // необходимых при использовании с libev..
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
class UDPSocketU: class UDPSocketU:
...@@ -12,6 +12,10 @@ class UDPSocketU: ...@@ -12,6 +12,10 @@ class UDPSocketU:
{ {
public: public:
UDPSocketU():
Poco::Net::DatagramSocket(Poco::Net::IPAddress::IPv4)
{}
UDPSocketU( const std::string& bind, int port ): UDPSocketU( const std::string& bind, int port ):
Poco::Net::DatagramSocket(Poco::Net::SocketAddress(bind, port),true) Poco::Net::DatagramSocket(Poco::Net::SocketAddress(bind, port),true)
{} {}
...@@ -29,6 +33,10 @@ class UDPReceiveU: ...@@ -29,6 +33,10 @@ class UDPReceiveU:
{ {
public: public:
UDPReceiveU():
Poco::Net::DatagramSocket(Poco::Net::IPAddress::IPv4)
{}
UDPReceiveU( const std::string& bind, int port): UDPReceiveU( const std::string& bind, int port):
Poco::Net::DatagramSocket(Poco::Net::SocketAddress(bind, port),true) Poco::Net::DatagramSocket(Poco::Net::SocketAddress(bind, port),true)
{} {}
...@@ -39,31 +47,6 @@ class UDPReceiveU: ...@@ -39,31 +47,6 @@ class UDPReceiveU:
{ {
return Poco::Net::DatagramSocket::sockfd(); return Poco::Net::DatagramSocket::sockfd();
} }
inline void setCompletion( bool set )
{
Poco::Net::DatagramSocket::setBlocking(set);
}
};
// -------------------------------------------------------------------------
class UDPDuplexU:
public Poco::Net::DatagramSocket
{
public:
UDPDuplexU(const std::string& bind, int port):
Poco::Net::DatagramSocket(Poco::Net::SocketAddress(bind, port),true)
{}
virtual ~UDPDuplexU() {}
int getReceiveSocket()
{
return Poco::Net::DatagramSocket::sockfd();;
}
void setReceiveCompletion( bool set )
{
Poco::Net::DatagramSocket::setBlocking(set);
}
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // UDPReceiveU_H_ #endif // UDPReceiveU_H_
......
...@@ -20,7 +20,7 @@ bool run_test_server() ...@@ -20,7 +20,7 @@ bool run_test_server()
while( !cancel ) while( !cancel )
{ {
if( sock.poll(500,Poco::Net::Socket::SELECT_READ) ) if( sock.poll(500000,Poco::Net::Socket::SELECT_READ) )
{ {
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment