Commit 256ad9f4 authored by Pavel Vainerman's avatar Pavel Vainerman

(MBSlave): добавил сигналы в ModbusTCPServer для возможности делать

какие-то действия непосредственно при получении запроса, а также добавил ведение статистики обмена непосредственн в ModbusServer.
parent 3b0f410b
......@@ -283,6 +283,12 @@
</item>
<item id="112" iotype="DI" name="MQTT_DI_S" textname="MQTT test DI" mqtt="1"/>
<item id="2052" mbtcp="3" mbaddr="1" mbreg="263" iotype="AI" name="AIForThresholds" textname="Тестовый регистр для проверки пороговых датчиков"/>
<item id="2053" mbtcp="3" mbaddr="1" iotype="DI" name="Threshold1" textname="Порог N1" threshold_aid="AIForThresholds" lowlimit="20" hilimit="25" />
<item id="2054" mbtcp="3" mbaddr="1" iotype="DI" name="Threshold2" textname="Порог N2" threshold_aid="AIForThresholds" lowlimit="20" hilimit="25" threshold_invert="1"/>
<item id="2055" mbtcp="3" mbaddr="2" mbreg="263" iotype="AI" name="AIMBADD2" textname="Тестовый регистр для проверки чтения по второму устройству"/>
</sensors>
<thresholds name="thresholds">
<sensor iotype="AI" name="AI_AS">
......
......@@ -3,10 +3,10 @@
# See doc: http://www.gnu.org/software/hello/manual/autoconf/Generic-Programs.html
# AC_PREREQ(2.59)
AC_INIT([uniset2], [2.2.1], pv@etersoft.ru)
AC_INIT([uniset2], [2.3.0], pv@etersoft.ru)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME,AC_PACKAGE_VERSION)
LIBVER=2:2:1
LIBVER=2:3:0
AC_SUBST(LIBVER)
# AC_CONFIG_MACRO_DIR([m4])
......
......@@ -2128,9 +2128,7 @@ bool MBExchange::initRSProperty( RSProperty& p, UniXML::iterator& it )
// тогда заносим его в отдельный список
if( p.t_ai != DefaultObjectId )
{
// испольуем конструктор копирования
// IOBase b( *(static_cast<IOBase*>(&p)));
thrlist.push_back( std::move(p) );
thrlist.emplace_back( std::move(p) );
return true;
}
......
......@@ -16,7 +16,7 @@ using namespace UniSetExtensions;
// --------------------------------------------------------------------------
std::shared_ptr<SharedMemory> shm;
// --------------------------------------------------------------------------
int main(int argc, const char* argv[] )
int main( int argc, const char* argv[] )
{
try
{
......
......@@ -550,8 +550,6 @@ void MBSlave::execute_rtu()
{
auto rscomm = dynamic_pointer_cast<ModbusRTUSlaveSlot>(mbslot);
ModbusRTU::mbErrCode prev = erNoError;
// ждём чтобы прошла инициализация
// потому-что нужно чтобы наполнилась таблица адресов (vaddr)
if( !shm->isLocalwork() )
......@@ -583,18 +581,6 @@ void MBSlave::execute_rtu()
if( res != ModbusRTU::erTimeOut )
ptTimeout.reset();
// собираем статистику обмена
if( prev != ModbusRTU::erTimeOut )
{
// с проверкой на переполнение
askCount = askCount >= numeric_limits<long>::max() ? 0 : askCount + 1;
if( res != ModbusRTU::erNoError )
++errmap[res];
}
prev = res;
if( res != ModbusRTU::erNoError && res != ModbusRTU::erTimeOut )
mbwarn << myname << "(execute_rtu): " << ModbusRTU::mbErr2Str(res) << endl;
......@@ -602,12 +588,7 @@ void MBSlave::execute_rtu()
continue;
updateStatistics();
for( auto && rmap : iomap )
{
for( auto && it : rmap.second )
IOBase::processingThreshold(&it.second, shm, force);
}
updateThresholds();
}
catch( std::exception& ex )
{
......@@ -642,8 +623,11 @@ void MBSlave::execute_tcp()
// Чтобы не создавать отдельный поток для tcpserver (или для обновления статистики)
// воспользуемся таймером tcpserver-а..
tcpserver->setTimer(updateStatTime);
tcpserver->signal_timer().connect( sigc::mem_fun(this, &MBSlave::updateTCPStatistics) );
tcpserver->setTimer(updateStatTime);
// для обновления пороговых датчиков
tcpserver->signal_post_receive().connect( sigc::mem_fun(this, &MBSlave::postReceiveEvent) );
mbinfo << myname << "(execute_tcp): run tcpserver.." << endl;
......@@ -728,10 +712,21 @@ void MBSlave::updateTCPStatistics()
return;
// Обновляем информацию по соединениям
sess.clear();
tcpserver->getSessions(sess);
{
uniset_mutex_lock l(sessMutex);
sess.clear();
tcpserver->getSessions(sess);
}
askCount = tcpserver->getAskCount();
for( auto && s : sess )
// если список сессий не пустой.. значит связь есть..
if( !sess.empty() )
ptTimeout.reset(); // см. updateStatistics()
// суммарное количество по всем
askCount = 0;
for( const auto& s : sess )
{
if( !activated || cancelled )
return;
......@@ -799,6 +794,26 @@ void MBSlave::updateTCPStatistics()
}
}
// -------------------------------------------------------------------------
void MBSlave::updateThresholds()
{
for( auto&& i: thrlist )
{
try
{
IOBase::processingThreshold(&i, shm, force);
}
catch( std::exception& ex )
{
mbwarn << myname << "(updateThresholds): " << ex.what() << endl;
}
}
}
// -------------------------------------------------------------------------
void MBSlave::postReceiveEvent( ModbusRTU::mbErrCode res )
{
updateThresholds();
}
// -------------------------------------------------------------------------
void MBSlave::sysCommand( const UniSetTypes::SystemMessage* sm )
{
switch( sm->command )
......@@ -1091,6 +1106,13 @@ bool MBSlave::initItem( UniXML::iterator& it )
if( !IOBase::initItem( static_cast<IOBase*>(&p), it, shm, prop_prefix, false, mblog, myname) )
return false;
if( p.t_ai != DefaultObjectId )
{
// это пороговый датчик.. вносим его в список
thrlist.emplace_back(std::move(p));
return true;
}
std::string s_mbaddr = IOBase::initProp(it, "mbaddr", prop_prefix, false, default_mbaddr);
if( s_mbaddr.empty() )
......@@ -2441,6 +2463,10 @@ ModbusRTU::mbErrCode MBSlave::forceSingleCoil( ModbusRTU::ForceSingleCoilMessage
ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query,
ModbusRTU::DiagnosticRetMessage& reply )
{
auto mbserver = dynamic_pointer_cast<ModbusServer>(mbslot);
if( !mbserver )
return ModbusRTU::erHardwareError;
if( query.subf == ModbusRTU::subEcho )
{
reply = query;
......@@ -2450,7 +2476,7 @@ ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query,
if( query.subf == ModbusRTU::dgBusErrCount )
{
reply = query;
reply.data[0] = errmap[ModbusRTU::erBadCheckSum];
reply.data[0] = mbserver->getErrCount(ModbusRTU::erBadCheckSum);
return ModbusRTU::erNoError;
}
......@@ -2464,15 +2490,15 @@ ModbusRTU::mbErrCode MBSlave::diagnostics( ModbusRTU::DiagnosticMessage& query,
if( query.subf == ModbusRTU::dgSlaveNAKCount )
{
reply = query;
reply.data[0] = errmap[erOperationFailed];
reply.data[0] = mbserver->getErrCount(erOperationFailed);
return ModbusRTU::erNoError;
}
if( query.subf == ModbusRTU::dgClearCounters )
{
askCount = 0;
errmap[erOperationFailed] = 0;
errmap[ModbusRTU::erBadCheckSum] = 0;
mbserver->resetAskCounter();
mbserver->resetErrCount(erOperationFailed,0);
mbserver->resetErrCount(ModbusRTU::erBadCheckSum,0);
// другие счётчики пока не сбрасываем..
reply = query;
return ModbusRTU::erNoError;
......@@ -2552,7 +2578,13 @@ UniSetTypes::SimpleInfo* MBSlave::getInfo( CORBA::Long userparam )
if( tcpserver )
{
inf << "TCP Clients: " << endl;
size_t snum = 0;
{
uniset_mutex_lock l(sessMutex);
snum = sess.size();
}
inf << "TCP Clients[" << snum << "]:" << endl;
for( const auto& m : cmap )
inf << " " << m.second.getShortInfo() << endl;
......
......@@ -302,19 +302,6 @@
*/
// -----------------------------------------------------------------------------
namespace std
{
template<>
class hash<ModbusRTU::mbErrCode>
{
public:
size_t operator()(const ModbusRTU::mbErrCode& e) const
{
return std::hash<int>()((int)e);
}
};
}
// -----------------------------------------------------------------------------
/*! Реализация slave-интерфейса */
class MBSlave:
public UniSetObject
......@@ -471,6 +458,11 @@ class MBSlave:
IOMap iomap; /*!< список входов/выходов по адресам */
// т.к. пороговые датчики не связаны напрямую с обменом, создаём для них отдельный список
// и отдельно его проверяем потом
typedef std::list<IOBase> ThresholdList;
ThresholdList thrlist;
std::shared_ptr<ModbusServerSlot> mbslot;
std::unordered_set<ModbusRTU::ModbusAddr> vaddr; /*!< адреса данного узла */
std::string default_mbaddr = { "" };
......@@ -490,6 +482,8 @@ class MBSlave:
virtual void execute_tcp();
virtual void updateStatistics();
virtual void updateTCPStatistics();
virtual void updateThresholds();
virtual void postReceiveEvent( ModbusRTU::mbErrCode res );
virtual bool activateObject() override;
virtual bool deactivateObject() override;
......@@ -542,8 +536,6 @@ class MBSlave:
PassiveTimer ptTimeout;
long askCount = { 0 };
typedef std::unordered_map<ModbusRTU::mbErrCode, unsigned int> ExchangeErrorMap;
ExchangeErrorMap errmap; /*!< статистика обмена */
std::atomic_bool activated = { false };
std::atomic_bool cancelled = { false };
......@@ -588,6 +580,7 @@ class MBSlave:
timeout_t sessTimeout; /*!< таймаут на сессию */
timeout_t updateStatTime;
ModbusTCPServer::Sessions sess; /*!< список открытых сессий */
UniSetTypes::uniset_mutex sessMutex;
unsigned int sessMaxNum;
std::shared_ptr<ModbusTCPServerSlot> tcpserver;
......
......@@ -199,9 +199,13 @@
<item id="2050" mbs="1" mbreg="259" iotype="AO" vtype="F2" name="Test_CountWrite10_F2" textname="Тестовый регистр для 0x10(F2)"/>
<item id="2051" mbs="1" mbreg="261" iotype="AO" vtype="F2" precision="1" name="Test_CountWrite10_F2prec" textname="Тестовый регистр для 0x10(F2)(prec)"/>
<item id="2053" mbs="1" mbaddr="0x02" mbreg="130" default="1" iotype="AO" name="Test_SlaveAddr2" textname="Тестовый регистр для проверки работы со вторым адресом (mbreg)"/>
<!-- thresholds -->
<item id="2054" mbs="1" mbreg="263" iotype="AI" name="AIForThresholds" textname="Тестовый регистр для проверки пороговых датчиков"/>
<item id="2055" mbs="1" iotype="DI" name="Threshold1" textname="Порог N1" threshold_aid="AIForThresholds" lowlimit="20" hilimit="25" />
<item id="2056" mbs="1" iotype="DI" name="Threshold2" textname="Порог N2" threshold_aid="AIForThresholds" lowlimit="20" hilimit="25" threshold_invert="1"/>
<item id="10000" iotype="DI" name="TestMode_S" textname="Тестовый датчик"/>
......
......@@ -1184,11 +1184,9 @@ TEST_CASE("(0x10): write register outputs or memories [F2](precision)", "[modbus
// -------------------------------------------------------------
TEST_CASE("Multi adress check", "[modbus][mbslave][multiaddress]")
{
using namespace VTypes;
InitTest();
ModbusRTU::ModbusData tREG = 130;
int num = 10;
ModbusRTU::ReadOutputRetMessage ret = mb->read03(slaveaddr, tREG, 1);
REQUIRE( ret.data[0] == 1 );
......@@ -1197,4 +1195,35 @@ TEST_CASE("Multi adress check", "[modbus][mbslave][multiaddress]")
REQUIRE( ret2.data[0] == 1 );
}
// -------------------------------------------------------------
TEST_CASE("Thresholds", "[modbus][mbslave][thresholds]")
{
InitTest();
ModbusRTU::ModbusData tREG = 263;
// формируем порог
ModbusRTU::WriteSingleOutputRetMessage ret = mb->write06(slaveaddr, tREG, 26);
REQUIRE( ret.start == tREG );
REQUIRE( ret.data == 26 );
REQUIRE( ui->getValue(2054) == 26 );
REQUIRE( ui->getValue(2055) == 1 );
REQUIRE( ui->getValue(2056) == 0 );
// отпускаем порог
ret = mb->write06(slaveaddr, tREG, 19);
REQUIRE( ret.start == tREG );
REQUIRE( ret.data == 19 );
REQUIRE( ui->getValue(2054) == 19 );
REQUIRE( ui->getValue(2055) == 0 );
REQUIRE( ui->getValue(2056) == 1 );
// формируем порог
ret = mb->write06(slaveaddr, tREG, 500);
REQUIRE( ret.start == tREG );
REQUIRE( ret.data == 500 );
REQUIRE( ui->getValue(2054) == 500 );
REQUIRE( ui->getValue(2055) == 1 );
REQUIRE( ui->getValue(2056) == 0 );
}
// -------------------------------------------------------------
/*! \todo Доделать тесты на считывание с разными prop_prefix.. */
......@@ -32,8 +32,6 @@ class ModbusRTUSlave:
void setSpeed( const std::string& s );
ComPort::Speed getSpeed();
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
virtual void cleanupChannel() override
{
if(port) port->cleanupChannel();
......@@ -44,6 +42,8 @@ class ModbusRTUSlave:
protected:
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
// realisation (see ModbusServer.h)
virtual size_t getNextData( unsigned char* buf, int len ) override;
virtual void setChannelTimeout( timeout_t msec ) override;
......
......@@ -4,6 +4,8 @@
// -------------------------------------------------------------------------
#include <string>
#include <unordered_set>
#include <unordered_map>
#include <sigc++/sigc++.h>
#include "Debug.h"
#include "Mutex.h"
......@@ -11,6 +13,19 @@
#include "PassiveTimer.h"
#include "ModbusTypes.h"
// -------------------------------------------------------------------------
namespace std
{
template<>
class hash<ModbusRTU::mbErrCode>
{
public:
size_t operator()(const ModbusRTU::mbErrCode& e) const
{
return std::hash<int>()(e);
}
};
}
// -------------------------------------------------------------------------
/*! Modbus server interface */
class ModbusServer
{
......@@ -32,11 +47,25 @@ class ModbusServer
\param msecTimeout - время ожидания прихода очередного сообщения в мсек.
\return Возвращает код ошибки из ModbusRTU::mbErrCode
*/
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) = 0;
ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout );
// версия с "одним" адресом
virtual ModbusRTU::mbErrCode receive_one( const ModbusRTU::ModbusAddr addr, timeout_t msec );
// ---------------------------------------------------------------------------------------
// сигналы по обработке событий приёма сообщения.
// ---------------------------------------------------------------------------------------
// сигнал вызова receive, ДО обработки realReceive()
// \return ModbusRTU::errNoError, тогда обработка продолжиться.
typedef sigc::signal<ModbusRTU::mbErrCode,const std::unordered_set<ModbusRTU::ModbusAddr>, timeout_t> PreReceiveSignal;
PreReceiveSignal signal_pre_receive();
// сигнал после обработки realReceive()
typedef sigc::signal<void,ModbusRTU::mbErrCode> PostReceiveSignal;
PostReceiveSignal signal_post_receive();
// ---------------------------------------------------------------------------------------
/*! Проверка входит ли данный адрес в список
* \param vaddr - вектор адресов
* \param addr - адрес который ищем
......@@ -115,8 +144,21 @@ class ModbusServer
virtual bool isAcive() = 0;
// ------------ Статистика ---------------
typedef std::unordered_map<ModbusRTU::mbErrCode, size_t> ExchangeErrorMap;
ExchangeErrorMap getErrorMap();
size_t getErrCount( ModbusRTU::mbErrCode e );
size_t resetErrCount( ModbusRTU::mbErrCode e, size_t set=0 );
inline size_t getAskCount() { return askCount; }
void resetAskCounter();
protected:
/*! реализация получения очередного сообщения */
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) = 0;
/*! Обработка запроса на чтение данных (0x01).
\param query - запрос
\param reply - ответ. Заполняется в обработчике.
......@@ -244,7 +286,6 @@ class ModbusServer
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) = 0;
/*! set timeout for receive data */
virtual void setChannelTimeout( timeout_t msec ) = 0;
......@@ -281,6 +322,13 @@ class ModbusServer
std::shared_ptr<DebugStream> dlog;
// статистика сервера
size_t askCount;
ExchangeErrorMap errmap; /*!< статистика ошибок обмена */
PreReceiveSignal m_pre_signal;
PostReceiveSignal m_post_signal;
private:
};
......
......@@ -31,16 +31,13 @@ class ModbusTCPServer:
ModbusTCPServer( ost::InetAddress& ia, int port = 502 );
virtual ~ModbusTCPServer();
// функция receive пока не поддерживается...
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override;
/*! Запуск сервера
* \param thread - создавать ли отдельный поток
*/
void run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, bool thread = false );
void setMaxSessions( unsigned int num );
inline unsigned int getMaxSessions()
void setMaxSessions( size_t num );
inline size_t getMaxSessions()
{
return maxSessions;
}
......@@ -53,7 +50,7 @@ class ModbusTCPServer:
}
/*! текущее количество подключений */
unsigned getCountSessions();
size_t getCountSessions();
inline void setIgnoreAddrMode( bool st )
{
......@@ -108,6 +105,9 @@ class ModbusTCPServer:
protected:
// функция receive пока не поддерживается...
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override;
virtual void mainLoop();
virtual void ioAccept(ev::io& watcher, int revents);
void onTimer( ev::timer& t, int revents );
......@@ -139,8 +139,8 @@ class ModbusTCPServer:
bool ignoreAddr = { false };
unsigned int maxSessions = { 100 };
unsigned int sessCount = { 0 };
size_t maxSessions = { 100 };
size_t sessCount = { 0 };
timeout_t sessTimeout = { 10000 }; // msec
......@@ -157,6 +157,9 @@ class ModbusTCPServer:
double tmTime = { 0.0 };
private:
// транслирование сигналов от Sessions..
void postReceiveEvent( ModbusRTU::mbErrCode res );
ModbusRTU::mbErrCode preReceiveEvent( const std::unordered_set<ModbusRTU::ModbusAddr> vaddr, timeout_t tout );
std::atomic_bool cancelled;
};
......
......@@ -37,20 +37,13 @@ class ModbusTCPSession:
void cleanInputStream();
virtual void cleanupChannel() override
{
cleanInputStream();
}
virtual void cleanupChannel() override;
virtual void terminate() override;
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
typedef sigc::slot<void, ModbusTCPSession*> FinalSlot;
void connectFinalSession( FinalSlot sl );
unsigned int getAskCount();
inline std::string getClientAddress()
{
return caddr;
......@@ -58,11 +51,15 @@ class ModbusTCPSession:
void setSessionTimeout( double t );
// запуск обработки входящих запросов
void run();
virtual bool isAcive() override;
protected:
virtual ModbusRTU::mbErrCode realReceive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
// -------------------------------------------
// author:
// Buffer class - allow for output buffering such that it can be written out into async pieces
......@@ -173,10 +170,6 @@ class ModbusTCPSession:
FinalSlot slFin;
std::atomic_bool cancelled = { false };
// статистика
UniSetTypes::uniset_rwmutex mAsk;
unsigned int askCount = { 0 };
};
// -------------------------------------------------------------------------
#endif // ModbusTCPSession_H_
......
......@@ -79,52 +79,7 @@ ModbusRTUSlave::~ModbusRTUSlave()
delete port;
}
// -------------------------------------------------------------------------
mbErrCode ModbusRTUSlave::receive(const std::unordered_set<ModbusAddr>& vmbaddr, timeout_t timeout )
{
uniset_mutex_lock lck(recvMutex, timeout);
if( !lck.lock_ok() )
{
if( dlog->is_crit() )
dlog->crit() << "(ModbusRTUSlave::receive): Don`t lock " << recvMutex << endl;
return erTimeOut;
}
ModbusMessage buf;
mbErrCode res = erBadReplyNodeAddress;
do
{
res = recv(vmbaddr, buf, timeout);
if( res != erNoError && res != erBadReplyNodeAddress )
{
// Если ошибка подразумевает посылку ответа с сообщением об ошибке
// то посылаем
if( res < erInternalErrorCode )
{
ErrorRetMessage em( buf.addr, buf.func, res );
buf = em.transport_msg();
send(buf);
printProcessingTime();
}
if( aftersend_msec > 0 )
msleep(aftersend_msec);
// usleep(10000);
return res;
}
// если полученный пакет адресован
// не данному узлу (и не широковещательный)
// то ждать следующий...
}
while( res == erBadReplyNodeAddress );
return processing(buf);
}
// --------------------------------------------------------------------------------
ComPort::Speed ModbusRTUSlave::getSpeed()
{
......@@ -191,3 +146,51 @@ bool ModbusRTUSlave::isAcive()
return false;
}
// -------------------------------------------------------------------------
mbErrCode ModbusRTUSlave::realReceive(const std::unordered_set<ModbusAddr>& vmbaddr, timeout_t timeout )
{
{
uniset_mutex_lock lck(recvMutex, timeout);
if( !lck.lock_ok() )
{
if( dlog->is_crit() )
dlog->crit() << "(ModbusRTUSlave::receive): Don`t lock " << recvMutex << endl;
return erTimeOut;
}
ModbusMessage buf;
mbErrCode res = erBadReplyNodeAddress;
do
{
res = recv(vmbaddr, buf, timeout);
if( res != erNoError && res != erBadReplyNodeAddress )
{
// Если ошибка подразумевает посылку ответа с сообщением об ошибке
// то посылаем
if( res < erInternalErrorCode )
{
ErrorRetMessage em( buf.addr, buf.func, res );
buf = em.transport_msg();
send(buf);
printProcessingTime();
}
if( aftersend_msec > 0 )
msleep(aftersend_msec);
return res;
}
// если полученный пакет адресован
// не данному узлу (и не широковещательный)
// то ждать следующий...
}
while( res == erBadReplyNodeAddress );
return processing(buf);
}
}
// -------------------------------------------------------------------------
......@@ -48,7 +48,6 @@ ModbusServer::ModbusServer():
dlog->addLevel(Debug::CRIT);
dlog->addLevel(Debug::INFO);
*/
}
// -------------------------------------------------------------------------
......@@ -577,7 +576,6 @@ mbErrCode ModbusServer::processing( ModbusMessage& buf )
printProcessingTime();
return erUnExpectedPacketType;
}
// -------------------------------------------------------------------------
mbErrCode ModbusServer::recv( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, ModbusMessage& rbuf, timeout_t timeout )
{
......@@ -1521,13 +1519,56 @@ std::unordered_set<ModbusAddr> ModbusServer::addr2vaddr(ModbusAddr& mbaddr)
return std::move(v);
}
// -------------------------------------------------------------------------
mbErrCode ModbusServer::receive(const std::unordered_set<ModbusAddr>& vaddr, timeout_t msecTimeout)
{
// приходиться делать специальное исключение для кода erSessionClosed
// т.к. это означает, что клиент ничего не посылал (read=0) и отвалился (закрыл канал).
// т.е. собственно события receive и не было..
// Это актуально для TCPServe-а.. (но вообще это получается какой-то архитектурный изъян)
mbErrCode ret = erNoError;
if( !m_pre_signal.empty() )
{
ret = m_pre_signal.emit(vaddr,msecTimeout);
if( ret != erNoError && ret != erSessionClosed )
{
errmap[ret] +=1;
return ret;
}
}
ret = realReceive(vaddr,msecTimeout);
// собираем статистику..
if( ret != erTimeOut && ret != erSessionClosed )
askCount++;
if( ret != erNoError && ret != erSessionClosed )
errmap[ret] +=1;
if( ret != erSessionClosed )
m_post_signal.emit(ret);
return ret;
}
// -------------------------------------------------------------------------
mbErrCode ModbusServer::receive_one( ModbusAddr a, timeout_t msec )
{
auto v = addr2vaddr(a);
return receive(v, msec);
}
// -------------------------------------------------------------------------
ModbusServer::PreReceiveSignal ModbusServer::signal_pre_receive()
{
return m_pre_signal;
}
// -------------------------------------------------------------------------
ModbusServer::PostReceiveSignal ModbusServer::signal_post_receive()
{
return m_post_signal;
}
// -------------------------------------------------------------------------
void ModbusServer::initLog( UniSetTypes::Configuration* conf,
const std::string& lname, const string& logfile )
{
......@@ -1618,6 +1659,37 @@ ModbusRTU::mbErrCode ModbusServer::replyFileTransfer( const std::string& fname,
return ModbusRTU::erNoError;
}
// -------------------------------------------------------------------------
ModbusServer::ExchangeErrorMap ModbusServer::getErrorMap()
{
ExchangeErrorMap m(errmap);
return std::move(m);
}
// -------------------------------------------------------------------------
size_t ModbusServer::getErrCount( mbErrCode e )
{
auto i = errmap.find(e);
if( i == errmap.end() )
return 0;
return i->second;
}
// -------------------------------------------------------------------------
size_t ModbusServer::resetErrCount( mbErrCode e, size_t set )
{
auto i = errmap.find(e);
if( i == errmap.end() )
return 0;
size_t ret = i->second;
i->second = set;
return ret;
}
// -------------------------------------------------------------------------
void ModbusServer::resetAskCounter()
{
askCount = 0;
}
// -------------------------------------------------------------------------
ModbusRTU::mbErrCode ModbusServer::replySetDateTime( ModbusRTU::SetDateTimeMessage& query,
ModbusRTU::SetDateTimeRetMessage& reply,
std::shared_ptr<DebugStream> dlog )
......
......@@ -67,7 +67,7 @@ void ModbusTCPServer::setMaxSessions( unsigned int num )
maxSessions = num;
}
// -------------------------------------------------------------------------
unsigned ModbusTCPServer::getCountSessions()
size_t ModbusTCPServer::getCountSessions()
{
return sessCount;
}
......@@ -280,6 +280,8 @@ void ModbusTCPServer::ioAccept(ev::io& watcher, int revents)
s->setSleepPause(sleepPause_usec);
s->setCleanBeforeSend(cleanBeforeSend);
s->setSessionTimeout( (double)sessTimeout / 1000. );
s->signal_post_receive().connect( sigc::mem_fun(this, &ModbusTCPServer::postReceiveEvent) );
s->signal_pre_receive().connect( sigc::mem_fun(this, &ModbusTCPServer::preReceiveEvent) );
s->setLog(dlog);
s->connectFinalSession( sigc::mem_fun(this, &ModbusTCPServer::sessionFinished) );
......@@ -323,8 +325,21 @@ void ModbusTCPServer::onTimer( ev::timer& t, int revents )
}
// -------------------------------------------------------------------------
mbErrCode ModbusTCPServer::receive( const std::unordered_set<ModbusAddr>& vaddr, timeout_t msecTimeout )
mbErrCode ModbusTCPServer::realReceive( const std::unordered_set<ModbusAddr>& vaddr, timeout_t msecTimeout )
{
return ModbusRTU::erOperationFailed;
}
// -------------------------------------------------------------------------
void ModbusTCPServer::postReceiveEvent( mbErrCode res )
{
m_post_signal.emit(res);
}
// -------------------------------------------------------------------------
mbErrCode ModbusTCPServer::preReceiveEvent(const std::unordered_set<ModbusAddr> vaddr, timeout_t tout)
{
if( m_pre_signal.empty() )
return ModbusRTU::erNoError;
return m_pre_signal.emit(vaddr,tout);
}
// -------------------------------------------------------------------------
......@@ -46,8 +46,7 @@ ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr
timeout(timeout),
peername(""),
caddr(""),
cancelled(false),
askCount(0)
cancelled(false)
{
try
{
......@@ -82,12 +81,6 @@ ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr
ioTimeout.set<ModbusTCPSession, &ModbusTCPSession::onTimeout>(this);
}
// -------------------------------------------------------------------------
unsigned int ModbusTCPSession::getAskCount()
{
uniset_rwmutex_rlock l(mAsk);
return askCount;
}
void ModbusTCPSession::setSessionTimeout( double t )
{
sessTimeout = t;
......@@ -155,19 +148,8 @@ void ModbusTCPSession::onTimeout( ev::timer& watcher, int revents )
// -------------------------------------------------------------------------
void ModbusTCPSession::readEvent( ev::io& watcher )
{
ModbusRTU::mbErrCode res = receive(vaddr, timeout);
if( res == erSessionClosed )
{
if( receive(vaddr, timeout) == erSessionClosed )
cancelled = true;
return;
}
if( res != erTimeOut )
{
uniset_rwmutex_wrlock l(mAsk);
askCount++;
}
}
// -------------------------------------------------------------------------
void ModbusTCPSession::writeEvent( ev::io& watcher )
......@@ -196,7 +178,7 @@ void ModbusTCPSession::writeEvent( ev::io& watcher )
}
}
// -------------------------------------------------------------------------
ModbusRTU::mbErrCode ModbusTCPSession::receive( const std::unordered_set<ModbusAddr>& vmbaddr, timeout_t msec )
ModbusRTU::mbErrCode ModbusTCPSession::realReceive( const std::unordered_set<ModbusAddr>& vmbaddr, timeout_t msec )
{
ModbusRTU::mbErrCode res = erTimeOut;
ptTimeout.setTiming(msec);
......@@ -396,6 +378,11 @@ void ModbusTCPSession::cleanInputStream()
while( ret > 0);
}
// -------------------------------------------------------------------------
void ModbusTCPSession::cleanupChannel()
{
cleanInputStream();
}
// -------------------------------------------------------------------------
void ModbusTCPSession::terminate()
{
if( dlog->is_info() )
......
......@@ -22,4 +22,5 @@ src/Log
Utilities/codegen
Utilities/codegen/tests
Utilities/MBTester
Utilities/NullController
\ No newline at end of file
Utilities/NullController
/usr/include/sigc++-2.0/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment