Commit 8dbb4dcf authored by Pavel Vainerman's avatar Pavel Vainerman

[uwebsocketgate]: prototype

parent f228ac53
......@@ -48,6 +48,7 @@
</LogDB>
<BackendOpenTSDB name="BackendOpenTSDB" host="localhost" filter_field="tsdb" filter_value="1" tags="host=localhost uniset=1" prefix="uniset"/>
<UWebSocketGate name="UWebSocketGate1"/>
<settings>
......@@ -5502,6 +5503,7 @@
<item id="6088" name="TestGroup"/>
<item id="6089" name="MQTTPublisher1"/>
<item id="6090" name="BackendOpenTSDB"/>
<item id="6091" name="UWebSocketGate1"/>
<item id="6101" name="MBTCP1"/>
<item id="6102" name="MBTCP2"/>
<item id="6103" name="MBTCP3"/>
......
......@@ -593,6 +593,7 @@ AC_CONFIG_FILES([Makefile
extensions/LogDB/tests/Makefile
extensions/HttpResolver/Makefile
extensions/HttpResolver/tests/Makefile
extensions/UWebSocketGate/Makefile
testsuite/Makefile
wrappers/Makefile
wrappers/python/lib/Makefile
......
......@@ -7,7 +7,7 @@ SUBDIRS = lib include SharedMemory SharedMemory/tests IOControl IOControl/tests
ModbusMaster ModbusSlave SMViewer UniNetwork UNetUDP UNetUDP/tests \
DBServer-MySQL DBServer-SQLite DBServer-PostgreSQL MQTTPublisher \
RRDServer tests ModbusMaster/tests ModbusSlave/tests LogDB LogDB/tests \
Backend-OpenTSDB HttpResolver HttpResolver/tests
Backend-OpenTSDB HttpResolver HttpResolver/tests UWebSocketGate
pkgconfigdir = $(libdir)/pkgconfig
......
bin_PROGRAMS = @PACKAGE@-wsgate
@PACKAGE@_wsgate_LDADD = $(top_builddir)/lib/libUniSet2.la
@PACKAGE@_wsgate_SOURCES = UWebSocketGate.cc main.cc
include $(top_builddir)/include.mk
/*
* Copyright (c) 2017 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// --------------------------------------------------------------------------
/*! \file
* \author Pavel Vainerman
*/
// --------------------------------------------------------------------------
#include <sstream>
#include <iomanip>
#include <unistd.h>
#include "unisetstd.h"
#include <Poco/Net/NetException.h>
#include <Poco/Net/WebSocket.h>
#include "ujson.h"
#include "UWebSocketGate.h"
#include "Configuration.h"
#include "Exceptions.h"
#include "UHelpers.h"
#include "Debug.h"
#include "UniXML.h"
#include "ORepHelpers.h"
#include "UWebSocketGateSugar.h"
#include "SMonitor.h"
// --------------------------------------------------------------------------
using namespace uniset;
using namespace std;
// --------------------------------------------------------------------------
UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const string& prefix ):
UniSetObject(id)
{
offThread(); // отключаем поток обработки, потому-что будем обрабатывать сами
auto conf = uniset_conf();
mylog = make_shared<DebugStream>();
mylog->setLogName(myname);
{
ostringstream s;
s << prefix << "log";
conf->initLogStream(mylog, s.str());
}
UniXML::iterator it(cnode);
maxwsocks = conf->getArgPInt("--" + prefix + "ws-max", it.getProp("wsMax"), maxwsocks);
wscmd = make_shared<ev::async>();
wsactivate.set<UWebSocketGate, &UWebSocketGate::onActivate>(this);
wscmd->set<UWebSocketGate, &UWebSocketGate::onCommand>(this);
sigTERM.set<UWebSocketGate, &UWebSocketGate::onTerminate>(this);
sigQUIT.set<UWebSocketGate, &UWebSocketGate::onTerminate>(this);
sigINT.set<UWebSocketGate, &UWebSocketGate::onTerminate>(this);
iocheck.set<UWebSocketGate, &UWebSocketGate::checkMessages>(this);
#ifndef DISABLE_REST_API
wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-heartbeat-time", it.getProp("wsPingTime"), wsHeartbeatTime_sec) / 1000.0;
wsSendTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-send-time", it.getProp("wsSendTime"), wsSendTime_sec) / 1000.0;
wsMaxSend = conf->getArgPInt("--" + prefix + "ws-max-send", it.getProp("wsMaxSend"), wsMaxSend);
httpHost = conf->getArgParam("--" + prefix + "httpserver-host", "localhost");
httpPort = conf->getArgPInt("--" + prefix + "httpserver-port", 8080);
httpCORS_allow = conf->getArgParam("--" + prefix + "httpserver-cors-allow", "*");
mylog1 << myname << "(init): http server parameters " << httpHost << ":" << httpPort << endl;
Poco::Net::SocketAddress sa(httpHost, httpPort);
try
{
Poco::Net::HTTPServerParams* httpParams = new Poco::Net::HTTPServerParams;
int maxQ = conf->getArgPInt("--" + prefix + "httpserver-max-queued", it.getProp("httpMaxQueued"), 100);
int maxT = conf->getArgPInt("--" + prefix + "httpserver-max-threads", it.getProp("httpMaxThreads"), 3);
httpParams->setMaxQueued(maxQ);
httpParams->setMaxThreads(maxT);
httpserv = std::make_shared<Poco::Net::HTTPServer>(new UWebSocketGateRequestHandlerFactory(this), Poco::Net::ServerSocket(sa), httpParams );
}
catch( std::exception& ex )
{
std::stringstream err;
err << myname << "(init): " << httpHost << ":" << httpPort << " ERROR: " << ex.what();
throw uniset::SystemError(err.str());
}
#endif
}
//--------------------------------------------------------------------------------------------
UWebSocketGate::~UWebSocketGate()
{
if( evIsActive() )
evstop();
if( httpserv )
httpserv->stop();
}
//--------------------------------------------------------------------------------------------
void UWebSocketGate::onTerminate( ev::sig& evsig, int revents )
{
if( EV_ERROR & revents )
{
mycrit << myname << "(onTerminate): invalid event" << endl;
return;
}
myinfo << myname << "(onTerminate): terminate..." << endl;
evsig.stop();
//evsig.loop.break_loop();
try
{
httpserv->stop();
}
catch( std::exception& ex )
{
myinfo << myname << "(onTerminate): " << ex.what() << endl;
}
try
{
evstop();
}
catch( std::exception& ex )
{
myinfo << myname << "(onTerminate): " << ex.what() << endl;
}
}
//--------------------------------------------------------------------------------------------
void UWebSocketGate::checkMessages( ev::timer& t, int revents )
{
if( EV_ERROR & revents )
return;
auto m = receiveMessage();
if( m )
processingMessage(m.get());
}
//--------------------------------------------------------------------------------------------
void UWebSocketGate::sensorInfo( const SensorMessage* sm )
{
uniset_rwmutex_wrlock lock(wsocksMutex);
for( auto&& s : wsocks )
s->sensorInfo(sm);
}
//--------------------------------------------------------------------------------------------
UWebSocketGate::RespondFormat UWebSocketGate::from_string(const string& str)
{
if( str == "json" )
return RespondFormat::JSON;
if( str == "txt" )
return RespondFormat::TXT;
if( str == "raw" )
return RespondFormat::RAW;
return RespondFormat::UNKNOWN;
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::format( const SensorMessage* sm, const std::string& err, const RespondFormat fmt )
{
if( fmt == RespondFormat::JSON )
return to_json(sm, err);
if( fmt == RespondFormat::TXT )
return to_txt(sm, err);
if( fmt == RespondFormat::RAW )
return to_json(sm, err);
return to_json(sm, err);
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::to_json( const SensorMessage* sm, const std::string& err )
{
Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
json->set("error", err);
json->set("id", sm->id);
json->set("value", sm->value);
json->set("name", uniset::ORepHelpers::getShortName(uniset_conf()->oind->getMapName(sm->id)));
json->set("sm_tv_sec", sm->sm_tv.tv_sec);
json->set("sm_tv_nsec", sm->sm_tv.tv_nsec);
json->set("type", uniset::iotype2str(sm->sensor_type));
json->set("undefined", sm->undefined );
json->set("supplier", sm->supplier );
json->set("tv_sec", sm->tm.tv_sec);
json->set("tv_nsec", sm->tm.tv_nsec);
json->set("node", sm->node);
Poco::JSON::Object::Ptr calibr = uniset::json::make_child(json, "calibration");
calibr->set("cmin", sm->ci.minCal);
calibr->set("cmax", sm->ci.maxCal);
calibr->set("rmin", sm->ci.minRaw);
calibr->set("rmax", sm->ci.maxRaw);
calibr->set("precision", sm->ci.precision);
ostringstream out;
json->stringify(out);
return new UTCPCore::Buffer(out.str());
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::to_txt( const SensorMessage* sm, const std::string& err )
{
ostringstream out;
if( err.empty() )
out << SMonitor::printEvent(sm) << endl;
else
{
out << uniset::timeToString(sm->sm_tv.tv_sec)
<< "(" << setw(9) << sm->sm_tv.tv_nsec << ")"
<< " id=" << sm->id
<< " error=" << err
<< endl;
}
return new UTCPCore::Buffer(out.str());
}
//--------------------------------------------------------------------------------------------
UTCPCore::Buffer* UWebSocketGate::to_raw( const SensorMessage* sm, const std::string& err )
{
return new UTCPCore::Buffer( (const unsigned char*)(sm), sizeof(*sm) );
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const char* const* argv, const std::string& prefix )
{
string name = uniset::getArgParam("--" + prefix + "name", argc, argv, "UWebSocketGate");
if( name.empty() )
{
cerr << "(UWebSocketGate): Unknown name. Use --" << prefix << "name" << endl;
return nullptr;
}
return uniset::make_object<UWebSocketGate>(name, "UWebSocketGate", prefix);
}
// -----------------------------------------------------------------------------
void UWebSocketGate::help_print()
{
cout << "Default: prefix='ws'" << endl;
cout << "--prefix-name name - Имя. Для поиска настроечной секции в configure.xml" << endl;
cout << "websockets: " << endl;
cout << "--prefix-ws-max num - Максимальное количество websocket-ов" << endl;
cout << "--prefix-ws-heartbeat-time msec - Период сердцебиения в соединении. По умолчанию: 3000 мсек" << endl;
cout << "--prefix-ws-send-time msec - Период посылки сообщений. По умолчанию: 500 мсек" << endl;
cout << "--prefix-ws-max num - Максимальное число сообщений посылаемых за один раз. По умолчанию: 200" << endl;
cout << "http: " << endl;
cout << "--prefix-httpserver-host ip - IP на котором слушает http сервер. По умолчанию: localhost" << endl;
cout << "--prefix-httpserver-port num - Порт на котором принимать запросы. По умолчанию: 8080" << endl;
cout << "--prefix-httpserver-max-queued num - Размер очереди запросов к http серверу. По умолчанию: 100" << endl;
cout << "--prefix-httpserver-max-threads num - Разрешённое количество потоков для http-сервера. По умолчанию: 3" << endl;
cout << "--prefix-httpserver-cors-allow addr - (CORS): Access-Control-Allow-Origin. Default: *" << endl;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::run( bool async )
{
if( httpserv )
httpserv->start();
if( async )
async_evrun();
else
evrun();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::evfinish()
{
wsactivate.stop();
iocheck.stop();
wscmd->stop();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::evprepare()
{
wsactivate.set(loop);
wsactivate.start();
wscmd->set(loop);
wscmd->start();
sigTERM.set(loop);
sigTERM.start(SIGTERM);
sigQUIT.set(loop);
sigQUIT.start(SIGQUIT);
sigINT.set(loop);
sigINT.start(SIGINT);
iocheck.set(loop);
iocheck.start(0, check_sec);
}
// -----------------------------------------------------------------------------
void UWebSocketGate::onActivate( ev::async& watcher, int revents )
{
if (EV_ERROR & revents)
{
mycrit << myname << "(UWebSocketGate::onActivate): invalid event" << endl;
return;
}
uniset_rwmutex_rlock lk(wsocksMutex);
for( const auto& s : wsocks )
{
if( !s->isActive() )
{
s->doCommand(ui);
s->set(loop, wscmd);
}
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::onCommand( ev::async& watcher, int revents )
{
if (EV_ERROR & revents)
{
mycrit << myname << "(UWebSocketGate::onCommand): invalid event" << endl;
return;
}
uniset_rwmutex_rlock lk(wsocksMutex);
for( const auto& s : wsocks )
s->doCommand(ui);
}
// -----------------------------------------------------------------------------
#ifndef DISABLE_REST_API
// -----------------------------------------------------------------------------
class UWebSocketGateRequestHandler:
public Poco::Net::HTTPRequestHandler
{
public:
UWebSocketGateRequestHandler( UWebSocketGate* l ): wsgate(l) {}
virtual void handleRequest( Poco::Net::HTTPServerRequest& request,
Poco::Net::HTTPServerResponse& response ) override
{
wsgate->handleRequest(request, response);
}
private:
UWebSocketGate* wsgate;
};
// -----------------------------------------------------------------------------
class UWebSocketGateWebSocketRequestHandler:
public Poco::Net::HTTPRequestHandler
{
public:
UWebSocketGateWebSocketRequestHandler( UWebSocketGate* l ): wsgate(l) {}
virtual void handleRequest( Poco::Net::HTTPServerRequest& request,
Poco::Net::HTTPServerResponse& response ) override
{
wsgate->onWebSocketSession(request, response);
}
private:
UWebSocketGate* wsgate;
};
// -----------------------------------------------------------------------------
Poco::Net::HTTPRequestHandler* UWebSocketGate::UWebSocketGateRequestHandlerFactory::createRequestHandler( const Poco::Net::HTTPServerRequest& req )
{
if( req.find("Upgrade") != req.end() && Poco::icompare(req["Upgrade"], "websocket") == 0 )
return new UWebSocketGateWebSocketRequestHandler(wsgate);
return new UWebSocketGateRequestHandler(wsgate);
}
// -----------------------------------------------------------------------------
void UWebSocketGate::makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp )
{
resp.set("Access-Control-Allow-Methods", "GET");
resp.set("Access-Control-Allow-Request-Method", "*");
resp.set("Access-Control-Allow-Origin", httpCORS_allow /* req.get("Origin") */);
// header('Access-Control-Allow-Credentials: true');
// header('Access-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept, Authorization');
}
// -----------------------------------------------------------------------------
void UWebSocketGate::handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp )
{
using Poco::Net::HTTPResponse;
std::ostream& out = resp.send();
makeResponseAccessHeader(resp);
// В этой версии API поддерживается только GET
if( req.getMethod() != "GET" )
{
auto jdata = respError(resp, HTTPResponse::HTTP_BAD_REQUEST, "method must be 'GET'");
jdata->stringify(out);
out.flush();
return;
}
resp.setContentType("text/json");
Poco::URI uri(req.getURI());
mylog3 << req.getHost() << ": query: " << uri.getQuery() << endl;
std::vector<std::string> seg;
uri.getPathSegments(seg);
// проверка подключения к страничке со списком websocket-ов
if( !seg.empty() && seg[0] == "wsgate" )
{
if( seg.size() > 2 )
{
if( seg[1] == "json" || seg[1] == "txt" || seg[1] == "raw" )
{
ostringstream params;
params << seg[2] << "&format=" << seg[1];
httpWebSocketConnectPage(out, req, resp, params.str());
}
else
{
auto jdata = respError(resp, HTTPResponse::HTTP_BAD_REQUEST, "Unknown format. Must be [json,txt,raw]");
jdata->stringify(out);
out.flush();
}
return;
}
if( seg.size() > 1 )
{
if( seg[1] == "json" || seg[1] == "txt" || seg[1] == "raw" )
{
ostringstream params;
auto qp = uri.getQueryParameters();
for( const auto& p : qp )
{
params << p.first;
if( !p.second.empty() )
params << "=" << p.second;
params << "&";
}
params << "format=" << seg[1];
httpWebSocketConnectPage(out, req, resp, params.str());
}
else
httpWebSocketConnectPage(out, req, resp, seg[1]);
return;
}
}
// default page
httpWebSocketPage(out, req, resp);
out.flush();
}
// -----------------------------------------------------------------------------
Poco::JSON::Object::Ptr UWebSocketGate::respError( Poco::Net::HTTPServerResponse& resp,
Poco::Net::HTTPResponse::HTTPStatus estatus,
const string& message )
{
makeResponseAccessHeader(resp);
resp.setStatus(estatus);
resp.setContentType("text/json");
Poco::JSON::Object::Ptr jdata = new Poco::JSON::Object();
jdata->set("error", resp.getReasonForStatus(resp.getStatus()));
jdata->set("ecode", (int)resp.getStatus());
jdata->set("message", message);
return jdata;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::onWebSocketSession(Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp)
{
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
std::vector<std::string> seg;
makeResponseAccessHeader(resp);
Poco::URI uri(req.getURI());
uri.getPathSegments(seg);
mylog3 << req.getHost() << ": WSOCKET: " << uri.getQuery() << endl;
// example: ws://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]
if( seg.empty() || seg[0] != "wsgate" )
{
resp.setStatus(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentType("text/html");
resp.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentLength(0);
std::ostream& err = resp.send();
err << "Bad request. Must be: ws://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]";
err.flush();
return;
}
{
uniset_rwmutex_rlock lk(wsocksMutex);
if( wsocks.size() >= maxwsocks )
{
resp.setStatus(HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
resp.setContentType("text/html");
resp.setStatusAndReason(HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
resp.setContentLength(0);
std::ostream& err = resp.send();
err << "Error: exceeding the maximum number of open connections (" << maxwsocks << ")";
err.flush();
return;
}
}
auto qp = uri.getQueryParameters();
auto ws = newWebSocket(&req, &resp, qp);
if( !ws )
{
mywarn << myname << "(onWebSocketSession): failed create socket.." << endl;
return;
}
UWebSocketGuard lk(ws, this);
mylog3 << myname << "(onWebSocketSession): start session for " << req.clientAddress().toString() << endl;
// т.к. вся работа происходит в eventloop
// то здесь просто ждём..
ws->waitCompletion();
mylog3 << myname << "(onWebSocketSession): finish session for " << req.clientAddress().toString() << endl;
}
// -----------------------------------------------------------------------------
bool UWebSocketGate::activateObject()
{
bool ret = UniSetObject::activateObject();
if( ret )
run(true);
return ret;
}
// -----------------------------------------------------------------------------
std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::Net::HTTPServerRequest* req,
Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp )
{
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
std::shared_ptr<UWebSocket> ws;
RespondFormat fmt = RespondFormat::JSON;
std::string slist("");
for( const auto& p : qp )
{
// обрабатываем только первый встреченный параметр
if( p.first == "format" )
fmt = from_string(p.second);
else if( p.second.empty() && !p.first.empty() )
slist += ("," + p.first);
}
if( qp.size() == 1 && qp[0].first.empty() )
slist = qp[0].first;
// auto idlist = uniset::explode(slist);
#warning DEBUG
auto idlist = uniset::explode("34,23,54");
if( idlist.empty() )
{
resp->setStatus(HTTPResponse::HTTP_BAD_REQUEST);
resp->setContentType("text/html");
resp->setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp->setContentLength(0);
std::ostream& err = resp->send();
err << "Error: no list of sensors for '" << slist << "'. Use: http://host:port/wsgate/?s1,s2,s3";
err.flush();
mywarn << myname << "(newWebSocket): error: no list of sensors for '" << slist << "'" << endl;
return nullptr;
}
{
uniset_rwmutex_wrlock lock(wsocksMutex);
ws = make_shared<UWebSocket>(req, resp);
ws->setHearbeatTime(wsHeartbeatTime_sec);
ws->setSendPeriod(wsSendTime_sec);
ws->setMaxSendCount(wsMaxSend);
ws->mylog = mylog;
ws->setRespondFormat(fmt);
for( const auto& i : idlist.getList() )
{
mylog3 << myname << ": add " << i << endl;
UWebSocket::sinfo si;
si.id = i;
si.cmd = "ask";
ws->add(si);
}
wsocks.emplace_back(ws);
}
// wsocksMutex надо отпустить, прежде чем посылать сигнал
// т.к. в обработчике происходит его захват
wsactivate.send();
return ws;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::delWebSocket(std::shared_ptr<UWebSocket>& ws )
{
uniset_rwmutex_wrlock lock(wsocksMutex);
for( auto it = wsocks.begin(); it != wsocks.end(); it++ )
{
if( (*it).get() == ws.get() )
{
mylog3 << myname << ": delete websocket " << endl;
wsocks.erase(it);
return;
}
}
}
// -----------------------------------------------------------------------------
const std::string UWebSocketGate::UWebSocket::ping_str = { "." };
UWebSocketGate::UWebSocket::UWebSocket(Poco::Net::HTTPServerRequest* _req,
Poco::Net::HTTPServerResponse* _resp):
Poco::Net::WebSocket(*_req, *_resp),
req(_req),
resp(_resp)
{
setBlocking(false);
cancelled = false;
// т.к. создание websocket-а происходит в другом потоке
// то активация и привязка к loop происходит в функции set()
// вызываемой из eventloop
ioping.set<UWebSocketGate::UWebSocket, &UWebSocketGate::UWebSocket::ping>(this);
iosend.set<UWebSocketGate::UWebSocket, &UWebSocketGate::UWebSocket::send>(this);
iorecv.set<UWebSocketGate::UWebSocket, &UWebSocketGate::UWebSocket::read>(this);
maxsize = maxsend * 10; // пока так
setReceiveTimeout( uniset::PassiveTimer::millisecToPoco(recvTimeout));
}
// -----------------------------------------------------------------------------
UWebSocketGate::UWebSocket::~UWebSocket()
{
if( !cancelled )
term();
// удаляем всё что осталось
while(!wbuf.empty())
{
delete wbuf.front();
wbuf.pop();
}
}
// -----------------------------------------------------------------------------
bool UWebSocketGate::UWebSocket::isActive()
{
return iosend.is_active();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::set(ev::dynamic_loop& loop, std::shared_ptr<ev::async> a )
{
iosend.set(loop);
ioping.set(loop);
iosend.start(0, send_sec);
ioping.start(ping_sec, ping_sec);
iorecv.set(loop);
iorecv.start(sockfd(), ev::READ);
cmdsignal = a;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::send( ev::timer& t, int revents )
{
if( EV_ERROR & revents )
return;
for( size_t i = 0; !wbuf.empty() && i < maxsend && !cancelled; i++ )
write();
// read(iorecv,revents);
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::ping( ev::timer& t, int revents )
{
if( EV_ERROR & revents )
return;
if( cancelled )
return;
if( !wbuf.empty() )
{
ioping.stop();
return;
}
wbuf.emplace(new UTCPCore::Buffer(ping_str));
if( ioping.is_active() )
ioping.stop();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
{
if( EV_ERROR & revents )
return;
if( !(revents & EV_READ) )
return;
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
int flags = 0; // WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_TEXT;
try
{
if( available() <= 0 )
return;
int n = receiveFrame(rbuf, sizeof(rbuf), flags);
// int n = receiveBytes(rbuf, sizeof(rbuf));
if( n <= 0 )
return;
const std::string cmd(rbuf, n);
if( cmd == ping_str )
return;
onCommand(cmd);
// откладываем ping, т.к. что-то в канале и так было
ioping.start(ping_sec, ping_sec);
}
catch( WebSocketException& exc )
{
switch( exc.code() )
{
case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
resp->set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
case WebSocket::WS_ERR_NO_HANDSHAKE:
case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
resp->setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp->setContentLength(0);
resp->send();
break;
}
}
catch( const Poco::Net::NetException& e )
{
mylog3 << "(websocket):NetException: "
<< req->clientAddress().toString()
<< " error: " << e.displayText()
<< endl;
}
catch( Poco::IOException& ex )
{
mylog3 << "(websocket): IOException: "
<< req->clientAddress().toString()
<< " error: " << ex.displayText()
<< endl;
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::add( const sinfo& si )
{
smap[si.id] = si;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::del( uniset::ObjectId id )
{
auto s = smap.find(id);
if( s != smap.end() )
s->second.cmd = "del";
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::set( uniset::ObjectId id, long value )
{
auto s = smap.find(id);
if( s != smap.end() )
{
s->second.value = value;
s->second.cmd = "set";
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm )
{
if( cancelled )
return;
auto s = smap.find(sm->id);
if( s == smap.end() )
return;
if( wbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
return;
}
wbuf.emplace(UWebSocketGate::format(sm, s->second.err, fmt));
if( ioping.is_active() )
ioping.stop();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& ui )
{
for( auto&& io : smap )
{
auto& s = io.second;
try
{
if( s.cmd == "" )
continue;
if( s.cmd == "ask" )
ui->askSensor(s.id, UniversalIO::UIONotify);
else if( s.cmd == "del" )
ui->askSensor(s.id, UniversalIO::UIODontNotify);
else if( s.cmd == "set" )
ui->setValue(s.id, s.value);
s.err = "";
s.cmd = "";
}
catch( std::exception& ex )
{
mycrit << "(UWebSocket::doCommand): " << ex.what() << endl;
sendError(s, ex.what());
}
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::sendError( sinfo& si, const std::string& err )
{
uniset::SensorMessage sm(si.id, 0);
// sm.undefined = true;
si.err = err;
sensorInfo(&sm);
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
{
const string cmd = cmdtxt.substr(0, 3);
const string params = cmdtxt.substr(4, cmdtxt.size());
myinfo << "(websocket): " << req->clientAddress().toString()
<< "(" << cmd << "): " << params << endl;
if( cmd == "set" )
{
myinfo << "(websocket): " << req->clientAddress().toString()
<< "(set): " << params << endl;
auto idlist = uniset::getSInfoList(params, uniset_conf());
for( const auto& i : idlist )
set(i.si.id, i.val);
cmdsignal->send();
}
else if( cmd == "ask" )
{
myinfo << "(websocket): " << req->clientAddress().toString()
<< "(ask): " << params << endl;
auto idlist = uniset::explode(params);
for( const auto& id : idlist.getList() )
{
sinfo s;
s.id = id;
s.cmd = "ask";
add(s);
}
// даём команду на перезаказ датчиков
cmdsignal->send();
}
else if( cmd == "del" )
{
myinfo << "(websocket): " << req->clientAddress().toString()
<< "(del): " << params << endl;
auto idlist = uniset::explode(params);
for( const auto& id : idlist.getList() )
del(id);
// даём команду на перезаказ датчиков
cmdsignal->send();
}
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::write()
{
UTCPCore::Buffer* msg = 0;
if( wbuf.empty() )
{
if( !ioping.is_active() )
ioping.start(ping_sec, ping_sec);
return;
}
msg = wbuf.front();
if( !msg )
return;
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
int flags = WebSocket::FRAME_TEXT;
if( msg->len == 1 ) // это пинг состоящий из "."
flags = WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PING;
try
{
ssize_t ret = sendFrame(msg->dpos(), msg->nbytes(), flags);
if( ret < 0 )
{
mylog3 << "(websocket): " << req->clientAddress().toString()
<< " write to socket error(" << errno << "): " << strerror(errno) << endl;
if( errno == EPIPE || errno == EBADF )
{
mylog3 << "(websocket): "
<< req->clientAddress().toString()
<< " write error.. terminate session.." << endl;
term();
}
return;
}
msg->pos += ret;
if( msg->nbytes() == 0 )
{
wbuf.pop();
delete msg;
}
if( !wbuf.empty() )
{
if( ioping.is_active() )
ioping.stop();
}
else
{
if( !ioping.is_active() )
ioping.start(ping_sec, ping_sec);
}
return;
}
catch( WebSocketException& exc )
{
cerr << "(sendFrame): ERROR: " << exc.displayText() << endl;
switch( exc.code() )
{
case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
resp->set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
case WebSocket::WS_ERR_NO_HANDSHAKE:
case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
resp->setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp->setContentLength(0);
resp->send();
break;
}
}
catch( const Poco::Net::NetException& e )
{
mylog3 << "(websocket):NetException: "
<< req->clientAddress().toString()
<< " error: " << e.displayText()
<< endl;
}
catch( Poco::IOException& ex )
{
mylog3 << "(websocket): IOException: "
<< req->clientAddress().toString()
<< " error: " << ex.displayText()
<< endl;
}
term();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::term()
{
if( cancelled )
return;
cancelled = true;
ioping.stop();
iosend.stop();
iorecv.stop();
finish.notify_all();
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::waitCompletion()
{
std::unique_lock<std::mutex> lk(finishmut);
while( !cancelled )
finish.wait(lk);
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setHearbeatTime( const double& sec )
{
if( sec > 0 )
ping_sec = sec;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setSendPeriod ( const double& sec )
{
if( sec > 0 )
send_sec = sec;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setMaxSendCount( size_t val )
{
if( val > 0 )
maxsend = val;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setRespondFormat( UWebSocketGate::RespondFormat f )
{
fmt = f;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::httpWebSocketPage( std::ostream& ostr, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp )
{
using Poco::Net::HTTPResponse;
resp.setChunkedTransferEncoding(true);
resp.setContentType("text/html");
// resp.
ostr << "<html>" << endl;
ostr << "<head>" << endl;
ostr << "<title>" << myname << ": test page</title>" << endl;
ostr << "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=UTF-8\">" << endl;
ostr << "</head>" << endl;
ostr << "<body>" << endl;
ostr << "<h1>select sensors:</h1>" << endl;
ostr << "<ul>" << endl;
ostr << " <li><a target='_blank' href=\"http://"
<< req.serverAddress().toString()
<< "/wsgate/json\">42,30,1042 [json]</a></li>"
<< endl;
ostr << " <li><a target='_blank' href=\"http://"
<< req.serverAddress().toString()
<< "/wsgate/txt\">42,30,1042 [txt]</a></li>"
<< endl;
ostr << "</ul>" << endl;
ostr << "</body>" << endl;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::httpWebSocketConnectPage( ostream& ostr,
Poco::Net::HTTPServerRequest& req,
Poco::Net::HTTPServerResponse& resp,
const std::string& params )
{
resp.setChunkedTransferEncoding(true);
resp.setContentType("text/html");
// code base on example from
// https://github.com/pocoproject/poco/blob/developNet/samples/WebSocketServer/src/WebSocketServer.cpp
ostr << "<html>" << endl;
ostr << "<head>" << endl;
ostr << "<title>" << myname << ": sensors event</title>" << endl;
ostr << "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=UTF-8\">" << endl;
ostr << "<script type=\"text/javascript\">" << endl;
ostr << "logscrollStopped = false;" << endl;
ostr << "" << endl;
ostr << "function clickScroll()" << endl;
ostr << "{" << endl;
ostr << " if( logscrollStopped )" << endl;
ostr << " logscrollStopped = false;" << endl;
ostr << " else" << endl;
ostr << " logscrollStopped = true;" << endl;
ostr << "}" << endl;
ostr << "function LogAutoScroll()" << endl;
ostr << "{" << endl;
ostr << " if( logscrollStopped == false )" << endl;
ostr << " {" << endl;
ostr << " document.getElementById('end').scrollIntoView();" << endl;
ostr << " }" << endl;
ostr << "}" << endl;
ostr << "" << endl;
ostr << "function WebSocketCreate()" << endl;
ostr << "{" << endl;
ostr << " if (\"WebSocket\" in window)" << endl;
ostr << " {" << endl;
ostr << " var ws = new WebSocket(\"ws://" << req.serverAddress().toString() << "/wsgate/\");" << endl;
ostr << "setInterval(send_cmd, 1000);" << endl;
ostr << " var l = document.getElementById('logname');" << endl;
ostr << " l.innerHTML = '*'" << endl;
ostr << "ws.onopen = function() {" << endl;
// ostr << "ws.send(\"set:33=44,344=45\")" << endl;
ostr << "};" << endl;
ostr << " ws.onmessage = function(evt)" << endl;
ostr << " {" << endl;
ostr << " var p = document.getElementById('logs');" << endl;
ostr << " if( evt.data != '.' ) {" << endl;
ostr << " p.innerHTML = p.innerHTML + \"</br>\"+evt.data" << endl;
ostr << " LogAutoScroll();" << endl;
ostr << " }" << endl;
ostr << " };" << endl;
ostr << " ws.onclose = function()" << endl;
ostr << " { " << endl;
ostr << " alert(\"WebSocket closed.\");" << endl;
ostr << " };" << endl;
ostr << " }" << endl;
ostr << " else" << endl;
ostr << " {" << endl;
ostr << " alert(\"This browser does not support WebSockets.\");" << endl;
ostr << " }" << endl;
ostr << "function send_cmd() {" << endl;
ostr << " ws.send( 'set:12,32,34' );" << endl;
ostr << "}" << endl;
ostr << "}" << endl;
ostr << "</script>" << endl;
ostr << "<style media='all' type='text/css'>" << endl;
ostr << ".logs {" << endl;
ostr << " font-family: 'Liberation Mono', 'DejaVu Sans Mono', 'Courier New', monospace;" << endl;
ostr << " padding-top: 30px;" << endl;
ostr << "}" << endl;
ostr << "" << endl;
ostr << ".logtitle {" << endl;
ostr << " position: fixed;" << endl;
ostr << " top: 0;" << endl;
ostr << " left: 0;" << endl;
ostr << " padding: 10px;" << endl;
ostr << " width: 100%;" << endl;
ostr << " height: 25px;" << endl;
ostr << " background-color: green;" << endl;
ostr << " border-top: 2px solid;" << endl;
ostr << " border-bottom: 2px solid;" << endl;
ostr << " border-color: white;" << endl;
ostr << "}" << endl;
ostr << "</style>" << endl;
ostr << "</head>" << endl;
ostr << "<body style='background: #111111; color: #ececec;' onload=\"javascript:WebSocketCreate()\">" << endl;
ostr << "<h4><div onclick='javascritpt:clickScroll()' id='logname' class='logtitle'></div></h4>" << endl;
ostr << "<div id='logs' class='logs'></div>" << endl;
ostr << "<p><div id='end' style='display: hidden;'>&nbsp;</div></p>" << endl;
ostr << "</body>" << endl;
}
// -----------------------------------------------------------------------------
#endif
// -----------------------------------------------------------------------------
/*
* Copyright (c) 2017 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// --------------------------------------------------------------------------
/*! \file
* \author Pavel Vainerman
*/
// --------------------------------------------------------------------------
#ifndef UWebSocketGate_H_
#define UWebSocketGate_H_
// --------------------------------------------------------------------------
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <ev++.h>
#include <sigc++/sigc++.h>
#include <Poco/JSON/Object.h>
#include <Poco/Net/WebSocket.h>
#include "UniSetTypes.h"
#include "LogAgregator.h"
#include "UniSetObject.h"
#include "DebugStream.h"
#include "EventLoopServer.h"
#include "UTCPStream.h"
#include "UHttpRequestHandler.h"
#include "UHttpServer.h"
#include "UTCPCore.h"
// -------------------------------------------------------------------------
namespace uniset
{
//------------------------------------------------------------------------------------------
/*!
\page page_UWebSocketGate Шлюз для подключения через WebSocket (UWebSocketGate)
- \ref sec_UWebSocketGate_Comm
- \ref sec_UWebSocketGate_Conf
- \ref sec_UWebSocketGate_Command
\section sec_UWebSocketGate_Comm Общее описание работы UWebSocketGate
UWebSocketGate это сервис, который позволяет подключаться через websocket и получать события
об изменнии датчиков, а так же изменять состояние (см. \ref sec_UWebSocketGate_Command).
Подключение к websocket-у доступно по адресу:
\code
ws://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]
\endcode
Помимо этого UWebSocketGate работает в режиме мониторинга изменений датчиков.
Для этого достаточно зайти на страничку по адресу:
\code
http://host:port/wsgate/?s1,s2,s3,s4&format=[json,txt,raw]
\endcode
\section sec_UWebSocketGate_Conf Конфигурирование UWebSocketGate
Для конфигурования необходимо создать секцию вида:
\code
<UWebSocketGate name="UWebSocketGate" .../>
\endcode
Количество создаваемых websocket-ов можно ограничить при помощи параметра maxWebsockets (--prefix-ws-max).
\section sec_UWebSocketGate_DETAIL UWebSocketGate: Технические детали
Вся релизация построена на "однопоточном" eventloop. Если датчики долго не меняются, то периодически посылается "ping" сообщение.
\section sec_UWebSocketGate_Command Команды
Через websocket можно посылать команды.
На текущий момент формат команды строковый.
Т.е. для подачи команды, необходимо послать просто строку.
Поддерживаются следующие команды:
- "set:id1=val1,id2=val2,name3=val4,..." - выставить значение датчиков
- "ask:id1,id2,name3,..." - подписаться на уведомления об изменении датчиков (sensorInfo)
- "del:id1,id2,name3,..." - отказаться от уведомления об изменении датчиков
\todo Разобраться с "ping" сообщением для формата json..
\todo Настройка check_sec из командной строки и configure.xml
*/
class UWebSocketGate:
public UniSetObject,
public EventLoopServer
#ifndef DISABLE_REST_API
, public Poco::Net::HTTPRequestHandler
#endif
{
public:
UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const std::string& prefix );
virtual ~UWebSocketGate();
/*! глобальная функция для инициализации объекта */
static std::shared_ptr<UWebSocketGate> init_wsgate( int argc, const char* const* argv, const std::string& prefix = "logdb-" );
/*! глобальная функция для вывода help-а */
static void help_print();
inline std::shared_ptr<DebugStream> log()
{
return mylog;
}
#ifndef DISABLE_REST_API
virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
#endif
protected:
class UWebSocket;
virtual bool activateObject() override;
void run( bool async );
virtual void evfinish() override;
virtual void evprepare() override;
void onCheckBuffer( ev::timer& t, int revents );
void onActivate( ev::async& watcher, int revents ) ;
void onCommand( ev::async& watcher, int revents );
#ifndef DISABLE_REST_API
void httpWebSocketPage( std::ostream& out, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
void httpWebSocketConnectPage(std::ostream& out, Poco::Net::HTTPServerRequest& req,
Poco::Net::HTTPServerResponse& resp, const std::string& params );
std::shared_ptr<UWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp );
void delWebSocket( std::shared_ptr<UWebSocket>& ws );
Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
void makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp );
#endif
ev::sig sigTERM;
ev::sig sigQUIT;
ev::sig sigINT;
void onTerminate( ev::sig& evsig, int revents );
ev::async wsactivate; // активация WebSocket-ов
std::shared_ptr<ev::async> wscmd;
void checkMessages( ev::timer& t, int revents );
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
ev::timer iocheck;
double check_sec = { 0.3 };
std::shared_ptr<DebugStream> mylog;
#ifndef DISABLE_REST_API
std::shared_ptr<Poco::Net::HTTPServer> httpserv;
std::string httpHost = { "" };
int httpPort = { 0 };
std::string httpCORS_allow = { "*" };
double wsHeartbeatTime_sec = { 3.0 };
double wsSendTime_sec = { 0.5 };
size_t wsMaxSend = { 200 };
enum class RespondFormat
{
UNKNOWN,
JSON,
TXT,
RAW
};
RespondFormat from_string( const std::string& str );
static UTCPCore::Buffer* format( const uniset::SensorMessage* sm, const std::string& err, const RespondFormat fmt );
static UTCPCore::Buffer* to_json( const uniset::SensorMessage* sm, const std::string& err );
static UTCPCore::Buffer* to_txt( const uniset::SensorMessage* sm, const std::string& err );
static UTCPCore::Buffer* to_raw( const uniset::SensorMessage* sm, const std::string& err );
/*! класс реализует работу с websocket через eventloop
* Из-за того, что поступление логов может быть достаточно быстрым
* чтобы не "завалить" браузер кучей сообщений,
* сделана посылка не по факту приёма сообщения, а раз в send_sec,
* не более maxsend сообщений.
* \todo websocket: может стоит объединять сообщения в одну посылку (пока считаю преждевременной оптимизацией)
*/
class UWebSocket:
public Poco::Net::WebSocket
{
public:
UWebSocket( Poco::Net::HTTPServerRequest* req,
Poco::Net::HTTPServerResponse* resp);
virtual ~UWebSocket();
bool isActive();
void set( ev::dynamic_loop& loop, std::shared_ptr<ev::async> a );
void send( ev::timer& t, int revents );
void ping( ev::timer& t, int revents );
void read( ev::io& io, int revents );
struct sinfo
{
std::string err; // ошибка при работе с датчиком (например при заказе)
uniset::ObjectId id = { uniset::DefaultObjectId };
std::string cmd = "";
long value = { 0 }; // set value
};
void add( const sinfo& si );
void del( uniset::ObjectId id );
void set( uniset::ObjectId id, long value );
void sensorInfo( const uniset::SensorMessage* sm );
void doCommand( const std::shared_ptr<UInterface>& ui );
void term();
void waitCompletion();
// настройка
void setHearbeatTime( const double& sec );
void setSendPeriod( const double& sec );
void setMaxSendCount( size_t val );
void setRespondFormat( RespondFormat f );
std::shared_ptr<DebugStream> mylog;
protected:
void write();
void sendError( sinfo& si, const std::string& err );
void onCommand( const std::string& cmd );
ev::timer iosend;
double send_sec = { 0.5 };
size_t maxsend = { 200 };
ev::timer ioping;
double ping_sec = { 3.0 };
static const std::string ping_str;
ev::io iorecv;
char rbuf[512]; //! \todo сделать настраиваемым или применить Poco::FIFOBuffer
timeout_t recvTimeout = { 200 }; // msec
std::shared_ptr<ev::async> cmdsignal;
std::mutex finishmut;
std::condition_variable finish;
std::atomic_bool cancelled = { false };
std::unordered_map<uniset::ObjectId, sinfo> smap;
RespondFormat fmt = { RespondFormat::JSON };
Poco::Net::HTTPServerRequest* req;
Poco::Net::HTTPServerResponse* resp;
// очередь данных на посылку..
std::queue<UTCPCore::Buffer*> wbuf;
size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
};
class UWebSocketGuard
{
public:
UWebSocketGuard( std::shared_ptr<UWebSocket>& s, UWebSocketGate* g ):
ws(s), wsgate(g) {}
~UWebSocketGuard()
{
wsgate->delWebSocket(ws);
}
private:
std::shared_ptr<UWebSocket> ws;
UWebSocketGate* wsgate;
};
friend class UWebSocketGuard;
std::list<std::shared_ptr<UWebSocket>> wsocks;
uniset::uniset_rwmutex wsocksMutex;
size_t maxwsocks = { 50 }; // максимальное количество websocket-ов
class UWebSocketGateRequestHandlerFactory:
public Poco::Net::HTTPRequestHandlerFactory
{
public:
UWebSocketGateRequestHandlerFactory( UWebSocketGate* l ): wsgate(l) {}
virtual ~UWebSocketGateRequestHandlerFactory() {}
virtual Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req ) override;
private:
UWebSocketGate* wsgate;
};
#endif
private:
};
// ----------------------------------------------------------------------------------
} // end of namespace uniset
//------------------------------------------------------------------------------------------
#endif
#ifndef UWebSocketGateSugar_H_
#define UWebSocketGateSugar_H_
// "синтаксический сахар"..
#ifndef myinfo
#define myinfo if( mylog->debugging(Debug::INFO) ) mylog->info()
#endif
#ifndef mywarn
#define mywarn if( mylog->debugging(Debug::WARN) ) mylog->warn()
#endif
#ifndef mycrit
#define mycrit if( mylog->debugging(Debug::CRIT) ) mylog->crit()
#endif
#ifndef mylog1
#define mylog1 if( mylog->debugging(Debug::LEVEL1) ) mylog->level1()
#endif
#ifndef mylog2
#define mylog2 if( mylog->debugging(Debug::LEVEL2) ) mylog->level2()
#endif
#ifndef mylog3
#define mylog3 if( mylog->debugging(Debug::LEVEL3) ) mylog->level3()
#endif
#ifndef mylog4
#define mylog4 if( mylog->debugging(Debug::LEVEL4) ) mylog->level4()
#endif
#ifndef mylog5
#define mylog5 if( mylog->debugging(Debug::LEVEL5) ) mylog->level5()
#endif
#ifndef mylog6
#define mylog6 if( mylog->debugging(Debug::LEVEL6) ) mylog->level6()
#endif
#ifndef mylog7
#define mylog7 if( mylog->debugging(Debug::LEVEL7) ) mylog->level7()
#endif
#ifndef mylog8
#define mylog8 if( mylog->debugging(Debug::LEVEL8) ) mylog->level8()
#endif
#ifndef mylog9
#define mylog9 if( mylog->debugging(Debug::LEVEL9) ) mylog->level9()
#endif
#ifndef mylogany
#define mylogany mylog->any()
#endif
#endif
#include "Configuration.h"
#include "UWebSocketGate.h"
#include "Configuration.h"
#include "UniSetActivator.h"
// --------------------------------------------------------------------------
using namespace uniset;
using namespace std;
// --------------------------------------------------------------------------
int main(int argc, char** argv)
{
// std::ios::sync_with_stdio(false);
try
{
if( argc > 1 && (!strcmp(argv[1], "--help") || !strcmp(argv[1], "-h")) )
{
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
UWebSocketGate::help_print();
return 0;
}
auto conf = uniset_init(argc, argv);
auto ws = UWebSocketGate::init_wsgate(argc, argv, "ws-");
if( !ws )
return 1;
auto act = UniSetActivator::Instance();
act->add(ws);
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
act->run(false);
return 0;
}
catch( const std::exception& ex )
{
cerr << "(UWebSocketGate::main): " << ex.what() << endl;
}
catch(...)
{
cerr << "(UWebSocketGate::main): catch ..." << endl;
}
return 1;
}
#!/bin/sh
ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-wsgate --confile test.xml --ws-name UWebSocketGate1 --ws-log-add-levels any $*
../../conf/test.xml
\ No newline at end of file
../../Utilities/scripts/uniset2-functions.sh
\ No newline at end of file
../../Utilities/scripts/uniset2-start.sh
\ No newline at end of file
../../Utilities/scripts/uniset2-stop.sh
\ No newline at end of file
......@@ -33,6 +33,8 @@ namespace uniset
~SMonitor();
// -----
static std::string printEvent( const uniset::SensorMessage* sm );
protected:
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* si ) override;
......
......@@ -95,25 +95,33 @@ void SMonitor::sysCommand( const SystemMessage* sm )
}
}
// ------------------------------------------------------------------------------------------
void SMonitor::sensorInfo( const SensorMessage* si )
std::string SMonitor::printEvent( const uniset::SensorMessage* sm )
{
auto conf = uniset_conf();
ostringstream s;
string s_sup("");
if( si->supplier == uniset::AdminID )
if( sm->supplier == uniset::AdminID )
s_sup = "uniset-admin";
else
s_sup = ORepHelpers::getShortName(conf->oind->getMapName(si->supplier));
cout << "(" << setw(6) << si->id << "):"
<< "[(" << std::right << setw(5) << si->supplier << ")"
<< std::left << setw(20) << s_sup << "] "
<< std::right << setw(8) << timeToString(si->sm_tv.tv_sec, ":")
<< "(" << setw(6) << si->sm_tv.tv_nsec << "): "
<< std::right << setw(45) << conf->oind->getMapName(si->id)
<< " value:" << std::right << setw(9) << si->value
<< " fvalue:" << std::right << setw(12) << ( (float)si->value / pow(10.0, si->ci.precision) ) << endl;
s_sup = ORepHelpers::getShortName(conf->oind->getMapName(sm->supplier));
s << "(" << setw(6) << sm->id << "):"
<< "[(" << std::right << setw(5) << sm->supplier << ")"
<< std::left << setw(20) << s_sup << "] "
<< std::right << setw(8) << timeToString(sm->sm_tv.tv_sec, ":")
<< "(" << setw(6) << sm->sm_tv.tv_nsec << "): "
<< std::right << setw(45) << conf->oind->getMapName(sm->id)
<< " value:" << std::right << setw(9) << sm->value
<< " fvalue:" << std::right << setw(12) << ( (float)sm->value / pow(10.0, sm->ci.precision) ) << endl;
return s.str();
}
// ------------------------------------------------------------------------------------------
void SMonitor::sensorInfo( const SensorMessage* si )
{
cout << printEvent(si) << endl;
if( !script.empty() )
{
......@@ -124,7 +132,7 @@ void SMonitor::sensorInfo( const SensorMessage* si )
if( script[0] == '.' || script[0] == '/' )
cmd << script;
else
cmd << conf->getBinDir() << script;
cmd << uniset_conf()->getBinDir() << script;
cmd << " " << si->id << " " << si->value << " " << si->sm_tv.tv_sec << " " << si->sm_tv.tv_nsec;
......
......@@ -295,6 +295,11 @@
./extensions/UniNetwork/UniExchange.cc
./extensions/UniNetwork/UniExchange.h
./extensions/UniNetwork/uninet.cc
./extensions/UWebSocketGate/main.cc
./extensions/UWebSocketGate/Makefile.am
./extensions/UWebSocketGate/UWebSocketGate.cc
./extensions/UWebSocketGate/UWebSocketGate.h
./extensions/UWebSocketGate/UWebSocketGateSugar.h
./IDL/Makefile.am
./IDL/Processes/Makefile.am
./IDL/UniSetTypes/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment