Commit 44d7bfad authored by Pavel Vainerman's avatar Pavel Vainerman

[uwebsocket]: read buffer increased to 32kB (by session),

close socket processing, added error "payload to big"
parent 3c41a56e
...@@ -217,6 +217,16 @@ Poco::JSON::Object::Ptr UWebSocketGate::to_json( const SensorMessage* sm, const ...@@ -217,6 +217,16 @@ Poco::JSON::Object::Ptr UWebSocketGate::to_json( const SensorMessage* sm, const
return json; return json;
} }
//-------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------
Poco::JSON::Object::Ptr UWebSocketGate::error_to_json( const std::string& err )
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
json->set("type", "Error");
json->set("message", err);
return json;
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const char* const* argv, const std::string& prefix ) std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const char* const* argv, const std::string& prefix )
{ {
string name = uniset::getArgParam("--" + prefix + "name", argc, argv, "UWebSocketGate"); string name = uniset::getArgParam("--" + prefix + "name", argc, argv, "UWebSocketGate");
...@@ -548,21 +558,6 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco:: ...@@ -548,21 +558,6 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::
auto idlist = uniset::explode(slist); auto idlist = uniset::explode(slist);
// if( idlist.empty() )
// {
// resp->setStatus(HTTPResponse::HTTP_BAD_REQUEST);
// resp->setContentType("text/html");
// resp->setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
// resp->setContentLength(0);
// std::ostream& err = resp->send();
// err << "Error: no list of sensors for '" << slist << "'. Use: http://host:port/wsgate/?s1,s2,s3";
// err.flush();
// mywarn << myname << "(newWebSocket): error: no list of sensors for '" << slist << "'" << endl;
// return nullptr;
// }
{ {
uniset_rwmutex_wrlock lock(wsocksMutex); uniset_rwmutex_wrlock lock(wsocksMutex);
ws = make_shared<UWebSocket>(req, resp); ws = make_shared<UWebSocket>(req, resp);
...@@ -602,7 +597,7 @@ void UWebSocketGate::delWebSocket(std::shared_ptr<UWebSocket>& ws ) ...@@ -602,7 +597,7 @@ void UWebSocketGate::delWebSocket(std::shared_ptr<UWebSocket>& ws )
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
const std::string UWebSocketGate::UWebSocket::ping_str = { "{\"data\": [{\"type\": \"Ping\"}]}" }; const std::string UWebSocketGate::UWebSocket::ping_str = "{\"data\": [{\"type\": \"Ping\"}]}";
UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req, UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
Poco::Net::HTTPServerResponse* _resp): Poco::Net::HTTPServerResponse* _resp):
...@@ -610,6 +605,7 @@ UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req, ...@@ -610,6 +605,7 @@ UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
req(_req), req(_req),
resp(_resp) resp(_resp)
{ {
memset(rbuf, 0, sizeof(rbuf));
setBlocking(false); setBlocking(false);
cancelled = false; cancelled = false;
...@@ -623,7 +619,8 @@ UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req, ...@@ -623,7 +619,8 @@ UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
maxsize = maxsend * 10; // пока так maxsize = maxsend * 10; // пока так
setReceiveTimeout( uniset::PassiveTimer::millisecToPoco(recvTimeout)); setReceiveTimeout(uniset::PassiveTimer::millisecToPoco(recvTimeout));
setMaxPayloadSize(sizeof(rbuf));
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
UWebSocketGate::UWebSocket::~UWebSocket() UWebSocketGate::UWebSocket::~UWebSocket()
...@@ -740,16 +737,29 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents ) ...@@ -740,16 +737,29 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
return; return;
int n = receiveFrame(rbuf, sizeof(rbuf), flags); int n = receiveFrame(rbuf, sizeof(rbuf), flags);
// int n = receiveBytes(rbuf, sizeof(rbuf));
if( n <= 0 ) if( n <= 0 )
return; return;
const std::string cmd(rbuf, n); if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
{
sendFrame(rbuf, n, WebSocket::FRAME_OP_PONG);
return;
}
if( cmd == ping_str ) if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE )
return; return;
if( n == sizeof(rbuf) )
{
ostringstream err;
err << "Payload too big. Must be < " << sizeof(rbuf) << " bytes";
sendError(err.str());
return;
}
const std::string cmd(rbuf, n);
onCommand(cmd); onCommand(cmd);
// откладываем ping, т.к. что-то в канале и так было // откладываем ping, т.к. что-то в канале и так было
...@@ -785,6 +795,17 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents ) ...@@ -785,6 +795,17 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
<< " error: " << ex.displayText() << " error: " << ex.displayText()
<< endl; << endl;
} }
catch( Poco::TimeoutException& ex )
{
// it is ok
}
catch( std::exception& ex )
{
mylog3 << "(websocket): std::exception: "
<< req->clientAddress().toString()
<< " error: " << ex.what()
<< endl;
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::ask( uniset::ObjectId id ) void UWebSocketGate::UWebSocket::ask( uniset::ObjectId id )
...@@ -931,6 +952,20 @@ void UWebSocketGate::UWebSocket::sendResponse( sinfo& si ) ...@@ -931,6 +952,20 @@ void UWebSocketGate::UWebSocket::sendResponse( sinfo& si )
ioping.stop(); ioping.stop();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::sendError( const std::string& msg )
{
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
return;
}
jbuf.emplace(UWebSocketGate::error_to_json(msg));
if( ioping.is_active() )
ioping.stop();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt ) void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
{ {
if( cmdtxt.size() < 5 ) if( cmdtxt.size() < 5 )
...@@ -1110,6 +1145,13 @@ void UWebSocketGate::UWebSocket::write() ...@@ -1110,6 +1145,13 @@ void UWebSocketGate::UWebSocket::write()
<< " error: " << ex.displayText() << " error: " << ex.displayText()
<< endl; << endl;
} }
catch( std::exception& ex )
{
mylog3 << "(websocket): std::exception: "
<< req->clientAddress().toString()
<< " error: " << ex.what()
<< endl;
}
term(); term();
} }
......
...@@ -167,6 +167,16 @@ namespace uniset ...@@ -167,6 +167,16 @@ namespace uniset
- "ask:id1,id2,name3,..." - подписаться на уведомления об изменении датчиков (sensorInfo) - "ask:id1,id2,name3,..." - подписаться на уведомления об изменении датчиков (sensorInfo)
- "del:id1,id2,name3,..." - отказаться от уведомления об изменении датчиков - "del:id1,id2,name3,..." - отказаться от уведомления об изменении датчиков
- "get:id1,id2,name3,..." - получить текущее значение датчиков (разовое сообщение ShortSensorInfo) - "get:id1,id2,name3,..." - получить текущее значение датчиков (разовое сообщение ShortSensorInfo)
Если длинна команды превышает допустимое значение, то возвращается ошибка
\code
{
"data": [
{"type": "Error", "message": "Payload to big"}
]
}
\endcode
*/ */
class UWebSocketGate: class UWebSocketGate:
public UniSetObject, public UniSetObject,
...@@ -246,6 +256,7 @@ namespace uniset ...@@ -246,6 +256,7 @@ namespace uniset
size_t wsMaxCmd = { 100 }; size_t wsMaxCmd = { 100 };
static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err ); static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err );
static Poco::JSON::Object::Ptr error_to_json( const std::string& err );
/*! класс реализует работу с websocket через eventloop /*! класс реализует работу с websocket через eventloop
* Из-за того, что поступление логов может быть достаточно быстрым * Из-за того, что поступление логов может быть достаточно быстрым
...@@ -305,6 +316,7 @@ namespace uniset ...@@ -305,6 +316,7 @@ namespace uniset
void sendResponse( sinfo& si ); void sendResponse( sinfo& si );
void sendShortResponse( sinfo& si ); void sendShortResponse( sinfo& si );
void onCommand( const std::string& cmd ); void onCommand( const std::string& cmd );
void sendError( const std::string& message );
ev::timer iosend; ev::timer iosend;
double send_sec = { 0.5 }; double send_sec = { 0.5 };
...@@ -316,7 +328,7 @@ namespace uniset ...@@ -316,7 +328,7 @@ namespace uniset
static const std::string ping_str; static const std::string ping_str;
ev::io iorecv; ev::io iorecv;
char rbuf[512]; //! \todo сделать настраиваемым или применить Poco::FIFOBuffer char rbuf[32 * 1024]; //! \todo сделать настраиваемым или применить Poco::FIFOBuffer
timeout_t recvTimeout = { 200 }; // msec timeout_t recvTimeout = { 200 }; // msec
std::shared_ptr<ev::async> cmdsignal; std::shared_ptr<ev::async> cmdsignal;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment