Commit 3b0f410b authored by Pavel Vainerman's avatar Pavel Vainerman

Перевод ModbusTCPServer и libev на использование TCPSocket

из commoncpp (ModbusTCPServer): сделал io-таймер (чтобы уйти от создания лишних потоков) Перенёс MBTCPPersistentSlave в MBSlave и удалил "отдельный" класс.
parent 62b4f673
......@@ -53,9 +53,8 @@ MBTCPServer::MBTCPServer(const std::unordered_set<ModbusAddr>& myaddr, const str
sslot->connectRemoteService( sigc::mem_fun(this, &MBTCPServer::remoteService) );
sslot->connectFileTransfer( sigc::mem_fun(this, &MBTCPServer::fileTransfer) );
sslot->setRecvTimeout(6000);
// sslot->setAfterSendPause(afterSend);
sslot->setReplyTimeout(10000);
// sslot->setRecvTimeout(6000);
// sslot->setReplyTimeout(10000);
// build file list...
}
......@@ -74,29 +73,7 @@ void MBTCPServer::setLog(std::shared_ptr<DebugStream>& dlog )
// -------------------------------------------------------------------------
void MBTCPServer::execute()
{
// Работа...
while(1)
{
ModbusRTU::mbErrCode res = sslot->receive( vaddr, UniSetTimer::WaitUpTime );
#if 0
// собираем статистику обмена
if( prev != ModbusRTU::erTimeOut )
{
// с проверкой на переполнение
askCount = askCount >= numeric_limits<long>::max() ? 0 : askCount + 1;
if( res != ModbusRTU::erNoError )
++errmap[res];
prev = res;
}
#endif
if( verbose && res != ModbusRTU::erNoError && res != ModbusRTU::erTimeOut )
cerr << "(execute::receive): " << ModbusRTU::mbErr2Str(res) << endl;
}
sslot->run( vaddr, false );
}
// -------------------------------------------------------------------------
void MBTCPServer::sigterm( int signo )
......
......@@ -79,7 +79,6 @@ MBTCPTestServer::MBTCPTestServer( const std::unordered_set<ModbusAddr>& _vaddr,
sslot->connectFileTransfer( sigc::mem_fun(this, &MBTCPTestServer::fileTransfer) );
sslot->setRecvTimeout(6000);
// sslot->setAfterSendPause(afterSend);
sslot->setReplyTimeout(10000);
// build file list...
......@@ -114,32 +113,7 @@ void MBTCPTestServer::runThread()
void MBTCPTestServer::execute()
{
isrunning = true;
// cerr << "******************** MBTCPTestServer(" << myname << ") running... " << endl;
// Работа...
while(1)
{
ModbusRTU::mbErrCode res = sslot->receive( vaddr, UniSetTimer::WaitUpTime );
#if 0
// собираем статистику обмена
if( prev != ModbusRTU::erTimeOut )
{
// с проверкой на переполнение
askCount = askCount >= numeric_limits<long>::max() ? 0 : askCount + 1;
if( res != ModbusRTU::erNoError )
++errmap[res];
prev = res;
}
#endif
if( verbose && res != ModbusRTU::erNoError && res != ModbusRTU::erTimeOut )
cerr << "(execute::receive): " << ModbusRTU::mbErr2Str(res) << endl;
}
sslot->run( vaddr );
isrunning = false;
}
// -------------------------------------------------------------------------
......
......@@ -29,6 +29,8 @@
#include "UniSetObject.h"
#include "modbus/ModbusTypes.h"
#include "modbus/ModbusServerSlot.h"
#include "modbus/ModbusTCPServer.h"
#include "modbus/ModbusTCPServerSlot.h"
#include "PassiveTimer.h"
#include "Trigger.h"
#include "Mutex.h"
......@@ -54,6 +56,7 @@
- \ref sec_MBSlave_FileTransfer
- \ref sec_MBSlave_MEIRDI
- \ref sec_MBSlave_DIAG
- \ref sec_MBSlave_TCP
\section sec_MBSlave_Comm Общее описание Modbus slave
Класс реализует базовые функции для протокола Modbus в slave режиме. Реализацию Modbus RTU - см. RTUExchange.
......@@ -286,6 +289,16 @@
\section sec_MBSlave_DIAG Диагностические функции (0x08)
\section sec_MBSlave_TCP Настройка TCPServer
\code
<MBTCPPersistentSlave ....sesscount="">
<clients>
<item ip="" respond="" invert="1" askcount=""/>
<item ip="" respond="" invert="1" askcount=""/>
<item ip="" respond="" invert="1" askcount=""/>
</clients>
</MBTCPPersistentSlave>
\endcode
*/
// -----------------------------------------------------------------------------
......@@ -475,6 +488,8 @@ class MBSlave:
void waitSMReady();
virtual void execute_rtu();
virtual void execute_tcp();
virtual void updateStatistics();
virtual void updateTCPStatistics();
virtual bool activateObject() override;
virtual bool deactivateObject() override;
......@@ -565,6 +580,53 @@ class MBSlave:
std::string logserv_host = {""};
int logserv_port = {0};
VMonitor vmon;
// ----------------------------------------------------------------------------
// TCPServer section..
void initTCPClients( UniXML::iterator confnode );
timeout_t sessTimeout; /*!< таймаут на сессию */
timeout_t updateStatTime;
ModbusTCPServer::Sessions sess; /*!< список открытых сессий */
unsigned int sessMaxNum;
std::shared_ptr<ModbusTCPServerSlot> tcpserver;
struct ClientInfo
{
ClientInfo(): iaddr(""), respond_s(UniSetTypes::DefaultObjectId), invert(false),
askCount(0), askcount_s(UniSetTypes::DefaultObjectId)
{
ptTimeout.setTiming(0);
}
std::string iaddr;
UniSetTypes::ObjectId respond_s = { UniSetTypes::DefaultObjectId };
IOController::IOStateList::iterator respond_it;
bool invert = { false };
PassiveTimer ptTimeout;
timeout_t tout = { 2000 };
long askCount;
UniSetTypes::ObjectId askcount_s = { UniSetTypes::DefaultObjectId };
IOController::IOStateList::iterator askcount_it;
inline void initIterators( const std::shared_ptr<SMInterface>& shm )
{
shm->initIterator( respond_it );
shm->initIterator( askcount_it );
}
const std::string getShortInfo() const;
};
typedef std::unordered_map<std::string, ClientInfo> ClientsMap;
ClientsMap cmap;
UniSetTypes::ObjectId sesscount_id;
IOController::IOStateList::iterator sesscount_it;
std::atomic_bool tcpCancelled = { true };
};
// -----------------------------------------------------------------------------
#endif // _MBSlave_H_
......
// -----------------------------------------------------------------------------
#ifndef _MBTCPPersistentSlave_H_
#define _MBTCPPersistentSlave_H_
// -----------------------------------------------------------------------------
#include <unordered_map>
#include "MBSlave.h"
#include "modbus/ModbusTCPServer.h"
// -----------------------------------------------------------------------------
/*!
<MBTCPPersistentSlave ....sesscount="">
<clients>
<item ip="" respond="" invert="1" askcount=""/>
<item ip="" respond="" invert="1" askcount=""/>
<item ip="" respond="" invert="1" askcount=""/>
</clients>
</MBTCPPersistentSlave>
*/
// -----------------------------------------------------------------------------
/*! Реализация многоптоточного slave-интерфейса */
class MBTCPPersistentSlave:
public MBSlave
{
public:
MBTCPPersistentSlave( UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr, const std::string& prefix = "mbs" );
virtual ~MBTCPPersistentSlave();
/*! глобальная функция для инициализации объекта */
static std::shared_ptr<MBTCPPersistentSlave> init_mbslave( int argc, const char* const* argv,
UniSetTypes::ObjectId shmID, const std::shared_ptr<SharedMemory>& ic = nullptr,
const std::string& prefix = "mbs" );
/*! глобальная функция для вывода help-а */
static void help_print( int argc, const char* const* argv );
UniSetTypes::SimpleInfo* getInfo( CORBA::Long userparam ) override;
protected:
virtual void execute_tcp() override;
virtual void initIterators() override;
virtual bool deactivateObject() override;
virtual void sigterm( int signo ) override;
timeout_t sessTimeout; /*!< таймаут на сессию */
timeout_t waitTimeout;
ModbusTCPServer::Sessions sess; /*!< список открытых сессий */
unsigned int sessMaxNum;
PassiveTimer ptUpdateInfo;
struct ClientInfo
{
ClientInfo(): iaddr(""), respond_s(UniSetTypes::DefaultObjectId), invert(false),
askCount(0), askcount_s(UniSetTypes::DefaultObjectId)
{
ptTimeout.setTiming(0);
}
std::string iaddr;
UniSetTypes::ObjectId respond_s = { UniSetTypes::DefaultObjectId };
IOController::IOStateList::iterator respond_it;
bool invert = { false };
PassiveTimer ptTimeout;
timeout_t tout = { 2000 };
long askCount;
UniSetTypes::ObjectId askcount_s = { UniSetTypes::DefaultObjectId };
IOController::IOStateList::iterator askcount_it;
inline void initIterators( const std::shared_ptr<SMInterface>& shm )
{
shm->initIterator( respond_it );
shm->initIterator( askcount_it );
}
const std::string getShortInfo() const;
};
typedef std::unordered_map<std::string, ClientInfo> ClientsMap;
ClientsMap cmap;
UniSetTypes::ObjectId sesscount_id;
IOController::IOStateList::iterator sesscount_it;
};
// -----------------------------------------------------------------------------
#endif // _MBTCPPersistentSlave_H_
// -----------------------------------------------------------------------------
bin_PROGRAMS = @PACKAGE@-mbslave @PACKAGE@-mbtcp-persistentslave
bin_PROGRAMS = @PACKAGE@-mbslave
# не забывайте править версию в2.pc-файле
UMBS_VER=@LIBVER@
......@@ -10,7 +10,7 @@ libUniSet2MBSlave_la_LIBADD = $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
libUniSet2MBSlave_la_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libUniSet2MBSlave_la_SOURCES = MBSlave.cc MBTCPPersistentSlave.cc
libUniSet2MBSlave_la_SOURCES = MBSlave.cc
@PACKAGE@_mbslave_SOURCES = mbslave.cc
@PACKAGE@_mbslave_LDADD = libUniSet2MBSlave.la $(top_builddir)/lib/libUniSet2.la \
......@@ -19,13 +19,6 @@ libUniSet2MBSlave_la_SOURCES = MBSlave.cc MBTCPPersistentSlave.cc
$(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_mbslave_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
@PACKAGE@_mbtcp_persistentslave_SOURCES = mbtcp-persistentslave.cc
@PACKAGE@_mbtcp_persistentslave_LDADD = libUniSet2MBSlave.la $(top_builddir)/lib/libUniSet2.la \
$(top_builddir)/extensions/SharedMemory/libUniSet2SharedMemory.la \
$(top_builddir)/extensions/lib/libUniSet2Extensions.la \
$(SIGC_LIBS) $(COMCPP_LIBS)
@PACKAGE@_mbtcp_persistentslave_CXXFLAGS = -I$(top_builddir)/extensions/include -I$(top_builddir)/extensions/SharedMemory $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
# install
devel_include_HEADERS = *.h
devel_includedir = $(pkgincludedir)/extensions
......
......@@ -16,27 +16,32 @@ using namespace std;
// Buffer class - allow for output buffering such that it can be written out
// into async pieces
//
struct Buffer {
char *data;
struct Buffer
{
char* data;
ssize_t len;
ssize_t pos;
Buffer(const char *bytes, ssize_t nbytes) {
Buffer(const char* bytes, ssize_t nbytes)
{
pos = 0;
len = nbytes;
data = new char[nbytes];
memcpy(data, bytes, nbytes);
}
virtual ~Buffer() {
virtual ~Buffer()
{
delete [] data;
}
char *dpos() {
char* dpos()
{
return data + pos;
}
ssize_t nbytes() {
ssize_t nbytes()
{
return len - pos;
}
};
......@@ -44,7 +49,8 @@ struct Buffer {
//
// A single instance of a non-blocking Echo handler
//
class EchoInstance {
class EchoInstance
{
private:
ev::io io;
static int total_clients;
......@@ -55,27 +61,30 @@ class EchoInstance {
std::list<Buffer*> write_queue;
// Generic callback
void idle_callback(ev::idle &watcher, int revents) {
void idle_callback(ev::idle& watcher, int revents)
{
cerr << "idle..." << endl;
/*
/*
if (!write_queue.empty())
write_data(sfd);
else
{
io_idle.stop();
}
*/
*/
// io.set(ev::READ|ev::WRITE);
// io_idle.stop();
}
// Generic callback
void callback(ev::io &watcher, int revents) {
void callback(ev::io& watcher, int revents)
{
cerr << "call..." << endl;
if (EV_ERROR & revents) {
if (EV_ERROR & revents)
{
perror("got invalid event");
return;
}
......@@ -84,20 +93,26 @@ class EchoInstance {
read_cb(watcher);
#if 0
// if (revents & EV_WRITE)
// write_cb(watcher);
// if( !write_queue.empty() )
// write_cb(watcher);
// if (revents & EV_WRITE)
// write_cb(watcher);
// if( !write_queue.empty() )
// write_cb(watcher);
// io.set(ev::READ);
// io.set(ev::READ);
#else
if (revents & EV_WRITE)
write_cb(watcher);
if (write_queue.empty()) {
if (write_queue.empty())
{
io.set(ev::READ);
} else {
io.set(ev::READ|ev::WRITE);
}
else
{
io.set(ev::READ | ev::WRITE);
}
#endif
cerr << "events: " << revents << " active: " << io.is_active()
<< " is_pending: " << io.is_pending()
......@@ -107,13 +122,16 @@ class EchoInstance {
}
// Socket is writable
void write_cb(ev::io &watcher) {
void write_cb(ev::io& watcher)
{
write_data(watcher.fd);
}
void write_data(int wfd) {
void write_data(int wfd)
{
if (write_queue.empty()) {
if (write_queue.empty())
{
//io.set(ev::READ);
return;
}
......@@ -121,18 +139,23 @@ class EchoInstance {
Buffer* buffer = write_queue.front();
ssize_t written = write(wfd, buffer->dpos(), buffer->nbytes());
if (written < 0) {
if (written < 0)
{
perror("write error");
return;
}
buffer->pos += written;
if (buffer->nbytes() == 0) {
if (buffer->nbytes() == 0)
{
write_queue.pop_front();
delete buffer;
}
if (write_queue.empty()) {
if (write_queue.empty())
{
//io.set(ev::READ);
}
......@@ -140,7 +163,8 @@ class EchoInstance {
}
// Receive message from client socket
void read_cb(ev::io &watcher) {
void read_cb(ev::io& watcher)
{
char buffer[1024];
ssize_t nread = recv(watcher.fd, buffer, sizeof(buffer), 0);
......@@ -148,15 +172,19 @@ class EchoInstance {
cout << "read: " << nread << endl;
if (nread < 0) {
if (nread < 0)
{
perror("read error");
return;
}
if (nread == 0) {
if (nread == 0)
{
// Gack - we're deleting ourself inside of ourself!
delete this; // (pv): ??!!!
} else {
}
else
{
// Send message bach to the client
write_queue.push_back(new Buffer(buffer, nread));
//io_idle.start();
......@@ -164,7 +192,8 @@ class EchoInstance {
}
// effictivly a close and a destroy
virtual ~EchoInstance() {
virtual ~EchoInstance()
{
// Stop and free watcher if client socket is closing
io.stop();
//io_idle.stop();
......@@ -175,7 +204,8 @@ class EchoInstance {
}
public:
EchoInstance(int s) : sfd(s) {
EchoInstance(int s) : sfd(s)
{
fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
printf("Got connection\n");
......@@ -190,38 +220,53 @@ class EchoInstance {
}
};
class EchoServer {
class EchoServer
{
private:
ev::io io;
ev::sig sio;
ev::timer tm;
int s;
public:
void io_accept(ev::io &watcher, int revents) {
if (EV_ERROR & revents) {
void io_timer( ev::timer& t, int revents )
{
cerr << "************* TIMEOUT ***" << endl;
t.start(5.0);
t.
}
void io_accept(ev::io& watcher, int revents)
{
if (EV_ERROR & revents)
{
perror("got invalid event");
return;
}
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
int client_sd = accept(watcher.fd, (struct sockaddr*)&client_addr, &client_len);
if (client_sd < 0) {
if (client_sd < 0)
{
perror("accept error");
return;
}
EchoInstance *client = new EchoInstance(client_sd);
EchoInstance* client = new EchoInstance(client_sd);
}
static void signal_cb(ev::sig &signal, int revents) {
static void signal_cb(ev::sig& signal, int revents)
{
signal.loop.break_loop();
}
EchoServer(int port) {
EchoServer(int port)
{
printf("Listening on port %d\n", port);
struct sockaddr_in addr;
......@@ -232,13 +277,15 @@ class EchoServer {
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) != 0)
{
perror("bind");
}
fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
int on = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
perror("Could not set socket %d option for reusability: \n");
......@@ -249,9 +296,13 @@ class EchoServer {
sio.set<&EchoServer::signal_cb>();
sio.start(SIGINT);
tm.set<EchoServer, &EchoServer::io_timer>(this);
tm.start(5.0);
}
virtual ~EchoServer() {
virtual ~EchoServer()
{
shutdown(s, SHUT_RDWR);
close(s);
}
......@@ -259,7 +310,7 @@ class EchoServer {
int EchoInstance::total_clients = 0;
int main(int argc, char **argv)
int main(int argc, char** argv)
{
int port = 8192;
......
......@@ -12,34 +12,39 @@ using namespace std;
// Read/write buffer max length
static const size_t MAX_BUF = 512;
typedef struct {
typedef struct
{
struct event ev;
char buf[MAX_BUF];
size_t offset;
size_t size;
} ConnectionData;
void on_connect(int fd, short event, void *arg);
void client_read(int fd, short event, void *arg);
void client_write(int fd, short event, void *arg);
void on_connect(int fd, short event, void* arg);
void client_read(int fd, short event, void* arg);
void client_write(int fd, short event, void* arg);
int main(int argc, char **argv)
int main(int argc, char** argv)
{
// Check arguments
if (argc < 3) {
if (argc < 3)
{
std::cout << "Run with options: <ip address> <port>" << std::endl;
return 1;
}
// Create server socket
int server_sock = socket(AF_INET, SOCK_STREAM, 0);
if (server_sock == -1) {
if (server_sock == -1)
{
std::cerr << "Failed to create socket" << std::endl;
return 1;
}
sockaddr_in sa;
int on = 1;
char * ip_addr = argv[1];
char* ip_addr = argv[1];
short port = atoi(argv[2]);
sa.sin_family = AF_INET;
......@@ -47,19 +52,22 @@ int main(int argc, char **argv)
sa.sin_addr.s_addr = inet_addr(ip_addr);
// Set option SO_REUSEADDR to reuse same host:port in a short time
if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1)
{
std::cerr << "Failed to set option SO_REUSEADDR" << std::endl;
return 1;
}
// Bind server socket to ip:port
if (bind(server_sock, reinterpret_cast<const sockaddr*>(&sa), sizeof(sa)) == -1) {
if (bind(server_sock, reinterpret_cast<const sockaddr*>(&sa), sizeof(sa)) == -1)
{
std::cerr << "Failed to bind server socket" << std::endl;
return 1;
}
// Make server to listen
if (listen(server_sock, 10) == -1) {
if (listen(server_sock, 10) == -1)
{
std::cerr << "Failed to make server listen" << std::endl;
return 1;
}
......@@ -69,7 +77,7 @@ int main(int argc, char **argv)
// Initialize
event_init();
// Set connection callback (on_connect()) to read event on server socket
event_set(&evserver_sock, server_sock, EV_READ|EV_PERSIST, on_connect, &evserver_sock);
event_set(&evserver_sock, server_sock, EV_READ | EV_PERSIST, on_connect, &evserver_sock);
// Add server event without timeout
event_add(&evserver_sock, NULL);
......@@ -80,14 +88,16 @@ int main(int argc, char **argv)
}
// Handle new connection {{{
void on_connect(int fd, short event, void *arg)
void on_connect(int fd, short event, void* arg)
{
sockaddr_in client_addr;
socklen_t len = 0;
// Accept incoming connection
int sock = accept(fd, reinterpret_cast<sockaddr*>(&client_addr), &len);
if (sock < 1) {
if (sock < 1)
{
return;
}
......@@ -95,7 +105,7 @@ void on_connect(int fd, short event, void *arg)
// Set read callback to client socket
ConnectionData * data = new ConnectionData;
ConnectionData* data = new ConnectionData;
event_set(&data->ev, sock, EV_READ, client_read, data);
// Reschedule server event
event_add(reinterpret_cast<struct event*>(arg), NULL);
......@@ -105,21 +115,27 @@ void on_connect(int fd, short event, void *arg)
//}}}
// Handle client request {{{
void client_read(int fd, short event, void *arg)
void client_read(int fd, short event, void* arg)
{
cerr << "*** (READ): RESCHEDULE..." << endl;
ConnectionData * data = reinterpret_cast<ConnectionData*>(arg);
if (!data) {
ConnectionData* data = reinterpret_cast<ConnectionData*>(arg);
if (!data)
{
close(fd);
return;
}
int len = read(fd, data->buf, MAX_BUF - 1);
if (len < 1) {
if (len < 1)
{
close(fd);
delete data;
return;
}
data->buf[len] = 0;
data->size = len;
data->offset = 0;
......@@ -131,16 +147,21 @@ void client_read(int fd, short event, void *arg)
//}}}
// Handle client responce {{{
void client_write(int fd, short event, void *arg)
void client_write(int fd, short event, void* arg)
{
ConnectionData * data = reinterpret_cast<ConnectionData*>(arg);
if (!data) {
ConnectionData* data = reinterpret_cast<ConnectionData*>(arg);
if (!data)
{
close(fd);
return;
}
// Send data to client
int len = write(fd, data->buf + data->offset, data->size - data->offset);
if (len < data->size - data->offset) {
if (len < data->size - data->offset)
{
// Failed to send rest data, need to reschedule
data->offset += len;
event_set(&data->ev, fd, EV_WRITE, client_write, data);
......@@ -148,8 +169,9 @@ void client_write(int fd, short event, void *arg)
event_add(&data->ev, NULL);
return;
}
// close(fd);
// delete data;
// close(fd);
// delete data;
cerr << "*** (WRITE): RESCHEDULE..." << endl;
......
......@@ -14,9 +14,10 @@
#define EPOLL_ARRAY_SIZE 64
void sprint_buffer(const char *buffer, int size)
void sprint_buffer(const char* buffer, int size)
{
int i;
for (i = 0; i < size; i++)
{
if (isprint(buffer[i]))
......@@ -26,7 +27,7 @@ void sprint_buffer(const char *buffer, int size)
}
}
int main(int argc, char *argv[])
int main(int argc, char* argv[])
{
int sd, efd, clientsd, fd;
struct sockaddr_in bindaddr, peeraddr;
......@@ -50,6 +51,7 @@ int main(int argc, char *argv[])
}
sd = socket(AF_INET, SOCK_STREAM, 0);
if (sd < 0)
{
printf("Could not create new socket: %m\n");
......@@ -73,10 +75,10 @@ int main(int argc, char *argv[])
}
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_family= AF_INET;
bindaddr.sin_family = AF_INET;
bindaddr.sin_port = htons(port);
if (bind(sd, (struct sockaddr *) &bindaddr, sizeof(struct sockaddr_in)) < 0)
if (bind(sd, (struct sockaddr*) &bindaddr, sizeof(struct sockaddr_in)) < 0)
{
printf("Could not bind socket %d to address 'INADDR_ANY' and port %u: %m", sd, port);
close(sd);
......@@ -131,7 +133,8 @@ int main(int argc, char *argv[])
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
} else
}
else
{
printf("Closing socket with sd %d\n", fd);
shutdown(fd, SHUT_RDWR);
......@@ -146,7 +149,8 @@ int main(int argc, char *argv[])
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
} else
}
else
{
printf("Closing socket with sd %d\n", fd);
shutdown(fd, SHUT_RDWR);
......@@ -161,7 +165,8 @@ int main(int argc, char *argv[])
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
} else
}
else
{
printf("Closing socket with sd %d\n", fd);
shutdown(fd, SHUT_RDWR);
......@@ -175,6 +180,7 @@ int main(int argc, char *argv[])
if (fd != sd)
{
rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);
while ((rc = send(fd, buffer, rc, 0)) < 0)
{
if ((fd < 0) && (errno != EINTR))
......@@ -194,7 +200,8 @@ int main(int argc, char *argv[])
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
} else if (rc > 0)
}
else if (rc > 0)
{
printf("Sent '");
sprint_buffer(buffer, rc);
......@@ -217,7 +224,7 @@ int main(int argc, char *argv[])
{
if (fd == sd)
{
while ((clientsd = accept(sd, (struct sockaddr *) &peeraddr, &salen)) < 0)
while ((clientsd = accept(sd, (struct sockaddr*) &peeraddr, &salen)) < 0)
{
if ((clientsd < 0) && (errno != EINTR))
{
......@@ -229,7 +236,8 @@ int main(int argc, char *argv[])
if (inet_ntop(AF_INET, &peeraddr.sin_addr.s_addr, buffer, sizeof(buffer)) != NULL)
{
printf("Accepted connection from %s:%u, assigned new sd %d\n", buffer, ntohs(peeraddr.sin_port), clientsd);
} else
}
else
{
printf("Failed to convert address from binary to text form: %m\n");
}
......@@ -245,7 +253,8 @@ int main(int argc, char *argv[])
}
pollsize++;
} else
}
else
{
while ((rc = recv(fd, buffer, sizeof(buffer), 0)) < 0)
{
......@@ -266,7 +275,8 @@ int main(int argc, char *argv[])
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
} else if (rc > 0)
}
else if (rc > 0)
{
printf("Received '");
sprint_buffer(buffer, rc);
......
#!/bin/sh
uniset2-start.sh -f ./uniset2-mbtcp-persistentslave --confile test.xml --dlog-add-levels any --mbs-log-add-levels any --mbs-exchangelog-add-levels any \
--smemory-id SharedMemory \
--mbs-name MBMultiSlave1 --mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \
$*
# --mbs-force 1
#--mbs-reg-from-id 1 \
#--mbs-filter-field CAN2sender --mbs-filter-value SYSTSNode \
\ No newline at end of file
......@@ -38,6 +38,7 @@ static void InitTest()
ost::InetAddress ia(addr.c_str());
mb->setTimeout(2000);
REQUIRE_NOTHROW( mb->connect(ia, port) );
msleep(5000);
}
}
// -----------------------------------------------------------------------------
......
......@@ -24,7 +24,6 @@
#include "Extensions.h"
#include "RTUExchange.h"
#include "MBSlave.h"
#include "MBTCPPersistentSlave.h"
#include "MBTCPMaster.h"
#include "SharedMemory.h"
//#include "UniExchange.h"
......@@ -209,35 +208,6 @@ int main( int argc, const char** argv )
act->add(unet);
}
// ------------- MBTCPMultiSlave --------------
for( unsigned int i = 0; i < MaxAddNum; i++ )
{
stringstream s;
s << "--add-mbmultislave";
if( i > 0 ) s << i;
bool add_mbslave = findArgParam(s.str(), argc, argv) != -1;
if( add_mbslave )
{
stringstream p;
p << "mbms";
if( i > 0 ) p << i;
dinfo << "(smemory-plus): add MBTCPMultiSlave(" << p.str() << ")" << endl;
auto mbs = MBTCPPersistentSlave::init_mbslave(argc, argv, shm->getId(), shm, p.str());
if( !mbs )
return 1;
act->add(mbs);
}
}
// ---------------------------------------
// попытка решить вопрос с "зомби" процессами
signal( SIGCHLD, on_sigchild );
......
// -------------------------------------------------------------------------
#ifndef UTCPCore_H_
#define UTCPCore_H_
// -------------------------------------------------------------------------
#include <cc++/thread.h> // ..for timeout_t
// -------------------------------------------------------------------------
namespace UTCPCore
{
bool setKeepAliveParams( int sock, timeout_t timeout_sec = 5, int conn_keepcnt = 1, int keepintvl = 2 );
}
// -------------------------------------------------------------------------
#endif // UTCPCore_H_
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
#ifndef UTCPSocket_H_
#define UTCPSocket_H_
// -------------------------------------------------------------------------
#include <string>
#include <cc++/socket.h>
// -------------------------------------------------------------------------
class UTCPSocket:
public ost::TCPSocket
{
public:
// dup and accept...raw socket
UTCPSocket( int sock );
// hname = "host:port"
UTCPSocket(const std::string& hname, unsigned backlog = 5, unsigned mss = 536 );
UTCPSocket(const ost::IPV4Address& bind, ost::tpport_t port, unsigned backlog = 5, unsigned mss = 536 );
virtual ~UTCPSocket();
// set keepalive params
// return true if OK
bool setKeepAliveParams( timeout_t timeout_sec = 5, int conn_keepcnt = 1, int keepintvl = 2 );
/*!
* Enable/disable delaying packets (Nagle algorithm)
*
* @return true on success.
* @param enable disable Nagle algorithm when set to true.
*/
bool setNoDelay( bool enable );
void setCompletion( bool set )
{
ost::TCPSocket::setCompletion(set);
}
int getSocket();
// --------------------------------------------------------------------
// Пришлось вынести эти функции read/write[Data] в public
// т.к. они сразу "посылают" данные в канал, в отличие от operator<<
// который у TCPStream (или std::iostream?) буферизует их и из-за этого
// не позволяет работать с отправкой коротких сообщений
// --------------------------------------------------------------------
ssize_t writeData( const void* buf, size_t len, timeout_t t = 0 );
ssize_t readData( void* buf, size_t len, char separator = 0, timeout_t t = 0 );
protected:
void init( bool throwflag = false );
private:
};
// -------------------------------------------------------------------------
#endif // UTCPSocket_H_
// -------------------------------------------------------------------------
......@@ -55,8 +55,8 @@ class UTCPStream:
// который у TCPStream (или std::iostream?) буферизует их и из-за этого
// не позволяет работать с отправкой коротких сообщений
// --------------------------------------------------------------------
ssize_t writeData( const void* buf, size_t len, timeout_t t=0 );
ssize_t readData( void * buf,size_t len,char separator=0,timeout_t t=0 );
ssize_t writeData( const void* buf, size_t len, timeout_t t = 0 );
ssize_t readData( void* buf, size_t len, char separator = 0, timeout_t t = 0 );
int getSocket();
......
......@@ -40,6 +40,7 @@ class ModbusRTUSlave:
}
virtual void terminate() override;
virtual bool isAcive() override;
protected:
......
......@@ -20,7 +20,10 @@ class ModbusServer
void initLog( UniSetTypes::Configuration* conf, const std::string& name, const std::string& logfile = "" );
void setLog( std::shared_ptr<DebugStream> dlog );
inline std::shared_ptr<DebugStream> log(){ return dlog; }
inline std::shared_ptr<DebugStream> log()
{
return dlog;
}
std::unordered_set<ModbusRTU::ModbusAddr> addr2vaddr( ModbusRTU::ModbusAddr& mbaddr );
......@@ -110,6 +113,8 @@ class ModbusServer
virtual void cleanupChannel() {}
virtual void terminate() {}
virtual bool isAcive() = 0;
protected:
/*! Обработка запроса на чтение данных (0x01).
......
......@@ -12,13 +12,13 @@ namespace ModbusTCPCore
{
// t - msec (сколько ждать)
size_t readNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, size_t max = 100, timeout_t t=10 );
size_t getNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, timeout_t t=10 );
ModbusRTU::mbErrCode sendData(UTCPStream* tcp, unsigned char* buf, size_t len, timeout_t t=10 );
size_t readNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, size_t max = 100, timeout_t t = 10 );
size_t getNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, timeout_t t = 10 );
ModbusRTU::mbErrCode sendData(UTCPStream* tcp, unsigned char* buf, size_t len, timeout_t t = 10 );
// работа напрямую с сокетом
size_t readDataFD(int fd, std::queue<unsigned char>& qrecv, size_t max = 100, size_t attempts=1 );
size_t getDataFD( int fd, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, size_t attempts=1 );
size_t readDataFD(int fd, std::queue<unsigned char>& qrecv, size_t max = 100, size_t attempts = 1 );
size_t getDataFD( int fd, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, size_t attempts = 1 );
ModbusRTU::mbErrCode sendDataFD( int fd, unsigned char* buf, size_t len );
}
// -------------------------------------------------------------------------
......
......@@ -6,6 +6,7 @@
#include <queue>
#include <cc++/socket.h>
#include <ev++.h>
#include <sigc++/sigc++.h>
#include "Mutex.h"
#include "Debug.h"
#include "Configuration.h"
......@@ -13,9 +14,16 @@
#include "ModbusTypes.h"
#include "ModbusServer.h"
#include "ModbusTCPSession.h"
#include "UTCPStream.h"
#include "ThreadCreator.h"
#include "UTCPSocket.h"
// -------------------------------------------------------------------------
/*! ModbusTCP server */
/*! ModbusTCPServer
* Реализация сервера на основе libev. Подерживается "много" соединений (постоянных).
* Хоть класс и наследуется от ModbusServer на самом деле он не реализует его функции,
* каждое соединение обслуживается классом ModbusTCPSession.
* Но собственно реализаия функций одна на всех, это следует учитывать при реализации обработчиков,
* т.к.из многих "соединений" будут вызываться одни и теже обработатчики.
*/
class ModbusTCPServer:
public ModbusServer
{
......@@ -23,11 +31,13 @@ class ModbusTCPServer:
ModbusTCPServer( ost::InetAddress& ia, int port = 502 );
virtual ~ModbusTCPServer();
/*! Однопоточная обработка (каждый запрос последовательно), с разрывом соединения в конце */
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
// функция receive пока не поддерживается...
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vaddr, timeout_t msecTimeout ) override;
/*! Поддержка большого количества соединений (постоянных) */
virtual void mainLoop( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr );
/*! Запуск сервера
* \param thread - создавать ли отдельный поток
*/
void run( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, bool thread = false );
void setMaxSessions( unsigned int num );
inline unsigned int getMaxSessions()
......@@ -54,13 +64,7 @@ class ModbusTCPServer:
return ignoreAddr;
}
void cleanInputStream();
virtual void cleanupChannel() override
{
cleanInputStream();
}
virtual void terminate() override;
virtual void terminate();
// Сбор статистики по соединениям...
struct SessionInfo
......@@ -84,41 +88,73 @@ class ModbusTCPServer:
return port;
}
protected:
virtual bool isAcive() override;
// -------------------------------------------------
// Таймер.
// Т.к. mainLoop() "бесконечный", то сделана возможность
// подключиться к сигналу "таймера", например для обновления статистики по сессиям
// \warning Следует иметь ввиду, что обработчик сигнала, должен быть максимально коротким
// т.к. на это время останавливается работа основного потока (mainLoop)
// -------------------------------------------------
typedef sigc::signal<void> TimerSignal;
TimerSignal signal_timer();
virtual void ioAccept(ev::io &watcher, int revents);
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request ) override;
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request ) override;
void setTimer( timeout_t msec );
inline timeout_t getTimer()
{
return tmTime;
}
// realisation (see ModbusServer.h)
virtual size_t getNextData( unsigned char* buf, int len ) override;
virtual void setChannelTimeout( timeout_t msec ) override;
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override;
protected:
virtual void mainLoop();
virtual void ioAccept(ev::io& watcher, int revents);
void onTimer( ev::timer& t, int revents );
virtual ModbusRTU::mbErrCode tcp_processing( UTCPStream& tcp, ModbusTCP::MBAPHeader& mhead );
void sessionFinished( ModbusTCPSession* s );
virtual size_t getNextData( unsigned char* buf, int len ) override
{
return 0;
}
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override
{
return ModbusRTU::erHardwareError;
}
/*! set timeout for receive data */
virtual void setChannelTimeout( timeout_t msec ) override {};
ost::tpport_t port = { 0 };
ost::InetAddress iaddr;
std::string myname;
std::queue<unsigned char> qrecv;
ModbusTCP::MBAPHeader curQueryHeader;
typedef std::list<ModbusTCPSession*> SessionList;
UniSetTypes::uniset_mutex sMutex;
typedef std::list<ModbusTCPSession*> SessionList;
SessionList slist;
bool ignoreAddr = { false };
unsigned int maxSessions = { 5 };
unsigned int maxSessions = { 100 };
unsigned int sessCount = { 0 };
timeout_t sessTimeout = { 10000 }; // msec
ev::default_loop* evloop = { 0 };
std::shared_ptr<ev::default_loop> evloop;
ev::io io;
int sock = { -1 };
std::shared_ptr<UTCPSocket> sock;
ev::timer ioTimer;
const std::unordered_set<ModbusRTU::ModbusAddr>* vmbaddr;
std::shared_ptr< ThreadCreator<ModbusTCPServer> > thrMainLoop;
TimerSignal m_timer_signal;
timeout_t tmTime_msec = { TIMEOUT_INF }; // время по умолчанию для таймера (TimerSignal)
double tmTime = { 0.0 };
private:
......
......@@ -9,6 +9,7 @@
#include "ModbusServerSlot.h"
#include "ModbusServer.h"
#include "PassiveTimer.h"
#include "USocket.h"
// -------------------------------------------------------------------------
/*!
* \brief The ModbusTCPSession class
......@@ -36,13 +37,13 @@ class ModbusTCPSession:
void cleanInputStream();
virtual void cleanupChannel()
virtual void cleanupChannel() override
{
cleanInputStream();
}
virtual void terminate();
virtual void terminate() override;
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout );
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
typedef sigc::slot<void, ModbusTCPSession*> FinalSlot;
......@@ -55,7 +56,10 @@ class ModbusTCPSession:
return caddr;
}
void setSessionTimeout( double t );
void run();
virtual bool isAcive() override;
protected:
......@@ -64,11 +68,11 @@ class ModbusTCPSession:
// Buffer class - allow for output buffering such that it can be written out into async pieces
struct Buffer
{
unsigned char *data;
unsigned char* data;
ssize_t len;
ssize_t pos;
Buffer( const unsigned char *bytes, ssize_t nbytes )
Buffer( const unsigned char* bytes, ssize_t nbytes )
{
pos = 0;
len = nbytes;
......@@ -81,7 +85,7 @@ class ModbusTCPSession:
delete [] data;
}
unsigned char *dpos()
unsigned char* dpos()
{
return data + pos;
}
......@@ -92,18 +96,18 @@ class ModbusTCPSession:
}
};
void callback( ev::io &watcher, int revents );
void idleCallback( ev::idle &watcher, int revents );
virtual void readEvent( ev::io &watcher );
virtual void writeEvent( ev::io &watcher );
void callback( ev::io& watcher, int revents );
void onTimeout( ev::timer& watcher, int revents );
virtual void readEvent( ev::io& watcher );
virtual void writeEvent( ev::io& watcher );
virtual void final();
virtual size_t getNextData( unsigned char* buf, int len ) override;
virtual void setChannelTimeout( timeout_t msec );
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override;
virtual ModbusRTU::mbErrCode tcp_processing(ModbusTCP::MBAPHeader& mhead );
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request ) override;
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request ) override;
virtual ModbusRTU::mbErrCode readCoilStatus( ModbusRTU::ReadCoilMessage& query,
ModbusRTU::ReadCoilRetMessage& reply );
......@@ -156,9 +160,10 @@ class ModbusTCPSession:
ModbusRTU::ModbusMessage buf;
ev::io io;
int sfd;
std::shared_ptr<USocket> sock;
std::queue<Buffer*> qsend;
ev::idle idle;
ev::timer ioTimeout;
double sessTimeout = { 10.0 };
bool ignoreAddr = { false };
std::string peername = { "" };
......
......@@ -185,3 +185,9 @@ void ModbusRTUSlave::terminate()
}
catch(...) {}
}
// -------------------------------------------------------------------------
bool ModbusRTUSlave::isAcive()
{
return false;
}
// -------------------------------------------------------------------------
......@@ -32,7 +32,7 @@ size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
for( ; i < max; i++ )
{
unsigned char c;
ssize_t l = tcp->readData(&c, sizeof(c),0,t);
ssize_t l = tcp->readData(&c, sizeof(c), 0, t);
if( l <= 0 )
break;
......@@ -80,7 +80,8 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
{
size_t i = 0;
#if 1
for( size_t a=0; a<attempts; a++ )
for( size_t a = 0; a < attempts; a++ )
{
for( ; i < max; i++ )
{
......@@ -94,19 +95,22 @@ size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size
qrecv.push(c);
}
}
#else
char* buf = new char[max];
ssize_t l = 0;
for( size_t a=0; a<attempts; a++ )
for( size_t a = 0; a < attempts; a++ )
{
l = ::read(fd, buf, sizeof(buf));
if( l > 0 )
break;
}
if( l > 0 )
{
for( int i=0; i<l; i++ )
for( int i = 0; i < l; i++ )
qrecv.push(buf[i]);
}
......@@ -148,7 +152,8 @@ mbErrCode ModbusTCPCore::sendData( UTCPStream* tcp, unsigned char* buf, size_t l
try
{
ssize_t l = tcp->writeData(buf,len,t);
ssize_t l = tcp->writeData(buf, len, t);
if( l == len )
return erNoError;
}
......@@ -166,9 +171,11 @@ mbErrCode ModbusTCPCore::sendData( UTCPStream* tcp, unsigned char* buf, size_t l
// -------------------------------------------------------------------------
mbErrCode ModbusTCPCore::sendDataFD( int fd, unsigned char* buf, size_t len )
{
ssize_t l = ::write(fd,buf,len);
ssize_t l = ::write(fd, buf, len);
if( l == len )
return erNoError;
return erHardwareError;
}
// -------------------------------------------------------------------------
......@@ -132,7 +132,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
{
if( tcp->isPending(ost::Socket::pendingOutput, timeout) )
{
tcp->writeData(&mh,sizeof(mh));
tcp->writeData(&mh, sizeof(mh));
// send PDU
mbErrCode res = send(msg);
......
......@@ -33,26 +33,44 @@ using namespace UniSetTypes;
ModbusTCPSession::~ModbusTCPSession()
{
cancelled = true;
if( io.is_active() )
io.stop();
if( idle.is_active() )
idle.stop();
if( sfd > 0 )
close(sfd);
if( ioTimeout.is_active() )
ioTimeout.stop();
}
// -------------------------------------------------------------------------
ModbusTCPSession::ModbusTCPSession(int sock, const std::unordered_set<ModbusAddr>& a, timeout_t timeout ):
ModbusTCPSession::ModbusTCPSession( int sfd, const std::unordered_set<ModbusAddr>& a, timeout_t timeout ):
vaddr(a),
timeout(timeout),
sfd(sock),
peername(""),
caddr(""),
cancelled(false),
askCount(0)
{
fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
try
{
sock = make_shared<USocket>(sfd);
ost::tpport_t p;
ost::InetAddress iaddr = sock->getIPV4Peer(&p);
// resolve..
caddr = string( iaddr.getHostname() );
ostringstream s;
s << iaddr << ":" << p;
peername = s.str();
}
catch( const ost::SockException& ex )
{
ostringstream err;
err << ex.what();
dlog->crit() << "(ModbusTCPSession): err: " << err.str() << endl;
throw SystemError(err.str());
}
sock->setCompletion(false); // fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
setCRCNoCheckit(true);
timeout_t tout = timeout / 1000;
......@@ -61,7 +79,7 @@ ModbusTCPSession::ModbusTCPSession(int sock, const std::unordered_set<ModbusAddr
tout = 3;
io.set<ModbusTCPSession, &ModbusTCPSession::callback>(this);
idle.set<ModbusTCPSession, &ModbusTCPSession::idleCallback>(this);
ioTimeout.set<ModbusTCPSession, &ModbusTCPSession::onTimeout>(this);
}
// -------------------------------------------------------------------------
unsigned int ModbusTCPSession::getAskCount()
......@@ -69,22 +87,36 @@ unsigned int ModbusTCPSession::getAskCount()
uniset_rwmutex_rlock l(mAsk);
return askCount;
}
void ModbusTCPSession::setSessionTimeout( double t )
{
sessTimeout = t;
if( ioTimeout.is_active() )
ioTimeout.start(t);
}
// -------------------------------------------------------------------------
void ModbusTCPSession::run()
{
if( dlog->is_info() )
dlog->info() << peername << "(run): run session.." << endl;
io.start(sfd, ev::READ);
//idle.start();
io.start(sock->getSocket(), ev::READ);
ioTimeout.start(sessTimeout);
}
// -------------------------------------------------------------------------
bool ModbusTCPSession::isAcive()
{
return io.is_active();
}
// -------------------------------------------------------------------------
void ModbusTCPSession::callback( ev::io &watcher, int revents )
void ModbusTCPSession::callback( ev::io& watcher, int revents )
{
if( EV_ERROR & revents )
{
if( dlog->is_crit() )
dlog->crit() << peername << "(callback): EVENT ERROR.." << endl;
return;
}
......@@ -97,25 +129,31 @@ void ModbusTCPSession::callback( ev::io &watcher, int revents )
if( qsend.empty() )
io.set(ev::READ);
else
io.set(ev::READ|ev::WRITE);
io.set(ev::READ | ev::WRITE);
if( cancelled )
{
if( dlog->is_info() )
dlog->info() << peername << ": stop session... disconnect.." << endl;
io.stop();
//close(sfd);
idle.stop();
ioTimeout.stop();
final();
delete this;
}
else
ioTimeout.start(sessTimeout); // restart timer..
}
// -------------------------------------------------------------------------
void ModbusTCPSession::idleCallback(ev::idle& watcher, int revents)
void ModbusTCPSession::onTimeout( ev::timer& watcher, int revents )
{
cerr << "idle..." << endl;
if( dlog->is_info() )
dlog->info() << peername << ": timeout connection activity.." << endl;
terminate();
}
// -------------------------------------------------------------------------
void ModbusTCPSession::readEvent( ev::io &watcher )
void ModbusTCPSession::readEvent( ev::io& watcher )
{
ModbusRTU::mbErrCode res = receive(vaddr, timeout);
......@@ -140,14 +178,17 @@ void ModbusTCPSession::writeEvent( ev::io& watcher )
Buffer* buffer = qsend.front();
ssize_t ret = write(watcher.fd, buffer->dpos(), buffer->nbytes());
if( ret < 0 )
{
if( dlog->is_warn() )
dlog->warn() << peername << "(writeEvent): write to socket error: " << strerror(errno) << endl;
return;
}
buffer->pos += ret;
if( buffer->nbytes() == 0 )
{
qsend.pop();
......@@ -229,20 +270,20 @@ void ModbusTCPSession::final()
// -------------------------------------------------------------------------
mbErrCode ModbusTCPSession::sendData( unsigned char* buf, int len )
{
qsend.push( new Buffer(buf,len) );
qsend.push( new Buffer(buf, len) );
return erNoError;
}
// -------------------------------------------------------------------------
size_t ModbusTCPSession::getNextData( unsigned char* buf, int len )
{
ssize_t res = ModbusTCPCore::getDataFD( sfd, qrecv, buf, len );
ssize_t res = ModbusTCPCore::getDataFD( sock->getSocket(), qrecv, buf, len );
if( res > 0 )
return res;
if( res < 0 )
{
if( errno!=EAGAIN && dlog->is_warn() )
if( errno != EAGAIN && dlog->is_warn() )
dlog->warn() << peername << "(getNextData): read from socket error(" << errno << "): " << strerror(errno) << endl;
return 0;
......@@ -295,12 +336,13 @@ mbErrCode ModbusTCPSession::tcp_processing( ModbusTCP::MBAPHeader& mhead )
return erInvalidFormat;
}
len = ModbusTCPCore::readDataFD( sfd, qrecv, mhead.len );
len = ModbusTCPCore::readDataFD( sock->getSocket(), qrecv, mhead.len );
if( len == 0 )
{
// делаем ещё одну попытку чтения через некоторое время
msleep(5);
len = ModbusTCPCore::readDataFD( sfd, qrecv, mhead.len );
len = ModbusTCPCore::readDataFD( sock->getSocket(), qrecv, mhead.len );
}
if( len < mhead.len )
......@@ -336,7 +378,7 @@ mbErrCode ModbusTCPSession::pre_send_request( ModbusMessage& request )
dlog->info(false) << endl;
}
sendData((unsigned char*)(&curQueryHeader),sizeof(curQueryHeader));
sendData((unsigned char*)(&curQueryHeader), sizeof(curQueryHeader));
curQueryHeader.swapdata();
return erNoError;
......@@ -356,15 +398,17 @@ void ModbusTCPSession::cleanInputStream()
// -------------------------------------------------------------------------
void ModbusTCPSession::terminate()
{
ModbusServer::terminate();
if( dlog->is_info() )
dlog->info() << peername << "(terminate)..." << endl;
cancelled = true;
io.stop();
idle.stop();
close(sfd);
ioTimeout.stop();
ModbusServer::terminate();
sock.reset(); // close..
final();
}
// -------------------------------------------------------------------------
mbErrCode ModbusTCPSession::readCoilStatus( ReadCoilMessage& query,
......@@ -505,8 +549,7 @@ ModbusRTU::mbErrCode ModbusTCPSession::fileTransfer( ModbusRTU::FileTransferMess
// -------------------------------------------------------------------------
void ModbusTCPSession::setChannelTimeout( timeout_t msec )
{
// setTimeout(msec);
#warning NOT YET!!
setSessionTimeout(msec);
}
// -------------------------------------------------------------------------
void ModbusTCPSession::connectFinalSession( FinalSlot sl )
......
......@@ -2,7 +2,7 @@
# This file is part of the UniSet library #
############################################################################
noinst_LTLIBRARIES = libTCP.la
libTCP_la_SOURCES = UTCPStream.cc TCPCheck.cc
libTCP_la_SOURCES = UTCPCore.cc UTCPStream.cc USocket.cc UTCPSocket.cc TCPCheck.cc
libTCP_la_CXXFLAGS = $(SIGC_CFLAGS) $(COMCPP_CFLAGS)
libTCP_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS)
......
#include "USocket.h"
#include "UTCPCore.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
USocket::~USocket()
{
endSocket();
}
// -------------------------------------------------------------------------
USocket::USocket( int sock ):
Socket(accept(sock, NULL, NULL))
{
init();
}
// -------------------------------------------------------------------------
bool USocket::setKeepAliveParams( timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(so, timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
int USocket::getSocket()
{
return so;
}
// -------------------------------------------------------------------------
void USocket::init( bool throwflag )
{
setError(throwflag);
setKeepAlive(true);
setLinger(true);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
#include "UTCPCore.h"
// glibc..
#include <netinet/tcp.h>
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
bool UTCPCore::setKeepAliveParams( int fd, timeout_t timeout_sec, int keepcnt, int keepintvl )
{
int enable = 1;
bool ok = true;
if( setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&enable, sizeof(enable)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPCNT, (void*) &keepcnt, sizeof(keepcnt)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, (void*) &keepintvl, sizeof (keepintvl)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, (void*) &timeout_sec, sizeof (timeout_sec)) == -1 )
ok = false;
return ok;
}
// -------------------------------------------------------------------------
#include <iostream>
#include <string>
#include <fcntl.h>
#include <errno.h>
#include <cstring>
#include <cc++/socket.h>
#include "UTCPSocket.h"
#include "PassiveTimer.h"
#include "UniSetTypes.h"
#include "UTCPCore.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
UTCPSocket::~UTCPSocket()
{
endSocket();
// shutdown(so, SHUT_RDWR);
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( int sock ):
TCPSocket(NULL)
{
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
so = accept(sock, (struct sockaddr*)&client_addr, &client_len);
if( so < 0 )
{
endSocket();
error(errConnectRejected);
return;
}
Socket::state = CONNECTED;
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket( const std::string& hname, unsigned backlog, unsigned mss ):
TCPSocket(hname.c_str(), backlog, mss)
{
init();
}
// -------------------------------------------------------------------------
UTCPSocket::UTCPSocket(const ost::IPV4Address& bind, ost::tpport_t port, unsigned backlog, unsigned mss):
TCPSocket(bind, port, backlog, mss)
{
init();
}
// -------------------------------------------------------------------------
bool UTCPSocket::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{
return UTCPCore::setKeepAliveParams(so, timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
bool UTCPSocket::setNoDelay(bool enable)
{
return ( TCPSocket::setNoDelay(enable) == 0 );
}
// -------------------------------------------------------------------------
int UTCPSocket::getSocket()
{
return so;
}
// -------------------------------------------------------------------------
void UTCPSocket::init( bool throwflag )
{
setError(throwflag);
setKeepAlive(true);
setLinger(true);
setKeepAliveParams();
}
// -------------------------------------------------------------------------
ssize_t UTCPSocket::writeData(const void* buf, size_t len, timeout_t t)
{
return TCPSocket::writeData(buf, len, t);
}
// -------------------------------------------------------------------------
ssize_t UTCPSocket::readData(void* buf, size_t len, char separator, timeout_t t)
{
return TCPSocket::readData(buf, len, separator, t);
}
// -------------------------------------------------------------------------
......@@ -23,9 +23,7 @@
#include "UTCPStream.h"
#include "PassiveTimer.h"
#include "UniSetTypes.h"
// glibc..
#include <netinet/tcp.h>
#include "UTCPCore.h"
// -------------------------------------------------------------------------
using namespace std;
// -------------------------------------------------------------------------
......@@ -41,23 +39,7 @@ UTCPStream::UTCPStream():
// -------------------------------------------------------------------------
bool UTCPStream::setKeepAliveParams(timeout_t timeout_sec, int keepcnt, int keepintvl )
{
SOCKET fd = TCPStream::so;
int enable = 1;
bool ok = true;
if( setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&enable, sizeof(enable)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPCNT, (void*) &keepcnt, sizeof(keepcnt)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, (void*) &keepintvl, sizeof (keepintvl)) == -1 )
ok = false;
if( setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, (void*) &timeout_sec, sizeof (timeout_sec)) == -1 )
ok = false;
return ok;
return UTCPCore::setKeepAliveParams(so, timeout_sec, keepcnt, keepintvl);
}
// -------------------------------------------------------------------------
bool UTCPStream::isSetLinger()
......@@ -80,12 +62,12 @@ bool UTCPStream::setNoDelay(bool enable)
// -------------------------------------------------------------------------
ssize_t UTCPStream::writeData(const void* buf, size_t len, timeout_t t)
{
return TCPStream::writeData(buf,len,t);
return TCPStream::writeData(buf, len, t);
}
// -------------------------------------------------------------------------
ssize_t UTCPStream::readData(void* buf, size_t len, char separator, timeout_t t)
{
return TCPStream::readData(buf,len,separator,t);
return TCPStream::readData(buf, len, separator, t);
}
// -------------------------------------------------------------------------
int UTCPStream::getSocket()
......
......@@ -34,6 +34,7 @@ bool DBNetInterface::connect( const std::string& param )
dbname = param.substr(prev, pos - prev);
break;
}
return nconnect( host, user, pswd, dbname );
}
//--------------------------------------------------------------------------------------------
......
......@@ -146,9 +146,6 @@ extensions/ModbusSlave/Makefile.am
extensions/ModbusSlave/MBSlave.cc
extensions/ModbusSlave/mbslave.cc
extensions/ModbusSlave/MBSlave.h
extensions/ModbusSlave/mbtcp-persistentslave.cc
extensions/ModbusSlave/MBTCPPersistentSlave.cc
extensions/ModbusSlave/MBTCPPersistentSlave.h
extensions/ModbusSlave/test.xml
extensions/RRDServer/libUniSet2RRDServer.pc.in
extensions/RRDServer/main.cc
......@@ -311,7 +308,10 @@ include/UniSetManager.h
include/UniSetObject.h
include/UniSetTypes.h
include/UniXML.h
include/UTCPCore.h
include/UTCPStream.h
include/UTCPSocket.h
include/USocket.h
include/WDTInterface.h
lib/Makefile.am
python/examples/test.xml
......@@ -349,7 +349,10 @@ src/Communications/Modbus/ModbusTCPTypes.cc
src/Communications/Modbus/ModbusTypes.cc
src/Communications/TCP/Makefile.am
src/Communications/TCP/TCPCheck.cc
src/Communications/TCP/UTCPCore.cc
src/Communications/TCP/UTCPStream.cc
src/Communications/TCP/UTCPSocket.cc
src/Communications/TCP/USocket.cc
src/Communications/ComPort.cc
src/Communications/ComPort485F.cc
src/Communications/Makefile.am
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment