Commit c530d43c authored by Pavel Vainerman's avatar Pavel Vainerman

added new function 'pushMessage(string)' to UniSetObject

update uniset2-admin, UInterface, DBServers (MySQL,PGSQL,SQLLite)
parent e37b4e1b
......@@ -52,6 +52,14 @@
/*! Функция посылки сообщения объекту */
void push( in uniset::TransportMessage msg );
/*! Функция посылки текстового сообщения объекту */
void pushMessage( in string msg
, in uniset::Timespec tm
, in uniset::ProducerInfo pi
, in long priority
, in long consumer
);
};
//}; // end of module UniSet
......
......@@ -77,11 +77,25 @@
ObjectId node; /*!< узел, на котором он находится */
};
/*! Информация о "поставщике" события */
struct ProducerInfo
{
ObjectId id; /*!< идентификатор заказчика */
ObjectId node; /*!< узел, на котором он находится */
};
/*! Список идентификаторов */
typedef sequence<ObjectId> IDSeq;
/*! время (posix timespec) */
struct Timespec
{
unsigned long long sec; /* Seconds. */
unsigned long nsec; /* nanoseconds. */
};
};// end of module uniset
......
......@@ -55,6 +55,7 @@ static struct option longopts[] =
{ "verbose", no_argument, 0, 'v' },
{ "quiet", no_argument, 0, 'q' },
{ "csv", required_argument, 0, 'z' },
{ "sendText", required_argument, 0, 'm' },
{ NULL, 0, 0, 0 }
};
......@@ -77,6 +78,7 @@ int getState( const string& args, UInterface& ui );
int getCalibrate( const string& args, UInterface& ui );
int oinfo(const string& args, UInterface& ui , const string& userparam );
int apiRequest( const string& args, UInterface& ui, const string& query );
void sendText( const string& args, UInterface& ui, const string& txt );
// --------------------------------------------------------------------------
static void print_help(int width, const string& cmd, const string& help, const string& tab = " " )
{
......@@ -123,6 +125,7 @@ static void usage()
print_help(36, "-v|--verbose", "Подробный вывод логов.\n");
print_help(36, "-q|--quiet", "Выводит только результат.\n");
print_help(36, "-z|--csv", "Вывести результат (getValue) в виде val1,val2,val3...\n");
print_help(36, "-m|--sendText id1@node1,id2@node2,id3,.. text", "Послать объектам текстовое сообщение text\n");
cout << endl;
}
......@@ -148,7 +151,7 @@ int main(int argc, char** argv)
while(1)
{
opt = getopt_long(argc, argv, "hc:beosfur:l:i::x:g:w:y:p:vqz:a:", longopts, &optindex);
opt = getopt_long(argc, argv, "hc:beosfur:l:i::x:g:w:y:p:vqz:a:m:", longopts, &optindex);
if( opt == -1 )
break;
......@@ -300,7 +303,6 @@ int main(int argc, char** argv)
commandToAll(conf->getControllersSection(), rep, (Command)cmd);
commandToAll(conf->getObjectsSection(), rep, (Command)cmd);
}
return 0;
case 'r': //--configure
......@@ -329,7 +331,6 @@ int main(int argc, char** argv)
if( verb )
cout << "(finish): done" << endl;
}
return 0;
case 'l': //--logrotate
......@@ -367,9 +368,29 @@ int main(int argc, char** argv)
commandToAll(conf->getControllersSection(), rep, (Command)cmd);
commandToAll(conf->getObjectsSection(), rep, (Command)cmd);
// cout<<"(foldUp): done"<<endl;
return 0;
}
return 0;
case 'm': //--sendText
{
// смотрим второй параметр
if( checkArg(optind, argc, argv) == 0 )
{
if( !quiet )
cerr << "admin(sendText): Unknown 'text'. Use: id,name,name2@nodeX text" << endl;
return 1;
}
auto conf = uniset_init(argc, argv, conffile);
UInterface ui(conf);
ui.initBackId(uniset::AdminID);
std::string txt = string(argv[optind]);
sendText(optarg, ui, txt);
return 0;
}
break;
case '?':
default:
......@@ -1090,6 +1111,34 @@ int apiRequest( const string& args, UInterface& ui, const string& query )
}
// --------------------------------------------------------------------------------------
void sendText( const string& args, UInterface& ui, const string& txt )
{
auto conf = uniset_conf();
auto sl = uniset::getObjectsList( args, conf );
for( auto && it : sl )
{
if( it.node == DefaultObjectId )
it.node = conf->getLocalNode();
try
{
ui.sendText(it.id, txt, it.node);
}
catch( const std::exception& ex )
{
if( !quiet )
cerr << "std::exception: " << ex.what() << endl;
}
catch(...)
{
if( !quiet )
cerr << "Unknown exception.." << endl;
}
}
}
// --------------------------------------------------------------------------------------
void errDoNotResolve( const std::string& oname )
{
if( verb )
......
......@@ -18,6 +18,7 @@ ln -s -f admin.sh getCalibrate
ln -s -f admin.sh help
ln -s -f admin.sh oinfo
ln -s -f admin.sh apiRequest
ln -s -f admin.sh sendText
ln -s -f ../../Utilities/scripts/uniset2-start.sh
ln -s -f ../../Utilities/scripts/uniset2-stop.sh stop.sh
......
......@@ -66,8 +66,8 @@ int main( int argc, char** argv )
if( t != UniversalIO::AI && t != UniversalIO::AO )
{
cerr << endl << "Неверный типа датчика '" << t << "' для id='" << it.fname << "'. Тип должен быть AI или AO." << endl << endl;
return 1;
cerr << endl << "WARNING! Неверный типа датчика '" << t << "' для id='" << it.fname << "'. Тип должен быть AI или AO." << endl << endl;
// return 1;
}
if( it.si.node == DefaultObjectId )
......
......@@ -467,6 +467,12 @@ void <xsl:value-of select="$CLASSNAME"/>_SK::processingMessage( const uniset::Vo
break;
default:
<xsl:choose>
<xsl:when test="normalize-space($BASECLASS)='UniSetObject'"> UniSetObject::processingMessage(_msg);</xsl:when>
<xsl:when test="normalize-space($BASECLASS)='UniSetManager'"> UniSetManager::processingMessage(_msg);</xsl:when>
<xsl:when test="normalize-space($BASECLASS)!=''"> <xsl:value-of select="normalize-space($BASECLASS)"/>::processingMessage(_msg);</xsl:when>
<xsl:when test="normalize-space($BASECLASS)=''"> UniSetObject::processingMessage(_msg);</xsl:when>
</xsl:choose>
break;
}
}
......
......@@ -3,10 +3,10 @@
# See doc: http://www.gnu.org/software/hello/manual/autoconf/Generic-Programs.html
# AC_PREREQ(2.59)
AC_INIT([uniset2], [2.7.0], pv@etersoft.ru)
AC_INIT([uniset2], [2.8.0], pv@etersoft.ru)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME,AC_PACKAGE_VERSION)
LIBVER=2:7:0
LIBVER=2:8:0
AC_SUBST(LIBVER)
# AC_CONFIG_MACRO_DIR([m4])
......
......@@ -121,6 +121,47 @@ void DBServer_MySQL::confirmInfo( const uniset::ConfirmMessage* cem )
}
}
//--------------------------------------------------------------------------------------------
void DBServer_MySQL::onTextMessage( const TextMessage* msg )
{
try
{
// если время не было выставлено (указываем время сохранения в БД)
if( !msg->tm.tv_sec )
{
// Выдаём CRIT, но тем не менее сохраняем в БД
dbcrit << myname << "(insert_main_messages): UNKNOWN TIMESTAMP! (tm.tv_sec=0)"
<< " for msg='" << msg->txt << "'"
<< " supplier=" << uniset_conf()->oind->getMapName(msg->supplier)
<< endl;
}
ostringstream data;
data << "INSERT INTO " << tblName(msg->type)
<< "(date, time, time_usec, text, node) VALUES( '"
<< dateToString(msg->tm.tv_sec, "-") << "','" // date
<< timeToString(msg->tm.tv_sec, ":") << "','" // time
<< msg->tm.tv_nsec << "','" // time_usec
<< msg->txt << "','" // text
<< msg->node << "')"; // node
dbinfo << myname << "(insert_main_messages): " << data.str() << endl;
if( !writeToBase(data.str()) )
{
dbcrit << myname << "(insert_main_messages): error: " << db->error() << endl;
}
}
catch( const uniset::Exception& ex )
{
dbcrit << myname << "(insert_main_messages): " << ex << endl;
}
catch( const std::exception& ex )
{
dbcrit << myname << "(insert_main_messages): catch: " << ex.what() << endl;
}
}
//--------------------------------------------------------------------------------------------
bool DBServer_MySQL::writeToBase( const std::string& query )
{
dbinfo << myname << "(writeToBase): " << query << endl;
......@@ -274,6 +315,7 @@ void DBServer_MySQL::initDBServer()
tblMap[uniset::Message::SensorInfo] = "main_history";
tblMap[uniset::Message::Confirm] = "main_history";
tblMap[uniset::Message::TextMessage] = "main_messages";
PingTime = conf->getPIntProp(node, "pingTime", PingTime);
ReconnectTime = conf->getPIntProp(node, "reconnectTime", ReconnectTime);
......
......@@ -129,6 +129,19 @@ namespace uniset
CONSTRAINT `sensor_id_refs_id_436bab5e` FOREIGN KEY (`sensor_id`) REFERENCES `main_sensor` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `main_messages`;
CREATE TABLE `main_messages` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`date` date NOT NULL,
`time` time NOT NULL,
`time_usec` int(10) unsigned NOT NULL,
`text` text NOT NULL,
`node` int(10) unsigned NOT NULL,
`confirm` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `main_messages_key` (date,time,node)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
\endcode
\warning Временно, для обратной совместимости поле 'time_usec' в таблицах оставлено с таким названием,
......@@ -168,6 +181,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) override;
virtual void onTextMessage( const uniset::TextMessage* msg ) override;
virtual std::string getMonitInfo( const std::string& params ) override;
bool writeToBase( const std::string& query );
......
......@@ -122,6 +122,36 @@ void DBServer_PostgreSQL::confirmInfo( const uniset::ConfirmMessage* cem )
}
}
//--------------------------------------------------------------------------------------------
void DBServer_PostgreSQL::onTextMessage( const TextMessage* msg )
{
try
{
ostringstream data;
data << "INSERT INTO " << tblName(msg->type)
<< "(date, time, time_usec, text, node) VALUES( '"
<< dateToString(msg->tm.tv_sec, "-") << "','" // date
<< timeToString(msg->tm.tv_sec, ":") << "','" // time
<< msg->tm.tv_nsec << "','" // time_usec
<< msg->txt << "','" // text
<< msg->node << "')"; // node
dbinfo << myname << "(insert_main_messages): " << data.str() << endl;
if( !writeToBase(data.str()) )
{
dbcrit << myname << "(insert_main_messages): error: " << db->error() << endl;
}
}
catch( const uniset::Exception& ex )
{
dbcrit << myname << "(insert_main_messages): " << ex << endl;
}
catch( ... )
{
dbcrit << myname << "(insert_main_messages): catch..." << endl;
}
}
//--------------------------------------------------------------------------------------------
bool DBServer_PostgreSQL::writeToBase( const string& query )
{
dbinfo << myname << "(writeToBase): " << query << endl;
......@@ -330,6 +360,7 @@ void DBServer_PostgreSQL::initDBServer()
tblMap[uniset::Message::SensorInfo] = "main_history";
tblMap[uniset::Message::Confirm] = "main_history";
tblMap[uniset::Message::TextMessage] = "main_messages";
PingTime = conf->getArgPInt("--" + prefix + "-pingTime", it.getProp("pingTime"), PingTime);
ReconnectTime = conf->getArgPInt("--" + prefix + "-reconnectTime", it.getProp("reconnectTime"), ReconnectTime);
......
......@@ -87,6 +87,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) override;
virtual void onTextMessage( const uniset::TextMessage* msg ) override;
virtual bool deactivateObject() override;
virtual std::string getMonitInfo( const std::string& params ) override;
......
......@@ -3,4 +3,5 @@
ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-pgsql-dbserver --confile test.xml --name DBServer1 \
--pgsql-dbserver-buffer-size 100 $*
--pgsql-dbserver-buffer-size 100 \
--pgsql-log-add-levels any $*
......@@ -71,14 +71,16 @@ void DBServer_SQLite::sysCommand( const uniset::SystemMessage* sm )
case SystemMessage::Finish:
{
activate = false;
db->close();
if(db)
db->close();
}
break;
case SystemMessage::FoldUp:
{
activate = false;
db->close();
if(db)
db->close();
}
break;
......@@ -117,6 +119,36 @@ void DBServer_SQLite::confirmInfo( const uniset::ConfirmMessage* cem )
}
}
//--------------------------------------------------------------------------------------------
void DBServer_SQLite::onTextMessage( const TextMessage* msg )
{
try
{
ostringstream data;
data << "INSERT INTO " << tblName(msg->type)
<< "(date, time, time_usec, text, node) VALUES( '"
<< dateToString(msg->tm.tv_sec, "-") << "','" // date
<< timeToString(msg->tm.tv_sec, ":") << "','" // time
<< msg->tm.tv_nsec << "','" // time_usec
<< msg->txt << "','" // text
<< msg->node << "')"; // node
dbinfo << myname << "(insert_main_messages): " << data.str() << endl;
if( !writeToBase(data.str()) )
{
dbcrit << myname << "(insert_main_messages): error: " << db->error() << endl;
}
}
catch( const uniset::Exception& ex )
{
dbcrit << myname << "(insert_main_messages): " << ex << endl;
}
catch( ... )
{
dbcrit << myname << "(insert_main_messages): catch..." << endl;
}
}
//--------------------------------------------------------------------------------------------
bool DBServer_SQLite::writeToBase( const string& query )
{
dbinfo << myname << "(writeToBase): " << query << endl;
......@@ -159,6 +191,9 @@ void DBServer_SQLite::flushBuffer()
{
uniset_rwmutex_wrlock l(mqbuf);
if( !db || !connect_ok )
return;
// Сперва пробуем очистить всё что накопилось в очереди до этого...
while( !qbuf.empty() )
{
......@@ -253,6 +288,7 @@ void DBServer_SQLite::initDBServer()
tblMap[uniset::Message::SensorInfo] = "main_history";
tblMap[uniset::Message::Confirm] = "main_history";
tblMap[uniset::Message::TextMessage] = "main_messages";
PingTime = conf->getPIntProp(node, "pingTime", PingTime);
ReconnectTime = conf->getPIntProp(node, "reconnectTime", ReconnectTime);
......
......@@ -129,6 +129,19 @@ namespace uniset
CONSTRAINT `sensor_id_refs_id_436bab5e` FOREIGN KEY (`sensor_id`) REFERENCES `main_sensor` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `main_messages`;
CREATE TABLE `main_messages` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`date` date NOT NULL,
`time` time NOT NULL,
`time_usec` int(10) unsigned NOT NULL,
`text` text NOT NULL,
`node` int(10) unsigned NOT NULL,
`confirm` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `main_messages_key` (date,time,node)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
\endcode
\warning Временно, для обратной совместимости поле 'time_usec' в таблицах оставлено с таким названием,
......@@ -168,6 +181,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void confirmInfo( const uniset::ConfirmMessage* cmsg ) override;
virtual void onTextMessage( const uniset::TextMessage* msg ) override;
virtual std::string getMonitInfo( const std::string& params ) override;
bool writeToBase( const std::string& query );
......
......@@ -3,6 +3,7 @@
ulimit -Sc 1000000
uniset2-start.sh -f ./uniset2-sqlite-dbserver --confile test.xml --name DBServer1 \
--sqlite-log-add-levels any \
--ulog-add-levels info,crit,warn,level9,system \
--dbserver-buffer-size 100
--sqlite-buffer-size 100
......@@ -56,6 +56,11 @@ void TestObject::sensorInfo( const SensorMessage* sm )
}
}
// -----------------------------------------------------------------------------
void TestObject::onTextMessage( const TextMessage* msg )
{
lastText = msg->txt;
}
// -----------------------------------------------------------------------------
void TestObject::stopHeartbeat()
{
maxHeartBeat = 0;
......@@ -102,4 +107,9 @@ bool TestObject::isFullQueue()
{
return (getCountOfLostMessages() > 0);
}
string TestObject::getLastTextMessage() const
{
return lastText;
}
// -----------------------------------------------------------------------------
......@@ -36,12 +36,14 @@ class TestObject:
bool isEmptyQueue();
bool isFullQueue();
std::string getLastTextMessage() const;
protected:
TestObject();
virtual void sysCommand( const uniset::SystemMessage* sm ) override;
virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
virtual void onTextMessage( const uniset::TextMessage* msg ) override;
private:
bool evntIsOK = { false };
......@@ -49,6 +51,7 @@ class TestObject:
bool monotonicFailed = { false };
long lostMessages = { false };
long lastValue = { 0 };
std::string lastText = { "" };
};
// -----------------------------------------------------------------------------
#endif // _TestObject_H_
......
......@@ -311,3 +311,14 @@ TEST_CASE("[SM]: monitonic sensor message", "[sm][monitonic]")
// cerr << std::string(si->info) << endl;
}
// -----------------------------------------------------------------------------
TEST_CASE("[SM]: sendText", "[sm][sendText]")
{
InitTest();
std::string txt = "Hello world";
ui->sendText(obj->getId(), txt);
msleep(300);
REQUIRE( obj->getLastTextMessage() == txt );
}
......@@ -8,7 +8,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2018-12-25+03:00
// generate timestamp: 2019-02-02+03:00
// -----------------------------------------------------------------------------
#ifndef UObject_SK_H_
#define UObject_SK_H_
......
......@@ -11,7 +11,7 @@
ВСЕ ВАШИ ИЗМЕНЕНИЯ БУДУТ ПОТЕРЯНЫ.
*/
// --------------------------------------------------------------------------
// generate timestamp: 2018-12-25+03:00
// generate timestamp: 2019-02-02+03:00
// -----------------------------------------------------------------------------
#include <memory>
#include <iomanip>
......@@ -407,6 +407,7 @@ void UObject_SK::processingMessage( const uniset::VoidMessage* _msg )
break;
default:
UniSetObject::processingMessage(_msg);
break;
}
}
......
......@@ -97,6 +97,13 @@ TEST_CASE("UInterface", "[UInterface]")
REQUIRE_NOTHROW( ui.send(sid, tm) );
}
SECTION( "sendText" )
{
TransportMessage tm( SensorMessage(sid, 10).transport_msg() );
REQUIRE_NOTHROW( ui.send(sid, tm) );
}
SECTION( "wait..exist.." )
{
CHECK( ui.waitReady(sid, 200, 50) );
......@@ -227,3 +234,4 @@ TEST_CASE("UInterface", "[UInterface]")
CHECK( ci.precision == ci2.precision );
}
}
// -----------------------------------------------------------------------------
......@@ -24,6 +24,7 @@
// --------------------------------------------------------------------------
#include <time.h> // for timespec
#include <cstring>
#include <memory>
#include <string>
#include <ostream>
#include "UniSetTypes.h"
......@@ -41,6 +42,7 @@ namespace uniset
SysCommand, // Сообщение содержит системную команду
Confirm, // Сообщение содержит подтверждение
Timer, // Сообщения о срабатывании таймера
TextMessage, // текстовое сообщение
TheLastFieldOfTypeOfMessage // Обязательно оставьте последним
};
......@@ -271,6 +273,30 @@ namespace uniset
ConfirmMessage() noexcept;
};
// ------------------------------------------------------------------------
/*! текстовое сообщение */
class TextMessage : public VoidMessage
{
public:
TextMessage( TextMessage&& ) noexcept = default;
TextMessage& operator=(TextMessage&& ) noexcept = default;
TextMessage( const TextMessage& ) noexcept = default;
TextMessage& operator=( const TextMessage& ) noexcept = default;
TextMessage( const VoidMessage* msg ) noexcept;
TextMessage();
TextMessage( const char* msg,
const ::uniset::Timespec& tm,
const ::uniset::ProducerInfo& pi,
Priority prior = Message::Medium,
ObjectId cons = uniset::DefaultObjectId );
std::shared_ptr<VoidMessage> toLocalVoidMessage() const;
std::string txt;
};
}
// --------------------------------------------------------------------------
#endif // MessageType_H_
......@@ -233,6 +233,8 @@ namespace uniset
/*! посылка сообщения msg объекту name на узел node */
void send( const uniset::ObjectId name, const uniset::TransportMessage& msg, uniset::ObjectId node );
void send( const uniset::ObjectId name, const uniset::TransportMessage& msg);
void sendText(const uniset::ObjectId name, const std::string& text, const uniset::ObjectId node = uniset::DefaultObjectId );
void sendText(const uniset::ObjectId name, const uniset::TextMessage& msg, const uniset::ObjectId node = uniset::DefaultObjectId );
// ---------------------------------------------------------------
// Вспомогательный класс для кэширования ссылок на удалённые объекты
......
......@@ -107,6 +107,13 @@ namespace uniset
//! поместить сообщение в очередь
virtual void push( const uniset::TransportMessage& msg ) override;
//! поместить текстовое сообщение в очередь
virtual void pushMessage( const char* msg,
const ::uniset::Timespec& tm,
const ::uniset::ProducerInfo& pi,
::CORBA::Long priority,
::CORBA::Long consumer ) override;
#ifndef DISABLE_REST_API
// HTTP API
virtual Poco::JSON::Object::Ptr httpGet( const Poco::URI::QueryParameters& p ) override;
......@@ -136,6 +143,7 @@ namespace uniset
virtual void sysCommand( const uniset::SystemMessage* sm ) {}
virtual void sensorInfo( const uniset::SensorMessage* sm ) {}
virtual void timerInfo( const uniset::TimerMessage* tm ) {}
virtual void onTextMessage( const uniset::TextMessage* tm ) {}
/*! Получить сообщение */
VoidMessagePtr receiveMessage();
......
......@@ -175,6 +175,9 @@ namespace uniset
struct timespec to_timespec( const std::chrono::system_clock::duration& d ); /*!< конвертирование std::chrono в posix timespec */
struct timespec now_to_timespec(); /*!< получение текущего времени */
uniset::Timespec_var to_uniset_timespec( const std::chrono::system_clock::duration& d );
uniset::Timespec_var now_to_uniset_timespec(); /*!< получение текущего времени */
inline bool operator==( const struct timespec& r1, const struct timespec& r2 )
{
return ( r1.tv_sec == r2.tv_sec && r1.tv_nsec == r2.tv_nsec );
......
......@@ -45,6 +45,9 @@ namespace uniset
if( type == Message::Timer )
return "Timer";
if( type == Message::TextMessage )
return "TextMessage";
if( type == Message::Unused )
return "Unused";
......@@ -66,7 +69,6 @@ namespace uniset
}
//--------------------------------------------------------------------------------------------
VoidMessage::VoidMessage( const TransportMessage& tm ) noexcept:
Message(1) // вызываем dummy-конструктор, который не инициализирует данные (оптимизация)
{
......@@ -77,6 +79,7 @@ namespace uniset
VoidMessage::VoidMessage() noexcept
{
assert(sizeof(VoidMessage) >= sizeof(uniset::RawDataOfTransportMessage));
}
......@@ -223,7 +226,59 @@ namespace uniset
type = Message::Confirm;
priority = in_priority;
}
//--------------------------------------------------------------------------------------------
TextMessage::TextMessage( const VoidMessage* vmsg ) noexcept
: VoidMessage(1) // dummy constructor
{
assert(vmsg->type == Message::TextMessage);
auto m = static_cast<const TextMessage*>(vmsg);
if( m )
{
type = m->type;
priority = m->priority;
node = m->node;
tm = m->tm;
consumer = m->consumer;
supplier = m->supplier;
txt = m->txt;
}
}
//--------------------------------------------------------------------------------------------
TextMessage::TextMessage()
{
type = Message::TextMessage;
}
TextMessage::TextMessage( const char* msg,
const uniset::Timespec& tm,
const ::uniset::ProducerInfo& pi,
Priority prior,
ObjectId cons)
{
type = Message::TextMessage;
this->node = pi.node;
this->supplier = pi.id;
this->priority = prior;
this->consumer = cons;
this->tm.tv_sec = tm.sec;
this->tm.tv_nsec = tm.nsec;
txt = std::string(msg);
}
//--------------------------------------------------------------------------------------------
std::shared_ptr<VoidMessage> TextMessage::toLocalVoidMessage() const
{
uniset::ProducerInfo pi;
pi.id = supplier;
pi.node = node;
uniset::Timespec ts;
ts.sec = tm.tv_sec;
ts.nsec = tm.tv_nsec;
auto tmsg = std::make_shared<TextMessage>(txt.c_str(), ts, pi, priority, consumer);
return std::static_pointer_cast<VoidMessage>(tmsg);
}
//--------------------------------------------------------------------------------------------
} // end of namespace uniset
//--------------------------------------------------------------------------------------------
......@@ -1019,6 +1019,152 @@ namespace uniset
{
send(name, msg, uconf->getLocalNode());
}
// ------------------------------------------------------------------------------------------------------------
void UInterface::sendText( const ObjectId name, const std::string& txt, const ObjectId node )
{
if ( name == uniset::DefaultObjectId )
throw uniset::ORepFailed("UI(sendText): ERROR: id=uniset::DefaultObjectId");
uniset::ObjectId onode = (node == uniset::DefaultObjectId) ? uconf->getLocalNode() : node;
uniset::Timespec_var ts = uniset::now_to_uniset_timespec();
uniset::ProducerInfo_var pi;
pi->id = myid;
pi->node = uconf->getLocalNode();
try
{
CORBA::Object_var oref;
try
{
oref = rcache.resolve(name, onode);
}
catch( const uniset::NameNotFound& ) {}
for (size_t i = 0; i < uconf->getRepeatCount(); i++)
{
try
{
if( CORBA::is_nil(oref) )
oref = resolve( name, onode );
UniSetObject_i_var obj = UniSetObject_i::_narrow(oref);
obj->pushMessage(txt.c_str(), ts, pi, Message::Medium, uniset::DefaultObjectId);
return;
}
catch( const CORBA::TRANSIENT& ) {}
catch( const CORBA::OBJECT_NOT_EXIST& ) {}
catch( const CORBA::SystemException& ) {}
msleep(uconf->getRepeatTimeout());
oref = CORBA::Object::_nil();
}
}
catch( const uniset::ORepFailed )
{
rcache.erase(name, onode);
throw uniset::IOBadParam(set_err("UI(sendText): resolve failed ", name, onode));
}
catch( const CORBA::NO_IMPLEMENT )
{
rcache.erase(name, onode);
throw uniset::IOBadParam(set_err("UI(sendText): method no implement", name, onode));
}
catch( const CORBA::OBJECT_NOT_EXIST )
{
rcache.erase(name, onode);
throw uniset::IOBadParam(set_err("UI(sendText): object not exist", name, onode));
}
catch( const CORBA::COMM_FAILURE& )
{
// ошибка системы коммуникации
// uwarn << "UI(sendText): ошибка системы коммуникации" << endl;
}
catch( const CORBA::SystemException& )
{
// ошибка системы коммуникации
// uwarn << "UI(sendText): CORBA::SystemException" << endl;
}
rcache.erase(name, onode);
throw uniset::TimeOut(set_err("UI(sendText): Timeout", name, onode));
}
// ------------------------------------------------------------------------------------------------------------
void UInterface::sendText( const ObjectId name, const TextMessage& msg, const ObjectId node )
{
if ( name == uniset::DefaultObjectId )
throw uniset::ORepFailed("UI(sendText): ERROR: id=uniset::DefaultObjectId");
uniset::ObjectId onode = (node == uniset::DefaultObjectId) ? uconf->getLocalNode() : node;
uniset::Timespec_var ts;
ts->sec = msg.tm.tv_sec;
ts->nsec = msg.tm.tv_nsec;
uniset::ProducerInfo_var pi;
pi->id = msg.supplier;
pi->node = msg.node;
try
{
CORBA::Object_var oref;
try
{
oref = rcache.resolve(name, onode);
}
catch( const uniset::NameNotFound& ) {}
for (size_t i = 0; i < uconf->getRepeatCount(); i++)
{
try
{
if( CORBA::is_nil(oref) )
oref = resolve( name, onode );
UniSetObject_i_var obj = UniSetObject_i::_narrow(oref);
obj->pushMessage(msg.txt.c_str(),ts, pi, msg.priority, msg.consumer);
return;
}
catch( const CORBA::TRANSIENT& ) {}
catch( const CORBA::OBJECT_NOT_EXIST& ) {}
catch( const CORBA::SystemException& ) {}
msleep(uconf->getRepeatTimeout());
oref = CORBA::Object::_nil();
}
}
catch( const uniset::ORepFailed )
{
rcache.erase(name, node);
throw uniset::IOBadParam(set_err("UI(sendText): resolve failed ", name, node));
}
catch( const CORBA::NO_IMPLEMENT )
{
rcache.erase(name, node);
throw uniset::IOBadParam(set_err("UI(sendText): method no implement", name, node));
}
catch( const CORBA::OBJECT_NOT_EXIST )
{
rcache.erase(name, node);
throw uniset::IOBadParam(set_err("UI(sendText): object not exist", name, node));
}
catch( const CORBA::COMM_FAILURE& )
{
// ошибка системы коммуникации
// uwarn << "UI(sendText): ошибка системы коммуникации" << endl;
}
catch( const CORBA::SystemException& )
{
// ошибка системы коммуникации
// uwarn << "UI(sendText): CORBA::SystemException" << endl;
}
rcache.erase(name, node);
throw uniset::TimeOut(set_err("UI(sendText): Timeout", name, node));
}
// ------------------------------------------------------------------------------------------------------------
IOController_i::ShortIOInfo UInterface::getTimeChange( const uniset::ObjectId id, const uniset::ObjectId node ) const
......
......@@ -400,6 +400,27 @@ namespace uniset
termWaiting();
}
// ------------------------------------------------------------------------------------------
void UniSetObject::pushMessage( const char* msg,
const ::uniset::Timespec& tm,
const ::uniset::ProducerInfo& pi,
::CORBA::Long priority,
::CORBA::Long consumer )
{
uniset::TextMessage tmsg(msg, tm, pi, (uniset::Message::Priority)priority, consumer);
auto vm = tmsg.toLocalVoidMessage();
if( vm->priority == Message::Medium )
mqueueMedium.push(vm);
else if( vm->priority == Message::High )
mqueueHi.push(vm);
else if( vm->priority == Message::Low )
mqueueLow.push(vm);
else // на всякий по умолчанию medium
mqueueMedium.push(vm);
termWaiting();
}
// ------------------------------------------------------------------------------------------
#ifndef DISABLE_REST_API
Poco::JSON::Object::Ptr UniSetObject::httpGet( const Poco::URI::QueryParameters& p )
{
......@@ -845,6 +866,13 @@ namespace uniset
sysCommand( reinterpret_cast<const SystemMessage*>(msg) );
break;
case Message::TextMessage:
{
TextMessage tm(msg);
onTextMessage( &tm );
break;
}
default:
break;
}
......
......@@ -664,6 +664,28 @@ timespec uniset::now_to_timespec()
return to_timespec(d);
}
// -------------------------------------------------------------------------
uniset::Timespec_var uniset::now_to_uniset_timespec()
{
auto d = std::chrono::system_clock::now().time_since_epoch();
return to_uniset_timespec(d);
}
// -------------------------------------------------------------------------
uniset::Timespec_var uniset::to_uniset_timespec( const chrono::system_clock::duration& d )
{
uniset::Timespec_var ts;
if( d.count() == 0 )
ts->sec = ts->nsec = 0;
else
{
std::chrono::seconds const sec = std::chrono::duration_cast<std::chrono::seconds>(d);
ts->sec = sec.count();
ts->nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(d - sec).count();
}
return ts;
}
// -------------------------------------------------------------------------
char uniset::checkBadSymbols( const string& str )
{
for ( const auto& c : str )
......@@ -704,4 +726,3 @@ uniset::KeyType uniset::key( const IOController_i::SensorInfo& si )
return key(si.id, si.node);
}
// ---------------------------------------------------------------------------------------------------------------
......@@ -230,3 +230,55 @@ TEST_CASE("ConfirmMessage", "[basic][message types][ConfirmMessage]" )
}
}
// ---------------------------------------------------------------
TEST_CASE("TextMessage", "[basic][message types][TextMessage]" )
{
CHECK( uniset_conf() != nullptr );
auto conf = uniset_conf();
SECTION("Default consturctor")
{
TextMessage tm;
CHECK( tm.type == Message::TextMessage );
CHECK( tm.priority == Message::Medium );
CHECK( tm.node == conf->getLocalNode() );
CHECK( tm.supplier == DefaultObjectId );
CHECK( tm.consumer == DefaultObjectId );
CHECK( tm.txt == "" );
}
SECTION("TextMessage from network")
{
std::string txt="Hello world";
::uniset::Timespec tspec;
tspec.sec = 10;
tspec.nsec = 100;
::uniset::ProducerInfo pi;
pi.id = 30;
pi.node = conf->getLocalNode();
ObjectId consumer = 40;
TextMessage tm(txt.c_str(), tspec, pi, uniset::Message::High, consumer );
REQUIRE( tm.consumer == consumer );
REQUIRE( tm.node == pi.node );
REQUIRE( tm.supplier == pi.id );
REQUIRE( tm.txt == txt );
REQUIRE( tm.tm.tv_sec == tspec.sec );
REQUIRE( tm.tm.tv_nsec == tspec.nsec );
auto vm = tm.toLocalVoidMessage();
REQUIRE( vm->type == Message::TextMessage );
TextMessage tm2(vm.get());
REQUIRE( tm.consumer == consumer );
REQUIRE( tm.node == pi.node );
REQUIRE( tm.supplier == pi.id );
REQUIRE( tm.txt == txt );
REQUIRE( tm.tm.tv_sec == tspec.sec );
REQUIRE( tm.tm.tv_nsec == tspec.nsec );
}
}
// ---------------------------------------------------------------
......@@ -44,6 +44,8 @@ TEST_CASE("UInterface", "[UInterface]")
REQUIRE_THROWS_AS( ui.getTimeChange(sid, -20), uniset::Exception );
REQUIRE_THROWS_AS( ui.getTimeChange(sid, DefaultObjectId), uniset::Exception );
REQUIRE_THROWS_AS( ui.getTimeChange(sid, conf->getLocalNode()), uniset::Exception );
REQUIRE_THROWS_AS( ui.sendText(testOID, "hello"), uniset::Exception );
REQUIRE_THROWS_AS( ui.sendText(testOID, "hello", -20), uniset::Exception );
CHECK_FALSE( ui.isExist(sid) );
CHECK_FALSE( ui.isExist(sid, DefaultObjectId) );
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment