Commit 4aad5897 authored by Pavel Vainerman's avatar Pavel Vainerman

(ModbusSlave): работа над многопоточным ModbusTCPSlave

parent 72bc51ee
......@@ -79,6 +79,14 @@
<MBSlave2 addr="0x3">
</MBSlave2>
<MBMultiSlave1 addr="0x31" aftersend-pause="0" name="MBMultiSlave1" poll_time="200" reply_timeout="60">
<clients>
<item ip="localhost"/>
</clients>
</MBMultiSlave1>
<MBSlave1 addr="0x31" aftersend-pause="0" dev="/dev/ttyS0" levels="info,warn,crit" name="MBSlave1" poll_time="200" reply_timeout="60" speed="9600">
<filelist>
<!-- Список файлов разрешённых для передачи по modbus
......@@ -318,6 +326,7 @@
<item id="6060" name="TestProc49"/>
<item id="6061" name="TestProc50"/>
<item id="6062" name="MBSlave2"/>
<item id="6063" name="MBMultiSlave1"/>
</objects>
</ObjectsMap>
<messages idfromfile="1" name="messages">
......
......@@ -11,8 +11,7 @@ using namespace UniSetTypes;
using namespace UniSetExtensions;
using namespace ModbusRTU;
// -----------------------------------------------------------------------------
MBSlave::MBSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId,
SharedMemory* ic, string prefix ):
MBSlave::MBSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic, const string& prefix ):
UniSetObject_LT(objId),
mbslot(0),
shm(0),
......@@ -888,7 +887,7 @@ void MBSlave::help_print( int argc, const char* const* argv )
}
// -----------------------------------------------------------------------------
MBSlave* MBSlave::init_mbslave( int argc, const char* const* argv, UniSetTypes::ObjectId icID, SharedMemory* ic,
string prefix )
const string& prefix )
{
string name = conf->getArgParam("--" + prefix + "-name","MBSlave1");
if( name.empty() )
......
......@@ -23,13 +23,13 @@ class MBSlave:
public UniSetObject_LT
{
public:
MBSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0, std::string prefix="mbs" );
MBSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0, const std::string& prefix="mbs" );
virtual ~MBSlave();
/*! глобальная функция для инициализации объекта */
static MBSlave* init_mbslave( int argc, const char* const* argv,
UniSetTypes::ObjectId shmID, SharedMemory* ic=0,
std::string prefix="mbs" );
UniSetTypes::ObjectId shmID, SharedMemory* ic=0,
const std::string& prefix="mbs" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* const* argv );
......@@ -140,15 +140,15 @@ class MBSlave:
virtual void sensorInfo( const UniSetTypes::SensorMessage* sm ) override;
void askSensors( UniversalIO::UIOCommand cmd );
void waitSMReady();
void execute_rtu();
void execute_tcp();
virtual void execute_rtu();
virtual void execute_tcp();
virtual bool activateObject() override;
// действия при завершении работы
virtual void sigterm( int signo ) override;
void initIterators();
virtual void initIterators();
bool initItem( UniXML_iterator& it );
bool readItem( const UniXML& xml, UniXML_iterator& it, xmlNode* sec );
......@@ -162,7 +162,7 @@ class MBSlave:
ModbusRTU::mbErrCode real_read_it( IOMap::iterator& it, ModbusRTU::ModbusData& val );
ModbusRTU::mbErrCode real_write_it( IOMap::iterator& it, ModbusRTU::ModbusData& val );
private:
MBSlave();
bool initPause;
UniSetTypes::uniset_rwmutex mutex_start;
......
#include <cmath>
#include <sstream>
#include "Exceptions.h"
#include "Extensions.h"
#include "MBTCPMultiSlave.h"
#include "modbus/ModbusRTUSlaveSlot.h"
#include "modbus/ModbusTCPServerSlot.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
using namespace ModbusRTU;
// -----------------------------------------------------------------------------
MBTCPMultiSlave::MBTCPMultiSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, SharedMemory* ic, const string& prefix ):
MBSlave(objId,shmId,ic,prefix)
{
cnode = conf->getNode(myname);
if( cnode == NULL )
throw UniSetTypes::SystemError("(MBSlave): Not found conf-node for " + myname );
UniXML::iterator it(cnode);
waitTimeout = conf->getArgInt("--" + prefix + "-wait-timeout",it.getProp("waitTimeout"));
if( waitTimeout == 0 )
waitTimeout = 4000;
UniXML::iterator cit(it);
if( cit.find("clients") && cit.goChildren() )
{
for( ;cit; cit++ )
{
ClientInfo c;
c.iaddr = cit.getProp("ip");
if( c.iaddr.empty() )
{
ostringstream err;
err << myname << "(init): Unknown ip=''";
dcrit << err.str() << endl;
throw SystemError(err.str());
}
// resolve (если получиться)
ost::InetAddress ia(c.iaddr.c_str());
c.iaddr = string( ia.getHostname() );
if( !cit.getProp("respond").empty() )
{
c.respond_s = conf->getSensorID(cit.getProp("respond"));
if( c.respond_s == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): Not found sensor ID for " << cit.getProp("respond");
dcrit << err.str() << endl;
throw SystemError(err.str());
}
}
if( !cit.getProp("askcount").empty() )
{
c.askcount_s = conf->getSensorID(cit.getProp("askcount"));
if( c.askcount_s == DefaultObjectId )
{
ostringstream err;
err << myname << "(init): Not found sensor ID for " << cit.getProp("askcount");
dcrit << err.str() << endl;
throw SystemError(err.str());
}
}
c.invert = cit.getIntProp("invert");
if( !cit.getProp("timeout").empty() )
{
c.tout = cit.getIntProp("timeout");
c.ptTimeout.setTiming(c.tout);
}
cmap[c.iaddr] = c;
dinfo << myname << "(init): add client: " << c.iaddr << " respond=" << c.respond_s << " askcount=" << c.askcount_s << endl;
}
}
}
// -----------------------------------------------------------------------------
MBTCPMultiSlave::~MBTCPMultiSlave()
{
}
// -----------------------------------------------------------------------------
void MBTCPMultiSlave::help_print( int argc, const char* const* argv )
{
MBSlave::help_print(argc,argv);
}
// -----------------------------------------------------------------------------
MBTCPMultiSlave* MBTCPMultiSlave::init_mbslave( int argc, const char* const* argv, UniSetTypes::ObjectId icID, SharedMemory* ic,
const string& prefix )
{
string name = conf->getArgParam("--" + prefix + "-name","MBSlave1");
if( name.empty() )
{
cerr << "(mbslave): Не задан name'" << endl;
return 0;
}
ObjectId ID = conf->getObjectID(name);
if( ID == UniSetTypes::DefaultObjectId )
{
cerr << "(mbslave): идентификатор '" << name
<< "' не найден в конф. файле!"
<< " в секции " << conf->getObjectsSection() << endl;
return 0;
}
dinfo << "(mbslave): name = " << name << "(" << ID << ")" << endl;
return new MBTCPMultiSlave(ID,icID,ic,prefix);
}
// -----------------------------------------------------------------------------
void MBTCPMultiSlave::execute_tcp()
{
ModbusTCPServerSlot* sslot = dynamic_cast<ModbusTCPServerSlot*>(mbslot);
if( !sslot )
{
dcrit << myname << "(execute_tcp): DYNAMIC CAST ERROR (mbslot --> ModbusTCPServerSlot)" << std::endl;
raise(SIGTERM);
return;
}
if( dlog.debugging(Debug::LEVEL9) )
sslot->setLog(dlog);
for( auto &i: cmap )
i.second.ptTimeout.reset();
while(1)
{
try
{
sslot->waitQuery( addr, waitTimeout );
// Обновляем информацию по соединениям
sess.clear();
sslot->getSessions(sess);
for( auto& s: sess )
{
cerr << " find " << s.iaddr << endl;
auto i = cmap.find( s.iaddr );
if( i!=cmap.end() )
{
// если ещё в списке, значит отвечает (т.е. сбрасываем таймер)
if( i->second.tout == 0 )
i->second.ptTimeout.setTiming( waitTimeout );
else
i->second.ptTimeout.reset();
i->second.askCount = s.askCount;
}
}
// а теперь проходим по списку и выставляем датчики..
for( auto &it: cmap )
{
ClientInfo& c(it.second);
if( dlog.is_level4() )
dlog4 << myname << "(work): " << c.iaddr << " resp=" << (c.invert ? c.ptTimeout.checkTime() : !c.ptTimeout.checkTime())
<< " askcount=" << c.askCount
<< endl;
if( c.respond_s != DefaultObjectId )
{
try
{
bool st = c.invert ? c.ptTimeout.checkTime() : !c.ptTimeout.checkTime();
shm->localSetValue(c.respond_it,c.respond_s,st,getId());
}
catch(Exception& ex)
{
dcrit << myname << "(execute_tcp): " << ex << std::endl;
}
}
if( c.askcount_s != DefaultObjectId )
{
try
{
shm->localSetValue(c.askcount_it,c.askcount_s, c.askCount,getId());
}
catch(Exception& ex)
{
dcrit << myname << "(execute_tcp): " << ex << std::endl;
}
}
}
#if 0
if( res!=ModbusRTU::erTimeOut )
ptTimeout.reset();
// собираем статистику обмена
if( prev!=ModbusRTU::erTimeOut )
{
// с проверкой на переполнение
askCount = askCount>=numeric_limits<long>::max() ? 0 : askCount+1;
if( res!=ModbusRTU::erNoError )
errmap[res]++;
}
prev = res;
if( res!=ModbusRTU::erNoError && res!=ModbusRTU::erTimeOut )
dlog[Debug::WARN] << myname << "(execute_tcp): " << ModbusRTU::mbErr2Str(res) << endl;
#endif
if( !activated )
continue;
if( sidHeartBeat!=DefaultObjectId && ptHeartBeat.checkTime() )
{
try
{
shm->localSetValue(itHeartBeat,sidHeartBeat,maxHeartBeat,getId());
ptHeartBeat.reset();
}
catch(Exception& ex)
{
dcrit << myname << "(execute_tcp): (hb) " << ex << std::endl;
}
}
if( respond_id != DefaultObjectId )
{
bool state = ptTimeout.checkTime() ? false : true;
if( respond_invert )
state ^= true;
try
{
shm->localSetValue(itRespond,respond_id,(state ? 1 : 0),getId());
}
catch(Exception& ex)
{
dcrit << myname << "(execute_rtu): (respond) " << ex << std::endl;
}
}
if( askcount_id!=DefaultObjectId )
{
try
{
shm->localSetValue(itAskCount,askcount_id,askCount,getId());
}
catch(Exception& ex)
{
dcrit << myname << "(execute_rtu): (askCount) " << ex << std::endl;
}
}
}
catch(...){}
}
}
// -----------------------------------------------------------------------------
void MBTCPMultiSlave::initIterators()
{
MBSlave::initIterators();
for( auto &i: cmap )
i.second.initIterators(shm);
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
#ifndef _MBTCPMultiSlave_H_
#define _MBTCPMultiSlave_H_
// -----------------------------------------------------------------------------
#include <map>
#include "MBSlave.h"
#include "modbus/ModbusTCPServer.h"
// -----------------------------------------------------------------------------
/*!
<MBTCPMultiSlave ....>
<clients>
<item ip="" respond="" invert="1" askcount=""/>
<item ip="" respond="" invert="1" askcount=""/>
<item ip="" respond="" invert="1" askcount=""/>
</clients>
</MBTCPMultiSlave>
*/
// -----------------------------------------------------------------------------
/*! Реализация многоптоточного slave-интерфейса */
class MBTCPMultiSlave:
public MBSlave
{
public:
MBTCPMultiSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, SharedMemory* ic=0, const std::string& prefix="mbs" );
virtual ~MBTCPMultiSlave();
/*! глобальная функция для инициализации объекта */
static MBTCPMultiSlave* init_mbslave( int argc, const char* const* argv,
UniSetTypes::ObjectId shmID, SharedMemory* ic=0,
const std::string& prefix="mbs" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* const* argv );
protected:
virtual void execute_tcp() override;
virtual void initIterators() override;
timeout_t waitTimeout;
ModbusTCPServer::Sessions sess; /*!< список открытых сессий */
struct ClientInfo
{
ClientInfo():iaddr(""),respond_s(UniSetTypes::DefaultObjectId),invert(false),
askCount(0),askcount_s(UniSetTypes::DefaultObjectId){ ptTimeout.setTiming(0); }
std::string iaddr;
UniSetTypes::ObjectId respond_s;
IOController::IOStateList::iterator respond_it;
bool invert;
PassiveTimer ptTimeout;
timeout_t tout;
long askCount;
UniSetTypes::ObjectId askcount_s;
IOController::IOStateList::iterator askcount_it;
inline void initIterators( SMInterface* shm )
{
shm->initIterator( respond_it );
shm->initIterator( askcount_it );
}
};
typedef std::map<const std::string,ClientInfo> ClientsMap;
ClientsMap cmap;
};
// -----------------------------------------------------------------------------
#endif // _MBTCPMultiSlave_H_
// -----------------------------------------------------------------------------
bin_PROGRAMS = @PACKAGE@-mbslave
bin_PROGRAMS = @PACKAGE@-mbslave @PACKAGE@-mbtcp-multislave
# не забывайте править версию в2.pc-файле
UMBS_VER=@LIBVER@
......@@ -10,7 +10,7 @@ libUniSet2MBSlave_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
libUniSet2MBSlave_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS)
libUniSet2MBSlave_la_SOURCES = MBSlave.cc
libUniSet2MBSlave_la_SOURCES = MBSlave.cc MBTCPMultiSlave.cc
@PACKAGE@_mbslave_SOURCES = mbslave.cc
@PACKAGE@_mbslave_LDADD = libUniSet2MBSlave.la $(top_builddir)/lib/libUniSet2.la \
......@@ -19,6 +19,13 @@ libUniSet2MBSlave_la_SOURCES = MBSlave.cc
$(SIGC_LIBS)
@PACKAGE@_mbslave_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS)
@PACKAGE@_mbtcp_multislave_SOURCES = mbtcp-multislave.cc
@PACKAGE@_mbtcp_multislave_LDADD = libUniSet2MBSlave.la $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
@PACKAGE@_mbtcp_multislave_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions
......
// --------------------------------------------------------------------------
#include <sys/wait.h>
#include <sstream>
#include <string>
#include <cc++/socket.h>
#include "MBTCPMultiSlave.h"
#include "Configuration.h"
#include "Debug.h"
#include "UniSetActivator.h"
#include "Extensions.h"
// --------------------------------------------------------------------------
using namespace UniSetTypes;
using namespace UniSetExtensions;
using namespace std;
// --------------------------------------------------------------------------
int main(int argc, const char **argv)
{
if( argc>1 && (!strcmp(argv[1],"--help") || !strcmp(argv[1],"-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: autodetect" << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << "--mbs-logfile filename - logfilename. Default: mbslave.log" << endl;
cout << endl;
MBSlave::help_print(argc,argv);
return 0;
}
try
{
string confile = UniSetTypes::getArgParam( "--confile", argc, argv, "configure.xml" );
conf = new Configuration(argc, argv,confile);
string logfilename(conf->getArgParam("--mbs-logfile"));
if( logfilename.empty() )
logfilename = "mbslave.log";
std::ostringstream logname;
string dir(conf->getLogDir());
logname << dir << logfilename;
ulog.logFile( logname.str() );
dlog.logFile( logname.str() );
conf->initDebug(dlog,"dlog");
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
MBTCPMultiSlave* s = MBTCPMultiSlave::init_mbslave(argc,argv,shmID);
if( !s )
{
dcrit << "(mbslave): init не прошёл..." << endl;
return 1;
}
UniSetActivator* act = UniSetActivator::Instance();
act->addObject(static_cast<class UniSetObject*>(s));
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
ulog << "\n\n\n";
ulog << "(main): -------------- MBTCPMultiSlave START -------------------------\n\n";
dlog << "\n\n\n";
dlog << "(main): -------------- MBTCPMultiSlave START -------------------------\n\n";
act->run(false);
while( waitpid(-1, 0, 0) > 0 );
return 0;
}
catch( SystemError& err )
{
dcrit << "(mbslave): " << err << endl;
}
catch( Exception& ex )
{
dcrit << "(mbslave): " << ex << endl;
}
catch( std::exception& e )
{
dcrit << "(mbslave): " << e.what() << endl;
}
catch(...)
{
dcrit << "(mbslave): catch(...)" << endl;
}
while( waitpid(-1, 0, 0) > 0 );
return 1;
}
// --------------------------------------------------------------------------
#!/bin/sh
uniset2-start.sh -f ./uniset2-mbtcp-multislave --confile test.xml --dlog-add-levels level3,level4 \
--smemory-id SharedMemory \
--mbs-name MBMultiSlave1 --mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \
# --mbs-force 1
#--mbs-reg-from-id 1 \
#--mbs-filter-field CAN2sender --mbs-filter-value SYSTSNode \
\ No newline at end of file
../../conf/test.xml
\ No newline at end of file
......@@ -7,27 +7,28 @@
// -------------------------------------------------------------------------
namespace ModbusRTU
{
/*! Ошибки обмена
/*! Ошибки обмена
все ошибки > InternalErrorCode в сеть не посылаются...
*/
enum mbErrCode
{
erNoError = 0, /*!< нет ошибок */
erUnExpectedPacketType = 1, /*!< Неожидаемый тип пакета (ошибка кода функции) */
erBadDataAddress = 2, /*!< адрес запрещен к опросу или не существует */
erBadDataValue = 3, /*!< недопустимое значение */
erHardwareError = 4, /*!< ошибка оборудования */
erAnknowledge = 5, /*!< запрос принят в исполнению, но ещё не выполнен */
erSlaveBusy = 6, /*!< контроллер занят длительной операцией (повторить запрос позже) */
erOperationFailed = 7, /*!< запрашиваемая функция запрещена конфигурацией контроллера */
erMemoryParityError = 8, /*!< ошибка паритета при чтении памяти */
erNoError = 0, /*!< нет ошибок */
erUnExpectedPacketType = 1, /*!< Неожидаемый тип пакета (ошибка кода функции) */
erBadDataAddress = 2, /*!< адрес запрещен к опросу или не существует */
erBadDataValue = 3, /*!< недопустимое значение */
erHardwareError = 4, /*!< ошибка оборудования */
erAnknowledge = 5, /*!< запрос принят в исполнению, но ещё не выполнен */
erSlaveBusy = 6, /*!< контроллер занят длительной операцией (повторить запрос позже) */
erOperationFailed = 7, /*!< запрашиваемая функция запрещена конфигурацией контроллера */
erMemoryParityError = 8, /*!< ошибка паритета при чтении памяти */
erInternalErrorCode = 10, /*!< коды ошибок используемые для внутренней работы */
erInvalidFormat = 11, /*!< неправильный формат */
erBadCheckSum = 12, /*!< У пакета не сошлась контрольная сумма */
erBadReplyNodeAddress = 13, /*!< Ответ на запрос адресован не мне или от станции,которую не спрашивали */
erTimeOut = 14, /*!< Тайм-аут при приеме ответа */
erPacketTooLong = 15 /*!< пакет длинее буфера приема */
erInternalErrorCode = 10, /*!< коды ошибок используемые для внутренней работы */
erInvalidFormat = 11, /*!< неправильный формат */
erBadCheckSum = 12, /*!< У пакета не сошлась контрольная сумма */
erBadReplyNodeAddress = 13, /*!< Ответ на запрос адресован не мне или от станции,которую не спрашивали */
erTimeOut = 14, /*!< Тайм-аут при приеме ответа */
erPacketTooLong = 15, /*!< пакет длинее буфера приема */
erSessionClosed = 16 /*!< соединение закрыто */
};
// ---------------------------------------------------------------------
......@@ -42,7 +43,7 @@ namespace ModbusRTU
mbException( ModbusRTU::mbErrCode err ):
UniSetTypes::Exception(mbErr2Str(err)),err(err){}
ModbusRTU::mbErrCode err;
friend std::ostream& operator<<(std::ostream& os, mbException& ex )
......
......@@ -11,6 +11,7 @@
#include "PassiveTimer.h"
#include "ModbusTypes.h"
#include "ModbusServer.h"
#include "ModbusTCPSession.h"
// -------------------------------------------------------------------------
/*! ModbusTCP server */
......@@ -22,7 +23,20 @@ class ModbusTCPServer:
ModbusTCPServer( ost::InetAddress &ia, int port=502 );
virtual ~ModbusTCPServer();
virtual ModbusRTU::mbErrCode receive( ModbusRTU::ModbusAddr addr, timeout_t msecTimeout ) override;
/*! Однопоточная обработка (каждый запрос последовательно), с разрывом соединения в конце */
virtual ModbusRTU::mbErrCode receive( ModbusRTU::ModbusAddr mbaddr, timeout_t msecTimeout ) override;
/*! Многопоточная обработка (создаётся по потоку для каждого "клиента")
\ return TRUE - если запр*ос пришёл
\return FALSE - если timeout
*/
virtual bool waitQuery( ModbusRTU::ModbusAddr mbaddr, timeout_t msec = UniSetTimer::WaitUpTime );
void setMaxSessions( unsigned int num );
inline unsigned int getMaxSessions(){ return maxSessions; }
/*! текущее количество подключений */
unsigned getCountSessions();
inline void setIgnoreAddrMode( bool st ){ ignoreAddr = st; }
inline bool getIgnoreAddrMode(){ return ignoreAddr; }
......@@ -32,10 +46,23 @@ class ModbusTCPServer:
virtual void terminate() override;
// Сбор статистики по соединениям...
struct SessionInfo
{
SessionInfo( const std::string& a, unsigned int ask ):iaddr(a),askCount(ask){}
std::string iaddr;
unsigned int askCount;
};
typedef std::list<SessionInfo> Sessions;
void getSessions( Sessions& lst );
protected:
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request );
// virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request ) override;
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request ) override;
// realisation (see ModbusServer.h)
virtual int getNextData( unsigned char* buf, int len ) override;
......@@ -43,6 +70,7 @@ class ModbusTCPServer:
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override;
virtual ModbusRTU::mbErrCode tcp_processing( ost::TCPStream& tcp, ModbusTCP::MBAPHeader& mhead );
void sessionFinished( ModbusTCPSession* s );
ost::tpport_t port;
ost::TCPStream tcp;
......@@ -50,8 +78,15 @@ class ModbusTCPServer:
std::queue<unsigned char> qrecv;
ModbusTCP::MBAPHeader curQueryHeader;
typedef std::list<ModbusTCPSession*> SessionList;
UniSetTypes::uniset_mutex sMutex;
SessionList slist;
bool ignoreAddr;
unsigned int maxSessions;
unsigned int sessCount;
private:
};
......
// -------------------------------------------------------------------------
#ifndef ModbusTCPSession_H_
#define ModbusTCPSession_H_
// -------------------------------------------------------------------------
#include <string>
#include <queue>
#include <cc++/socket.h>
#include "ModbusServerSlot.h"
#include "ModbusServer.h"
#include "PassiveTimer.h"
// -------------------------------------------------------------------------
class ModbusTCPSession:
public ModbusServerSlot,
public ModbusServer,
public ost::TCPSession
{
public:
ModbusTCPSession( ost::TCPSocket &server, ModbusRTU::ModbusAddr mbaddr );
virtual ~ModbusTCPSession();
void cleanInputStream();
virtual void cleanupChannel(){ cleanInputStream(); }
virtual void terminate();
virtual ModbusRTU::mbErrCode receive( ModbusRTU::ModbusAddr addr, timeout_t msecTimeout );
typedef sigc::slot<void, ModbusTCPSession*> FinalSlot;
void connectFinalSession( FinalSlot sl );
unsigned int getAskCount();
inline std::string getClientAddress(){ return caddr; }
protected:
virtual void run();
virtual void final();
virtual int getNextData( unsigned char* buf, int len );
virtual void setChannelTimeout( timeout_t msec );
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len );
virtual ModbusRTU::mbErrCode tcp_processing( ost::TCPStream& tcp, ModbusTCP::MBAPHeader& mhead );
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode readCoilStatus( ModbusRTU::ReadCoilMessage& query,
ModbusRTU::ReadCoilRetMessage& reply );
virtual ModbusRTU::mbErrCode readInputStatus( ModbusRTU::ReadInputStatusMessage& query,
ModbusRTU::ReadInputStatusRetMessage& reply );
virtual ModbusRTU::mbErrCode readOutputRegisters( ModbusRTU::ReadOutputMessage& query,
ModbusRTU::ReadOutputRetMessage& reply );
virtual ModbusRTU::mbErrCode readInputRegisters( ModbusRTU::ReadInputMessage& query,
ModbusRTU::ReadInputRetMessage& reply );
virtual ModbusRTU::mbErrCode forceSingleCoil( ModbusRTU::ForceSingleCoilMessage& query,
ModbusRTU::ForceSingleCoilRetMessage& reply );
virtual ModbusRTU::mbErrCode writeOutputSingleRegister( ModbusRTU::WriteSingleOutputMessage& query,
ModbusRTU::WriteSingleOutputRetMessage& reply );
virtual ModbusRTU::mbErrCode forceMultipleCoils( ModbusRTU::ForceCoilsMessage& query,
ModbusRTU::ForceCoilsRetMessage& reply );
virtual ModbusRTU::mbErrCode writeOutputRegisters( ModbusRTU::WriteOutputMessage& query,
ModbusRTU::WriteOutputRetMessage& reply );
virtual ModbusRTU::mbErrCode diagnostics( ModbusRTU::DiagnosticMessage& query,
ModbusRTU::DiagnosticRetMessage& reply );
virtual ModbusRTU::mbErrCode read4314( ModbusRTU::MEIMessageRDI& query,
ModbusRTU::MEIMessageRetRDI& reply );
virtual ModbusRTU::mbErrCode journalCommand( ModbusRTU::JournalCommandMessage& query,
ModbusRTU::JournalCommandRetMessage& reply );
virtual ModbusRTU::mbErrCode setDateTime( ModbusRTU::SetDateTimeMessage& query,
ModbusRTU::SetDateTimeRetMessage& reply );
virtual ModbusRTU::mbErrCode remoteService( ModbusRTU::RemoteServiceMessage& query,
ModbusRTU::RemoteServiceRetMessage& reply );
virtual ModbusRTU::mbErrCode fileTransfer( ModbusRTU::FileTransferMessage& query,
ModbusRTU::FileTransferRetMessage& reply );
private:
std::queue<unsigned char> qrecv;
ModbusTCP::MBAPHeader curQueryHeader;
ModbusRTU::ModbusAddr addr;
PassiveTimer ptTimeout;
ModbusRTU::ModbusMessage buf;
bool ignoreAddr;
std::string peername;
std::string caddr;
FinalSlot slFin;
// статистика
UniSetTypes::uniset_rwmutex mAsk;
unsigned int askCount;
};
// -------------------------------------------------------------------------
#endif // ModbusTCPSession_H_
// -------------------------------------------------------------------------
......@@ -2,7 +2,7 @@
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libModbus.la
libModbus_la_SOURCES = ModbusTypes.cc ModbusHelpers.cc \
libModbus_la_SOURCES = ModbusTypes.cc ModbusHelpers.cc ModbusTCPSession.cc UTCPStream.cc \
ModbusClient.cc ModbusServer.cc ModbusServerSlot.cc \
ModbusRTUSlave.cc ModbusRTUSlaveSlot.cc ModbusRTUMaster.cc \
ModbusTCPCore.cc ModbusTCPServer.cc ModbusTCPServerSlot.cc ModbusTCPMaster.cc TCPCheck.cc
......
......@@ -11,7 +11,9 @@ using namespace UniSetTypes;
ModbusTCPServer::ModbusTCPServer( ost::InetAddress &ia, int port ):
TCPSocket(ia,port),
iaddr(ia),
ignoreAddr(false)
ignoreAddr(false),
maxSessions(10),
sessCount(0)
{
setCRCNoCheckit(true);
}
......@@ -22,6 +24,90 @@ ModbusTCPServer::~ModbusTCPServer()
terminate();
}
// -------------------------------------------------------------------------
void ModbusTCPServer::setMaxSessions( unsigned int num )
{
if( num < sessCount )
{
uniset_mutex_lock l(sMutex);
int k = sessCount - num;
int d = 0;
for( SessionList::reverse_iterator i=slist.rbegin(); d<k && i!=slist.rend(); ++i,d++ )
delete *i;
sessCount = num;
}
maxSessions = num;
}
// -------------------------------------------------------------------------
unsigned ModbusTCPServer::getCountSessions()
{
return sessCount;
}
// -------------------------------------------------------------------------
bool ModbusTCPServer::waitQuery( ModbusRTU::ModbusAddr mbaddr, timeout_t msec )
{
if( msec == 0 )
msec = UniSetTimer::WaitUpTime;
cerr << "*** sessCount=" << sessCount << " maxSess=" << maxSessions << endl;
if( sessCount >= maxSessions )
return false;
try
{
cerr << "*** wait connection: " << msec << " msec" << endl;
if( isPendingConnection(msec) )
{
ModbusTCPSession* s = new ModbusTCPSession(*this,mbaddr);
s->connectReadCoil( sigc::mem_fun(this, &ModbusTCPServer::readCoilStatus) );
s->connectReadInputStatus( sigc::mem_fun(this, &ModbusTCPServer::readInputStatus) );
s->connectReadOutput( sigc::mem_fun(this, &ModbusTCPServer::readOutputRegisters) );
s->connectReadInput( sigc::mem_fun(this, &ModbusTCPServer::readInputRegisters) );
s->connectForceSingleCoil( sigc::mem_fun(this, &ModbusTCPServer::forceSingleCoil) );
s->connectForceCoils( sigc::mem_fun(this, &ModbusTCPServer::forceMultipleCoils) );
s->connectWriteOutput( sigc::mem_fun(this, &ModbusTCPServer::writeOutputRegisters) );
s->connectWriteSingleOutput( sigc::mem_fun(this, &ModbusTCPServer::writeOutputSingleRegister) );
s->connectMEIRDI( sigc::mem_fun(this, &ModbusTCPServer::read4314) );
s->connectSetDateTime( sigc::mem_fun(this, &ModbusTCPServer::setDateTime) );
s->connectDiagnostics( sigc::mem_fun(this, &ModbusTCPServer::diagnostics) );
s->connectFileTransfer( sigc::mem_fun(this, &ModbusTCPServer::fileTransfer) );
s->connectJournalCommand( sigc::mem_fun(this, &ModbusTCPServer::journalCommand) );
s->connectRemoteService( sigc::mem_fun(this, &ModbusTCPServer::remoteService) );
s->connectFileTransfer( sigc::mem_fun(this, &ModbusTCPServer::fileTransfer) );
s->setAfterSendPause(aftersend_msec);
s->setReplyTimeout(replyTimeout_ms);
s->setRecvTimeout(recvTimeOut_ms);
s->setSleepPause(sleepPause_usec);
s->setCleanBeforeSend(cleanBeforeSend);
s->setLog(dlog);
s->connectFinalSession( sigc::mem_fun(this, &ModbusTCPServer::sessionFinished) );
{
uniset_mutex_lock l(sMutex);
slist.push_back(s);
sessCount++;
}
s->detach();
return true;
}
}
catch( ost::Exception& e )
{
if( dlog.debugging(Debug::WARN) )
dlog[Debug::WARN] << "(ModbusTCPServer): " << e.what() << endl;
}
return false;
}
// -------------------------------------------------------------------------
mbErrCode ModbusTCPServer::receive( ModbusRTU::ModbusAddr addr, timeout_t timeout )
{
PassiveTimer ptTimeout(timeout);
......@@ -208,6 +294,12 @@ mbErrCode ModbusTCPServer::tcp_processing( ost::TCPStream& tcp, ModbusTCP::MBAPH
return erNoError;
}
// -------------------------------------------------------------------------
ModbusRTU::mbErrCode ModbusTCPServer::post_send_request( ModbusRTU::ModbusMessage& request )
{
tcp << endl;
return erNoError;
}
// -------------------------------------------------------------------------
mbErrCode ModbusTCPServer::pre_send_request( ModbusMessage& request )
{
if( !tcp.isConnected() )
......@@ -252,3 +344,27 @@ void ModbusTCPServer::terminate()
tcp.disconnect();
}
// -------------------------------------------------------------------------
void ModbusTCPServer::sessionFinished( ModbusTCPSession* s )
{
uniset_mutex_lock l(sMutex);
for( auto i=slist.begin(); i!=slist.end(); ++i )
{
if( (*i) == s )
{
slist.erase(i);
sessCount--;
break;
}
}
}
// -------------------------------------------------------------------------
void ModbusTCPServer::getSessions( Sessions& lst )
{
uniset_mutex_lock l(sMutex);
for( auto &i: slist )
{
SessionInfo inf( i->getClientAddress(), i->getAskCount() );
lst.push_back(inf);
}
}
// -------------------------------------------------------------------------
......@@ -12,7 +12,7 @@ using namespace std;
// -------------------------------------------------------------------------
UTCPStream::~UTCPStream()
{
}
// -------------------------------------------------------------------------
UTCPStream::UTCPStream():
......
......@@ -139,6 +139,19 @@ int main( int argc, const char **argv )
{
try
{
{
uniset_mutex m("testmutex");
{
uniset_mutex_lock l(m);
msleep(20);
}
{
uniset_mutex_lock l(m,100);
msleep(50);
}
}
#if 1
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment