Commit 90a33e6d authored by Pavel Vainerman's avatar Pavel Vainerman Committed by Pavel Vainerman

[uwebsocket]: supported json format only, optimization: send messages by batch

parent 9816f6ac
......@@ -73,12 +73,12 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const strin
setMaxSizeOfMessageQueue(sz);
#ifndef DISABLE_REST_API
wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-heartbeat-time", it.getProp("wsPingTime"), int(wsHeartbeatTime_sec * 1000)) / 1000.0;
wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-heartbeat-time", it.getProp("wsHeartbeatTimeTime"), int(wsHeartbeatTime_sec * 1000)) / 1000.0;
wsSendTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-send-time", it.getProp("wsSendTime"), int(wsSendTime_sec * 1000.0)) / 1000.0;
wsMaxSend = conf->getArgPInt("--" + prefix + "ws-max-send", it.getProp("wsMaxSend"), wsMaxSend);
httpHost = conf->getArgParam("--" + prefix + "httpserver-host", "localhost");
httpPort = conf->getArgPInt("--" + prefix + "httpserver-port", 8080);
httpPort = conf->getArgPInt("--" + prefix + "httpserver-port", 8081);
httpCORS_allow = conf->getArgParam("--" + prefix + "httpserver-cors-allow", "*");
mylog1 << myname << "(init): http server parameters " << httpHost << ":" << httpPort << endl;
......@@ -162,39 +162,13 @@ void UWebSocketGate::sensorInfo( const SensorMessage* sm )
{
uniset_rwmutex_wrlock lock(wsocksMutex);
mylog5 << myname << "(sensorInfo): sid=" << sm->id << " val=" << sm->value << endl;
for( auto&& s : wsocks )
s->sensorInfo(sm);
}
//--------------------------------------------------------------------------------------------
UWebSocketGate::RespondFormat UWebSocketGate::from_string(const string& str)
{
if( str == "json" )
return RespondFormat::JSON;
if( str == "txt" )
return RespondFormat::TXT;
if( str == "raw" )
return RespondFormat::RAW;
return RespondFormat::UNKNOWN;
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::format( const SensorMessage* sm, const std::string& err, const RespondFormat fmt )
{
if( fmt == RespondFormat::JSON )
return to_json(sm, err);
if( fmt == RespondFormat::TXT )
return to_txt(sm, err);
if( fmt == RespondFormat::RAW )
return to_json(sm, err);
return to_json(sm, err);
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::to_json( const SensorMessage* sm, const std::string& err )
Poco::JSON::Object::Ptr UWebSocketGate::to_json( const SensorMessage* sm, const std::string& err )
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
......@@ -205,7 +179,7 @@ UTCPCore::Buffer* UWebSocketGate::to_json( const SensorMessage* sm, const std::s
json->set("name", uniset::ORepHelpers::getShortName(uniset_conf()->oind->getMapName(sm->id)));
json->set("sm_tv_sec", sm->sm_tv.tv_sec);
json->set("sm_tv_nsec", sm->sm_tv.tv_nsec);
json->set("type", uniset::iotype2str(sm->sensor_type));
json->set("sm_type", uniset::iotype2str(sm->sensor_type));
json->set("undefined", sm->undefined );
json->set("supplier", sm->supplier );
json->set("tv_sec", sm->tm.tv_sec);
......@@ -219,32 +193,7 @@ UTCPCore::Buffer* UWebSocketGate::to_json( const SensorMessage* sm, const std::s
calibr->set("rmax", sm->ci.maxRaw);
calibr->set("precision", sm->ci.precision);
ostringstream out;
json->stringify(out);
return new UTCPCore::Buffer(out.str());
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::to_txt( const SensorMessage* sm, const std::string& err )
{
ostringstream out;
if( err.empty() )
out << SMonitor::printEvent(sm) << endl;
else
{
out << uniset::timeToString(sm->sm_tv.tv_sec)
<< "(" << setw(9) << sm->sm_tv.tv_nsec << ")"
<< " id=" << sm->id
<< " error=" << err
<< endl;
}
return new UTCPCore::Buffer(out.str());
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::to_raw( const SensorMessage* sm, const std::string& err )
{
return new UTCPCore::Buffer( (const unsigned char*)(sm), sizeof(*sm) );
return json;
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const char* const* argv, const std::string& prefix )
......@@ -435,54 +384,31 @@ void UWebSocketGate::handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net
// проверка подключения к страничке со списком websocket-ов
if( !seg.empty() && seg[0] == "wsgate" )
{
if( seg.size() > 2 )
{
if( seg[1] == "json" || seg[1] == "txt" || seg[1] == "raw" )
{
ostringstream params;
params << seg[2] << "&format=" << seg[1];
httpWebSocketConnectPage(out, req, resp, params.str());
}
else
{
auto jdata = respError(resp, HTTPResponse::HTTP_BAD_REQUEST, "Unknown format. Must be [json,txt,raw]");
jdata->stringify(out);
out.flush();
}
return;
}
if( seg.size() > 1 )
{
if( seg[1] == "json" || seg[1] == "txt" || seg[1] == "raw" )
{
ostringstream params;
auto qp = uri.getQueryParameters();
int i = 0;
for( const auto& p : qp )
{
if( i > 0 )
params << "&";
params << p.first;
if( !p.second.empty() )
params << "=" << p.second;
params << "&";
i++;
}
params << "format=" << seg[1];
httpWebSocketConnectPage(out, req, resp, params.str());
}
else
httpWebSocketConnectPage(out, req, resp, seg[1]);
out.flush();
return;
}
}
// default page
httpWebSocketPage(out, req, resp);
out.flush();
}
// -----------------------------------------------------------------------------
......@@ -517,7 +443,7 @@ void UWebSocketGate::onWebSocketSession(Poco::Net::HTTPServerRequest& req, Poco:
mylog3 << req.getHost() << ": WSOCKET: " << uri.getQuery() << endl;
// example: ws://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]
// example: ws://host:port/wsgate/?s1,s2,s3,s4
if( seg.empty() || seg[0] != "wsgate" )
{
resp.setStatus(HTTPResponse::HTTP_BAD_REQUEST);
......@@ -525,7 +451,7 @@ void UWebSocketGate::onWebSocketSession(Poco::Net::HTTPServerRequest& req, Poco:
resp.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentLength(0);
std::ostream& err = resp.send();
err << "Bad request. Must be: ws://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]";
err << "Bad request. Must be: ws://host:port/wsgate/?s1,s2,s3,s4";
err.flush();
return;
}
......@@ -586,16 +512,11 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::
std::shared_ptr<UWebSocket> ws;
RespondFormat fmt = RespondFormat::JSON;
std::string slist("");
for( const auto& p : qp )
{
// обрабатываем только первый встреченный параметр
if( p.first == "format" )
fmt = from_string(p.second);
else if( p.second.empty() && !p.first.empty() )
if( p.second.empty() && !p.first.empty() )
slist += ("," + p.first);
}
......@@ -627,11 +548,9 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::
ws->setMaxSendCount(wsMaxSend);
ws->mylog = mylog;
ws->setRespondFormat(fmt);
for( const auto& i : idlist.getList() )
{
mylog3 << myname << ": add " << i << endl;
mylog3 << myname << ": ask add " << i << endl;
UWebSocket::sinfo si;
si.id = i;
si.cmd = "ask";
......@@ -662,7 +581,7 @@ void UWebSocketGate::delWebSocket(std::shared_ptr<UWebSocket>& ws )
}
}
// -----------------------------------------------------------------------------
const std::string UWebSocketGate::UWebSocket::ping_str = { "." };
const std::string UWebSocketGate::UWebSocket::ping_str = { "{\"data\": [{\"type\": \"Ping\"}]}" };
UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
Poco::Net::HTTPServerResponse* _resp):
......@@ -723,10 +642,40 @@ void UWebSocketGate::UWebSocket::send( ev::timer& t, int revents )
if( EV_ERROR & revents )
return;
if( !jbuf.empty() )
{
// сперва формируем очередной пакет(поток байт) из накопившихся данных для отправки
ostringstream out;
out << "{\"data\":[";
size_t i = 0;
for( ; !jbuf.empty() && !cancelled; i++ )
{
if( i > 0 )
out << ",";
auto json = jbuf.front();
jbuf.pop();
if( !json )
continue;
json->stringify(out);
}
out << "]}";
wbuf.emplace( new UTCPCore::Buffer(std::move(out.str())) );
mylog4 << req->clientAddress().toString() << "(write): batch " << i << " objects" << endl;
}
// реальная посылка данных
for( size_t i = 0; !wbuf.empty() && i < maxsend && !cancelled; i++ )
{
write();
// read(iorecv,revents);
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::ping( ev::timer& t, int revents )
......@@ -851,13 +800,13 @@ void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm )
if( s == smap.end() )
return;
if( wbuf.size() > maxsize )
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
return;
}
wbuf.emplace(UWebSocketGate::format(sm, s->second.err, fmt));
jbuf.emplace(UWebSocketGate::to_json(sm, s->second.err));
if( ioping.is_active() )
ioping.stop();
......@@ -874,6 +823,11 @@ void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& u
if( s.cmd == "" )
continue;
mylog3 << req->clientAddress().toString() << "(doCommand): "
<< s.cmd << " sid=" << s.id
<< " value=" << s.value
<< endl;
if( s.cmd == "ask" )
ui->askSensor(s.id, UniversalIO::UIONotify);
else if( s.cmd == "del" )
......@@ -918,6 +872,7 @@ void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
for( const auto& i : idlist )
set(i.si.id, i.val);
// уведомление о новой команде
cmdsignal->send();
}
else if( cmd == "ask" )
......@@ -935,7 +890,7 @@ void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
add(s);
}
// даём команду на перезаказ датчиков
// уведомление о новой команде
cmdsignal->send();
}
else if( cmd == "del" )
......@@ -948,7 +903,7 @@ void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
for( const auto& id : idlist.getList() )
del(id);
// даём команду на перезаказ датчиков
// уведомление о новой команде
cmdsignal->send();
}
}
......@@ -977,7 +932,7 @@ void UWebSocketGate::UWebSocket::write()
int flags = WebSocket::FRAME_TEXT;
if( msg->len == 1 ) // это пинг состоящий из "."
if( msg->len == ping_str.size() )
flags = WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PING;
try
......@@ -1096,11 +1051,6 @@ void UWebSocketGate::UWebSocket::setMaxSendCount( size_t val )
maxsend = val;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setRespondFormat( UWebSocketGate::RespondFormat f )
{
fmt = f;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::httpWebSocketPage( std::ostream& ostr, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp )
{
using Poco::Net::HTTPResponse;
......@@ -1120,12 +1070,7 @@ void UWebSocketGate::httpWebSocketPage( std::ostream& ostr, Poco::Net::HTTPServe
ostr << " <li><a target='_blank' href=\"http://"
<< req.serverAddress().toString()
<< "/wsgate/json\">42,30,1042 [json]</a></li>"
<< endl;
ostr << " <li><a target='_blank' href=\"http://"
<< req.serverAddress().toString()
<< "/wsgate/txt\">42,30,1042 [txt]</a></li>"
<< "/wsgate/?42,30,1042\">42,30,1042</a></li>"
<< endl;
ostr << "</ul>" << endl;
......@@ -1169,7 +1114,7 @@ void UWebSocketGate::httpWebSocketConnectPage( ostream& ostr,
ostr << "{" << endl;
ostr << " if (\"WebSocket\" in window)" << endl;
ostr << " {" << endl;
ostr << " var ws = new WebSocket(\"ws://" << req.serverAddress().toString() << "/wsgate/\");" << endl;
ostr << " var ws = new WebSocket(\"ws://" << req.serverAddress().toString() << "/wsgate/?" << params << "\");" << endl;
ostr << "setInterval(send_cmd, 1000);" << endl;
ostr << " var l = document.getElementById('logname');" << endl;
ostr << " l.innerHTML = '*'" << endl;
......@@ -1196,7 +1141,7 @@ void UWebSocketGate::httpWebSocketConnectPage( ostream& ostr,
ostr << " }" << endl;
ostr << "function send_cmd() {" << endl;
ostr << " ws.send( 'set:12,32,34' );" << endl;
// ostr << " ws.send( 'set:12,32,34' );" << endl;
ostr << "}" << endl;
ostr << "}" << endl;
......
......@@ -55,13 +55,13 @@ namespace uniset
об изменнии датчиков, а так же изменять состояние (см. \ref sec_UWebSocketGate_Command).
Подключение к websocket-у доступно по адресу:
\code
ws://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]
ws://host:port/wsgate/
\endcode
Помимо этого UWebSocketGate работает в режиме мониторинга изменений датчиков.
Для этого достаточно зайти на страничку по адресу:
\code
http://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]
http://host:port/wsgate/?s1,s2,s3,s4
\endcode
\section sec_UWebSocketGate_Conf Конфигурирование UWebSocketGate
......@@ -75,6 +75,73 @@ namespace uniset
\section sec_UWebSocketGate_DETAIL UWebSocketGate: Технические детали
Вся релизация построена на "однопоточном" eventloop. Если датчики долго не меняются, то периодически посылается "ping" сообщение.
\section sec_UWebSocketGate_Messages Сообщения
Общий формат сообщений
\code
{
"data": [
{
"type": "SensorInfo",
...
},
{
"type": "SensorInfo",
...
},
{
"type": "OtherType",
...
},
{
"type": "YetAnotherType",
...
},
]}
\endcode
Example
\code
{
"data": [
{
"type": "SensorInfo",
"tv_nsec": 343079769,
"tv_sec": 1614521238,
"value": 63
"sm_tv_nsec": 976745544,
"sm_tv_sec": 1614520060,
"sm_type": "AI",
"error": "",
"id": 10,
"name": "AI_AS",
"node": 3000,
"supplier": 5003,
"undefined": false,
"calibration": {
"cmax": 0,
"cmin": 0,
"precision": 3,
"rmax": 0,
"rmin": 0
},
}]
}
\endcode
\section sec_UWebSocketGate_Ping Ping
Для того, чтобы соединение не закрывалось при отсутствии данных, каждые ping_sec посылается
специальное сообщение
\code
{
"data": [
{"type": "Ping"}
]
}
\endcode
По умолчанию каждый 3 секунды, но время можно задавать параметром "wsHeartbeatTime" (msec)
или аргументом командной строки
--prefix-ws-heartbeat-time msec
\section sec_UWebSocketGate_Command Команды
Через websocket можно посылать команды.
На текущий момент формат команды строковый.
......@@ -84,9 +151,6 @@ namespace uniset
- "set:id1=val1,id2=val2,name3=val4,..." - выставить значение датчиков
- "ask:id1,id2,name3,..." - подписаться на уведомления об изменении датчиков (sensorInfo)
- "del:id1,id2,name3,..." - отказаться от уведомления об изменении датчиков
\todo Разобраться с "ping" сообщением для формата json..
\todo Настройка check_sec из командной строки и configure.xml
*/
class UWebSocketGate:
public UniSetObject,
......@@ -149,7 +213,7 @@ namespace uniset
void checkMessages( ev::timer& t, int revents );
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
ev::timer iocheck;
double check_sec = { 0.3 };
double check_sec = { 0.01 };
std::shared_ptr<DebugStream> mylog;
......@@ -163,20 +227,7 @@ namespace uniset
double wsSendTime_sec = { 0.5 };
size_t wsMaxSend = { 200 };
enum class RespondFormat
{
UNKNOWN,
JSON,
TXT,
RAW
};
RespondFormat from_string( const std::string& str );
static UTCPCore::Buffer* format( const uniset::SensorMessage* sm, const std::string& err, const RespondFormat fmt );
static UTCPCore::Buffer* to_json( const uniset::SensorMessage* sm, const std::string& err );
static UTCPCore::Buffer* to_txt( const uniset::SensorMessage* sm, const std::string& err );
static UTCPCore::Buffer* to_raw( const uniset::SensorMessage* sm, const std::string& err );
static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err );
/*! класс реализует работу с websocket через eventloop
* Из-за того, что поступление логов может быть достаточно быстрым
......@@ -223,7 +274,6 @@ namespace uniset
void setHearbeatTime( const double& sec );
void setSendPeriod( const double& sec );
void setMaxSendCount( size_t val );
void setRespondFormat( RespondFormat f );
std::shared_ptr<DebugStream> mylog;
......@@ -252,13 +302,15 @@ namespace uniset
std::atomic_bool cancelled = { false };
std::unordered_map<uniset::ObjectId, sinfo> smap;
RespondFormat fmt = { RespondFormat::JSON };
Poco::Net::HTTPServerRequest* req;
Poco::Net::HTTPServerResponse* resp;
// очередь json-на отправку
std::queue<Poco::JSON::Object::Ptr> jbuf;
// очередь данных на посылку..
std::queue<UTCPCore::Buffer*> wbuf;
std::queue<uniset::UTCPCore::Buffer*> wbuf;
size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment