Commit faee9b3c authored by Pavel Vainerman's avatar Pavel Vainerman

(ModbusTCPCore): исправил ошибку обработки "закрытия соединения",

убрал за ненадобностью свою версию ожидания на select()
parent 5b61107d
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -86,4 +86,14 @@ pkgconfig_DATA = libUniSet2MBTCPMaster.pc libUniSet2RTU.pc ...@@ -86,4 +86,14 @@ pkgconfig_DATA = libUniSet2MBTCPMaster.pc libUniSet2RTU.pc
#all-local: #all-local:
# ln -sf ../ModbusMaster/$(devel_include_HEADERS) ../include # ln -sf ../ModbusMaster/$(devel_include_HEADERS) ../include
noinst_PROGRAMS = mb-perf-test
mb_perf_test_SOURCES = mb-perf-test.cc
mb_perf_test_LDADD = libUniSet2MBTCPMaster.la libMBMaster.la $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS)
mb_perf_test_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS)
include $(top_builddir)/include.mk include $(top_builddir)/include.mk
#!/bin/sh
BEG=6100
for N in `seq 1 200`; do
ID=$((BEG+N))
echo " <item id=\"${ID}\" name=\"MBTCP${N}\"/>"
done
\ No newline at end of file
#!/bin/sh
BEG=10000
STEP=50
MBCNT=100
for M in `seq 1 $MBCNT`; do
R=1
for I in `seq 1 $STEP`; do
ID=$((BEG+I))
# делаем "дырки" между регистрами, чтобы они не шли в одном запросе
R=$((R+2))
echo " <item id=\"$ID\" mbperf=\"$M\" iotype=\"AI\" mbaddr=\"0x01\" mbfunc=\"0x03\" mbreg=\"$R\" mbtype=\"rtu\" name=\"Sensor${ID}_S\" textname=\"test sensor $ID\"/>"
done
BEG=$((BEG+STEP+1))
done
\ No newline at end of file
/*
* Copyright (c) 2015 Pavel Vainerman.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, version 2.1.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Lesser Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// -------------------------------------------------------------------------
#include <sstream>
#include "MBTCPMultiMaster.h"
#include "Configuration.h"
#include "Debug.h"
#include "UniSetActivator.h"
#include "Extensions.h"
// -----------------------------------------------------------------------------
using namespace std;
using namespace UniSetTypes;
using namespace UniSetExtensions;
// -----------------------------------------------------------------------------
int main( int argc, const char** argv )
{
if( argc > 1 && (!strcmp(argv[1], "--help") || !strcmp(argv[1], "-h")) )
{
cout << "--smemory-id objectName - SharedMemory objectID. Default: get from configure..." << endl;
cout << "--confile filename - configuration file. Default: configure.xml" << endl;
cout << endl;
MBTCPMultiMaster::help_print(argc, argv);
return 0;
}
try
{
auto conf = uniset_init( argc, argv );
ObjectId shmID = DefaultObjectId;
string sID = conf->getArgParam("--smemory-id");
if( !sID.empty() )
shmID = conf->getControllerID(sID);
else
shmID = getSharedMemoryID();
if( shmID == DefaultObjectId )
{
cerr << sID << "? SharedMemoryID not found in " << conf->getControllersSection() << " section" << endl;
return 1;
}
auto act = UniSetActivator::Instance();
int count = conf->getArgPInt("--count",50);
for( int i=1; i<=count; i++ )
{
ostringstream prefix;
prefix << "mbtcp" << i;
auto mb = MBTCPMultiMaster::init_mbmaster(argc, argv,shmID,nullptr, prefix.str());
if( !mb )
{
cerr << "(main): " << prefix.str() << " init failed..." << endl;
return 1;
}
act->add(mb);
}
SystemMessage sm(SystemMessage::StartUp);
act->broadcast( sm.transport_msg() );
act->run(false);
return 0;
}
catch( const Exception& ex )
{
cerr << "(mbtcpmultimaster): " << ex << std::endl;
}
catch(...)
{
cerr << "(mbtcpmultimaster): catch ..." << std::endl;
}
return 1;
}
#!/bin/sh
MBPARAM=
for N in `seq 1 100`; do
MBPARAM="$MBPARAM --mbtcp${N}-name MBTCP${N} --mbtcp${N}-confnode MBPerfTestMaster --mbtcp${N}-filter-field mbperf
--mbtcp${N}-filter-value $N --mbtcp${N}-persistent-connection 1 --mbtcp${N}-log-add-levels crit,warn"
done
#echo "$MBPARAM"
#exit 0
./uniset2-start.sh -f ./mb-perf-test --dlog-add-levels crit,warn ${MBPARAM} \
--confile test.xml \
--smemory-id SharedMemory \
$*
...@@ -10,11 +10,12 @@ ...@@ -10,11 +10,12 @@
/*! ModbusTCP core functions */ /*! ModbusTCP core functions */
namespace ModbusTCPCore namespace ModbusTCPCore
{ {
// t - msec (сколько ждать) // Если соединение закрыто (другой стороной), функции выкидывают исключение UniSetTypes::CommFailed
size_t readNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, size_t max = 100, timeout_t t = 10 ); // t - msec (сколько ждать)
size_t getNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, timeout_t t = 10 ); size_t readNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, size_t max = 100, timeout_t t = 50 );
ModbusRTU::mbErrCode sendData(UTCPStream* tcp, unsigned char* buf, size_t len, timeout_t t = 10 ); size_t getNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, timeout_t t = 50 );
ModbusRTU::mbErrCode sendData(UTCPStream* tcp, unsigned char* buf, size_t len, timeout_t t = 50 );
// работа напрямую с сокетом // работа напрямую с сокетом
size_t readDataFD(int fd, std::queue<unsigned char>& qrecv, size_t max = 100, size_t attempts = 1 ); size_t readDataFD(int fd, std::queue<unsigned char>& qrecv, size_t max = 100, size_t attempts = 1 );
......
...@@ -48,6 +48,9 @@ class ModbusTCPMaster: ...@@ -48,6 +48,9 @@ class ModbusTCPMaster:
return port; return port;
} }
void setReadTimeout( timeout_t msec );
timeout_t getReadTimeout() const;
protected: protected:
virtual size_t getNextData(unsigned char* buf, size_t len ) override; virtual size_t getNextData(unsigned char* buf, size_t len ) override;
...@@ -56,9 +59,6 @@ class ModbusTCPMaster: ...@@ -56,9 +59,6 @@ class ModbusTCPMaster:
virtual ModbusRTU::mbErrCode query( ModbusRTU::ModbusAddr addr, ModbusRTU::ModbusMessage& msg, virtual ModbusRTU::mbErrCode query( ModbusRTU::ModbusAddr addr, ModbusRTU::ModbusMessage& msg,
ModbusRTU::ModbusMessage& reply, timeout_t timeout ) override; ModbusRTU::ModbusMessage& reply, timeout_t timeout ) override;
bool waitInput( int timeout_msec );
bool waitOutput( int timeout_msec );
private: private:
//ost::TCPStream* tcp; //ost::TCPStream* tcp;
std::shared_ptr<UTCPStream> tcp; std::shared_ptr<UTCPStream> tcp;
...@@ -70,9 +70,7 @@ class ModbusTCPMaster: ...@@ -70,9 +70,7 @@ class ModbusTCPMaster:
bool force_disconnect = { false }; bool force_disconnect = { false };
int keepAliveTimeout = { 1000 }; int keepAliveTimeout = { 1000 };
// структура для реализации использования select timeout_t readTimeout = { 50 }; // timeout на чтение очередной порции данных
fd_set s_set;
timeval s_timeout;
}; };
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // ModbusTCPMaster_H_ #endif // ModbusTCPMaster_H_
......
...@@ -384,6 +384,14 @@ mbErrCode ModbusClient::recv( ModbusAddr addr, ModbusByte qfunc, ...@@ -384,6 +384,14 @@ mbErrCode ModbusClient::recv( ModbusAddr addr, ModbusByte qfunc,
{ {
// cout << "(recv): catch TimeOut " << endl; // cout << "(recv): catch TimeOut " << endl;
} }
catch( const UniSetTypes::CommFailed& ex )
{
if( dlog->is_crit() )
dlog->crit() << "(recv): " << ex << endl;
cleanupChannel();
return erTimeOut;
}
catch( const Exception& ex ) // SystemError catch( const Exception& ex ) // SystemError
{ {
if( dlog->is_crit() ) if( dlog->is_crit() )
...@@ -1312,6 +1320,13 @@ mbErrCode ModbusClient::recv_pdu( ModbusByte qfunc, ModbusMessage& rbuf, timeout ...@@ -1312,6 +1320,13 @@ mbErrCode ModbusClient::recv_pdu( ModbusByte qfunc, ModbusMessage& rbuf, timeout
{ {
// cout << "(recv): catch TimeOut " << endl; // cout << "(recv): catch TimeOut " << endl;
} }
catch( const UniSetTypes::CommFailed& ex )
{
if( dlog->is_crit() )
dlog->crit() << "(recv): " << ex << endl;
return erTimeOut;
}
catch( const Exception& ex ) // SystemError catch( const Exception& ex ) // SystemError
{ {
if( dlog->is_crit() ) if( dlog->is_crit() )
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
*/ */
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#include "modbus/ModbusTCPCore.h" #include "modbus/ModbusTCPCore.h"
#include "Exceptions.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
using namespace std; using namespace std;
using namespace ModbusRTU; using namespace ModbusRTU;
...@@ -29,6 +30,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -29,6 +30,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
return 0; return 0;
size_t i = 0; size_t i = 0;
bool commfail = false;
#ifdef USE_BUFFER_FOR_READ #ifdef USE_BUFFER_FOR_READ
...@@ -50,9 +52,18 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -50,9 +52,18 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
i = l; i = l;
} }
// канал закрыт!
if( l == 0 )
commfail = true;
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
if( e.getSocketError() == ost::Socket::errConnectFailed ||
e.getSocketError() == ost::Socket::errConnectInvalid )
{
commfail = true;
}
} }
delete [] buf; delete [] buf;
...@@ -66,7 +77,13 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -66,7 +77,13 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
unsigned char c; unsigned char c;
ssize_t l = tcp->readData(&c, sizeof(c), 0, t); ssize_t l = tcp->readData(&c, sizeof(c), 0, t);
if( l <= 0 ) if( l == 0 )
{
commfail = true;
break;
}
if( l < 0 )
break; break;
qrecv.push(c); qrecv.push(c);
...@@ -78,6 +95,8 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp, ...@@ -78,6 +95,8 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
#endif #endif
if( commfail )
throw UniSetTypes::CommFailed();
return i; return i;
} }
...@@ -113,9 +132,12 @@ size_t ModbusTCPCore::getNextData(UTCPStream* tcp, ...@@ -113,9 +132,12 @@ size_t ModbusTCPCore::getNextData(UTCPStream* tcp,
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size_t max , size_t attempts ) size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size_t max , size_t attempts )
{ {
bool commfail = false;
#ifdef USE_BUFFER_FOR_READ #ifdef USE_BUFFER_FOR_READ
max = std::max(max,(size_t)DEFAULT_BUFFER_SIZE_FOR_READ);
char* buf = new char[max]; char* buf = new char[max];
if( buf == 0 ) if( buf == 0 )
...@@ -138,6 +160,10 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size ...@@ -138,6 +160,10 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
if( cnt >= max ) if( cnt >= max )
break; break;
} }
// канал закрыт!
if( l == 0 )
commfail = true;
} }
delete [] buf; delete [] buf;
...@@ -153,7 +179,13 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size ...@@ -153,7 +179,13 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
unsigned char c; unsigned char c;
ssize_t l = ::read(fd, &c, sizeof(c)); ssize_t l = ::read(fd, &c, sizeof(c));
if( l <= 0 ) if( l == 0 )
{
commfail = true;
break;
}
if( l < 0 )
break; break;
qrecv.push(c); qrecv.push(c);
...@@ -162,7 +194,10 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size ...@@ -162,7 +194,10 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
#endif #endif
return ( qrecv.size() >= max ? max : qrecv.size() ); if( commfail )
throw UniSetTypes::CommFailed();
return std::min(qrecv.size(),max);
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
size_t ModbusTCPCore::getDataFD( int fd, std::queue<unsigned char>& qrecv, size_t ModbusTCPCore::getDataFD( int fd, std::queue<unsigned char>& qrecv,
......
...@@ -51,7 +51,7 @@ ModbusTCPMaster::~ModbusTCPMaster() ...@@ -51,7 +51,7 @@ ModbusTCPMaster::~ModbusTCPMaster()
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
size_t ModbusTCPMaster::getNextData( unsigned char* buf, size_t len ) size_t ModbusTCPMaster::getNextData( unsigned char* buf, size_t len )
{ {
return ModbusTCPCore::getNextData(tcp.get(), qrecv, buf, len); return ModbusTCPCore::getNextData(tcp.get(), qrecv, buf, len, readTimeout );
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
void ModbusTCPMaster::setChannelTimeout( timeout_t msec ) void ModbusTCPMaster::setChannelTimeout( timeout_t msec )
...@@ -118,8 +118,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -118,8 +118,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
for( size_t i = 0; i < 2; i++ ) for( size_t i = 0; i < 2; i++ )
{ {
//if( tcp->isPending(ost::Socket::pendingOutput, timeout) ) if( tcp->isPending(ost::Socket::pendingOutput, timeout) )
if( waitOutput(timeout) )
{ {
mbErrCode res = send(msg); mbErrCode res = send(msg);
...@@ -165,9 +164,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -165,9 +164,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
tcp->sync(); tcp->sync();
//reply.clear(); if( tcp->isPending(ost::Socket::pendingInput, timeout) )
//if( tcp->isPending(ost::Socket::pendingInput, timeout) )
if( waitInput(timeout) )
{ {
size_t ret = 0; size_t ret = 0;
...@@ -177,10 +174,6 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -177,10 +174,6 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( ret == sizeof(reply.aduhead) ) if( ret == sizeof(reply.aduhead) )
break; break;
//if( !tcp->isPending(ost::Socket::pendingInput, timeout) )
if( !waitInput(timeout) )
break;
} }
if( ret > 0 && dlog->is_info() ) if( ret > 0 && dlog->is_info() )
...@@ -297,6 +290,13 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -297,6 +290,13 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( dlog->is_warn() ) if( dlog->is_warn() )
dlog->warn() << "(query): " << err << endl; dlog->warn() << "(query): " << err << endl;
} }
catch( const UniSetTypes::CommFailed& ex )
{
if( dlog->is_crit() )
dlog->crit() << "(query): " << ex << endl;
tcp->forceDisconnect();
}
catch( const Exception& ex ) catch( const Exception& ex )
{ {
if( dlog->is_warn() ) if( dlog->is_warn() )
...@@ -306,59 +306,40 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -306,59 +306,40 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
{ {
if( dlog->is_warn() ) if( dlog->is_warn() )
dlog->warn() << "(query): tcp error: " << e.getString() << endl; dlog->warn() << "(query): tcp error: " << e.getString() << endl;
return erTimeOut;
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
if( dlog->is_warn() ) if( dlog->is_warn() )
dlog->crit() << "(query): " << e.what() << std::endl; dlog->crit() << "(query): " << e.what() << std::endl;
return erTimeOut;
} }
return erTimeOut; // erHardwareError return erTimeOut; // erHardwareError
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusTCPMaster::waitInput( int msec )
{
if( !tcp )
return false;
// FD_ZERO (&s_set);
// FD_SET (tcp->getSocket(), &s_set);
s_timeout.tv_sec = 0;
s_timeout.tv_usec = 1000*msec;
return ( select( 1+(int)tcp->getSocket(), &s_set, NULL, NULL, &s_timeout) > 0 );
}
// -------------------------------------------------------------------------
bool ModbusTCPMaster::waitOutput( int timeout_msec )
{
if( !tcp )
return false;
// FD_ZERO (&s_set);
// FD_SET (tcp->getSocket(), &s_set);
s_timeout.tv_sec = 0;
s_timeout.tv_usec = 1000*timeout_msec;
//return ( select(FD_SETSIZE, NULL, &s_set, NULL, &s_timeout) > 0 );
return ( select(1+(int)tcp->getSocket(), NULL, &s_set, NULL, &s_timeout) > 0 );
}
// -------------------------------------------------------------------------
void ModbusTCPMaster::cleanInputStream() void ModbusTCPMaster::cleanInputStream()
{ {
unsigned char buf[100]; unsigned char buf[100];
int ret = 0; int ret = 0;
try
{
do do
{ {
ret = getNextData(buf, sizeof(buf)); ret = getNextData(buf, sizeof(buf));
} }
while( ret > 0); while( ret > 0);
}
catch( ... ){}
}
// -------------------------------------------------------------------------
void ModbusTCPMaster::setReadTimeout( timeout_t msec )
{
readTimeout = msec;
}
// -------------------------------------------------------------------------
timeout_t ModbusTCPMaster::getReadTimeout() const
{
return readTimeout;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool ModbusTCPMaster::checkConnection( const std::string& ip, int port, int timeout_msec ) bool ModbusTCPMaster::checkConnection( const std::string& ip, int port, int timeout_msec )
...@@ -401,9 +382,6 @@ void ModbusTCPMaster::reconnect() ...@@ -401,9 +382,6 @@ void ModbusTCPMaster::reconnect()
tcp->setTimeout(replyTimeOut_ms); tcp->setTimeout(replyTimeOut_ms);
tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1)); tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true); tcp->setNoDelay(true);
FD_ZERO (&s_set);
FD_SET (tcp->getSocket(), &s_set);
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
...@@ -459,9 +437,6 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int _port ) ...@@ -459,9 +437,6 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int _port )
tcp->setTimeout(replyTimeOut_ms); tcp->setTimeout(replyTimeOut_ms);
tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1)); tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true); tcp->setNoDelay(true);
FD_ZERO (&s_set);
FD_SET (tcp->getSocket(), &s_set);
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
......
...@@ -126,6 +126,7 @@ extensions/ModbusMaster/mtr-conv.cc ...@@ -126,6 +126,7 @@ extensions/ModbusMaster/mtr-conv.cc
extensions/ModbusMaster/mtr-read.cc extensions/ModbusMaster/mtr-read.cc
extensions/ModbusMaster/mtr-setup.cc extensions/ModbusMaster/mtr-setup.cc
extensions/ModbusMaster/multi-main.cc extensions/ModbusMaster/multi-main.cc
extensions/ModbusMaster/mb-perf-test.cc
extensions/ModbusMaster/RTUExchange.cc extensions/ModbusMaster/RTUExchange.cc
extensions/ModbusMaster/rtuexchange.cc extensions/ModbusMaster/rtuexchange.cc
extensions/ModbusMaster/RTUExchange.h extensions/ModbusMaster/RTUExchange.h
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment