Commit d1460952 authored by Pavel Vainerman's avatar Pavel Vainerman

backported to p9 as 2.9.4-alt0.M90P.0.3 (with rpmbph script)

parents d910b649 9f246b4b
......@@ -27,7 +27,7 @@
Name: libuniset2
Version: 2.9.4
Release: alt0.M90P.0.2
Release: alt0.M90P.0.3
Summary: UniSet - library for building distributed industrial control systems
License: LGPL-2.1
......@@ -576,9 +576,12 @@ rm -f %buildroot%_docdir/%oname/html/*.md5
# history of current unpublished changes
%changelog
* Fri Mar 12 2021 Pavel Vainerman <pv@altlinux.ru> 2.9.4-alt0.M90P.0.2
* Mon Mar 15 2021 Pavel Vainerman <pv@altlinux.ru> 2.9.4-alt0.M90P.0.3
- backport to ALTLinux p9 (by rpmbph script)
* Mon Mar 15 2021 Pavel Vainerman <pv@altlinux.ru> 2.9.4-alt0.3
- [uwebsocket]: refactoring
* Fri Mar 12 2021 Pavel Vainerman <pv@altlinux.ru> 2.9.4-alt0.2
- test build for websocketgate
......
......@@ -83,6 +83,8 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const strin
wsMaxSend = conf->getArgPInt("--" + prefix + "max-send", it.getProp("wsMaxSend"), wsMaxSend);
wsMaxCmd = conf->getArgPInt("--" + prefix + "max-cmd", it.getProp("wsMaxCmd"), wsMaxCmd);
mylog1 << myname << "maxSend=" << wsMaxSend << " maxCmd=" << wsMaxCmd << endl;
httpHost = conf->getArgParam("--" + prefix + "httpserver-host", "localhost");
httpPort = conf->getArgPInt("--" + prefix + "httpserver-port", 8081);
httpCORS_allow = conf->getArgParam("--" + prefix + "httpserver-cors-allow", "*");
......@@ -179,6 +181,30 @@ void UWebSocketGate::sensorInfo( const SensorMessage* sm )
s->sensorInfo(sm);
}
//--------------------------------------------------------------------------------------------
uniset::SimpleInfo* UWebSocketGate::getInfo( const char* userparam )
{
uniset::SimpleInfo_var i = UniSetObject::getInfo(userparam);
ostringstream inf;
inf << i->info << endl;
// inf << vmon.pretty_str() << endl;
inf << endl;
{
uniset_rwmutex_wrlock lock(wsocksMutex);
inf << "websockets[" << wsocks.size() << "]: " << endl;
for( auto&& s : wsocks )
{
inf << " " << s->getInfo() << endl;
}
}
i->info = inf.str().c_str();
return i._retn();
}
//--------------------------------------------------------------------------------------------
Poco::JSON::Object::Ptr UWebSocketGate::UWebSocket::to_short_json( sinfo* si )
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
......@@ -217,6 +243,16 @@ Poco::JSON::Object::Ptr UWebSocketGate::to_json( const SensorMessage* sm, const
return json;
}
//--------------------------------------------------------------------------------------------
Poco::JSON::Object::Ptr UWebSocketGate::error_to_json( const std::string& err )
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
json->set("type", "Error");
json->set("message", err);
return json;
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const char* const* argv, const std::string& prefix )
{
string name = uniset::getArgParam("--" + prefix + "name", argc, argv, "UWebSocketGate");
......@@ -242,7 +278,7 @@ void UWebSocketGate::help_print()
cout << "--prefix-max num - Максимальное количество websocket-ов" << endl;
cout << "--prefix-heartbeat-time msec - Период сердцебиения в соединении. По умолчанию: 3000 мсек" << endl;
cout << "--prefix-send-time msec - Период посылки сообщений. По умолчанию: 500 мсек" << endl;
cout << "--prefix-max-send num - Максимальное число сообщений посылаемых за один раз. По умолчанию: 200" << endl;
cout << "--prefix-max-send num - Максимальное число сообщений посылаемых за один раз. По умолчанию: 5000" << endl;
cout << "--prefix-max-cmd num - Максимальное число команд обрабатываемых за один раз. По умолчанию: 200" << endl;
cout << "http: " << endl;
......@@ -548,21 +584,6 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::
auto idlist = uniset::explode(slist);
// if( idlist.empty() )
// {
// resp->setStatus(HTTPResponse::HTTP_BAD_REQUEST);
// resp->setContentType("text/html");
// resp->setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
// resp->setContentLength(0);
// std::ostream& err = resp->send();
// err << "Error: no list of sensors for '" << slist << "'. Use: http://host:port/wsgate/?s1,s2,s3";
// err.flush();
// mywarn << myname << "(newWebSocket): error: no list of sensors for '" << slist << "'" << endl;
// return nullptr;
// }
{
uniset_rwmutex_wrlock lock(wsocksMutex);
ws = make_shared<UWebSocket>(req, resp);
......@@ -602,7 +623,7 @@ void UWebSocketGate::delWebSocket(std::shared_ptr<UWebSocket>& ws )
}
}
// -----------------------------------------------------------------------------
const std::string UWebSocketGate::UWebSocket::ping_str = { "{\"data\": [{\"type\": \"Ping\"}]}" };
const std::string UWebSocketGate::UWebSocket::ping_str = "{\"data\": [{\"type\": \"Ping\"}]}";
UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
Poco::Net::HTTPServerResponse* _resp):
......@@ -610,6 +631,7 @@ UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
req(_req),
resp(_resp)
{
memset(rbuf, 0, sizeof(rbuf));
setBlocking(false);
cancelled = false;
......@@ -621,9 +643,10 @@ UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
iosend.set<UWebSocketGate::UWebSocket, &UWebSocketGate::UWebSocket::send>(this);
iorecv.set<UWebSocketGate::UWebSocket, &UWebSocketGate::UWebSocket::read>(this);
maxsize = maxsend * 10; // пока так
maxsize = maxsend * Kbuf;
setReceiveTimeout( uniset::PassiveTimer::millisecToPoco(recvTimeout));
setReceiveTimeout(uniset::PassiveTimer::millisecToPoco(recvTimeout));
setMaxPayloadSize(sizeof(rbuf));
}
// -----------------------------------------------------------------------------
UWebSocketGate::UWebSocket::~UWebSocket()
......@@ -639,6 +662,21 @@ UWebSocketGate::UWebSocket::~UWebSocket()
}
}
// -----------------------------------------------------------------------------
std::string UWebSocketGate::UWebSocket::getInfo() const noexcept
{
ostringstream inf;
inf << req->clientAddress().toString()
<< ": jbuf=" << jbuf.size()
<< " wbuf=" << wbuf.size()
<< " ping_sec=" << ping_sec
<< " maxsend=" << maxsend
<< " send_sec=" << send_sec
<< " maxcmd=" << maxcmd;
return inf.str();
}
// -----------------------------------------------------------------------------
bool UWebSocketGate::UWebSocket::isActive()
{
return iosend.is_active();
......@@ -727,6 +765,9 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
if( !(revents & EV_READ) )
return;
if( cancelled )
return;
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
......@@ -740,16 +781,36 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
return;
int n = receiveFrame(rbuf, sizeof(rbuf), flags);
// int n = receiveBytes(rbuf, sizeof(rbuf));
if( n <= 0 )
return;
const std::string cmd(rbuf, n);
if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
{
sendFrame(rbuf, n, WebSocket::FRAME_OP_PONG);
return;
}
if( cmd == ping_str )
if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
return;
if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE )
{
term();
return;
}
if( n == sizeof(rbuf) )
{
ostringstream err;
err << "Payload too big. Must be < " << sizeof(rbuf) << " bytes";
sendError(err.str());
return;
}
const std::string cmd(rbuf, n);
onCommand(cmd);
// откладываем ping, т.к. что-то в канале и так было
......@@ -785,6 +846,17 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
<< " error: " << ex.displayText()
<< endl;
}
catch( Poco::TimeoutException& ex )
{
// it is ok
}
catch( std::exception& ex )
{
mylog3 << "(websocket): std::exception: "
<< req->clientAddress().toString()
<< " error: " << ex.what()
<< endl;
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::ask( uniset::ObjectId id )
......@@ -832,7 +904,7 @@ void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm )
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
mywarn << req->clientAddress().toString() << " lost messages...(maxsize=" << maxsize << ")" << endl;
return;
}
......@@ -905,7 +977,7 @@ void UWebSocketGate::UWebSocket::sendShortResponse( sinfo& si )
{
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
mywarn << req->clientAddress().toString() << "(sendShortResponse): lost messages (maxsize=" << maxsize << ")" << endl;
return;
}
......@@ -921,7 +993,7 @@ void UWebSocketGate::UWebSocket::sendResponse( sinfo& si )
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
mywarn << req->clientAddress().toString() << "(sendResponse): lost messages (maxsize=" << maxsize << ")" << endl;
return;
}
......@@ -931,12 +1003,35 @@ void UWebSocketGate::UWebSocket::sendResponse( sinfo& si )
ioping.stop();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::sendError( const std::string& msg )
{
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << "(sendError): lost messages (maxsize=" << maxsize << ")" << endl;
return;
}
jbuf.emplace(UWebSocketGate::error_to_json(msg));
if( ioping.is_active() )
ioping.stop();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
{
if( cmdtxt.size() < 5 )
{
mylog3 << "(websocket): " << req->clientAddress().toString()
<< " error: bad command format '" << cmdtxt << "'. Len must be > 4" << endl;
sendError("Unknown command. Command must be > 4 bytes");
return;
}
const string cmd = cmdtxt.substr(0, 3);
const string params = cmdtxt.substr(4, cmdtxt.size());
const string params = cmdtxt.substr(4);
myinfo << "(websocket): " << req->clientAddress().toString()
myinfo << "(websocket)(command): " << req->clientAddress().toString()
<< "(" << cmd << "): " << params << endl;
if( cmd == "set" )
......@@ -1094,6 +1189,13 @@ void UWebSocketGate::UWebSocket::write()
<< " error: " << ex.displayText()
<< endl;
}
catch( std::exception& ex )
{
mylog3 << "(websocket): std::exception: "
<< req->clientAddress().toString()
<< " error: " << ex.what()
<< endl;
}
term();
}
......@@ -1133,7 +1235,10 @@ void UWebSocketGate::UWebSocket::setSendPeriod ( const double& sec )
void UWebSocketGate::UWebSocket::setMaxSendCount( size_t val )
{
if( val > 0 )
{
maxsend = val;
maxsize = maxsend * Kbuf;
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setMaxCmdCount( size_t val )
......
......@@ -167,6 +167,19 @@ namespace uniset
- "ask:id1,id2,name3,..." - подписаться на уведомления об изменении датчиков (sensorInfo)
- "del:id1,id2,name3,..." - отказаться от уведомления об изменении датчиков
- "get:id1,id2,name3,..." - получить текущее значение датчиков (разовое сообщение ShortSensorInfo)
Если длина команды превышает допустимое значение, то возвращается ошибка
\code
{
"data": [
{"type": "Error", "message": "Payload to big"}
]
}
\endcode
\warning Под хранение сообщений для отправки выделяется Kbuf*maxSend. Kbuf в текущей реализации равен 10.
Т.е. если настроено maxSend=5000 сообщений, то буфер сможет максимально хранить 50000 сообщений.
*/
class UWebSocketGate:
public UniSetObject,
......@@ -228,6 +241,7 @@ namespace uniset
void checkMessages( ev::timer& t, int revents );
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual uniset::SimpleInfo* getInfo( const char* userparam = 0 ) override;
ev::timer iocheck;
double check_sec = { 0.05 };
int maxMessagesProcessing = { 100 };
......@@ -242,10 +256,11 @@ namespace uniset
double wsHeartbeatTime_sec = { 3.0 };
double wsSendTime_sec = { 0.5 };
size_t wsMaxSend = { 200 };
size_t wsMaxCmd = { 100 };
size_t wsMaxSend = { 5000 };
size_t wsMaxCmd = { 200 };
static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err );
static Poco::JSON::Object::Ptr error_to_json( const std::string& err );
/*! класс реализует работу с websocket через eventloop
* Из-за того, что поступление логов может быть достаточно быстрым
......@@ -263,6 +278,8 @@ namespace uniset
virtual ~UWebSocket();
std::string getInfo() const noexcept;
bool isActive();
void set( ev::dynamic_loop& loop, std::shared_ptr<ev::async> a );
......@@ -305,18 +322,20 @@ namespace uniset
void sendResponse( sinfo& si );
void sendShortResponse( sinfo& si );
void onCommand( const std::string& cmd );
void sendError( const std::string& message );
ev::timer iosend;
double send_sec = { 0.5 };
size_t maxsend = { 200 };
size_t maxcmd = { 100 };
size_t maxsend = { 5000 };
size_t maxcmd = { 200 };
const int Kbuf = { 10 }; // коэффициент для буфера сообщений (maxsend умножается на Kbuf)
ev::timer ioping;
double ping_sec = { 3.0 };
static const std::string ping_str;
ev::io iorecv;
char rbuf[512]; //! \todo сделать настраиваемым или применить Poco::FIFOBuffer
char rbuf[32 * 1024]; //! \todo сделать настраиваемым или применить Poco::FIFOBuffer
timeout_t recvTimeout = { 200 }; // msec
std::shared_ptr<ev::async> cmdsignal;
......
......@@ -2,4 +2,4 @@
ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-wsgate --confile test.xml --ws-name UWebSocketGate1 --ws-log-add-levels any --ws-max-cmd 3 $*
uniset2-start.sh -f ./uniset2-wsgate --confile test.xml --ws-name UWebSocketGate1 --ws-log-add-levels any $*
......@@ -48,8 +48,8 @@ TEST_CASE("[DigitalFilter]: median", "[DigitalFilter][median]")
REQUIRE( df.currentMedian() == 50 );
DigitalFilter df1;
DigitalFilter df10;
DigitalFilter df1(1);
DigitalFilter df10(10);
vector<long> dat = {0, 234, 356, 344, 234, 320, 250, 250, 250, 250, 250, 250, 250, 251, 252, 251, 252, 252, 250};
for( auto v : dat )
......@@ -58,8 +58,14 @@ TEST_CASE("[DigitalFilter]: median", "[DigitalFilter][median]")
df10.median(v);
}
REQUIRE( df1.currentMedian() == 252 );
REQUIRE( df10.currentMedian() == 252 );
REQUIRE( df1.currentMedian() == 250 );
REQUIRE( df10.currentMedian() == 251 );
DigitalFilter d(1);
d.median(1);
d.median(2);
d.median(3);
REQUIRE( d.currentMedian() == 3 );
}
// -----------------------------------------------------------------------------
TEST_CASE("[DigitalFilter]: filter1", "[DigitalFilter][filter1]")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment