Commit 5dbbca92 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): накидал предварительную реализацию поддержки WebSocket

parent 1e99958b
......@@ -23,6 +23,7 @@
#include "unisetstd.h"
#include <Poco/Net/NetException.h>
#include <Poco/Net/WebSocket.h>
#include "ujson.h"
#include "LogDB.h"
#include "Configuration.h"
......@@ -61,6 +62,7 @@ LogDB::LogDB( const string& name , const string& prefix ):
connectionTimer.set<LogDB, &LogDB::onTimer>(this);
checkBufferTimer.set<LogDB, &LogDB::onCheckBuffer>(this);
pingWebSockets.set<LogDB, &LogDB::onPingWebSockets>(this);
UniXML::iterator sit(cnode);
......@@ -156,7 +158,7 @@ LogDB::LogDB( const string& name , const string& prefix ):
/*! \FIXME: доделать конфигурирование параметров */
Poco::Net::HTTPServerParams* httpParams = new Poco::Net::HTTPServerParams;
httpParams->setMaxQueued(100);
httpParams->setMaxThreads(1);
httpParams->setMaxThreads(3);
httpserv = std::make_shared<Poco::Net::HTTPServer>(this, Poco::Net::ServerSocket(sa), httpParams );
}
catch( std::exception& ex )
......@@ -249,17 +251,20 @@ void LogDB::evfinish()
{
connectionTimer.stop();
checkBufferTimer.stop();
pingWebSockets.stop();
}
// -----------------------------------------------------------------------------
void LogDB::evprepare()
{
connectionTimer.set(loop);
checkBufferTimer.set(loop);
pingWebSockets.set(loop);
if( tmConnection_msec != UniSetTimer::WaitUpTime )
connectionTimer.start(0, tmConnection_sec);
checkBufferTimer.start(0, tmCheckBuffer_sec);
pingWebSockets.start(0, tmPingWebSockets_sec);
}
// -----------------------------------------------------------------------------
void LogDB::onTimer( ev::timer& t, int revents )
......@@ -503,8 +508,28 @@ class LogDBRequestHandler:
LogDB* logdb;
};
// -----------------------------------------------------------------------------
class LogDBWebSocketRequestHandler:
public Poco::Net::HTTPRequestHandler
{
public:
LogDBWebSocketRequestHandler( LogDB* l ): logdb(l) {}
virtual void handleRequest( Poco::Net::HTTPServerRequest& request,
Poco::Net::HTTPServerResponse& response ) override
{
logdb->onWebSocketSession(request, response);
}
private:
LogDB* logdb;
};
// -----------------------------------------------------------------------------
Poco::Net::HTTPRequestHandler* LogDB::createRequestHandler( const Poco::Net::HTTPServerRequest& req )
{
if( req.find("Upgrade") != req.end() && Poco::icompare(req["Upgrade"], "websocket") == 0 )
return new LogDBWebSocketRequestHandler(this);
return new LogDBRequestHandler(this);
}
// -----------------------------------------------------------------------------
......@@ -817,5 +842,282 @@ string LogDB::qLast( const string& p )
return "";
}
// -----------------------------------------------------------------------------
void LogDB::onWebSocketSession(Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp)
{
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
std::vector<std::string> seg;
Poco::URI uri(req.getURI());
uri.getPathSegments(seg);
// example: http://host:port/logdb/ws/logname
if( seg.size() < 3
|| seg[0] != "logdb"
|| seg[1] != "ws"
|| seg[2].empty())
{
resp.setStatus(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentType("text/html");
resp.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentLength(0);
std::ostream& err = resp.send();
err << "Bad request. Must be: http://host:port/logdb/ws/logname";
err.flush();
return;
}
auto ws = newWebSocket(req, resp, seg[2]);
if( !ws )
return;
LogWebSocketGuard lk(ws, this);
char buf[100];
while( true )
{
try
{
std::string txt = ws->get();
if( txt.empty() )
continue;
int flags = WebSocket::FRAME_TEXT;
if( txt == "." )
flags = WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PING;
ssize_t ret = ws->sendFrame(txt.data(), txt.size(), flags);
if( ret < 0 )
{
dbwarn << myname << "(websocket): " << req.clientAddress().toString()
<< " write to socket error(" << errno << "): " << strerror(errno) << endl;
if( errno == EPIPE || errno == EBADF )
{
dbwarn << myname << "(websocket): "
<< req.clientAddress().toString()
<< " write error.. terminate session.." << endl;
}
return;
}
// проверяем соединение
// ssize_t n = ws->receiveFrame(buf, sizeof(buf), flags);
// cerr << "read from websocket: " << n << endl;
// if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE )
// {
// dbwarn << myname << "(websocket): "
// << req.clientAddress().toString()
// << " connection closed.." << endl;
// return;
// }
continue;
}
catch( WebSocketException& exc )
{
switch( exc.code() )
{
case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
resp.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
case WebSocket::WS_ERR_NO_HANDSHAKE:
case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
resp.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentLength(0);
resp.send();
break;
}
}
catch( const Poco::Net::NetException& e )
{
dbwarn << myname << "(websocket):NetException: "
<< req.clientAddress().toString()
<< " error: " << e.displayText()
<< endl;
}
catch( Poco::IOException& ex )
{
dbwarn << myname << "(websocket): IOException: "
<< req.clientAddress().toString()
<< " error: " << ex.displayText()
<< endl;
}
return;
}
}
// -----------------------------------------------------------------------------
std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerRequest& req,
Poco::Net::HTTPServerResponse& resp,
const std::string& logname )
{
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
std::shared_ptr<Log> log;
for( const auto& s : logservers )
{
if( s->name == logname )
{
log = s;
break;
}
}
if( !log )
{
resp.setStatus(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentType("text/html");
resp.setStatusAndReason(HTTPResponse::HTTP_NOT_FOUND);
std::ostream& err = resp.send();
err << "Not found '" << logname << "'";
err.flush();
return nullptr;
}
uniset_rwmutex_wrlock lock(wsocksMutex);
std::shared_ptr<LogWebSocket> ws = make_shared<LogWebSocket>(req, resp, log);
wsocks.emplace_back(ws);
return ws;
}
// -----------------------------------------------------------------------------
void LogDB::delWebSocket( std::shared_ptr<LogWebSocket>& ws )
{
uniset_rwmutex_wrlock lock(wsocksMutex);
ws->term(); // надо вызывать под mutex-ом, т.к. там идёт вызов
for( auto it = wsocks.begin(); it != wsocks.end(); it++ )
{
if( (*it).get() == ws.get() )
{
dblog3 << myname << ": delete websocket "
<< endl;
wsocks.erase(it);
return;
}
}
}
// -----------------------------------------------------------------------------
void LogDB::onPingWebSockets( ev::timer& t, int revents )
{
if (EV_ERROR & revents)
{
dbcrit << myname << "(onPingWebSockets): invalid event" << endl;
return;
}
uniset_rwmutex_rlock lk(wsocksMutex);
for( const auto& s : wsocks )
s->ping();
}
// -----------------------------------------------------------------------------
LogDB::LogWebSocket::LogWebSocket(Poco::Net::HTTPServerRequest& req,
Poco::Net::HTTPServerResponse& resp,
std::shared_ptr<Log>& _log):
Poco::Net::WebSocket(req, resp)
// log(_log)
{
con = _log->signal_on_read().connect( sigc::mem_fun(*this, &LogWebSocket::add));
}
// -----------------------------------------------------------------------------
LogDB::LogWebSocket::~LogWebSocket()
{
if( !cancelled )
term();
}
// -----------------------------------------------------------------------------
std::string LogDB::LogWebSocket::get()
{
if( cancelled )
return "";
std::string ret;
{
uniset_rwmutex_wrlock lk(mqmut);
if( !mqueue.empty() )
{
ret = mqueue.front();
mqueue.pop();
return ret;
}
}
{
std::unique_lock<std::mutex> lk(mut);
event.wait(lk);
}
if( cancelled )
return "";
uniset_rwmutex_wrlock lk(mqmut);
if( !mqueue.empty() )
{
ret = mqueue.front();
mqueue.pop();
}
return ret;
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::ping()
{
if( cancelled )
return;
{
uniset_rwmutex_wrlock lk(mqmut);
// в качестве ping-а просто добавляем фейковое сообщение '.'
mqueue.push(".");
}
event.notify_all();
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::add( LogDB::Log* log, const string& txt )
{
if( cancelled )
return;
{
uniset_rwmutex_wrlock lk(mqmut);
mqueue.push(txt);
}
event.notify_all();
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::term()
{
cancelled = true;
con.disconnect(); // вот тут не безпасный вызов, т.е. sigc там в других потоках используется
event.notify_all();
}
// -----------------------------------------------------------------------------
#endif
// -----------------------------------------------------------------------------
......@@ -23,10 +23,13 @@
// --------------------------------------------------------------------------
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <ev++.h>
#include <sigc++/sigc++.h>
#include <Poco/JSON/Object.h>
#include <Poco/Net/WebSocket.h>
#include "UniSetTypes.h"
#include "LogAgregator.h"
#include "DebugStream.h"
......@@ -93,9 +96,13 @@ namespace uniset
\todo Добавить ротацию БД
\todo Сделать настройку, для формата даты и времени при выгрузке из БД (при формировании json).
\todo Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз).
\todo Встроить поддержку websocket
\todo Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип.
\todo Пока не очень эффективная работа с датой и временем (заодно подумать всё-таки в чём хранить)
\todo WebSocket: Сделать запись через UTCPCore::Buffer, чтобы не терять данные при записи в сокет
\todo WebSocket: Доделать ограничение на размер буфера для каждого сокета
\todo WebSocket: доделать настройку всевозможных timeout-ов
\todo WebSocket: доделать проверку соединения
\todo WebSocket: сделать ограничение на максимальное количество соединений (websocket)
*/
class LogDB:
public EventLoopServer
......@@ -123,17 +130,20 @@ namespace uniset
#ifndef DISABLE_REST_API
Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req );
virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
#endif
protected:
class Log;
class LogWebSocket;
virtual void evfinish() override;
virtual void evprepare() override;
void onTimer( ev::timer& t, int revents );
void onCheckBuffer( ev::timer& t, int revents );
void addLog( Log* log, const std::string& txt );
#ifndef DISABLE_REST_API
Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
Poco::JSON::Object::Ptr httpGetRequest( const std::string& cmd, const Poco::URI::QueryParameters& p );
......@@ -144,6 +154,10 @@ namespace uniset
// формирование условия where для строки XX[m|h|d|M]
// XX m - минут, h-часов, d-дней, M - месяцев
static std::string qLast( const std::string& p );
std::shared_ptr<LogWebSocket> newWebSocket( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp, const std::string& logname );
void delWebSocket( std::shared_ptr<LogWebSocket>& ws );
void onPingWebSockets( ev::timer& t, int revents );
#endif
std::string myname;
std::unique_ptr<SQLiteInterface> db;
......@@ -172,7 +186,7 @@ namespace uniset
bool isConnected() const;
void ioprepare( ev::dynamic_loop& loop );
void event( ev::io& watcher, int revents );
void read( ev::io& watcher);
void read( ev::io& watcher );
void write( ev::io& io );
void close();
......@@ -203,11 +217,73 @@ namespace uniset
ev::timer checkBufferTimer;
double tmCheckBuffer_sec = { 1.0 };
ev::timer pingWebSockets;
double tmPingWebSockets_sec = { 3.0 };
#ifndef DISABLE_REST_API
std::shared_ptr<Poco::Net::HTTPServer> httpserv;
std::string httpHost = { "" };
int httpPort = { 0 };
class LogWebSocket:
public Poco::Net::WebSocket
{
public:
LogWebSocket(Poco::Net::HTTPServerRequest& req,
Poco::Net::HTTPServerResponse& resp,
std::shared_ptr<Log>& log);
virtual ~LogWebSocket();
// получение очередного сообщения
// (с засыпанием в случае отсутствия сообщения в очереди)
std::string get();
// вызывается из потока eventloop..
void ping();
// вызывается из потока eventloop..
void add( Log* log, const std::string& txt );
// надо вызывать только из потока eventloop
// т.к. идёт обращение sigc::connection
void term();
protected:
std::mutex mut;
std::condition_variable event;
uniset::uniset_rwmutex mqmut;
std::queue<std::string> mqueue;
std::atomic_bool cancelled = { false };
sigc::connection con;
// std::shared_ptr<Log> log;
};
class LogWebSocketGuard
{
public:
LogWebSocketGuard( std::shared_ptr<LogWebSocket>& s, LogDB* l ):
ws(s), logdb(l) {}
~LogWebSocketGuard()
{
logdb->delWebSocket(ws);
}
private:
std::shared_ptr<LogWebSocket> ws;
LogDB* logdb;
};
friend class LogWebSocketGuard;
std::list<std::shared_ptr<LogWebSocket>> wsocks;
uniset::uniset_rwmutex wsocksMutex;
#endif
private:
......
<html>
<head>
<title>WebSocketServer</title>
<script type="text/javascript">
function WebSocketTest()
{
if ("WebSocket" in window)
{
var ws = new WebSocket("ws://localhost:8888/logdb/ws/logserver1");
ws.onopen = function()
{
ws.send("Hello, world!");
};
ws.onmessage = ws.onmessage = function(evt)
{
var p = document.getElementById('logs');
if( evt.data != '.' ) {
p.innerHTML = p.innerHTML + "<p>"+evt.data+"</p>";
}
};
ws.onclose = function()
{
alert("WebSocket closed.");
};
}
else
{
alert("This browser does not support WebSockets.");
}
}
</script>
</head>
<body>
<h1>WebSocket Server</h1>
<p><a href="javascript:WebSocketTest()">Run WebSocket</a></p>
<div id="logs"></div>
</body>
</html>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment