Commit 101761d4 authored by Pavel Vainerman's avatar Pavel Vainerman

make style

parent 070f1f5b
...@@ -41,7 +41,7 @@ class MySQLInterface: ...@@ -41,7 +41,7 @@ class MySQLInterface:
// DBResult listFields( const std::string& table, const std::string& wild ); // DBResult listFields( const std::string& table, const std::string& wild );
virtual bool nconnect( const std::string& host, const std::string& user, const std::string& pswd, virtual bool nconnect( const std::string& host, const std::string& user, const std::string& pswd,
const std::string& dbname, unsigned int port=0 ) override; const std::string& dbname, unsigned int port = 0 ) override;
virtual bool close() override; virtual bool close() override;
bool query_ok( const std::string& q ); bool query_ok( const std::string& q );
......
...@@ -81,7 +81,7 @@ void DBServer_PostgreSQL::sysCommand( const UniSetTypes::SystemMessage* sm ) ...@@ -81,7 +81,7 @@ void DBServer_PostgreSQL::sysCommand( const UniSetTypes::SystemMessage* sm )
switch( sm->command ) switch( sm->command )
{ {
case SystemMessage::StartUp: case SystemMessage::StartUp:
askTimer(FlushInsertBuffer,ibufSyncTimeout); askTimer(FlushInsertBuffer, ibufSyncTimeout);
break; break;
case SystemMessage::Finish: case SystemMessage::Finish:
...@@ -219,8 +219,9 @@ void DBServer_PostgreSQL::flushInsertBuffer() ...@@ -219,8 +219,9 @@ void DBServer_PostgreSQL::flushInsertBuffer()
std::advance(end, delnum); std::advance(end, delnum);
} }
ibuf.erase(beg,end); ibuf.erase(beg, end);
ibufSize -= delnum; ibufSize -= delnum;
if( ibufSize < 0 ) if( ibufSize < 0 )
ibufSize = 0; ibufSize = 0;
...@@ -233,7 +234,7 @@ void DBServer_PostgreSQL::flushInsertBuffer() ...@@ -233,7 +234,7 @@ void DBServer_PostgreSQL::flushInsertBuffer()
dbinfo << myname << "(flushInsertBuffer): write insert buffer[" << ibufSize << "] to DB.." << endl; dbinfo << myname << "(flushInsertBuffer): write insert buffer[" << ibufSize << "] to DB.." << endl;
if( !db->copy("main_history",tblcols,ibuf) ) if( !db->copy("main_history", tblcols, ibuf) )
{ {
dbcrit << myname << "(flushInsertBuffer): error: " << db->error() << endl; dbcrit << myname << "(flushInsertBuffer): error: " << db->error() << endl;
} }
...@@ -322,20 +323,20 @@ void DBServer_PostgreSQL::initDBServer() ...@@ -322,20 +323,20 @@ void DBServer_PostgreSQL::initDBServer()
string dbnode( conf->getArgParam("--" + prefix + "-dbnode", it.getProp("dbnode"))); string dbnode( conf->getArgParam("--" + prefix + "-dbnode", it.getProp("dbnode")));
string dbuser( conf->getArgParam("--" + prefix + "-dbuser", it.getProp("dbuser"))); string dbuser( conf->getArgParam("--" + prefix + "-dbuser", it.getProp("dbuser")));
string dbpass( conf->getArgParam("--" + prefix + "-dbpass", it.getProp("dbpass"))); string dbpass( conf->getArgParam("--" + prefix + "-dbpass", it.getProp("dbpass")));
unsigned int dbport = conf->getArgPInt("--" + prefix + "-dbport", it.getProp("dbport"),5432); unsigned int dbport = conf->getArgPInt("--" + prefix + "-dbport", it.getProp("dbport"), 5432);
ibufMaxSize = conf->getArgPInt("--" + prefix + "-ibuf-maxsize", it.getProp("ibufMaxSize"),2000); ibufMaxSize = conf->getArgPInt("--" + prefix + "-ibuf-maxsize", it.getProp("ibufMaxSize"), 2000);
ibuf.reserve(ibufMaxSize); ibuf.reserve(ibufMaxSize);
ibufSyncTimeout = conf->getArgPInt("--" + prefix + "-ibuf-sync-timeout", it.getProp("ibufSyncTimeout"),15000); ibufSyncTimeout = conf->getArgPInt("--" + prefix + "-ibuf-sync-timeout", it.getProp("ibufSyncTimeout"), 15000);
std::string sfactor = conf->getArg2Param("--" + prefix + "-ibuf-overflow-cleanfactor", it.getProp("ibufOverflowCleanFactor"),"0.5"); std::string sfactor = conf->getArg2Param("--" + prefix + "-ibuf-overflow-cleanfactor", it.getProp("ibufOverflowCleanFactor"), "0.5");
ibufOverflowCleanFactor = atof(sfactor.c_str()); ibufOverflowCleanFactor = atof(sfactor.c_str());
tblMap[UniSetTypes::Message::SensorInfo] = "main_history"; tblMap[UniSetTypes::Message::SensorInfo] = "main_history";
tblMap[UniSetTypes::Message::Confirm] = "main_history"; tblMap[UniSetTypes::Message::Confirm] = "main_history";
PingTime = conf->getArgPInt("--" + prefix + "-pingTime", it.getProp("pingTime"),15000); PingTime = conf->getArgPInt("--" + prefix + "-pingTime", it.getProp("pingTime"), 15000);
ReconnectTime = conf->getArgPInt("--" + prefix + "-reconnectTime", it.getProp("reconnectTime"),30000); ReconnectTime = conf->getArgPInt("--" + prefix + "-reconnectTime", it.getProp("reconnectTime"), 30000);
qbufSize = conf->getArgPInt("--" + prefix + "-buffer-size", it.getProp("bufferSize"), 200); qbufSize = conf->getArgPInt("--" + prefix + "-buffer-size", it.getProp("bufferSize"), 200);
......
...@@ -114,7 +114,7 @@ class DBServer_PostgreSQL: ...@@ -114,7 +114,7 @@ class DBServer_PostgreSQL:
std::mutex mqbuf; std::mutex mqbuf;
// writeBuffer // writeBuffer
const std::list<std::string> tblcols = { "date", "time","time_usec","sensor_id","value","node" }; const std::list<std::string> tblcols = { "date", "time", "time_usec", "sensor_id", "value", "node" };
typedef std::vector<PostgreSQLInterface::Record> InsertBuffer; typedef std::vector<PostgreSQLInterface::Record> InsertBuffer;
InsertBuffer ibuf; InsertBuffer ibuf;
......
...@@ -98,11 +98,12 @@ bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std: ...@@ -98,11 +98,12 @@ bool PostgreSQLInterface::copy( const std::string& tblname, const std::list<std:
try try
{ {
work w( *(db.get()) ); work w( *(db.get()) );
tablewriter t(w,tblname,cols.begin(),cols.end()); tablewriter t(w, tblname, cols.begin(), cols.end());
t.reserve(data.size()); // size() не дорогая операция для list? t.reserve(data.size()); // size() не дорогая операция для list?
for( const auto& d: data )
t.push_back(d.begin(),d.end()); for( const auto& d : data )
t.push_back(d.begin(), d.end());
t.complete(); t.complete();
w.commit(); w.commit();
......
...@@ -36,7 +36,7 @@ class PostgreSQLInterface: ...@@ -36,7 +36,7 @@ class PostgreSQLInterface:
virtual bool nconnect( const std::string& host, const std::string& user, virtual bool nconnect( const std::string& host, const std::string& user,
const std::string& pswd, const std::string& dbname, const std::string& pswd, const std::string& dbname,
unsigned int port=5432) override; unsigned int port = 5432) override;
virtual bool close() override; virtual bool close() override;
virtual bool isConnection() override; virtual bool isConnection() override;
virtual bool ping() override; // проверка доступности БД virtual bool ping() override; // проверка доступности БД
......
...@@ -14,10 +14,12 @@ int main(int argc, char** argv) ...@@ -14,10 +14,12 @@ int main(int argc, char** argv)
dbname = string(argv[1]); dbname = string(argv[1]);
size_t ver = 1; size_t ver = 1;
if( argc > 2 ) if( argc > 2 )
ver = atoi(argv[2]); ver = atoi(argv[2]);
size_t num = 10000; size_t num = 10000;
if( argc > 3 ) if( argc > 3 )
num = atoi(argv[3]); num = atoi(argv[3]);
...@@ -40,7 +42,7 @@ int main(int argc, char** argv) ...@@ -40,7 +42,7 @@ int main(int argc, char** argv)
if( ver == 1 ) if( ver == 1 )
{ {
for( size_t i=0; i<num; i++ ) for( size_t i = 0; i < num; i++ )
{ {
q << "INSERT INTO main_history(date,time,time_usec,sensor_id,value,node)" q << "INSERT INTO main_history(date,time,time_usec,sensor_id,value,node)"
<< " VALUES(now(),now(),0,7,1,3000);"; << " VALUES(now(),now(),0,7,1,3000);";
...@@ -49,7 +51,8 @@ int main(int argc, char** argv) ...@@ -49,7 +51,8 @@ int main(int argc, char** argv)
else if( ver == 2 ) else if( ver == 2 )
{ {
q << "INSERT INTO main_history(date,time,time_usec,sensor_id,value,node) VALUES"; q << "INSERT INTO main_history(date,time,time_usec,sensor_id,value,node) VALUES";
for( size_t i=0; i<num; i++ )
for( size_t i = 0; i < num; i++ )
{ {
q << "(now(),now(),0,7,1,3000),"; q << "(now(),now(),0,7,1,3000),";
} }
...@@ -58,18 +61,21 @@ int main(int argc, char** argv) ...@@ -58,18 +61,21 @@ int main(int argc, char** argv)
} }
std::chrono::time_point<std::chrono::system_clock> start, end; std::chrono::time_point<std::chrono::system_clock> start, end;
if( ver == 3 ) if( ver == 3 )
{ {
std::list<std::string> cols = { "date", "time","time_usec","sensor_id","value","node" }; std::list<std::string> cols = { "date", "time", "time_usec", "sensor_id", "value", "node" };
PostgreSQLInterface::Data data; PostgreSQLInterface::Data data;
for( size_t i=0; i<num; i++ )
for( size_t i = 0; i < num; i++ )
{ {
PostgreSQLInterface::Record d = { "now()","now()","0","7","1","3000" }; PostgreSQLInterface::Record d = { "now()", "now()", "0", "7", "1", "3000" };
data.push_back(std::move(d)); data.push_back(std::move(d));
} }
start = std::chrono::system_clock::now(); start = std::chrono::system_clock::now();
if( !db.copy("main_history",cols,data) )
if( !db.copy("main_history", cols, data) )
{ {
cerr << "query error: " << db.error() << endl; cerr << "query error: " << db.error() << endl;
db.close(); db.close();
...@@ -79,6 +85,7 @@ int main(int argc, char** argv) ...@@ -79,6 +85,7 @@ int main(int argc, char** argv)
else else
{ {
start = std::chrono::system_clock::now(); start = std::chrono::system_clock::now();
if( !db.insert( std::move(q.str())) ) if( !db.insert( std::move(q.str())) )
{ {
cerr << "query error: " << db.error() << endl; cerr << "query error: " << db.error() << endl;
......
...@@ -186,7 +186,7 @@ class CardList: ...@@ -186,7 +186,7 @@ class CardList:
{ {
public: public:
explicit CardList( size_t size ) : std::vector<ComediInterface*>(size) { } explicit CardList( size_t size ) : std::vector<ComediInterface * >(size) { }
~CardList() ~CardList()
{ {
......
...@@ -139,7 +139,7 @@ class TOR: ...@@ -139,7 +139,7 @@ class TOR:
{ {
public: public:
TOR( ElementID id,size_t numbers = 0, bool st = false ); TOR( ElementID id, size_t numbers = 0, bool st = false );
virtual ~TOR(); virtual ~TOR();
virtual void setIn( size_t num, bool state ) override; virtual void setIn( size_t num, bool state ) override;
......
...@@ -984,6 +984,7 @@ UniSetTypes::SimpleInfo* SharedMemory::getInfo( CORBA::Long userparam ) ...@@ -984,6 +984,7 @@ UniSetTypes::SimpleInfo* SharedMemory::getInfo( CORBA::Long userparam )
inf << i->info << endl; inf << i->info << endl;
inf << vmon.pretty_str() << endl; inf << vmon.pretty_str() << endl;
if( logserv ) if( logserv )
inf << logserv->getShortInfo() << endl; inf << logserv->getShortInfo() << endl;
else else
......
...@@ -29,7 +29,7 @@ class UObject_SK: ...@@ -29,7 +29,7 @@ class UObject_SK:
public UniSetObject public UniSetObject
{ {
public: public:
UObject_SK( UniSetTypes::ObjectId id, xmlNode* node=UniSetTypes::uniset_conf()->getNode("UObject"), const std::string& argprefix="" ); UObject_SK( UniSetTypes::ObjectId id, xmlNode* node = UniSetTypes::uniset_conf()->getNode("UObject"), const std::string& argprefix = "" );
UObject_SK(); UObject_SK();
virtual ~UObject_SK(); virtual ~UObject_SK();
...@@ -43,54 +43,60 @@ class UObject_SK: ...@@ -43,54 +43,60 @@ class UObject_SK:
virtual bool setMsg( UniSetTypes::ObjectId code, bool state = true ); virtual bool setMsg( UniSetTypes::ObjectId code, bool state = true );
inline std::shared_ptr<DebugStream> log(){ return mylog; } inline std::shared_ptr<DebugStream> log()
inline std::shared_ptr<LogAgregator> logAgregator(){ return loga; } {
return mylog;
}
inline std::shared_ptr<LogAgregator> logAgregator()
{
return loga;
}
void init_dlog( std::shared_ptr<DebugStream> d ); void init_dlog( std::shared_ptr<DebugStream> d );
// "синтаксический сахар"..для логов // "синтаксический сахар"..для логов
#ifndef myinfo #ifndef myinfo
#define myinfo if( log()->debugging(Debug::INFO) ) log()->info() #define myinfo if( log()->debugging(Debug::INFO) ) log()->info()
#endif #endif
#ifndef mywarn #ifndef mywarn
#define mywarn if( log()->debugging(Debug::WARN) ) log()->warn() #define mywarn if( log()->debugging(Debug::WARN) ) log()->warn()
#endif #endif
#ifndef mycrit #ifndef mycrit
#define mycrit if( log()->debugging(Debug::CRIT) ) log()->crit() #define mycrit if( log()->debugging(Debug::CRIT) ) log()->crit()
#endif #endif
#ifndef mylog1 #ifndef mylog1
#define mylog1 if( log()->debugging(Debug::LEVEL1) ) log()->level1() #define mylog1 if( log()->debugging(Debug::LEVEL1) ) log()->level1()
#endif #endif
#ifndef mylog2 #ifndef mylog2
#define mylog2 if( log()->debugging(Debug::LEVEL2) ) log()->level2() #define mylog2 if( log()->debugging(Debug::LEVEL2) ) log()->level2()
#endif #endif
#ifndef mylog3 #ifndef mylog3
#define mylog3 if( log()->debugging(Debug::LEVEL3) ) log()->level3() #define mylog3 if( log()->debugging(Debug::LEVEL3) ) log()->level3()
#endif #endif
#ifndef mylog4 #ifndef mylog4
#define mylog4 if( log()->debugging(Debug::LEVEL4) ) log()->level4() #define mylog4 if( log()->debugging(Debug::LEVEL4) ) log()->level4()
#endif #endif
#ifndef mylog5 #ifndef mylog5
#define mylog5 if( log()->debugging(Debug::LEVEL5) ) log()->level5() #define mylog5 if( log()->debugging(Debug::LEVEL5) ) log()->level5()
#endif #endif
#ifndef mylog6 #ifndef mylog6
#define mylog6 if( log()->debugging(Debug::LEVEL6) ) log()->level6() #define mylog6 if( log()->debugging(Debug::LEVEL6) ) log()->level6()
#endif #endif
#ifndef mylog7 #ifndef mylog7
#define mylog7 if( log()->debugging(Debug::LEVEL7) ) log()->level7() #define mylog7 if( log()->debugging(Debug::LEVEL7) ) log()->level7()
#endif #endif
#ifndef mylog8 #ifndef mylog8
#define mylog8 if( log()->debugging(Debug::LEVEL8) ) log()->level8() #define mylog8 if( log()->debugging(Debug::LEVEL8) ) log()->level8()
#endif #endif
#ifndef mylog9 #ifndef mylog9
#define mylog9 if( log()->debugging(Debug::LEVEL9) ) log()->level9() #define mylog9 if( log()->debugging(Debug::LEVEL9) ) log()->level9()
#endif #endif
#ifndef mylogany #ifndef mylogany
#define mylogany log()->any() #define mylogany log()->any()
#endif #endif
#ifndef vmonit #ifndef vmonit
#define vmonit( var ) vmon.add( #var, var ) #define vmonit( var ) vmon.add( #var, var )
#endif #endif
// Вспомогательные функции для удобства логирования // Вспомогательные функции для удобства логирования
// ------------------------------------------------------------ // ------------------------------------------------------------
...@@ -107,16 +113,19 @@ class UObject_SK: ...@@ -107,16 +113,19 @@ class UObject_SK:
\param id - идентификатор датчика \param id - идентификатор датчика
\param showLinkName - TRUE - выводить SensorName, FALSE - не выводить \param showLinkName - TRUE - выводить SensorName, FALSE - не выводить
*/ */
std::string str( UniSetTypes::ObjectId id, bool showLinkName=true ) const; std::string str( UniSetTypes::ObjectId id, bool showLinkName = true ) const;
/*! Вывод значения входа/выхода в формате: in_xxx(SensorName)=val /*! Вывод значения входа/выхода в формате: in_xxx(SensorName)=val
\param id - идентификатор датчика \param id - идентификатор датчика
\param showLinkName - TRUE - выводить SensorName, FALSE - не выводить \param showLinkName - TRUE - выводить SensorName, FALSE - не выводить
*/ */
std::string strval( UniSetTypes::ObjectId id, bool showLinkName=true ) const; std::string strval( UniSetTypes::ObjectId id, bool showLinkName = true ) const;
/*! Вывод состояния внутренних переменных */ /*! Вывод состояния внутренних переменных */
inline std::string dumpVars(){ return std::move(vmon.pretty_str()); } inline std::string dumpVars()
{
return std::move(vmon.pretty_str());
}
// ------------------------------------------------------------ // ------------------------------------------------------------
std::string help(); std::string help();
...@@ -145,13 +154,16 @@ class UObject_SK: ...@@ -145,13 +154,16 @@ class UObject_SK:
virtual void callback() override; virtual void callback() override;
virtual void processingMessage( const UniSetTypes::VoidMessage* msg ) override; virtual void processingMessage( const UniSetTypes::VoidMessage* msg ) override;
virtual void sysCommand( const UniSetTypes::SystemMessage* sm ){}; virtual void sysCommand( const UniSetTypes::SystemMessage* sm ) {};
virtual void askSensors( UniversalIO::UIOCommand cmd ){} virtual void askSensors( UniversalIO::UIOCommand cmd ) {}
virtual void sensorInfo( const UniSetTypes::SensorMessage* sm ) override{} virtual void sensorInfo( const UniSetTypes::SensorMessage* sm ) override {}
virtual void timerInfo( const UniSetTypes::TimerMessage* tm ) override{} virtual void timerInfo( const UniSetTypes::TimerMessage* tm ) override {}
virtual void sigterm( int signo ) override; virtual void sigterm( int signo ) override;
virtual bool activateObject() override; virtual bool activateObject() override;
virtual std::string getMonitInfo(){ return ""; } /*!< пользовательская информация выводимая в getInfo() */ virtual std::string getMonitInfo()
{
return ""; /*!< пользовательская информация выводимая в getInfo() */
}
virtual void testMode( bool state ); virtual void testMode( bool state );
void updatePreviousValues(); void updatePreviousValues();
...@@ -172,7 +184,7 @@ class UObject_SK: ...@@ -172,7 +184,7 @@ class UObject_SK:
int resetMsgTime; int resetMsgTime;
// Выполнение очередного шага программы // Выполнение очередного шага программы
virtual void step(){} virtual void step() {}
int sleep_msec; /*!< пауза между итерациями */ int sleep_msec; /*!< пауза между итерациями */
bool active; bool active;
...@@ -187,9 +199,15 @@ class UObject_SK: ...@@ -187,9 +199,15 @@ class UObject_SK:
xmlNode* confnode; xmlNode* confnode;
/*! получить числовое свойство из конф. файла по привязанной confnode */ /*! получить числовое свойство из конф. файла по привязанной confnode */
int getIntProp(const std::string& name) { return UniSetTypes::uniset_conf()->getIntProp(confnode, name); } int getIntProp(const std::string& name)
{
return UniSetTypes::uniset_conf()->getIntProp(confnode, name);
}
/*! получить текстовое свойство из конф. файла по привязанной confnode */ /*! получить текстовое свойство из конф. файла по привязанной confnode */
inline const std::string getProp(const std::string& name) { return UniSetTypes::uniset_conf()->getProp(confnode, name); } inline const std::string getProp(const std::string& name)
{
return UniSetTypes::uniset_conf()->getProp(confnode, name);
}
timeout_t smReadyTimeout; /*!< время ожидания готовности SM */ timeout_t smReadyTimeout; /*!< время ожидания готовности SM */
std::atomic_bool activated; std::atomic_bool activated;
......
...@@ -23,20 +23,22 @@ int main(int argc, const char** argv) ...@@ -23,20 +23,22 @@ int main(int argc, const char** argv)
act->broadcast( sm.transport_msg() ); act->broadcast( sm.transport_msg() );
act->run(true); act->run(true);
SensorMessage smsg(100,2); SensorMessage smsg(100, 2);
TransportMessage tm( std::move(smsg.transport_msg()) ); TransportMessage tm( std::move(smsg.transport_msg()) );
size_t num = 0; size_t num = 0;
const size_t max = 100000; const size_t max = 100000;
std::chrono::time_point<std::chrono::system_clock> start, end; std::chrono::time_point<std::chrono::system_clock> start, end;
start = std::chrono::system_clock::now(); start = std::chrono::system_clock::now();
for( num=0; num<max; num++ )
for( num = 0; num < max; num++ )
{ {
tp->push(tm); tp->push(tm);
if( tp->isFullQueue() ) if( tp->isFullQueue() )
break; break;
if( num%100 == 0 ) if( num % 100 == 0 )
msleep(50); msleep(50);
} }
......
...@@ -160,7 +160,7 @@ class LogServer: ...@@ -160,7 +160,7 @@ class LogServer:
// (они необходимы для восстановления настроек после завершения всех (!) сессий) // (они необходимы для восстановления настроек после завершения всех (!) сессий)
// т.к. shared_ptr-ов может быть много, то в качестве ключа используем указатель на "реальный объект"(внутри shared_ptr) // т.к. shared_ptr-ов может быть много, то в качестве ключа используем указатель на "реальный объект"(внутри shared_ptr)
// но только для этого(!), пользоваться этим указателем ни в коем случае нельзя (и нужно проверять shared_ptr на существование) // но только для этого(!), пользоваться этим указателем ни в коем случае нельзя (и нужно проверять shared_ptr на существование)
std::unordered_map< DebugStream*,Debug::type > defaultLogLevels; std::unordered_map< DebugStream*, Debug::type > defaultLogLevels;
std::string myname = { "LogServer" }; std::string myname = { "LogServer" };
std::string addr = { "" }; std::string addr = { "" };
......
...@@ -41,7 +41,7 @@ class LogSession ...@@ -41,7 +41,7 @@ class LogSession
// сигнал о приходе команды: std::string func( LogSession*, command, logname ); // сигнал о приходе команды: std::string func( LogSession*, command, logname );
// \return какую-то информацию, которая будет послана client-у. Если return.empty(), то ничего послано не будет. // \return какую-то информацию, которая будет послана client-у. Если return.empty(), то ничего послано не будет.
typedef sigc::signal<std::string,LogSession*,LogServerTypes::Command, const std::string& > LogSessionCommand_Signal; typedef sigc::signal<std::string, LogSession*, LogServerTypes::Command, const std::string& > LogSessionCommand_Signal;
LogSessionCommand_Signal signal_logsession_command(); LogSessionCommand_Signal signal_logsession_command();
// прервать работу // прервать работу
......
...@@ -22,18 +22,18 @@ ...@@ -22,18 +22,18 @@
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
namespace UniSetTypes namespace UniSetTypes
{ {
// Шаблон для "универсальной инициализации объекта(процесса)". // Шаблон для "универсальной инициализации объекта(процесса)".
// Использование: // Использование:
// auto m = make_object<MyClass>("ObjectId","secname"); // auto m = make_object<MyClass>("ObjectId","secname");
// -- // --
// Где MyClass должен содержать конструктор MyClass( const ObjetctId id, xmlNode* cnode, ...any args.. ); // Где MyClass должен содержать конструктор MyClass( const ObjetctId id, xmlNode* cnode, ...any args.. );
// --------------- // ---------------
// Если secname задан, то ищется: <secname name="ObjectId" ....> // Если secname задан, то ищется: <secname name="ObjectId" ....>
// Если secname не задан, то: <idname name="idname" ...> // Если secname не задан, то: <idname name="idname" ...>
//---------------- //----------------
template<typename T, typename... _Args> template<typename T, typename... _Args>
std::shared_ptr<T> make_object( const std::string& idname, const std::string& secname, _Args&&... __args ) std::shared_ptr<T> make_object( const std::string& idname, const std::string& secname, _Args&& ... __args )
{ {
auto conf = UniSetTypes::uniset_conf(); auto conf = UniSetTypes::uniset_conf();
UniSetTypes::ObjectId id = conf->getObjectID(idname); UniSetTypes::ObjectId id = conf->getObjectID(idname);
...@@ -42,50 +42,52 @@ std::shared_ptr<T> make_object( const std::string& idname, const std::string& se ...@@ -42,50 +42,52 @@ std::shared_ptr<T> make_object( const std::string& idname, const std::string& se
auto xml = conf->getConfXML(); auto xml = conf->getConfXML();
std::string s( (secname.empty() ? idname : secname) ); std::string s( (secname.empty() ? idname : secname) );
xmlNode* cnode = conf->findNode(xml->getFirstNode(),s,idname); xmlNode* cnode = conf->findNode(xml->getFirstNode(), s, idname);
if( cnode == 0 ) if( cnode == 0 )
throw UniSetTypes::SystemError("(make_object<" + string(typeid(T).name()) + ">): Not found xmlnode <" + s + " name='" + idname + "' ... >"); throw UniSetTypes::SystemError("(make_object<" + string(typeid(T).name()) + ">): Not found xmlnode <" + s + " name='" + idname + "' ... >");
std::shared_ptr<T> obj =std::make_shared<T>(id,cnode,std::forward<_Args>(__args)...); std::shared_ptr<T> obj = std::make_shared<T>(id, cnode, std::forward<_Args>(__args)...);
if (obj == nullptr) if (obj == nullptr)
throw UniSetTypes::SystemError("(make_object<T> == nullptr" + string(typeid(T).name())); throw UniSetTypes::SystemError("(make_object<T> == nullptr" + string(typeid(T).name()));
return obj; return obj;
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// версия с указанием начального xml-узла, с которого ведётся поиск xmlNode // версия с указанием начального xml-узла, с которого ведётся поиск xmlNode
// а ID берётся из поля name="" у найденного xmlnode. // а ID берётся из поля name="" у найденного xmlnode.
template<typename T, typename... _Args> template<typename T, typename... _Args>
std::shared_ptr<T> make_object_x( xmlNode* root, const std::string& secname, _Args&&... __args ) std::shared_ptr<T> make_object_x( xmlNode* root, const std::string& secname, _Args&& ... __args )
{ {
auto conf = UniSetTypes::uniset_conf(); auto conf = UniSetTypes::uniset_conf();
auto xml = conf->getConfXML(); auto xml = conf->getConfXML();
xmlNode* cnode = conf->findNode(root,secname,""); xmlNode* cnode = conf->findNode(root, secname, "");
if( cnode == 0 ) if( cnode == 0 )
throw UniSetTypes::SystemError("(make_object_x<" + string(typeid(T).name()) + ">): Not found xmlnode <" + secname + " ... >"); throw UniSetTypes::SystemError("(make_object_x<" + string(typeid(T).name()) + ">): Not found xmlnode <" + secname + " ... >");
string idname = conf->getProp(cnode,"name"); string idname = conf->getProp(cnode, "name");
UniSetTypes::ObjectId id = conf->getObjectID(idname); UniSetTypes::ObjectId id = conf->getObjectID(idname);
if( id == UniSetTypes::DefaultObjectId ) if( id == UniSetTypes::DefaultObjectId )
throw UniSetTypes::SystemError("(make_object_x<" + string(typeid(T).name()) + ">): Not found ID for '" + idname + "'"); throw UniSetTypes::SystemError("(make_object_x<" + string(typeid(T).name()) + ">): Not found ID for '" + idname + "'");
return std::make_shared<T>(id,cnode,std::forward<_Args>(__args)...); return std::make_shared<T>(id, cnode, std::forward<_Args>(__args)...);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Просто обёртка для удобства вывода сообщений об ошибке в лог "объекта".. // Просто обёртка для удобства вывода сообщений об ошибке в лог "объекта"..
// "по задумке" позволяет не загромаждать код.. // "по задумке" позволяет не загромаждать код..
// T - тип создаваемого объекта // T - тип создаваемого объекта
// M - (master) - класс который создаёт объект (подразумевается, что он UniSetManager) // M - (master) - класс который создаёт объект (подразумевается, что он UniSetManager)
template<typename T, typename M, typename... _Args> template<typename T, typename M, typename... _Args>
std::shared_ptr<T> make_child_object( M* m, const std::string& idname, const std::string& secname, _Args&&... __args ) std::shared_ptr<T> make_child_object( M* m, const std::string& idname, const std::string& secname, _Args&& ... __args )
{ {
try try
{ {
m->log()->info() << m->getName() << "(" << __FUNCTION__ << "): " << "create " << idname << "..." << std::endl; m->log()->info() << m->getName() << "(" << __FUNCTION__ << "): " << "create " << idname << "..." << std::endl;
auto o = UniSetTypes::make_object<T>(idname,secname,std::forward<_Args>(__args)...); auto o = UniSetTypes::make_object<T>(idname, secname, std::forward<_Args>(__args)...);
m->add(o); m->add(o);
m->logAgregator()->add(o->logAgregator()); m->logAgregator()->add(o->logAgregator());
return std::move(o); return std::move(o);
...@@ -95,15 +97,15 @@ std::shared_ptr<T> make_child_object( M* m, const std::string& idname, const std ...@@ -95,15 +97,15 @@ std::shared_ptr<T> make_child_object( M* m, const std::string& idname, const std
m->log()->crit() << m->getName() << "(" << __FUNCTION__ << "): " << "(create " << idname << "): " << ex << std::endl; m->log()->crit() << m->getName() << "(" << __FUNCTION__ << "): " << "(create " << idname << "): " << ex << std::endl;
throw; throw;
} }
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Версия использующая make_object_x<> // Версия использующая make_object_x<>
template<typename T, typename M, typename... _Args> template<typename T, typename M, typename... _Args>
std::shared_ptr<T> make_child_object_x( M* m, xmlNode* root, const std::string& secname, _Args&&... __args ) std::shared_ptr<T> make_child_object_x( M* m, xmlNode* root, const std::string& secname, _Args&& ... __args )
{ {
try try
{ {
auto o = UniSetTypes::make_object_x<T>(root,secname,std::forward<_Args>(__args)...); auto o = UniSetTypes::make_object_x<T>(root, secname, std::forward<_Args>(__args)...);
m->add(o); m->add(o);
m->logAgregator()->add(o->logAgregator()); m->logAgregator()->add(o->logAgregator());
return std::move(o); return std::move(o);
...@@ -113,8 +115,8 @@ std::shared_ptr<T> make_child_object_x( M* m, xmlNode* root, const std::string& ...@@ -113,8 +115,8 @@ std::shared_ptr<T> make_child_object_x( M* m, xmlNode* root, const std::string&
m->log()->crit() << m->getName() << "(" << __FUNCTION__ << "): " << "(create " << string(typeid(T).name()) << "): " << ex << std::endl; m->log()->crit() << m->getName() << "(" << __FUNCTION__ << "): " << "(create " << string(typeid(T).name()) << "): " << ex << std::endl;
throw; throw;
} }
} }
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
} // endof namespace UniSetTypes } // endof namespace UniSetTypes
// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
#endif // UHelpers_H_ #endif // UHelpers_H_
...@@ -192,16 +192,31 @@ namespace ModbusRTU ...@@ -192,16 +192,31 @@ namespace ModbusRTU
ModbusMessage( const ModbusMessage& ) = default; ModbusMessage( const ModbusMessage& ) = default;
ModbusMessage& operator=(const ModbusMessage& ) = default; ModbusMessage& operator=(const ModbusMessage& ) = default;
inline ModbusByte func() const { return pduhead.func; } inline ModbusByte func() const
inline ModbusAddr addr() const { return pduhead.addr; } {
inline ModbusRTU::ModbusData tID() const { return aduhead.tID; } return pduhead.func;
inline ModbusRTU::ModbusData pID() const { return aduhead.pID; } }
inline ModbusRTU::ModbusData aduLen() const { return aduhead.len; } inline ModbusAddr addr() const
{
return pduhead.addr;
}
inline ModbusRTU::ModbusData tID() const
{
return aduhead.tID;
}
inline ModbusRTU::ModbusData pID() const
{
return aduhead.pID;
}
inline ModbusRTU::ModbusData aduLen() const
{
return aduhead.len;
}
unsigned char* buf(); unsigned char* buf();
ModbusRTU::ModbusData len() const; ModbusRTU::ModbusData len() const;
void swapHead(); void swapHead();
void makeHead( ModbusRTU::ModbusData tID, bool noCRC = true, ModbusRTU::ModbusData pID=0 ); void makeHead( ModbusRTU::ModbusData tID, bool noCRC = true, ModbusRTU::ModbusData pID = 0 );
ModbusRTU::ModbusData pduLen() const; ModbusRTU::ModbusData pduLen() const;
ModbusCRC pduCRC( size_t len ) const; ModbusCRC pduCRC( size_t len ) const;
......
...@@ -843,7 +843,7 @@ mbErrCode ModbusClient::recv_pdu( ModbusByte qfunc, ModbusMessage& rbuf, timeout ...@@ -843,7 +843,7 @@ mbErrCode ModbusClient::recv_pdu( ModbusByte qfunc, ModbusMessage& rbuf, timeout
// от начала(включая заголовок) // от начала(включая заголовок)
// и до конца (исключив последний элемент содержащий CRC) // и до конца (исключив последний элемент содержащий CRC)
// int mlen = szModbusHeader + mWrite.szHead()+ mWrite.bcnt; // int mlen = szModbusHeader + mWrite.szHead()+ mWrite.bcnt;
ModbusData tcrc =rbuf.pduCRC(bcnt - szCRC); ModbusData tcrc = rbuf.pduCRC(bcnt - szCRC);
if( tcrc != mWrite.crc ) if( tcrc != mWrite.crc )
{ {
...@@ -1335,6 +1335,7 @@ mbErrCode ModbusClient::send( ModbusMessage& msg ) ...@@ -1335,6 +1335,7 @@ mbErrCode ModbusClient::send( ModbusMessage& msg )
if( dlog->is_warn() ) if( dlog->is_warn() )
dlog->warn() << "(ModbusClient::send): message len=" << msg.len() dlog->warn() << "(ModbusClient::send): message len=" << msg.len()
<< " > MAXLEN=" << msg.maxSizeOfMessage() << endl; << " > MAXLEN=" << msg.maxSizeOfMessage() << endl;
return erPacketTooLong; return erPacketTooLong;
} }
...@@ -1345,7 +1346,7 @@ mbErrCode ModbusClient::send( ModbusMessage& msg ) ...@@ -1345,7 +1346,7 @@ mbErrCode ModbusClient::send( ModbusMessage& msg )
{ {
size_t len = msg.len(); // т.к. swapHead() поменяет size_t len = msg.len(); // т.к. swapHead() поменяет
msg.swapHead(); msg.swapHead();
sendData(msg.buf(),len); sendData(msg.buf(), len);
msg.swapHead(); msg.swapHead();
} }
catch( mbException& ex ) catch( mbException& ex )
......
...@@ -154,7 +154,7 @@ mbErrCode ModbusRTUSlave::realReceive(const std::unordered_set<ModbusAddr>& vmba ...@@ -154,7 +154,7 @@ mbErrCode ModbusRTUSlave::realReceive(const std::unordered_set<ModbusAddr>& vmba
return erTimeOut; return erTimeOut;
} }
std::lock_guard<std::timed_mutex> lk(recvMutex,std::adopt_lock); std::lock_guard<std::timed_mutex> lk(recvMutex, std::adopt_lock);
ModbusMessage buf; ModbusMessage buf;
mbErrCode res = erBadReplyNodeAddress; mbErrCode res = erBadReplyNodeAddress;
......
...@@ -1767,7 +1767,7 @@ mbErrCode ModbusServer::send( ModbusMessage& msg ) ...@@ -1767,7 +1767,7 @@ mbErrCode ModbusServer::send( ModbusMessage& msg )
{ {
size_t len = msg.len(); // т.к. swapHead() поменяет size_t len = msg.len(); // т.к. swapHead() поменяет
msg.swapHead(); msg.swapHead();
sendData(msg.buf(),len); sendData(msg.buf(), len);
msg.swapHead(); // обратно, т.к. потом ещё будет post_send_request msg.swapHead(); // обратно, т.к. потом ещё будет post_send_request
} }
catch( const Exception& ex ) // SystemError catch( const Exception& ex ) // SystemError
......
...@@ -32,16 +32,17 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -32,16 +32,17 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
#ifdef USE_BUFFER_FOR_READ #ifdef USE_BUFFER_FOR_READ
char* buf = new char[max]; char* buf = new char[max];
if( buf == 0 ) if( buf == 0 )
return 0; return 0;
try try
{ {
ssize_t l = tcp->readData(buf,max, 0, t); ssize_t l = tcp->readData(buf, max, 0, t);
if( l > 0 ) if( l > 0 )
{ {
for( ssize_t k=0; k<l; k++ ) for( ssize_t k = 0; k < l; k++ )
qrecv.push(buf[k]); qrecv.push(buf[k]);
i = l; i = l;
...@@ -53,6 +54,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -53,6 +54,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
delete [] buf; delete [] buf;
#else #else
try try
{ {
for( ; i < max; i++ ) for( ; i < max; i++ )
...@@ -70,6 +72,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -70,6 +72,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
} }
#endif #endif
...@@ -128,6 +131,7 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size ...@@ -128,6 +131,7 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
qrecv.push(buf[k]); qrecv.push(buf[k]);
cnt += l; cnt += l;
if( cnt >= max ) if( cnt >= max )
break; break;
} }
...@@ -137,6 +141,7 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size ...@@ -137,6 +141,7 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
#else #else
size_t i = 0; size_t i = 0;
for( size_t a = 0; a < attempts; a++ ) for( size_t a = 0; a < attempts; a++ )
{ {
for( ; i < max; i++ ) for( ; i < max; i++ )
...@@ -151,6 +156,7 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size ...@@ -151,6 +156,7 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
qrecv.push(c); qrecv.push(c);
} }
} }
#endif #endif
return ( qrecv.size() >= max ? max : qrecv.size() ); return ( qrecv.size() >= max ? max : qrecv.size() );
......
...@@ -102,7 +102,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -102,7 +102,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
tcp->setTimeout(timeout); tcp->setTimeout(timeout);
msg.makeHead(++nTransaction,crcNoCheckit); msg.makeHead(++nTransaction, crcNoCheckit);
for( unsigned int i = 0; i < 2; i++ ) for( unsigned int i = 0; i < 2; i++ )
{ {
...@@ -157,6 +157,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -157,6 +157,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( tcp->isPending(ost::Socket::pendingInput, timeout) ) if( tcp->isPending(ost::Socket::pendingInput, timeout) )
{ {
size_t ret = 0; size_t ret = 0;
while( !ptTimeout.checkTime() ) while( !ptTimeout.checkTime() )
{ {
ret = getNextData((unsigned char*)(&reply.aduhead), sizeof(reply.aduhead)); ret = getNextData((unsigned char*)(&reply.aduhead), sizeof(reply.aduhead));
......
...@@ -61,8 +61,10 @@ ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr ...@@ -61,8 +61,10 @@ ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr
{ {
ostringstream err; ostringstream err;
err << "(ModbusTCPSession): unknonwn ip(0.0.0.0) client disconnected?!"; err << "(ModbusTCPSession): unknonwn ip(0.0.0.0) client disconnected?!";
if( dlog->is_crit() ) if( dlog->is_crit() )
dlog->crit() << err.str() << endl; dlog->crit() << err.str() << endl;
sock.reset(); sock.reset();
throw SystemError(err.str()); throw SystemError(err.str());
} }
...@@ -76,6 +78,7 @@ ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr ...@@ -76,6 +78,7 @@ ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr
{ {
ostringstream err; ostringstream err;
err << ex.what(); err << ex.what();
if( dlog->is_crit() ) if( dlog->is_crit() )
dlog->crit() << "(ModbusTCPSession): err: " << err.str() << endl; dlog->crit() << "(ModbusTCPSession): err: " << err.str() << endl;
...@@ -381,7 +384,7 @@ ModbusRTU::mbErrCode ModbusTCPSession::post_send_request( ModbusRTU::ModbusMessa ...@@ -381,7 +384,7 @@ ModbusRTU::mbErrCode ModbusTCPSession::post_send_request( ModbusRTU::ModbusMessa
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
mbErrCode ModbusTCPSession::make_adu_header( ModbusMessage& req ) mbErrCode ModbusTCPSession::make_adu_header( ModbusMessage& req )
{ {
req.makeHead(curQueryHeader.tID,isCRCNoCheckit(),curQueryHeader.pID); req.makeHead(curQueryHeader.tID, isCRCNoCheckit(), curQueryHeader.pID);
return erNoError; return erNoError;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
...@@ -256,6 +256,7 @@ void ModbusMessage::makeHead( ModbusData tID, bool noCRC, ModbusData pID ) ...@@ -256,6 +256,7 @@ void ModbusMessage::makeHead( ModbusData tID, bool noCRC, ModbusData pID )
aduhead.tID = tID; aduhead.tID = tID;
aduhead.pID = pID; aduhead.pID = pID;
aduhead.len = pduLen(); aduhead.len = pduLen();
if( noCRC ) if( noCRC )
aduhead.len -= szCRC; aduhead.len -= szCRC;
} }
...@@ -283,12 +284,14 @@ void ModbusMessage::clear() ...@@ -283,12 +284,14 @@ void ModbusMessage::clear()
std::ostream& ModbusRTU::operator<<(std::ostream& os, const ModbusMessage& m ) std::ostream& ModbusRTU::operator<<(std::ostream& os, const ModbusMessage& m )
{ {
os << m.aduhead << "| "; os << m.aduhead << "| ";
if( m.aduLen() == 0 ) if( m.aduLen() == 0 )
mbPrintMessage(os, (ModbusByte*)(&m.pduhead), sizeof(m.pduhead) + m.dlen); mbPrintMessage(os, (ModbusByte*)(&m.pduhead), sizeof(m.pduhead) + m.dlen);
else else
mbPrintMessage(os, (ModbusByte*)(&m.pduhead), m.aduLen()); mbPrintMessage(os, (ModbusByte*)(&m.pduhead), m.aduLen());
return os; return os;
// return mbPrintMessage(os, (ModbusByte*)(&m), sizeof(m.aduhead) + sizeof(m.pduhead) + m.dlen); // return mbPrintMessage(os, (ModbusByte*)(&m), sizeof(m.aduhead) + sizeof(m.pduhead) + m.dlen);
} }
std::ostream& ModbusRTU::operator<<(std::ostream& os, const ModbusMessage* m ) std::ostream& ModbusRTU::operator<<(std::ostream& os, const ModbusMessage* m )
...@@ -410,7 +413,7 @@ ReadCoilMessage& ReadCoilMessage::operator=( const ModbusMessage& m ) ...@@ -410,7 +413,7 @@ ReadCoilMessage& ReadCoilMessage::operator=( const ModbusMessage& m )
void ReadCoilMessage::init( const ModbusMessage& m ) void ReadCoilMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnReadCoilStatus ); assert( m.pduhead.func == fnReadCoilStatus );
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
memcpy(this, &m.pduhead, sizeof(m.pduhead)); memcpy(this, &m.pduhead, sizeof(m.pduhead));
memcpy(&start, m.data, szData()); memcpy(&start, m.data, szData());
...@@ -730,7 +733,7 @@ ReadInputStatusMessage& ReadInputStatusMessage::operator=( const ModbusMessage& ...@@ -730,7 +733,7 @@ ReadInputStatusMessage& ReadInputStatusMessage::operator=( const ModbusMessage&
void ReadInputStatusMessage::init( const ModbusMessage& m ) void ReadInputStatusMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnReadInputStatus ); assert( m.pduhead.func == fnReadInputStatus );
memcpy(this, &m.pduhead, sizeof(m.pduhead)+szData()); memcpy(this, &m.pduhead, sizeof(m.pduhead) + szData());
// переворачиваем слова // переворачиваем слова
start = SWAPSHORT(start); start = SWAPSHORT(start);
...@@ -935,7 +938,7 @@ void ReadOutputMessage::init( const ModbusMessage& m ) ...@@ -935,7 +938,7 @@ void ReadOutputMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnReadOutputRegisters ); assert( m.pduhead.func == fnReadOutputRegisters );
//memset(this, 0, sizeof(*this)); //memset(this, 0, sizeof(*this));
memcpy(this, &m.pduhead, sizeof(m.pduhead)+szData()); memcpy(this, &m.pduhead, sizeof(m.pduhead) + szData());
// переворачиваем слова // переворачиваем слова
start = SWAPSHORT(start); start = SWAPSHORT(start);
...@@ -1142,9 +1145,9 @@ ReadInputMessage& ReadInputMessage::operator=( const ModbusMessage& m ) ...@@ -1142,9 +1145,9 @@ ReadInputMessage& ReadInputMessage::operator=( const ModbusMessage& m )
void ReadInputMessage::init( const ModbusMessage& m ) void ReadInputMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnReadInputRegisters ); assert( m.pduhead.func == fnReadInputRegisters );
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
memcpy(this, &m.pduhead, sizeof(m.pduhead)); memcpy(this, &m.pduhead, sizeof(m.pduhead));
memcpy(&start,m.data,szData()); memcpy(&start, m.data, szData());
// переворачиваем слова // переворачиваем слова
start = SWAPSHORT(start); start = SWAPSHORT(start);
...@@ -1511,7 +1514,7 @@ void ForceCoilsRetMessage::init( const ModbusMessage& m ) ...@@ -1511,7 +1514,7 @@ void ForceCoilsRetMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnForceMultipleCoils ); assert( m.pduhead.func == fnForceMultipleCoils );
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
// copy not include CRC // copy not include CRC
memcpy(this, &m.pduhead, szModbusHeader + m.dlen); memcpy(this, &m.pduhead, szModbusHeader + m.dlen);
...@@ -1877,7 +1880,7 @@ ForceSingleCoilMessage& ForceSingleCoilMessage::operator=( const ModbusMessage& ...@@ -1877,7 +1880,7 @@ ForceSingleCoilMessage& ForceSingleCoilMessage::operator=( const ModbusMessage&
void ForceSingleCoilMessage::init( const ModbusMessage& m ) void ForceSingleCoilMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnForceSingleCoil ); assert( m.pduhead.func == fnForceSingleCoil );
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
// копируем данные вместе с CRC // копируем данные вместе с CRC
memcpy(this, &m.pduhead, szModbusHeader + m.dlen + szCRC); memcpy(this, &m.pduhead, szModbusHeader + m.dlen + szCRC);
...@@ -2048,7 +2051,7 @@ WriteSingleOutputMessage& WriteSingleOutputMessage::operator=( const ModbusMessa ...@@ -2048,7 +2051,7 @@ WriteSingleOutputMessage& WriteSingleOutputMessage::operator=( const ModbusMessa
void WriteSingleOutputMessage::init( const ModbusMessage& m ) void WriteSingleOutputMessage::init( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnWriteOutputSingleRegister ); assert( m.pduhead.func == fnWriteOutputSingleRegister );
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
// копируем данные вместе с CRC // копируем данные вместе с CRC
memcpy(this, &m.pduhead, szModbusHeader + m.dlen + szCRC); memcpy(this, &m.pduhead, szModbusHeader + m.dlen + szCRC);
...@@ -2120,7 +2123,7 @@ WriteSingleOutputRetMessage& WriteSingleOutputRetMessage::operator=( const Modbu ...@@ -2120,7 +2123,7 @@ WriteSingleOutputRetMessage& WriteSingleOutputRetMessage::operator=( const Modbu
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void WriteSingleOutputRetMessage::init( const ModbusMessage& m ) void WriteSingleOutputRetMessage::init( const ModbusMessage& m )
{ {
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
// copy not include CRC // copy not include CRC
memcpy(this, &m.pduhead, szModbusHeader + m.dlen); memcpy(this, &m.pduhead, szModbusHeader + m.dlen);
...@@ -2761,7 +2764,7 @@ JournalCommandMessage::JournalCommandMessage( const ModbusMessage& m ) ...@@ -2761,7 +2764,7 @@ JournalCommandMessage::JournalCommandMessage( const ModbusMessage& m )
JournalCommandMessage& JournalCommandMessage::operator=( const ModbusMessage& m ) JournalCommandMessage& JournalCommandMessage::operator=( const ModbusMessage& m )
{ {
assert( m.pduhead.func == fnJournalCommand ); assert( m.pduhead.func == fnJournalCommand );
// memset(this, 0, sizeof(*this)); // memset(this, 0, sizeof(*this));
memcpy(this, &m.pduhead, sizeof(*this)); // m.len memcpy(this, &m.pduhead, sizeof(*this)); // m.len
// переворачиваем слова // переворачиваем слова
......
...@@ -196,6 +196,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents ) ...@@ -196,6 +196,7 @@ void LogServer::ioAccept( ev::io& watcher, int revents )
{ {
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
scount++; scount++;
// на первой сессии запоминаем состояние логов // на первой сессии запоминаем состояние логов
if( scount == 1 ) if( scount == 1 )
saveDefaultLogLevels("ALL"); saveDefaultLogLevels("ALL");
...@@ -258,7 +259,8 @@ string LogServer::getShortInfo() ...@@ -258,7 +259,8 @@ string LogServer::getShortInfo()
inf << "LogServer: " << myname << endl; inf << "LogServer: " << myname << endl;
{ {
uniset_rwmutex_wrlock l(mutSList); uniset_rwmutex_wrlock l(mutSList);
for( const auto& s: slist )
for( const auto& s : slist )
inf << " " << s->getShortInfo() << endl; inf << " " << s->getShortInfo() << endl;
} }
...@@ -271,15 +273,17 @@ void LogServer::saveDefaultLogLevels( const std::string& logname ) ...@@ -271,15 +273,17 @@ void LogServer::saveDefaultLogLevels( const std::string& logname )
mylog.info() << myname << "(saveDefaultLogLevels): SAVE DEFAULT LOG LEVELS.." << endl; mylog.info() << myname << "(saveDefaultLogLevels): SAVE DEFAULT LOG LEVELS.." << endl;
auto alog = dynamic_pointer_cast<LogAgregator>(elog); auto alog = dynamic_pointer_cast<LogAgregator>(elog);
if( alog ) if( alog )
{ {
std::list<LogAgregator::iLog> lst; std::list<LogAgregator::iLog> lst;
if( logname.empty() || logname == "ALL" ) if( logname.empty() || logname == "ALL" )
lst = alog->getLogList(); lst = alog->getLogList();
else else
lst = alog->getLogList(logname); lst = alog->getLogList(logname);
for( auto&& l: lst ) for( auto && l : lst )
defaultLogLevels[l.log.get()] = l.log->level(); defaultLogLevels[l.log.get()] = l.log->level();
} }
else if( elog ) else if( elog )
...@@ -292,17 +296,20 @@ void LogServer::restoreDefaultLogLevels( const std::string& logname ) ...@@ -292,17 +296,20 @@ void LogServer::restoreDefaultLogLevels( const std::string& logname )
mylog.info() << myname << "(restoreDefaultLogLevels): RESTORE DEFAULT LOG LEVELS.." << endl; mylog.info() << myname << "(restoreDefaultLogLevels): RESTORE DEFAULT LOG LEVELS.." << endl;
auto alog = dynamic_pointer_cast<LogAgregator>(elog); auto alog = dynamic_pointer_cast<LogAgregator>(elog);
if( alog ) if( alog )
{ {
std::list<LogAgregator::iLog> lst; std::list<LogAgregator::iLog> lst;
if( logname.empty() || logname == "ALL" ) if( logname.empty() || logname == "ALL" )
lst = alog->getLogList(); lst = alog->getLogList();
else else
lst = alog->getLogList(logname); lst = alog->getLogList(logname);
for( auto&& l: lst ) for( auto && l : lst )
{ {
auto d = defaultLogLevels.find(l.log.get()); auto d = defaultLogLevels.find(l.log.get());
if( d != defaultLogLevels.end() ) if( d != defaultLogLevels.end() )
l.log->level(d->second); l.log->level(d->second);
} }
...@@ -310,6 +317,7 @@ void LogServer::restoreDefaultLogLevels( const std::string& logname ) ...@@ -310,6 +317,7 @@ void LogServer::restoreDefaultLogLevels( const std::string& logname )
else if( elog ) else if( elog )
{ {
auto d = defaultLogLevels.find(elog.get()); auto d = defaultLogLevels.find(elog.get());
if( d != defaultLogLevels.end() ) if( d != defaultLogLevels.end() )
elog->level(d->second); elog->level(d->second);
} }
...@@ -331,9 +339,11 @@ std::string LogServer::onCommand( LogSession* s, LogServerTypes::Command cmd, co ...@@ -331,9 +339,11 @@ std::string LogServer::onCommand( LogSession* s, LogServerTypes::Command cmd, co
s << "List of saved default log levels (filter='" << logname << "')[" << defaultLogLevels.size() << "]: " << endl; s << "List of saved default log levels (filter='" << logname << "')[" << defaultLogLevels.size() << "]: " << endl;
s << "=================================" << endl; s << "=================================" << endl;
auto alog = dynamic_pointer_cast<LogAgregator>(elog); auto alog = dynamic_pointer_cast<LogAgregator>(elog);
if( alog ) // если у нас "агрегатор", то работаем с его списком потоков if( alog ) // если у нас "агрегатор", то работаем с его списком потоков
{ {
std::list<LogAgregator::iLog> lst; std::list<LogAgregator::iLog> lst;
if( logname.empty() || logname == "ALL" ) if( logname.empty() || logname == "ALL" )
lst = alog->getLogList(); lst = alog->getLogList();
else else
...@@ -349,6 +359,7 @@ std::string LogServer::onCommand( LogSession* s, LogServerTypes::Command cmd, co ...@@ -349,6 +359,7 @@ std::string LogServer::onCommand( LogSession* s, LogServerTypes::Command cmd, co
{ {
Debug::type deflevel = Debug::NONE; Debug::type deflevel = Debug::NONE;
auto i = defaultLogLevels.find(l.log.get()); auto i = defaultLogLevels.find(l.log.get());
if( i != defaultLogLevels.end() ) if( i != defaultLogLevels.end() )
deflevel = i->second; deflevel = i->second;
...@@ -359,8 +370,10 @@ std::string LogServer::onCommand( LogSession* s, LogServerTypes::Command cmd, co ...@@ -359,8 +370,10 @@ std::string LogServer::onCommand( LogSession* s, LogServerTypes::Command cmd, co
{ {
Debug::type deflevel = Debug::NONE; Debug::type deflevel = Debug::NONE;
auto i = defaultLogLevels.find(elog.get()); auto i = defaultLogLevels.find(elog.get());
if( i != defaultLogLevels.end() ) if( i != defaultLogLevels.end() )
deflevel = i->second; deflevel = i->second;
s << elog->getLogName() << " [" << Debug::str(deflevel) << " ]" << endl; s << elog->getLogName() << " [" << Debug::str(deflevel) << " ]" << endl;
} }
......
...@@ -54,7 +54,7 @@ LogSession::~LogSession() ...@@ -54,7 +54,7 @@ LogSession::~LogSession()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _cmdTimeout, timeout_t _checkConnectionTime ): LogSession::LogSession( int sfd, std::shared_ptr<DebugStream>& _log, timeout_t _cmdTimeout, timeout_t _checkConnectionTime ):
cmdTimeout(_cmdTimeout), cmdTimeout(_cmdTimeout),
checkConnectionTime(_checkConnectionTime/1000.), checkConnectionTime(_checkConnectionTime / 1000.),
peername(""), peername(""),
caddr(""), caddr(""),
log(_log) log(_log)
...@@ -103,24 +103,28 @@ void LogSession::logOnEvent( const std::string& s ) ...@@ -103,24 +103,28 @@ void LogSession::logOnEvent( const std::string& s )
if( cancelled || s.empty() ) if( cancelled || s.empty() )
return; return;
{ // чтобы поменьше удерживать mutex {
// чтобы поменьше удерживать mutex
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
// собираем статистику.. // собираем статистику..
// -------------------------- // --------------------------
if( s.size() < minSizeMsg || minSizeMsg==0 ) if( s.size() < minSizeMsg || minSizeMsg == 0 )
minSizeMsg = s.size(); minSizeMsg = s.size();
if( s.size() > maxSizeMsg ) if( s.size() > maxSizeMsg )
maxSizeMsg = s.size(); maxSizeMsg = s.size();
if( logbuf.size() > maxCount ) if( logbuf.size() > maxCount )
maxCount = logbuf.size(); maxCount = logbuf.size();
// -------------------------- // --------------------------
// проверяем на переполнение.. // проверяем на переполнение..
if( logbuf.size() >= maxRecordsNum ) if( logbuf.size() >= maxRecordsNum )
{ {
numLostMsg++; numLostMsg++;
if( !lostMsg ) if( !lostMsg )
{ {
ostringstream err; ostringstream err;
...@@ -173,6 +177,7 @@ void LogSession::terminate() ...@@ -173,6 +177,7 @@ void LogSession::terminate()
{ {
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
while( !logbuf.empty() ) while( !logbuf.empty() )
logbuf.pop(); logbuf.pop();
} }
...@@ -282,6 +287,7 @@ void LogSession::writeEvent( ev::io& watcher ) ...@@ -282,6 +287,7 @@ void LogSession::writeEvent( ev::io& watcher )
{ {
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
if( logbuf.empty() ) if( logbuf.empty() )
{ {
io.set(EV_NONE); io.set(EV_NONE);
...@@ -479,7 +485,8 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes:: ...@@ -479,7 +485,8 @@ void LogSession::cmdProcessing( const string& cmdLogName, const LogServerTypes::
try try
{ {
std::string ret( std::move(m_command_sig.emit(this,msg.cmd,cmdLogName)) ); std::string ret( std::move(m_command_sig.emit(this, msg.cmd, cmdLogName)) );
if( !ret.empty() ) if( !ret.empty() )
{ {
{ {
...@@ -522,6 +529,7 @@ void LogSession::onCheckConnectionTimer( ev::timer& watcher, int revents ) ...@@ -522,6 +529,7 @@ void LogSession::onCheckConnectionTimer( ev::timer& watcher, int revents )
} }
std::unique_lock<std::mutex> lk(logbuf_mutex); std::unique_lock<std::mutex> lk(logbuf_mutex);
if( !logbuf.empty() ) if( !logbuf.empty() )
{ {
checkConnectionTimer.start( checkConnectionTime ); // restart timer checkConnectionTimer.start( checkConnectionTime ); // restart timer
......
...@@ -124,7 +124,7 @@ static pid_t g_stacktrace_proc_pid = 0; // pid процесса делающег ...@@ -124,7 +124,7 @@ static pid_t g_stacktrace_proc_pid = 0; // pid процесса делающег
#define MAXFRAMES 64 #define MAXFRAMES 64
// выделение специального стека заранее // выделение специального стека заранее
// +60 - это на всякие переменные при обработке stack trace и т.п. // +60 - это на всякие переменные при обработке stack trace и т.п.
static char g_stack_body[(MAXFRAMES+60)*FUNCNAMESIZE]; static char g_stack_body[(MAXFRAMES + 60)*FUNCNAMESIZE];
static char trace_buf[10000]; static char trace_buf[10000];
static stack_t g_sigseg_stack; static stack_t g_sigseg_stack;
static void on_stacktrace_timeout(); // поток для защиты от зависания "процесса создания stack trace" static void on_stacktrace_timeout(); // поток для защиты от зависания "процесса создания stack trace"
...@@ -215,7 +215,7 @@ bool gdb_print_trace() ...@@ -215,7 +215,7 @@ bool gdb_print_trace()
char pid_buf[30]; char pid_buf[30];
sprintf(pid_buf, "%d", getpid()); sprintf(pid_buf, "%d", getpid());
char name_buf[512]; char name_buf[512];
name_buf[readlink("/proc/self/exe", name_buf, 511)]=0; name_buf[readlink("/proc/self/exe", name_buf, 511)] = 0;
TRACELOG << "stack trace: for " << name_buf << " pid=" << pid_buf << endl; TRACELOG << "stack trace: for " << name_buf << " pid=" << pid_buf << endl;
...@@ -230,7 +230,7 @@ bool gdb_print_trace() ...@@ -230,7 +230,7 @@ bool gdb_print_trace()
if( child_pid == 0 ) // CHILD if( child_pid == 0 ) // CHILD
{ {
msleep(300); // пауза чтобы родитель успел подготовиться.. msleep(300); // пауза чтобы родитель успел подготовиться..
dup2(2,1); // redirect output to stderr dup2(2, 1); // redirect output to stderr
if( g_act && !g_act->getAbortScript().empty() ) if( g_act && !g_act->getAbortScript().empty() )
{ {
...@@ -257,7 +257,7 @@ bool gdb_print_trace() ...@@ -257,7 +257,7 @@ bool gdb_print_trace()
g_trace_done = false; g_trace_done = false;
std::thread t(on_stacktrace_timeout); // запускаем поток "защищающий" от зависания процесса создания stack trace std::thread t(on_stacktrace_timeout); // запускаем поток "защищающий" от зависания процесса создания stack trace
waitpid(child_pid,NULL,0); waitpid(child_pid, NULL, 0);
g_trace_done = true; g_trace_done = true;
g_trace_doneevent.notify_all(); g_trace_doneevent.notify_all();
...@@ -281,7 +281,7 @@ static void on_stacktrace_timeout() ...@@ -281,7 +281,7 @@ static void on_stacktrace_timeout()
if( !g_trace_done ) if( !g_trace_done )
{ {
ulogsys << "****** STACK TRACE TIMEOUT.. (kill process) *******" << endl << flush; ulogsys << "****** STACK TRACE TIMEOUT.. (kill process) *******" << endl << flush;
kill(g_stacktrace_proc_pid,SIGKILL); kill(g_stacktrace_proc_pid, SIGKILL);
} }
else else
ulogsys << "****** STACK TRACE guard thread finish ******" << endl << flush; ulogsys << "****** STACK TRACE guard thread finish ******" << endl << flush;
...@@ -523,7 +523,7 @@ void UniSetActivator::init() ...@@ -523,7 +523,7 @@ void UniSetActivator::init()
_noUseGdbForStackTrace = ( findArgParam("--uniset-no-use-gdb-for-stacktrace", conf->getArgc(), conf->getArgv()) != -1 ); _noUseGdbForStackTrace = ( findArgParam("--uniset-no-use-gdb-for-stacktrace", conf->getArgc(), conf->getArgv()) != -1 );
abortScript = conf->getArgParam("--uniset-abort-script",""); abortScript = conf->getArgParam("--uniset-abort-script", "");
orb = conf->getORB(); orb = conf->getORB();
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA"); CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
...@@ -800,7 +800,7 @@ void UniSetActivator::set_signals(bool ask) ...@@ -800,7 +800,7 @@ void UniSetActivator::set_signals(bool ask)
sigemptyset(&act.sa_mask); sigemptyset(&act.sa_mask);
sigaddset(&act.sa_mask, SIGSEGV); sigaddset(&act.sa_mask, SIGSEGV);
act.sa_flags = 0; act.sa_flags = 0;
// act.sa_flags |= SA_RESTART; // act.sa_flags |= SA_RESTART;
act.sa_flags |= SA_RESETHAND; act.sa_flags |= SA_RESETHAND;
#if 1 #if 1
......
...@@ -130,6 +130,7 @@ void UniSetObject::initObject() ...@@ -130,6 +130,7 @@ void UniSetObject::initObject()
auto conf = uniset_conf(); auto conf = uniset_conf();
int sz = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000); int sz = conf->getArgPInt("--uniset-object-size-message-queue", conf->getField("SizeOfMessageQueue"), 1000);
if( sz > 0 ) if( sz > 0 )
setMaxSizeOfMessageQueue(sz); setMaxSizeOfMessageQueue(sz);
...@@ -187,6 +188,7 @@ VoidMessagePtr UniSetObject::receiveMessage() ...@@ -187,6 +188,7 @@ VoidMessagePtr UniSetObject::receiveMessage()
VoidMessagePtr UniSetObject::waitMessage( timeout_t timeMS ) VoidMessagePtr UniSetObject::waitMessage( timeout_t timeMS )
{ {
auto m = receiveMessage(); auto m = receiveMessage();
if( m ) if( m )
return m; return m;
...@@ -323,6 +325,7 @@ void UniSetObject::setThreadPriority( int p ) ...@@ -323,6 +325,7 @@ void UniSetObject::setThreadPriority( int p )
void UniSetObject::push( const TransportMessage& tm ) void UniSetObject::push( const TransportMessage& tm )
{ {
auto vm = make_shared<VoidMessage>(tm); auto vm = make_shared<VoidMessage>(tm);
if( vm->priority == Message::Medium ) if( vm->priority == Message::Medium )
mqueueMedium.push(vm); mqueueMedium.push(vm);
else if( vm->priority == Message::High ) else if( vm->priority == Message::High )
...@@ -556,6 +559,7 @@ void UniSetObject::callback() ...@@ -556,6 +559,7 @@ void UniSetObject::callback()
try try
{ {
auto m = waitMessage(sleepTime); auto m = waitMessage(sleepTime);
if( m ) if( m )
processingMessage(m.get()); processingMessage(m.get());
......
...@@ -44,26 +44,28 @@ void MQAtomic::push( const VoidMessagePtr& vm ) ...@@ -44,26 +44,28 @@ void MQAtomic::push( const VoidMessagePtr& vm )
if( wpos < rpos ) if( wpos < rpos )
{ {
// только надо привести к одному масштабу // только надо привести к одному масштабу
unsigned long w = wpos%SizeOfMessageQueue; unsigned long w = wpos % SizeOfMessageQueue;
unsigned long r = rpos%SizeOfMessageQueue; unsigned long r = rpos % SizeOfMessageQueue;
if( lostStrategy == lostNewData && (r-w) >= SizeOfMessageQueue ) if( lostStrategy == lostNewData && (r - w) >= SizeOfMessageQueue )
{ {
stCountOfLostMessages++; stCountOfLostMessages++;
return; return;
} }
} }
// ----------------------------------------------- // -----------------------------------------------
// сперва надо сдвинуть счётчик (чтобы следующий поток уже писал в новое место) // сперва надо сдвинуть счётчик (чтобы следующий поток уже писал в новое место)
unsigned long w = wpos.fetch_add(1); unsigned long w = wpos.fetch_add(1);
// а потом уже добавлять новое сообщение в "зарезервированное" место // а потом уже добавлять новое сообщение в "зарезервированное" место
mqueue[w%SizeOfMessageQueue] = vm; mqueue[w % SizeOfMessageQueue] = vm;
qpos.fetch_add(1); // теперь увеличиваем реальное количество элементов в очереди qpos.fetch_add(1); // теперь увеличиваем реальное количество элементов в очереди
// ведём статистику // ведём статистику
size_t sz = qpos - rpos; size_t sz = qpos - rpos;
if( sz > stMaxQueueMessages ) if( sz > stMaxQueueMessages )
stMaxQueueMessages = sz; stMaxQueueMessages = sz;
} }
...@@ -90,7 +92,7 @@ VoidMessagePtr MQAtomic::top() ...@@ -90,7 +92,7 @@ VoidMessagePtr MQAtomic::top()
{ {
// сперва надо сдвинуть счётчик (чтобы следующий поток уже работал с следующим значением) // сперва надо сдвинуть счётчик (чтобы следующий поток уже работал с следующим значением)
unsigned long r = rpos.fetch_add(1); unsigned long r = rpos.fetch_add(1);
return mqueue[r%SizeOfMessageQueue]; return mqueue[r % SizeOfMessageQueue];
} }
// Если rpos > qpos, значит qpos уже перешёл через максимум // Если rpos > qpos, значит qpos уже перешёл через максимум
...@@ -98,8 +100,8 @@ VoidMessagePtr MQAtomic::top() ...@@ -98,8 +100,8 @@ VoidMessagePtr MQAtomic::top()
if( rpos > qpos ) // делаем if каждый раз, т.к. qpos может уже поменяться в параллельном потоке if( rpos > qpos ) // делаем if каждый раз, т.к. qpos может уже поменяться в параллельном потоке
{ {
// приводим к одному масштабу // приводим к одному масштабу
unsigned long w = qpos%SizeOfMessageQueue; unsigned long w = qpos % SizeOfMessageQueue;
unsigned long r = rpos%SizeOfMessageQueue; unsigned long r = rpos % SizeOfMessageQueue;
if( lostStrategy == lostOldData && (r - w) >= SizeOfMessageQueue ) if( lostStrategy == lostOldData && (r - w) >= SizeOfMessageQueue )
{ {
...@@ -109,7 +111,7 @@ VoidMessagePtr MQAtomic::top() ...@@ -109,7 +111,7 @@ VoidMessagePtr MQAtomic::top()
// продолжаем читать как обычно // продолжаем читать как обычно
r = rpos.fetch_add(1); r = rpos.fetch_add(1);
return mqueue[r%SizeOfMessageQueue]; return mqueue[r % SizeOfMessageQueue];
} }
return nullptr; return nullptr;
...@@ -155,7 +157,8 @@ void MQAtomic::mqFill( const VoidMessagePtr& v ) ...@@ -155,7 +157,8 @@ void MQAtomic::mqFill( const VoidMessagePtr& v )
{ {
mqueue.reserve(SizeOfMessageQueue); mqueue.reserve(SizeOfMessageQueue);
mqueue.clear(); mqueue.clear();
for( size_t i=0; i<SizeOfMessageQueue; i++ )
for( size_t i = 0; i < SizeOfMessageQueue; i++ )
mqueue.push_back(v); mqueue.push_back(v);
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
......
...@@ -36,9 +36,10 @@ void MQMutex::push( const VoidMessagePtr& vm ) ...@@ -36,9 +36,10 @@ void MQMutex::push( const VoidMessagePtr& vm )
// проверяем переполнение, только если стратегия "терять новые данные" // проверяем переполнение, только если стратегия "терять новые данные"
// иначе нет смысла проверять, а можно просто писать новые данные затирая старые // иначе нет смысла проверять, а можно просто писать новые данные затирая старые
// (sz+1) - т.к мы смотрим есть ли место для новых данных // (sz+1) - т.к мы смотрим есть ли место для новых данных
if( (sz+1) > SizeOfMessageQueue ) if( (sz + 1) > SizeOfMessageQueue )
{ {
stCountOfLostMessages++; stCountOfLostMessages++;
if( lostStrategy == lostNewData ) if( lostStrategy == lostNewData )
return; return;
...@@ -49,6 +50,7 @@ void MQMutex::push( const VoidMessagePtr& vm ) ...@@ -49,6 +50,7 @@ void MQMutex::push( const VoidMessagePtr& vm )
mqueue.push_back(vm); mqueue.push_back(vm);
sz++; sz++;
if( sz > stMaxQueueMessages ) if( sz > stMaxQueueMessages )
stMaxQueueMessages = sz; stMaxQueueMessages = sz;
} }
......
...@@ -16,13 +16,13 @@ const size_t COUNT = 1000000; // сколько сообщений помест ...@@ -16,13 +16,13 @@ const size_t COUNT = 1000000; // сколько сообщений помест
// поток записи // поток записи
void mq_write_thread() void mq_write_thread()
{ {
SensorMessage smsg(100,2); SensorMessage smsg(100, 2);
TransportMessage tm( std::move(smsg.transport_msg()) ); TransportMessage tm( std::move(smsg.transport_msg()) );
auto vm = make_shared<VoidMessage>(tm); auto vm = make_shared<VoidMessage>(tm);
msleep(100); msleep(100);
for( size_t i=0; i<COUNT; i++ ) for( size_t i = 0; i < COUNT; i++ )
{ {
mq.push(vm); mq.push(vm);
} }
...@@ -40,6 +40,7 @@ int one_test() ...@@ -40,6 +40,7 @@ int one_test()
while( rnum < COUNT ) while( rnum < COUNT )
{ {
auto m = mq.top(); auto m = mq.top();
if( m ) if( m )
rnum++; rnum++;
} }
...@@ -59,16 +60,16 @@ int main(int argc, const char** argv) ...@@ -59,16 +60,16 @@ int main(int argc, const char** argv)
int tnum = 10; int tnum = 10;
// чтобы не происходило переполнение // чтобы не происходило переполнение
mq.setMaxSizeOfMessageQueue(COUNT+1); mq.setMaxSizeOfMessageQueue(COUNT + 1);
// сперва просто проверка что очередь работает. // сперва просто проверка что очередь работает.
{ {
SensorMessage sm(100,2); SensorMessage sm(100, 2);
TransportMessage tm( std::move(sm.transport_msg()) ); TransportMessage tm( std::move(sm.transport_msg()) );
auto vm = make_shared<VoidMessage>(tm); auto vm = make_shared<VoidMessage>(tm);
mq.push(vm); mq.push(vm);
auto msg = mq.top(); auto msg = mq.top();
assert( msg!=nullptr ); assert( msg != nullptr );
SensorMessage sm2( msg.get() ); SensorMessage sm2( msg.get() );
assert( sm.id == sm2.id ); assert( sm.id == sm2.id );
} }
...@@ -76,14 +77,15 @@ int main(int argc, const char** argv) ...@@ -76,14 +77,15 @@ int main(int argc, const char** argv)
vector<int> res; vector<int> res;
res.reserve(tnum); res.reserve(tnum);
for( int i=0; i<tnum; i++ ) for( int i = 0; i < tnum; i++ )
{ {
res.push_back(one_test()); res.push_back(one_test());
} }
// вычисляем среднее // вычисляем среднее
int sum = 0; int sum = 0;
for( auto&& r: res )
for( auto && r : res )
sum += r; sum += r;
float avg = (float)sum / tnum; float avg = (float)sum / tnum;
......
...@@ -12,9 +12,9 @@ class TestUObject: ...@@ -12,9 +12,9 @@ class TestUObject:
public: public:
TestUObject( UniSetTypes::ObjectId id, xmlNode* cnode ): TestUObject( UniSetTypes::ObjectId id, xmlNode* cnode ):
UniSetObject(id){} UniSetObject(id) {}
virtual ~TestUObject(){}; virtual ~TestUObject() {};
// специальные функции для проведения тестирования // специальные функции для проведения тестирования
inline VoidMessagePtr getOneMessage() inline VoidMessagePtr getOneMessage()
...@@ -28,7 +28,7 @@ class TestUObject: ...@@ -28,7 +28,7 @@ class TestUObject:
} }
protected: protected:
TestUObject(){}; TestUObject() {};
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // TestUObject_H_ #endif // TestUObject_H_
......
...@@ -11,9 +11,9 @@ ...@@ -11,9 +11,9 @@
#define TEST_MQ_ATOMIC 1 #define TEST_MQ_ATOMIC 1
#ifdef TEST_MQ_ATOMIC #ifdef TEST_MQ_ATOMIC
typedef MQAtomic UMessageQueue; typedef MQAtomic UMessageQueue;
#else #else
typedef MQMutex UMessageQueue; typedef MQMutex UMessageQueue;
#endif #endif
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -44,7 +44,7 @@ using namespace UniSetTypes; ...@@ -44,7 +44,7 @@ using namespace UniSetTypes;
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
static void pushMessage( UMessageQueue& mq, long id ) static void pushMessage( UMessageQueue& mq, long id )
{ {
SensorMessage sm(id,id); SensorMessage sm(id, id);
sm.consumer = id; // чтобы хоть как-то идентифицировать сообщений, используем поле consumer sm.consumer = id; // чтобы хоть как-то идентифицировать сообщений, используем поле consumer
TransportMessage tm( std::move(sm.transport_msg()) ); TransportMessage tm( std::move(sm.transport_msg()) );
auto vm = make_shared<VoidMessage>(tm); auto vm = make_shared<VoidMessage>(tm);
...@@ -66,10 +66,10 @@ TEST_CASE( "UMessageQueue: simple push/top", "[mqueue]" ) ...@@ -66,10 +66,10 @@ TEST_CASE( "UMessageQueue: simple push/top", "[mqueue]" )
UMessageQueue mq; UMessageQueue mq;
pushMessage(mq,100); pushMessage(mq, 100);
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg != nullptr );
REQUIRE( msg->consumer == 100 ); REQUIRE( msg->consumer == 100 );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -82,21 +82,21 @@ TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" ) ...@@ -82,21 +82,21 @@ TEST_CASE( "UMessageQueue: overflow (lost old data)", "[mqueue]" )
mq.setLostStrategy( UMessageQueue::lostOldData ); mq.setLostStrategy( UMessageQueue::lostOldData );
pushMessage(mq,100); pushMessage(mq, 100);
REQUIRE( mq.size() == 1 ); REQUIRE( mq.size() == 1 );
pushMessage(mq,110); pushMessage(mq, 110);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
pushMessage(mq,120); pushMessage(mq, 120);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg != nullptr );
REQUIRE( msg->consumer == 110 ); REQUIRE( msg->consumer == 110 );
msg = mq.top(); msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg != nullptr );
REQUIRE( msg->consumer == 120 ); REQUIRE( msg->consumer == 120 );
REQUIRE( mq.getCountOfLostMessages() == 1 ); REQUIRE( mq.getCountOfLostMessages() == 1 );
...@@ -111,31 +111,31 @@ TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" ) ...@@ -111,31 +111,31 @@ TEST_CASE( "UMessageQueue: overflow (lost new data)", "[mqueue]" )
mq.setLostStrategy( UMessageQueue::lostNewData ); mq.setLostStrategy( UMessageQueue::lostNewData );
pushMessage(mq,100); pushMessage(mq, 100);
REQUIRE( mq.size() == 1 ); REQUIRE( mq.size() == 1 );
pushMessage(mq,110); pushMessage(mq, 110);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
pushMessage(mq,120); pushMessage(mq, 120);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
REQUIRE( mq.getCountOfLostMessages() == 1 ); REQUIRE( mq.getCountOfLostMessages() == 1 );
pushMessage(mq,130); pushMessage(mq, 130);
REQUIRE( mq.size() == 2 ); REQUIRE( mq.size() == 2 );
REQUIRE( mq.getCountOfLostMessages() == 2 ); REQUIRE( mq.getCountOfLostMessages() == 2 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg != nullptr );
REQUIRE( msg->consumer == 100 ); REQUIRE( msg->consumer == 100 );
msg = mq.top(); msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg != nullptr );
REQUIRE( msg->consumer == 110 ); REQUIRE( msg->consumer == 110 );
msg = mq.top(); msg = mq.top();
REQUIRE( msg==nullptr ); REQUIRE( msg == nullptr );
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
TEST_CASE( "UMessageQueue: many read", "[mqueue]" ) TEST_CASE( "UMessageQueue: many read", "[mqueue]" )
...@@ -146,17 +146,17 @@ TEST_CASE( "UMessageQueue: many read", "[mqueue]" ) ...@@ -146,17 +146,17 @@ TEST_CASE( "UMessageQueue: many read", "[mqueue]" )
mq.setMaxSizeOfMessageQueue(1); mq.setMaxSizeOfMessageQueue(1);
mq.setLostStrategy( UMessageQueue::lostNewData ); mq.setLostStrategy( UMessageQueue::lostNewData );
pushMessage(mq,100); pushMessage(mq, 100);
REQUIRE( mq.size() == 1 ); REQUIRE( mq.size() == 1 );
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg!=nullptr ); REQUIRE( msg != nullptr );
REQUIRE( msg->consumer == 100 ); REQUIRE( msg->consumer == 100 );
for( int i=0; i<5; i++ ) for( int i = 0; i < 5; i++ )
{ {
auto msg = mq.top(); auto msg = mq.top();
REQUIRE( msg==nullptr ); REQUIRE( msg == nullptr );
} }
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
...@@ -171,15 +171,16 @@ TEST_CASE( "UMessageQueue: correct operation", "[mqueue]" ) ...@@ -171,15 +171,16 @@ TEST_CASE( "UMessageQueue: correct operation", "[mqueue]" )
const size_t num = 1000; const size_t num = 1000;
UMessageQueue mq; UMessageQueue mq;
mq.setMaxSizeOfMessageQueue(num+1); mq.setMaxSizeOfMessageQueue(num + 1);
size_t rnum = 0; size_t rnum = 0;
for( size_t i=0; i<num; i++ )
for( size_t i = 0; i < num; i++ )
{ {
pushMessage(mq,i); pushMessage(mq, i);
// каждые 50 читатем, имитируя реальную работу (чтение между записью) // каждые 50 читатем, имитируя реальную работу (чтение между записью)
if( i%50 ) if( i % 50 )
{ {
auto m = mq.top(); auto m = mq.top();
REQUIRE( m->consumer == rnum ); REQUIRE( m->consumer == rnum );
...@@ -215,9 +216,9 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostOldData)", "[mqueue]" ) ...@@ -215,9 +216,9 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostOldData)", "[mqueue]" )
mq.set_rpos(max); mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять // При переходе через максимум ничего не должны потерять
pushMessage(mq,100); pushMessage(mq, 100);
pushMessage(mq,110); pushMessage(mq, 110);
pushMessage(mq,120); pushMessage(mq, 120);
auto m = mq.top(); auto m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
...@@ -241,9 +242,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostOldData)", "[mqueue]" ) ...@@ -241,9 +242,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostOldData)", "[mqueue]" )
mq.setLostStrategy(MQAtomic::lostOldData); mq.setLostStrategy(MQAtomic::lostOldData);
mq.setMaxSizeOfMessageQueue(2); mq.setMaxSizeOfMessageQueue(2);
pushMessage(mq,100); pushMessage(mq, 100);
pushMessage(mq,110); pushMessage(mq, 110);
pushMessage(mq,120); pushMessage(mq, 120);
auto m = mq.top(); auto m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
...@@ -261,9 +262,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostOldData)", "[mqueue]" ) ...@@ -261,9 +262,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostOldData)", "[mqueue]" )
mq.set_rpos(max); mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять // При переходе через максимум ничего не должны потерять
pushMessage(mq,140); pushMessage(mq, 140);
pushMessage(mq,150); pushMessage(mq, 150);
pushMessage(mq,160); pushMessage(mq, 160);
m = mq.top(); m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
...@@ -289,9 +290,9 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" ) ...@@ -289,9 +290,9 @@ TEST_CASE( "UMessageQueue: overflow index (strategy=lostNewData)", "[mqueue]" )
mq.set_rpos(max); mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять // При переходе через максимум ничего не должны потерять
pushMessage(mq,100); pushMessage(mq, 100);
pushMessage(mq,110); pushMessage(mq, 110);
pushMessage(mq,120); pushMessage(mq, 120);
auto m = mq.top(); auto m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
...@@ -315,9 +316,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostNewData)", "[mqueue]" ) ...@@ -315,9 +316,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostNewData)", "[mqueue]" )
mq.setLostStrategy(MQAtomic::lostNewData); mq.setLostStrategy(MQAtomic::lostNewData);
mq.setMaxSizeOfMessageQueue(2); mq.setMaxSizeOfMessageQueue(2);
pushMessage(mq,100); pushMessage(mq, 100);
pushMessage(mq,110); pushMessage(mq, 110);
pushMessage(mq,120); pushMessage(mq, 120);
auto m = mq.top(); auto m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
...@@ -335,9 +336,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostNewData)", "[mqueue]" ) ...@@ -335,9 +336,9 @@ TEST_CASE( "UMessageQueue: lost data (strategy=lostNewData)", "[mqueue]" )
mq.set_rpos(max); mq.set_rpos(max);
// При переходе через максимум ничего не должны потерять // При переходе через максимум ничего не должны потерять
pushMessage(mq,140); pushMessage(mq, 140);
pushMessage(mq,150); pushMessage(mq, 150);
pushMessage(mq,160); pushMessage(mq, 160);
m = mq.top(); m = mq.top();
REQUIRE( m != nullptr ); REQUIRE( m != nullptr );
......
...@@ -23,14 +23,14 @@ void initTest() ...@@ -23,14 +23,14 @@ void initTest()
if( !uobj ) if( !uobj )
{ {
uobj = make_object<TestUObject>("TestUObject1","TestUObject"); uobj = make_object<TestUObject>("TestUObject1", "TestUObject");
REQUIRE( uobj != nullptr ); REQUIRE( uobj != nullptr );
} }
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
static void pushMessage( long id, Message::Priority p ) static void pushMessage( long id, Message::Priority p )
{ {
SensorMessage sm(id,id); SensorMessage sm(id, id);
sm.priority = p; sm.priority = p;
sm.consumer = id; // чтобы хоть как-то идентифицировать сообщений, используем поле consumer sm.consumer = id; // чтобы хоть как-то идентифицировать сообщений, используем поле consumer
TransportMessage tm( std::move(sm.transport_msg()) ); TransportMessage tm( std::move(sm.transport_msg()) );
...@@ -46,11 +46,11 @@ TEST_CASE( "UObject: priority messages", "[uobject]" ) ...@@ -46,11 +46,11 @@ TEST_CASE( "UObject: priority messages", "[uobject]" )
* Хотя в реальности, оно должно совпадать с id объекта получателя. * Хотя в реальности, оно должно совпадать с id объекта получателя.
*/ */
pushMessage(100,Message::Low); pushMessage(100, Message::Low);
pushMessage(101,Message::Low); pushMessage(101, Message::Low);
pushMessage(200,Message::Medium); pushMessage(200, Message::Medium);
pushMessage(300,Message::High); pushMessage(300, Message::High);
pushMessage(301,Message::High); pushMessage(301, Message::High);
// теперь проверяем что сперва вынули Hi // теперь проверяем что сперва вынули Hi
// но так же контролируем что порядок извлечения правильный // но так же контролируем что порядок извлечения правильный
...@@ -70,7 +70,7 @@ TEST_CASE( "UObject: priority messages", "[uobject]" ) ...@@ -70,7 +70,7 @@ TEST_CASE( "UObject: priority messages", "[uobject]" )
REQUIRE( m->priority == Message::Low ); REQUIRE( m->priority == Message::Low );
REQUIRE( m->consumer == 100 ); REQUIRE( m->consumer == 100 );
pushMessage(201,Message::Medium); pushMessage(201, Message::Medium);
m = uobj->getOneMessage(); m = uobj->getOneMessage();
REQUIRE( m->priority == Message::Medium ); REQUIRE( m->priority == Message::Medium );
REQUIRE( m->consumer == 201 ); REQUIRE( m->consumer == 201 );
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment