Commit c0bfa359 authored by Pavel Vainerman's avatar Pavel Vainerman

[uwebsocket]: refactoring "commmand processing"

parent b37fc34b
...@@ -66,7 +66,6 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const strin ...@@ -66,7 +66,6 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const strin
sigINT.set<UWebSocketGate, &UWebSocketGate::onTerminate>(this); sigINT.set<UWebSocketGate, &UWebSocketGate::onTerminate>(this);
iocheck.set<UWebSocketGate, &UWebSocketGate::checkMessages>(this); iocheck.set<UWebSocketGate, &UWebSocketGate::checkMessages>(this);
maxMessagesProcessing = conf->getArgPInt("--" + prefix + "max-messages-processing", conf->getField("maxMessagesProcessing"), maxMessagesProcessing); maxMessagesProcessing = conf->getArgPInt("--" + prefix + "max-messages-processing", conf->getField("maxMessagesProcessing"), maxMessagesProcessing);
if( maxMessagesProcessing < 0 ) if( maxMessagesProcessing < 0 )
...@@ -82,6 +81,7 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const strin ...@@ -82,6 +81,7 @@ UWebSocketGate::UWebSocketGate( uniset::ObjectId id, xmlNode* cnode, const strin
wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-heartbeat-time", it.getProp("wsHeartbeatTimeTime"), int(wsHeartbeatTime_sec * 1000)) / 1000.0; wsHeartbeatTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-heartbeat-time", it.getProp("wsHeartbeatTimeTime"), int(wsHeartbeatTime_sec * 1000)) / 1000.0;
wsSendTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-send-time", it.getProp("wsSendTime"), int(wsSendTime_sec * 1000.0)) / 1000.0; wsSendTime_sec = (float)conf->getArgPInt("--" + prefix + "ws-send-time", it.getProp("wsSendTime"), int(wsSendTime_sec * 1000.0)) / 1000.0;
wsMaxSend = conf->getArgPInt("--" + prefix + "ws-max-send", it.getProp("wsMaxSend"), wsMaxSend); wsMaxSend = conf->getArgPInt("--" + prefix + "ws-max-send", it.getProp("wsMaxSend"), wsMaxSend);
wsMaxCmd = conf->getArgPInt("--" + prefix + "ws-max-cmd", it.getProp("wsMaxCmd"), wsMaxCmd);
httpHost = conf->getArgParam("--" + prefix + "httpserver-host", "localhost"); httpHost = conf->getArgParam("--" + prefix + "httpserver-host", "localhost");
httpPort = conf->getArgPInt("--" + prefix + "httpserver-port", 8081); httpPort = conf->getArgPInt("--" + prefix + "httpserver-port", 8081);
...@@ -223,9 +223,9 @@ std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const cha ...@@ -223,9 +223,9 @@ std::shared_ptr<UWebSocketGate> UWebSocketGate::init_wsgate( int argc, const cha
void UWebSocketGate::help_print() void UWebSocketGate::help_print()
{ {
cout << "Default: prefix='ws'" << endl; cout << "Default: prefix='ws'" << endl;
cout << "--prefix-name name - Имя. Для поиска настроечной секции в configure.xml" << endl; cout << "--prefix-name name - Имя. Для поиска настроечной секции в configure.xml" << endl;
cout << "--uniset-object-size-message-queue num - Размер uniset-очереди сообщений" << endl; cout << "--uniset-object-size-message-queue num - Размер uniset-очереди сообщений" << endl;
cout << "--prefix-msg-check-time msec - Период опроса uniset-очереди сообщений, для обработки новых сообщений. По умолчанию: 10 мсек" << endl; cout << "--prefix-msg-check-time msec - Период опроса uniset-очереди сообщений, для обработки новых сообщений. По умолчанию: 10 мсек" << endl;
cout << "--prefix-max-messages-processing num - Количество uniset-сообщений обрабатывамых за один раз. По умолчанию 50. По умолчанию: 100" << endl; cout << "--prefix-max-messages-processing num - Количество uniset-сообщений обрабатывамых за один раз. По умолчанию 50. По умолчанию: 100" << endl;
cout << "websockets: " << endl; cout << "websockets: " << endl;
...@@ -233,13 +233,14 @@ void UWebSocketGate::help_print() ...@@ -233,13 +233,14 @@ void UWebSocketGate::help_print()
cout << "--prefix-ws-heartbeat-time msec - Период сердцебиения в соединении. По умолчанию: 3000 мсек" << endl; cout << "--prefix-ws-heartbeat-time msec - Период сердцебиения в соединении. По умолчанию: 3000 мсек" << endl;
cout << "--prefix-ws-send-time msec - Период посылки сообщений. По умолчанию: 500 мсек" << endl; cout << "--prefix-ws-send-time msec - Период посылки сообщений. По умолчанию: 500 мсек" << endl;
cout << "--prefix-ws-max num - Максимальное число сообщений посылаемых за один раз. По умолчанию: 200" << endl; cout << "--prefix-ws-max num - Максимальное число сообщений посылаемых за один раз. По умолчанию: 200" << endl;
cout << "--prefix-ws-cmd num - Максимальное число команд обрабатываемых за один раз. По умолчанию: 100" << endl;
cout << "http: " << endl; cout << "http: " << endl;
cout << "--prefix-httpserver-host ip - IP на котором слушает http сервер. По умолчанию: localhost" << endl; cout << "--prefix-httpserver-host ip - IP на котором слушает http сервер. По умолчанию: localhost" << endl;
cout << "--prefix-httpserver-port num - Порт на котором принимать запросы. По умолчанию: 8080" << endl; cout << "--prefix-httpserver-port num - Порт на котором принимать запросы. По умолчанию: 8080" << endl;
cout << "--prefix-httpserver-max-queued num - Размер очереди запросов к http серверу. По умолчанию: 100" << endl; cout << "--prefix-httpserver-max-queued num - Размер очереди запросов к http серверу. По умолчанию: 100" << endl;
cout << "--prefix-httpserver-max-threads num - Разрешённое количество потоков для http-сервера. По умолчанию: 3" << endl; cout << "--prefix-httpserver-max-threads num - Разрешённое количество потоков для http-сервера. По умолчанию: 3" << endl;
cout << "--prefix-httpserver-cors-allow addr - (CORS): Access-Control-Allow-Origin. Default: *" << endl; cout << "--prefix-httpserver-cors-allow addr - (CORS): Access-Control-Allow-Origin. Default: *" << endl;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::run( bool async ) void UWebSocketGate::run( bool async )
...@@ -558,15 +559,13 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco:: ...@@ -558,15 +559,13 @@ std::shared_ptr<UWebSocketGate::UWebSocket> UWebSocketGate::newWebSocket( Poco::
ws->setHearbeatTime(wsHeartbeatTime_sec); ws->setHearbeatTime(wsHeartbeatTime_sec);
ws->setSendPeriod(wsSendTime_sec); ws->setSendPeriod(wsSendTime_sec);
ws->setMaxSendCount(wsMaxSend); ws->setMaxSendCount(wsMaxSend);
ws->setMaxCmdCount(wsMaxCmd);
ws->mylog = mylog; ws->mylog = mylog;
for( const auto& i : idlist.getList() ) for( const auto& i : idlist.getList() )
{ {
mylog3 << myname << ": ask add " << i << endl; mylog3 << myname << ": ask sid=" << i << endl;
UWebSocket::sinfo si; ws->ask(i);
si.id = i;
si.cmd = "ask";
ws->add(si);
} }
wsocks.emplace_back(ws); wsocks.emplace_back(ws);
...@@ -778,35 +777,29 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents ) ...@@ -778,35 +777,29 @@ void UWebSocketGate::UWebSocket::read( ev::io& io, int revents )
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::add( const sinfo& si ) void UWebSocketGate::UWebSocket::ask( uniset::ObjectId id )
{ {
smap[si.id] = si; sinfo s;
s.id = id;
s.cmd = "ask";
qcmd.push(s);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::del( uniset::ObjectId id ) void UWebSocketGate::UWebSocket::del( uniset::ObjectId id )
{ {
auto s = smap.find(id); sinfo s;
s.id = id;
if( s != smap.end() ) s.cmd = "del";
s->second.cmd = "del"; qcmd.push(s);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::set( uniset::ObjectId id, long value ) void UWebSocketGate::UWebSocket::set( uniset::ObjectId id, long value )
{ {
auto s = smap.find(id); sinfo s;
s.id = id;
if( s != smap.end() ) s.value = value;
{ s.cmd = "set";
s->second.value = value; qcmd.push(s);
s->second.cmd = "set";
return;
}
sinfo si;
si.id = id;
si.value = value;
si.cmd = "set";
smap.emplace(id, si);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm ) void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm )
...@@ -833,9 +826,10 @@ void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm ) ...@@ -833,9 +826,10 @@ void UWebSocketGate::UWebSocket::sensorInfo( const uniset::SensorMessage* sm )
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& ui ) void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& ui )
{ {
for( auto&& io : smap ) for( size_t i = 0; i < maxcmd && !qcmd.empty(); i++ )
{ {
auto& s = io.second; auto s = qcmd.front();
qcmd.pop();
try try
{ {
...@@ -848,9 +842,18 @@ void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& u ...@@ -848,9 +842,18 @@ void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& u
<< endl; << endl;
if( s.cmd == "ask" ) if( s.cmd == "ask" )
{
ui->askSensor(s.id, UniversalIO::UIONotify); ui->askSensor(s.id, UniversalIO::UIONotify);
smap[s.id] = s;
}
else if( s.cmd == "del" ) else if( s.cmd == "del" )
{
ui->askSensor(s.id, UniversalIO::UIODontNotify); ui->askSensor(s.id, UniversalIO::UIODontNotify);
auto it = smap.find(s.id);
if( it != smap.end() )
smap.erase(it);
}
else if( s.cmd == "set" ) else if( s.cmd == "set" )
ui->setValue(s.id, s.value); ui->setValue(s.id, s.value);
...@@ -868,9 +871,18 @@ void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& u ...@@ -868,9 +871,18 @@ void UWebSocketGate::UWebSocket::doCommand( const std::shared_ptr<UInterface>& u
void UWebSocketGate::UWebSocket::sendError( sinfo& si, const std::string& err ) void UWebSocketGate::UWebSocket::sendError( sinfo& si, const std::string& err )
{ {
uniset::SensorMessage sm(si.id, 0); uniset::SensorMessage sm(si.id, 0);
// sm.undefined = true;
si.err = err; si.err = err;
sensorInfo(&sm);
if( jbuf.size() > maxsize )
{
mywarn << req->clientAddress().toString() << " lost messages..." << endl;
return;
}
jbuf.emplace(UWebSocketGate::to_json(&sm, err));
if( ioping.is_active() )
ioping.stop();
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt ) void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
...@@ -902,12 +914,7 @@ void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt ) ...@@ -902,12 +914,7 @@ void UWebSocketGate::UWebSocket::onCommand( const string& cmdtxt )
auto idlist = uniset::explode(params); auto idlist = uniset::explode(params);
for( const auto& id : idlist.getList() ) for( const auto& id : idlist.getList() )
{ ask(id);
sinfo s;
s.id = id;
s.cmd = "ask";
add(s);
}
// уведомление о новой команде // уведомление о новой команде
cmdsignal->send(); cmdsignal->send();
...@@ -1070,6 +1077,12 @@ void UWebSocketGate::UWebSocket::setMaxSendCount( size_t val ) ...@@ -1070,6 +1077,12 @@ void UWebSocketGate::UWebSocket::setMaxSendCount( size_t val )
maxsend = val; maxsend = val;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void UWebSocketGate::UWebSocket::setMaxCmdCount( size_t val )
{
if( val > 0 )
maxcmd = val;
}
// -----------------------------------------------------------------------------
void UWebSocketGate::httpWebSocketPage( std::ostream& ostr, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) void UWebSocketGate::httpWebSocketPage( std::ostream& ostr, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp )
{ {
using Poco::Net::HTTPResponse; using Poco::Net::HTTPResponse;
......
...@@ -227,6 +227,7 @@ namespace uniset ...@@ -227,6 +227,7 @@ namespace uniset
double wsHeartbeatTime_sec = { 3.0 }; double wsHeartbeatTime_sec = { 3.0 };
double wsSendTime_sec = { 0.5 }; double wsSendTime_sec = { 0.5 };
size_t wsMaxSend = { 200 }; size_t wsMaxSend = { 200 };
size_t wsMaxCmd = { 100 };
static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err ); static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err );
...@@ -261,7 +262,7 @@ namespace uniset ...@@ -261,7 +262,7 @@ namespace uniset
long value = { 0 }; // set value long value = { 0 }; // set value
}; };
void add( const sinfo& si ); void ask( uniset::ObjectId id );
void del( uniset::ObjectId id ); void del( uniset::ObjectId id );
void set( uniset::ObjectId id, long value ); void set( uniset::ObjectId id, long value );
void sensorInfo( const uniset::SensorMessage* sm ); void sensorInfo( const uniset::SensorMessage* sm );
...@@ -275,6 +276,7 @@ namespace uniset ...@@ -275,6 +276,7 @@ namespace uniset
void setHearbeatTime( const double& sec ); void setHearbeatTime( const double& sec );
void setSendPeriod( const double& sec ); void setSendPeriod( const double& sec );
void setMaxSendCount( size_t val ); void setMaxSendCount( size_t val );
void setMaxCmdCount( size_t val );
std::shared_ptr<DebugStream> mylog; std::shared_ptr<DebugStream> mylog;
...@@ -287,6 +289,7 @@ namespace uniset ...@@ -287,6 +289,7 @@ namespace uniset
ev::timer iosend; ev::timer iosend;
double send_sec = { 0.5 }; double send_sec = { 0.5 };
size_t maxsend = { 200 }; size_t maxsend = { 200 };
size_t maxcmd = { 100 };
ev::timer ioping; ev::timer ioping;
double ping_sec = { 3.0 }; double ping_sec = { 3.0 };
...@@ -303,6 +306,7 @@ namespace uniset ...@@ -303,6 +306,7 @@ namespace uniset
std::atomic_bool cancelled = { false }; std::atomic_bool cancelled = { false };
std::unordered_map<uniset::ObjectId, sinfo> smap; std::unordered_map<uniset::ObjectId, sinfo> smap;
std::queue<sinfo> qcmd; // очередь команд
Poco::Net::HTTPServerRequest* req; Poco::Net::HTTPServerRequest* req;
Poco::Net::HTTPServerResponse* resp; Poco::Net::HTTPServerResponse* resp;
......
...@@ -57,11 +57,14 @@ TEST_CASE("[UWebSocketGate]: set", "[uwebsocketgate]") ...@@ -57,11 +57,14 @@ TEST_CASE("[UWebSocketGate]: set", "[uwebsocketgate]")
REQUIRE( ui->getValue(2) == 20 ); REQUIRE( ui->getValue(2) == 20 );
REQUIRE( ui->getValue(3) == 30 ); REQUIRE( ui->getValue(3) == 30 );
// char buffer[1024] = {}; cmd = "set:1=11,2=21,3=31";
// int flags; ws.sendFrame(cmd.data(), (int)cmd.size());
// ws.receiveFrame(buffer, sizeof(buffer), flags);
// REQUIRE(flags == WebSocket::FRAME_TEXT); msleep(50);
REQUIRE( ui->getValue(1) == 11 );
REQUIRE( ui->getValue(2) == 21 );
REQUIRE( ui->getValue(3) == 31 );
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
TEST_CASE("[UWebSocketGate]: ask", "[uwebsocketgate]") TEST_CASE("[UWebSocketGate]: ask", "[uwebsocketgate]")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment