Commit cc7c1c60 authored by Pavel Vainerman's avatar Pavel Vainerman

(LogDB): перевёл работу WebSocket на eventloop.

parent 5dbbca92
...@@ -872,100 +872,24 @@ void LogDB::onWebSocketSession(Poco::Net::HTTPServerRequest& req, Poco::Net::HTT ...@@ -872,100 +872,24 @@ void LogDB::onWebSocketSession(Poco::Net::HTTPServerRequest& req, Poco::Net::HTT
return; return;
} }
auto ws = newWebSocket(req, resp, seg[2]); auto ws = newWebSocket(&req, &resp, seg[2]);
if( !ws ) if( !ws )
return; return;
LogWebSocketGuard lk(ws, this); LogWebSocketGuard lk(ws, this);
char buf[100]; dblog3 << myname << "(onWebSocketSession): start session for " << req.clientAddress().toString() << endl;
while( true ) // т.к. вся работа происходит в eventloop
{ // то здесь просто ждём..
try ws->waitCompletion();
{
std::string txt = ws->get();
if( txt.empty() )
continue;
int flags = WebSocket::FRAME_TEXT;
if( txt == "." )
flags = WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PING;
ssize_t ret = ws->sendFrame(txt.data(), txt.size(), flags);
if( ret < 0 )
{
dbwarn << myname << "(websocket): " << req.clientAddress().toString()
<< " write to socket error(" << errno << "): " << strerror(errno) << endl;
if( errno == EPIPE || errno == EBADF )
{
dbwarn << myname << "(websocket): "
<< req.clientAddress().toString()
<< " write error.. terminate session.." << endl;
}
return;
}
// проверяем соединение
// ssize_t n = ws->receiveFrame(buf, sizeof(buf), flags);
// cerr << "read from websocket: " << n << endl;
// if( (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE )
// {
// dbwarn << myname << "(websocket): "
// << req.clientAddress().toString()
// << " connection closed.." << endl;
// return;
// }
continue;
}
catch( WebSocketException& exc )
{
switch( exc.code() )
{
case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
resp.set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
case WebSocket::WS_ERR_NO_HANDSHAKE:
case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
resp.setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentLength(0);
resp.send();
break;
}
}
catch( const Poco::Net::NetException& e )
{
dbwarn << myname << "(websocket):NetException: "
<< req.clientAddress().toString()
<< " error: " << e.displayText()
<< endl;
}
catch( Poco::IOException& ex )
{
dbwarn << myname << "(websocket): IOException: "
<< req.clientAddress().toString()
<< " error: " << ex.displayText()
<< endl;
}
return;
}
dblog3 << myname << "(onWebSocketSession): finish session for " << req.clientAddress().toString() << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerRequest& req, std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerRequest* req,
Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPServerResponse* resp,
const std::string& logname ) const std::string& logname )
{ {
using Poco::Net::WebSocket; using Poco::Net::WebSocket;
...@@ -986,10 +910,10 @@ std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerR ...@@ -986,10 +910,10 @@ std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerR
if( !log ) if( !log )
{ {
resp.setStatus(HTTPResponse::HTTP_BAD_REQUEST); resp->setStatus(HTTPResponse::HTTP_BAD_REQUEST);
resp.setContentType("text/html"); resp->setContentType("text/html");
resp.setStatusAndReason(HTTPResponse::HTTP_NOT_FOUND); resp->setStatusAndReason(HTTPResponse::HTTP_NOT_FOUND);
std::ostream& err = resp.send(); std::ostream& err = resp->send();
err << "Not found '" << logname << "'"; err << "Not found '" << logname << "'";
err.flush(); err.flush();
return nullptr; return nullptr;
...@@ -997,8 +921,7 @@ std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerR ...@@ -997,8 +921,7 @@ std::shared_ptr<LogDB::LogWebSocket> LogDB::newWebSocket( Poco::Net::HTTPServerR
uniset_rwmutex_wrlock lock(wsocksMutex); uniset_rwmutex_wrlock lock(wsocksMutex);
std::shared_ptr<LogWebSocket> ws = make_shared<LogWebSocket>(req, resp, log, loop);
std::shared_ptr<LogWebSocket> ws = make_shared<LogWebSocket>(req, resp, log);
wsocks.emplace_back(ws); wsocks.emplace_back(ws);
return ws; return ws;
} }
...@@ -1007,8 +930,6 @@ void LogDB::delWebSocket( std::shared_ptr<LogWebSocket>& ws ) ...@@ -1007,8 +930,6 @@ void LogDB::delWebSocket( std::shared_ptr<LogWebSocket>& ws )
{ {
uniset_rwmutex_wrlock lock(wsocksMutex); uniset_rwmutex_wrlock lock(wsocksMutex);
ws->term(); // надо вызывать под mutex-ом, т.к. там идёт вызов
for( auto it = wsocks.begin(); it != wsocks.end(); it++ ) for( auto it = wsocks.begin(); it != wsocks.end(); it++ )
{ {
if( (*it).get() == ws.get() ) if( (*it).get() == ws.get() )
...@@ -1035,88 +956,171 @@ void LogDB::onPingWebSockets( ev::timer& t, int revents ) ...@@ -1035,88 +956,171 @@ void LogDB::onPingWebSockets( ev::timer& t, int revents )
s->ping(); s->ping();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
LogDB::LogWebSocket::LogWebSocket(Poco::Net::HTTPServerRequest& req, LogDB::LogWebSocket::LogWebSocket(Poco::Net::HTTPServerRequest* _req,
Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPServerResponse* _resp,
std::shared_ptr<Log>& _log): std::shared_ptr<Log>& _log,
Poco::Net::WebSocket(req, resp) ev::dynamic_loop& loop ):
Poco::Net::WebSocket(*_req, *_resp),
req(_req),
resp(_resp)
// log(_log) // log(_log)
{ {
setBlocking(false);
con = _log->signal_on_read().connect( sigc::mem_fun(*this, &LogWebSocket::add)); con = _log->signal_on_read().connect( sigc::mem_fun(*this, &LogWebSocket::add));
io.set(loop);
io.set<LogDB::LogWebSocket, &LogDB::LogWebSocket::event>(this);
io.start();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
LogDB::LogWebSocket::~LogWebSocket() LogDB::LogWebSocket::~LogWebSocket()
{ {
if( !cancelled ) if( !cancelled )
term(); term();
// удаляем всё что осталось
while(!wbuf.empty())
{
delete wbuf.front();
wbuf.pop();
}
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::event( ev::io& watcher, int revents )
{
if( EV_ERROR & revents )
return;
if( revents & EV_WRITE )
write(watcher);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
std::string LogDB::LogWebSocket::get() void LogDB::LogWebSocket::ping()
{ {
if( cancelled ) if( cancelled )
return ""; return;
wbuf.emplace(new UTCPCore::Buffer("."));
io.set(ev::WRITE);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::add( LogDB::Log* log, const string& txt )
{
if( cancelled || txt.empty())
return;
wbuf.emplace(new UTCPCore::Buffer(txt));
io.set(ev::WRITE);
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::write( ev::io& w )
{
UTCPCore::Buffer* msg = 0;
if( wbuf.empty() )
{
io.set(EV_NONE);
return;
}
msg = wbuf.front();
if( !msg )
return;
using Poco::Net::WebSocket;
using Poco::Net::WebSocketException;
using Poco::Net::HTTPResponse;
using Poco::Net::HTTPServerRequest;
std::string ret; int flags = WebSocket::FRAME_TEXT;
if( msg->len == 1 ) // это пинг состоящий из "."
flags = WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PING;
try
{ {
uniset_rwmutex_wrlock lk(mqmut); ssize_t ret = sendFrame(msg->dpos(), msg->nbytes(), flags);
if( !mqueue.empty() ) if( ret < 0 )
{ {
ret = mqueue.front(); cerr << "(websocket): " << req->clientAddress().toString()
mqueue.pop(); << " write to socket error(" << errno << "): " << strerror(errno) << endl;
return ret;
if( errno == EPIPE || errno == EBADF )
{
cerr << "(websocket): "
<< req->clientAddress().toString()
<< " write error.. terminate session.." << endl;
term();
}
return;
} }
}
{ msg->pos += ret;
std::unique_lock<std::mutex> lk(mut);
event.wait(lk);
}
if( cancelled ) if( msg->nbytes() == 0 )
return ""; {
wbuf.pop();
delete msg;
}
uniset_rwmutex_wrlock lk(mqmut); if( !wbuf.empty() )
io.set(EV_WRITE);
if( !mqueue.empty() ) return;
}
catch( WebSocketException& exc )
{ {
ret = mqueue.front(); switch( exc.code() )
mqueue.pop(); {
case WebSocket::WS_ERR_HANDSHAKE_UNSUPPORTED_VERSION:
resp->set("Sec-WebSocket-Version", WebSocket::WEBSOCKET_VERSION);
case WebSocket::WS_ERR_NO_HANDSHAKE:
case WebSocket::WS_ERR_HANDSHAKE_NO_VERSION:
case WebSocket::WS_ERR_HANDSHAKE_NO_KEY:
resp->setStatusAndReason(HTTPResponse::HTTP_BAD_REQUEST);
resp->setContentLength(0);
resp->send();
break;
}
} }
catch( const Poco::Net::NetException& e )
return ret;
}
// -----------------------------------------------------------------------------
void LogDB::LogWebSocket::ping()
{
if( cancelled )
return;
{ {
uniset_rwmutex_wrlock lk(mqmut); cerr << "(websocket):NetException: "
// в качестве ping-а просто добавляем фейковое сообщение '.' << req->clientAddress().toString()
mqueue.push("."); << " error: " << e.displayText()
<< endl;
} }
event.notify_all(); catch( Poco::IOException& ex )
{
cerr << "(websocket): IOException: "
<< req->clientAddress().toString()
<< " error: " << ex.displayText()
<< endl;
}
term();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::LogWebSocket::add( LogDB::Log* log, const string& txt ) void LogDB::LogWebSocket::term()
{ {
if( cancelled ) if( cancelled )
return; return;
{ cancelled = true;
uniset_rwmutex_wrlock lk(mqmut); con.disconnect();
mqueue.push(txt); io.stop();
} finish.notify_all();
event.notify_all();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void LogDB::LogWebSocket::term() void LogDB::LogWebSocket::waitCompletion()
{ {
cancelled = true; std::unique_lock<std::mutex> lk(finishmut);
con.disconnect(); // вот тут не безпасный вызов, т.е. sigc там в других потоках используется while( !cancelled )
event.notify_all(); finish.wait(lk);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#endif #endif
......
...@@ -62,7 +62,7 @@ namespace uniset ...@@ -62,7 +62,7 @@ namespace uniset
Ожидается что контролируемых логов будет не очень много (максимум несколько десятков) Ожидается что контролируемых логов будет не очень много (максимум несколько десятков)
и каждый лог будет генерировать не более 2-5 мегабайт записей. Поэтому sqlite должно хватить. и каждый лог будет генерировать не более 2-5 мегабайт записей. Поэтому sqlite должно хватить.
\section sec_LogDB_Conf Конфигурирвание LogDB \section sec_LogDB_Conf Конфигурирование LogDB
<LogDB name="LogDB" ...> <LogDB name="LogDB" ...>
<logserver name="" ip=".." port=".." cmd=".." description=".."/> <logserver name="" ip=".." port=".." cmd=".." description=".."/>
...@@ -92,13 +92,28 @@ namespace uniset ...@@ -92,13 +92,28 @@ namespace uniset
/count?logname - Получить текущее количество записей /count?logname - Получить текущее количество записей
\section sec_LogDB_WEBSOCK LogDB Поддержка web socket
В LogDB встроена возможность realtime чтения логов, через websocket.
Подключение (создание) сокета происходит по адресу
\code
ws://host:port/logdb/ws/logname
\endcode
Где \a logname - это имя логсервера от которого мы хотим получать логи (см. \ref sec_LogDB_Conf).
\section sec_LogDB_DETAIL LogDB Технические детали
Вся релизация построена на "однопоточном" eventloop. В нём происходит,
чтение данных от логсерверов, посылка сообщений в websockets.
При этом обработка запросов REST API реалиуется отдельными потоками контролируемыми libpoco.
\todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку, количество потоков для http и т.п. \todo Добавить настройки таймаутов, размера буфера, размера для резервирования под строку, количество потоков для http и т.п.
\todo Добавить ротацию БД \todo Добавить ротацию БД
\todo Сделать настройку, для формата даты и времени при выгрузке из БД (при формировании json). \todo Сделать настройку, для формата даты и времени при выгрузке из БД (при формировании json).
\todo Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз). \todo Возможно в /logs стоит в ответе сразу возвращать и общее количество в БД (это один лишний запрос, каждый раз).
\todo Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип. \todo Возможно в последствии оптимизировать таблицы (нормализовать) если будет тормозить. Сейчас пока прототип.
\todo Пока не очень эффективная работа с датой и временем (заодно подумать всё-таки в чём хранить) \todo Пока не очень эффективная работа с датой и временем (заодно подумать всё-таки в чём хранить)
\todo WebSocket: Сделать запись через UTCPCore::Buffer, чтобы не терять данные при записи в сокет
\todo WebSocket: Доделать ограничение на размер буфера для каждого сокета \todo WebSocket: Доделать ограничение на размер буфера для каждого сокета
\todo WebSocket: доделать настройку всевозможных timeout-ов \todo WebSocket: доделать настройку всевозможных timeout-ов
\todo WebSocket: доделать проверку соединения \todo WebSocket: доделать проверку соединения
...@@ -155,7 +170,7 @@ namespace uniset ...@@ -155,7 +170,7 @@ namespace uniset
// XX m - минут, h-часов, d-дней, M - месяцев // XX m - минут, h-часов, d-дней, M - месяцев
static std::string qLast( const std::string& p ); static std::string qLast( const std::string& p );
std::shared_ptr<LogWebSocket> newWebSocket( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp, const std::string& logname ); std::shared_ptr<LogWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const std::string& logname );
void delWebSocket( std::shared_ptr<LogWebSocket>& ws ); void delWebSocket( std::shared_ptr<LogWebSocket>& ws );
void onPingWebSockets( ev::timer& t, int revents ); void onPingWebSockets( ev::timer& t, int revents );
#endif #endif
...@@ -229,37 +244,41 @@ namespace uniset ...@@ -229,37 +244,41 @@ namespace uniset
public Poco::Net::WebSocket public Poco::Net::WebSocket
{ {
public: public:
LogWebSocket(Poco::Net::HTTPServerRequest& req, LogWebSocket(Poco::Net::HTTPServerRequest* req,
Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPServerResponse* resp,
std::shared_ptr<Log>& log); std::shared_ptr<Log>& log,
ev::dynamic_loop& loop );
virtual ~LogWebSocket(); virtual ~LogWebSocket();
// получение очередного сообщения void event( ev::io& watcher, int revents );
// (с засыпанием в случае отсутствия сообщения в очереди)
std::string get();
// вызывается из потока eventloop..
void ping(); void ping();
// вызывается из потока eventloop..
void add( Log* log, const std::string& txt ); void add( Log* log, const std::string& txt );
// надо вызывать только из потока eventloop
// т.к. идёт обращение sigc::connection
void term(); void term();
void waitCompletion();
protected: protected:
std::mutex mut;
std::condition_variable event;
uniset::uniset_rwmutex mqmut; void write( ev::io& w );
std::queue<std::string> mqueue;
ev::io io;
std::mutex finishmut;
std::condition_variable finish;
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
sigc::connection con; sigc::connection con; // подписка на появление логов..
// std::shared_ptr<Log> log;
Poco::Net::HTTPServerRequest* req;
Poco::Net::HTTPServerResponse* resp;
// очередь данных на посылку..
std::queue<UTCPCore::Buffer*> wbuf;
}; };
class LogWebSocketGuard class LogWebSocketGuard
......
<html> <html>
<!--
Base on example from
https://github.com/pocoproject/poco/blob/develop/Net/samples/WebSocketServer/src/WebSocketServer.cpp
-->
<head> <head>
<title>WebSocketServer</title> <title>WebSocketServer</title>
<script type="text/javascript"> <script type="text/javascript">
...@@ -7,10 +11,10 @@ ...@@ -7,10 +11,10 @@
if ("WebSocket" in window) if ("WebSocket" in window)
{ {
var ws = new WebSocket("ws://localhost:8888/logdb/ws/logserver1"); var ws = new WebSocket("ws://localhost:8888/logdb/ws/logserver1");
ws.onopen = function() // ws.onopen = function()
{ // {
ws.send("Hello, world!"); // ws.send("Hello, world!");
}; // };
ws.onmessage = ws.onmessage = function(evt) ws.onmessage = ws.onmessage = function(evt)
{ {
...@@ -33,8 +37,8 @@ ...@@ -33,8 +37,8 @@
</script> </script>
</head> </head>
<body> <body>
<h1>WebSocket Server</h1> <h1>Test uniset logdb websocket</h1>
<p><a href="javascript:WebSocketTest()">Run WebSocket</a></p> <p><a href="javascript:WebSocketTest()">Connect to 'logserver1'</a></p>
<div id="logs"></div> <div id="logs"></div>
</body> </body>
</html> </html>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment