Commit 62b4f673 authored by Pavel Vainerman's avatar Pavel Vainerman

Глобальная переделка UTCPStream и перевод ModbusTCPXXX на использование

writeData/readData (решение проблеммы буферизации). (ModbusTCPServer): первая версия реализации на libev
parent d0c3b37b
...@@ -41,6 +41,7 @@ PKG_CHECK_MODULES(XML, libxml-2.0) ...@@ -41,6 +41,7 @@ PKG_CHECK_MODULES(XML, libxml-2.0)
PKG_CHECK_MODULES(OMNI, omniORB4) PKG_CHECK_MODULES(OMNI, omniORB4)
PKG_CHECK_MODULES(SIGC, sigc++-2.0) PKG_CHECK_MODULES(SIGC, sigc++-2.0)
PKG_CHECK_MODULES(COMCPP, libccgnu2) PKG_CHECK_MODULES(COMCPP, libccgnu2)
PKG_CHECK_MODULES(EV, libev)
#check sqlite support #check sqlite support
...@@ -384,6 +385,7 @@ AC_CONFIG_FILES([Makefile ...@@ -384,6 +385,7 @@ AC_CONFIG_FILES([Makefile
extensions/ModbusSlave/Makefile extensions/ModbusSlave/Makefile
extensions/ModbusSlave/libUniSet2MBSlave.pc extensions/ModbusSlave/libUniSet2MBSlave.pc
extensions/ModbusSlave/tests/Makefile extensions/ModbusSlave/tests/Makefile
extensions/ModbusSlave/epoll-test/Makefile
extensions/LogicProcessor/Makefile extensions/LogicProcessor/Makefile
extensions/LogicProcessor/libUniSet2LogicProcessor.pc extensions/LogicProcessor/libUniSet2LogicProcessor.pc
extensions/LogicProcessor/tests/Makefile extensions/LogicProcessor/tests/Makefile
......
...@@ -51,15 +51,13 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const ...@@ -51,15 +51,13 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const
auto conf = uniset_conf(); auto conf = uniset_conf();
mutex_start.setName(myname + "_mutex_start"); mutex_start.setName(myname + "_mutex_start");
mblog = make_shared<DebugStream>();
mblog->setLogName(myname);
conf->initLogStream(mblog, prefix + "-log");
loga = make_shared<LogAgregator>(myname + "-loga"); loga = make_shared<LogAgregator>(myname + "-loga");
loga->add(mblog);
loga->add(ulog()); loga->add(ulog());
loga->add(dlog()); loga->add(dlog());
mblog = loga->create(myname);
conf->initLogStream(mblog, prefix + "-log");
// xmlNode* cnode = conf->getNode(myname); // xmlNode* cnode = conf->getNode(myname);
string conf_name(conf->getArgParam("--" + prefix + "-confnode", myname)); string conf_name(conf->getArgParam("--" + prefix + "-confnode", myname));
...@@ -172,12 +170,18 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const ...@@ -172,12 +170,18 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const
rs->setRecvTimeout(2000); rs->setRecvTimeout(2000);
rs->setAfterSendPause(aftersend_pause); rs->setAfterSendPause(aftersend_pause);
rs->setReplyTimeout(reply_tout); rs->setReplyTimeout(reply_tout);
rs->setLog(mblog); // rs->setLog(mblog);
mbslot = std::static_pointer_cast<ModbusServerSlot>(rs); mbslot = std::static_pointer_cast<ModbusServerSlot>(rs);
thr = make_shared< ThreadCreator<MBSlave> >(this, &MBSlave::execute_rtu); thr = make_shared< ThreadCreator<MBSlave> >(this, &MBSlave::execute_rtu);
thr->setFinalAction(this, &MBSlave::finalThread); thr->setFinalAction(this, &MBSlave::finalThread);
mbinfo << myname << "(init): type=RTU dev=" << dev << " speed=" << speed << endl; mbinfo << myname << "(init): type=RTU dev=" << dev << " speed=" << speed << endl;
ostringstream n;
n << prefix << "-exchangelog";
auto l = loga->create(n.str());
rs->setLog(l);
conf->initLogStream(l, prefix + "-exchangelog");
} }
else if( stype == "TCP" ) else if( stype == "TCP" )
{ {
...@@ -201,8 +205,11 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const ...@@ -201,8 +205,11 @@ MBSlave::MBSlave(UniSetTypes::ObjectId objId, UniSetTypes::ObjectId shmId, const
thr->setFinalAction(this, &MBSlave::finalThread); thr->setFinalAction(this, &MBSlave::finalThread);
mbinfo << myname << "(init): init TCP connection ok. " << " inet=" << iaddr << " port=" << port << endl; mbinfo << myname << "(init): init TCP connection ok. " << " inet=" << iaddr << " port=" << port << endl;
if( mblog->is_level9() ) ostringstream n;
mbtcp->setLog(mblog); n << prefix << "-exchangelog";
auto l = loga->create(n.str());
mbtcp->setLog(l);
conf->initLogStream(l, prefix + "-exchangelog");
} }
else else
throw UniSetTypes::SystemError(myname + "(MBSlave): Unknown slave type. Use: --" + prefix + "-type [RTU|TCP]"); throw UniSetTypes::SystemError(myname + "(MBSlave): Unknown slave type. Use: --" + prefix + "-type [RTU|TCP]");
......
...@@ -179,8 +179,6 @@ void MBTCPPersistentSlave::execute_tcp() ...@@ -179,8 +179,6 @@ void MBTCPPersistentSlave::execute_tcp()
return; return;
} }
auto l = loga->create(myname + "-exchangelog");
sslot->setLog(l);
for( auto && i : cmap ) for( auto && i : cmap )
i.second.ptTimeout.reset(); i.second.ptTimeout.reset();
...@@ -189,13 +187,14 @@ void MBTCPPersistentSlave::execute_tcp() ...@@ -189,13 +187,14 @@ void MBTCPPersistentSlave::execute_tcp()
mbinfo << myname << "(execute_tcp): thread running.." << endl; mbinfo << myname << "(execute_tcp): thread running.." << endl;
while( !cancelled ) while( !cancelled )
{ {
try try
{ {
sslot->waitQuery( vaddr, waitTimeout ); #warning MBTCPPersistentSlave: Из этой функции нет возврата..
sslot->mainLoop( vaddr );
// ========== В текущей реализации КОД НИЖЕ не будет вызван... по крайней мере пока не завершится процесс ====
// если слишком быстро обработали запрос // если слишком быстро обработали запрос
// то ничего не делаем.. // то ничего не делаем..
......
bin_PROGRAMS = mb-epoll-test boost_client
#mb_epoll_test_SOURCES = mbslave-epoll.cc
#mb_epoll_test_LDADD = -levent
mb_epoll_test_SOURCES = ev++example1.cc
mb_epoll_test_LDADD = -lev
boost_client_SOURCES = boost_client2.cc
boost_client_LDADD = -lboost_system -lboost_thread
# libevent.pc
\ No newline at end of file
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <ev++.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <resolv.h>
#include <errno.h>
#include <list>
#include <unistd.h>
#include <iostream>
using namespace std;
//
// Buffer class - allow for output buffering such that it can be written out
// into async pieces
//
struct Buffer {
char *data;
ssize_t len;
ssize_t pos;
Buffer(const char *bytes, ssize_t nbytes) {
pos = 0;
len = nbytes;
data = new char[nbytes];
memcpy(data, bytes, nbytes);
}
virtual ~Buffer() {
delete [] data;
}
char *dpos() {
return data + pos;
}
ssize_t nbytes() {
return len - pos;
}
};
//
// A single instance of a non-blocking Echo handler
//
class EchoInstance {
private:
ev::io io;
static int total_clients;
int sfd;
//ev::idle io_idle;
// Buffers that are pending write
std::list<Buffer*> write_queue;
// Generic callback
void idle_callback(ev::idle &watcher, int revents) {
cerr << "idle..." << endl;
/*
if (!write_queue.empty())
write_data(sfd);
else
{
io_idle.stop();
}
*/
// io.set(ev::READ|ev::WRITE);
// io_idle.stop();
}
// Generic callback
void callback(ev::io &watcher, int revents) {
cerr << "call..." << endl;
if (EV_ERROR & revents) {
perror("got invalid event");
return;
}
if (revents & EV_READ)
read_cb(watcher);
#if 0
// if (revents & EV_WRITE)
// write_cb(watcher);
// if( !write_queue.empty() )
// write_cb(watcher);
// io.set(ev::READ);
#else
if (revents & EV_WRITE)
write_cb(watcher);
if (write_queue.empty()) {
io.set(ev::READ);
} else {
io.set(ev::READ|ev::WRITE);
}
#endif
cerr << "events: " << revents << " active: " << io.is_active()
<< " is_pending: " << io.is_pending()
<< " total: " << total_clients
<< endl;
}
// Socket is writable
void write_cb(ev::io &watcher) {
write_data(watcher.fd);
}
void write_data(int wfd) {
if (write_queue.empty()) {
//io.set(ev::READ);
return;
}
Buffer* buffer = write_queue.front();
ssize_t written = write(wfd, buffer->dpos(), buffer->nbytes());
if (written < 0) {
perror("write error");
return;
}
buffer->pos += written;
if (buffer->nbytes() == 0) {
write_queue.pop_front();
delete buffer;
}
if (write_queue.empty()) {
//io.set(ev::READ);
}
cout << "write: " << written << endl;
}
// Receive message from client socket
void read_cb(ev::io &watcher) {
char buffer[1024];
ssize_t nread = recv(watcher.fd, buffer, sizeof(buffer), 0);
//ssize_t nread = read(watcher.fd, buffer, sizeof(buffer));
cout << "read: " << nread << endl;
if (nread < 0) {
perror("read error");
return;
}
if (nread == 0) {
// Gack - we're deleting ourself inside of ourself!
delete this; // (pv): ??!!!
} else {
// Send message bach to the client
write_queue.push_back(new Buffer(buffer, nread));
//io_idle.start();
}
}
// effictivly a close and a destroy
virtual ~EchoInstance() {
// Stop and free watcher if client socket is closing
io.stop();
//io_idle.stop();
close(sfd);
printf("%d client(s) disconnected.\n", --total_clients);
}
public:
EchoInstance(int s) : sfd(s) {
fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
printf("Got connection\n");
total_clients++;
io.set<EchoInstance, &EchoInstance::callback>(this);
io.start(s, ev::READ);
//io_idle.set<EchoInstance, &EchoInstance::idle_callback>(this);
//io_idle.start();
}
};
class EchoServer {
private:
ev::io io;
ev::sig sio;
int s;
public:
void io_accept(ev::io &watcher, int revents) {
if (EV_ERROR & revents) {
perror("got invalid event");
return;
}
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
if (client_sd < 0) {
perror("accept error");
return;
}
EchoInstance *client = new EchoInstance(client_sd);
}
static void signal_cb(ev::sig &signal, int revents) {
signal.loop.break_loop();
}
EchoServer(int port) {
printf("Listening on port %d\n", port);
struct sockaddr_in addr;
s = socket(PF_INET, SOCK_STREAM, 0);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
perror("bind");
}
fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
int on = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
perror("Could not set socket %d option for reusability: \n");
listen(s, 5);
io.set<EchoServer, &EchoServer::io_accept>(this);
io.start(s, ev::READ);
sio.set<&EchoServer::signal_cb>();
sio.start(SIGINT);
}
virtual ~EchoServer() {
shutdown(s, SHUT_RDWR);
close(s);
}
};
int EchoInstance::total_clients = 0;
int main(int argc, char **argv)
{
int port = 8192;
if (argc > 1)
port = atoi(argv[1]);
ev::default_loop loop;
EchoServer echo(port);
loop.run(0 /* ev::EPOLL */);
return 0;
}
#include <event.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <iostream>
using namespace std;
// Read/write buffer max length
static const size_t MAX_BUF = 512;
typedef struct {
struct event ev;
char buf[MAX_BUF];
size_t offset;
size_t size;
} ConnectionData;
void on_connect(int fd, short event, void *arg);
void client_read(int fd, short event, void *arg);
void client_write(int fd, short event, void *arg);
int main(int argc, char **argv)
{
// Check arguments
if (argc < 3) {
std::cout << "Run with options: <ip address> <port>" << std::endl;
return 1;
}
// Create server socket
int server_sock = socket(AF_INET, SOCK_STREAM, 0);
if (server_sock == -1) {
std::cerr << "Failed to create socket" << std::endl;
return 1;
}
sockaddr_in sa;
int on = 1;
char * ip_addr = argv[1];
short port = atoi(argv[2]);
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = inet_addr(ip_addr);
// Set option SO_REUSEADDR to reuse same host:port in a short time
if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
std::cerr << "Failed to set option SO_REUSEADDR" << std::endl;
return 1;
}
// Bind server socket to ip:port
if (bind(server_sock, reinterpret_cast<const sockaddr*>(&sa), sizeof(sa)) == -1) {
std::cerr << "Failed to bind server socket" << std::endl;
return 1;
}
// Make server to listen
if (listen(server_sock, 10) == -1) {
std::cerr << "Failed to make server listen" << std::endl;
return 1;
}
// Init events
struct event evserver_sock;
// Initialize
event_init();
// Set connection callback (on_connect()) to read event on server socket
event_set(&evserver_sock, server_sock, EV_READ|EV_PERSIST, on_connect, &evserver_sock);
// Add server event without timeout
event_add(&evserver_sock, NULL);
// Dispatch events
event_dispatch();
return 0;
}
// Handle new connection {{{
void on_connect(int fd, short event, void *arg)
{
sockaddr_in client_addr;
socklen_t len = 0;
// Accept incoming connection
int sock = accept(fd, reinterpret_cast<sockaddr*>(&client_addr), &len);
if (sock < 1) {
return;
}
cerr << "*** (ON CONNECT): ..." << endl;
// Set read callback to client socket
ConnectionData * data = new ConnectionData;
event_set(&data->ev, sock, EV_READ, client_read, data);
// Reschedule server event
event_add(reinterpret_cast<struct event*>(arg), NULL);
// Schedule client event
event_add(&data->ev, NULL);
}
//}}}
// Handle client request {{{
void client_read(int fd, short event, void *arg)
{
cerr << "*** (READ): RESCHEDULE..." << endl;
ConnectionData * data = reinterpret_cast<ConnectionData*>(arg);
if (!data) {
close(fd);
return;
}
int len = read(fd, data->buf, MAX_BUF - 1);
if (len < 1) {
close(fd);
delete data;
return;
}
data->buf[len] = 0;
data->size = len;
data->offset = 0;
// Set write callback to client socket
event_set(&data->ev, fd, EV_WRITE, client_write, data);
// Schedule client event
event_add(&data->ev, NULL);
}
//}}}
// Handle client responce {{{
void client_write(int fd, short event, void *arg)
{
ConnectionData * data = reinterpret_cast<ConnectionData*>(arg);
if (!data) {
close(fd);
return;
}
// Send data to client
int len = write(fd, data->buf + data->offset, data->size - data->offset);
if (len < data->size - data->offset) {
// Failed to send rest data, need to reschedule
data->offset += len;
event_set(&data->ev, fd, EV_WRITE, client_write, data);
// Schedule client event
event_add(&data->ev, NULL);
return;
}
// close(fd);
// delete data;
cerr << "*** (WRITE): RESCHEDULE..." << endl;
event_set(&data->ev, fd, EV_READ, client_read, data);
event_add(&data->ev, NULL);
}
//}}}
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define EPOLL_ARRAY_SIZE 64
void sprint_buffer(const char *buffer, int size)
{
int i;
for (i = 0; i < size; i++)
{
if (isprint(buffer[i]))
printf("%c", buffer[i]);
else
printf("\\0x%02X", buffer[i]);
}
}
int main(int argc, char *argv[])
{
int sd, efd, clientsd, fd;
struct sockaddr_in bindaddr, peeraddr;
socklen_t salen = sizeof(peeraddr);
int pollsize = 1;
struct epoll_event ev;
struct epoll_event epoll_events[EPOLL_ARRAY_SIZE];
uint32_t events;
unsigned short port = 20000;
int i, rval, on = 1;
ssize_t rc;
char buffer[1024];
efd = epoll_create(pollsize);
if (efd < 0)
{
printf("Could not create the epoll fd: %m");
return 1;
}
sd = socket(AF_INET, SOCK_STREAM, 0);
if (sd < 0)
{
printf("Could not create new socket: %m\n");
return 2;
}
printf("New socket created with sd %d\n", sd);
if (fcntl(sd, F_SETFL, O_NONBLOCK))
{
printf("Could not make the socket non-blocking: %m\n");
close(sd);
return 3;
}
if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
{
printf("Could not set socket %d option for reusability: %m\n", sd);
close(sd);
return 4;
}
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_family= AF_INET;
bindaddr.sin_port = htons(port);
if (bind(sd, (struct sockaddr *) &bindaddr, sizeof(struct sockaddr_in)) < 0)
{
printf("Could not bind socket %d to address 'INADDR_ANY' and port %u: %m", sd, port);
close(sd);
return 5;
}
else
{
printf("Bound socket %d to address 'INADDR_ANY' and port %u\n", sd, port);
}
if (listen(sd, SOMAXCONN))
{
printf("Could not start listening on server socket %d: %m\n", sd);
goto cleanup;
}
else
{
printf("Server socket %d started listening to address 'INADDR_ANY' and port %u\n", sd, port);
}
ev.events = EPOLLIN;
ev.data.u64 = 0LL;
ev.data.fd = sd;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sd, &ev) < 0)
{
printf("Couldn't add server socket %d to epoll set: %m\n", sd);
goto cleanup;
}
for (;;)
{
printf("Starting epoll_wait on %d fds\n", pollsize);
while ((rval = epoll_wait(efd, epoll_events, EPOLL_ARRAY_SIZE, -1)) < 0)
{
if ((rval < 0) && (errno != EINTR))
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
}
}
for (i = 0; i < rval; i++)
{
events = epoll_events[i].events;
fd = epoll_events[i].data.fd;
if (events & EPOLLERR)
{
if (fd == sd)
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
} else
{
printf("Closing socket with sd %d\n", fd);
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
}
}
if (events & EPOLLHUP)
{
if (fd == sd)
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
} else
{
printf("Closing socket with sd %d\n", fd);
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
}
}
if (events & EPOLLRDHUP)
{
if (fd == sd)
{
printf("EPoll on %d fds failed: %m\n", pollsize);
goto cleanup;
} else
{
printf("Closing socket with sd %d\n", fd);
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
}
}
if (events & EPOLLOUT)
{
if (fd != sd)
{
rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);
while ((rc = send(fd, buffer, rc, 0)) < 0)
{
if ((fd < 0) && (errno != EINTR))
{
printf("Send to socket %d failed: %m\n", fd);
pollsize--;
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
}
}
if (rc == 0)
{
printf("Closing socket with sd %d\n", fd);
pollsize--;
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
} else if (rc > 0)
{
printf("Sent '");
sprint_buffer(buffer, rc);
printf("' to socket with sd %d\n", fd);
ev.events = EPOLLIN;
ev.data.u64 = 0LL;
ev.data.fd = fd;
if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev) < 0)
{
printf("Couldn't modify client socket %d in epoll set: %m\n", fd);
goto cleanup;
}
}
}
}
if (events & EPOLLIN)
{
if (fd == sd)
{
while ((clientsd = accept(sd, (struct sockaddr *) &peeraddr, &salen)) < 0)
{
if ((clientsd < 0) && (errno != EINTR))
{
printf("Accept on socket %d failed: %m\n", sd);
goto cleanup;
}
}
if (inet_ntop(AF_INET, &peeraddr.sin_addr.s_addr, buffer, sizeof(buffer)) != NULL)
{
printf("Accepted connection from %s:%u, assigned new sd %d\n", buffer, ntohs(peeraddr.sin_port), clientsd);
} else
{
printf("Failed to convert address from binary to text form: %m\n");
}
ev.events = EPOLLIN;
ev.data.u64 = 0LL;
ev.data.fd = clientsd;
if (epoll_ctl(efd, EPOLL_CTL_ADD, clientsd, &ev) < 0)
{
printf("Couldn't add client socket %d to epoll set: %m\n", clientsd);
goto cleanup;
}
pollsize++;
} else
{
while ((rc = recv(fd, buffer, sizeof(buffer), 0)) < 0)
{
if ((fd < 0) && (errno != EINTR))
{
printf("Receive from socket %d failed: %m\n", fd);
pollsize--;
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
}
}
if (rc == 0)
{
printf("Closing socket with sd %d\n", fd);
pollsize--;
shutdown(fd, SHUT_RDWR);
close(fd);
continue;
} else if (rc > 0)
{
printf("Received '");
sprint_buffer(buffer, rc);
printf("' from socket with sd %d\n", fd);
ev.events = EPOLLIN | EPOLLOUT;
ev.data.u64 = 0LL;
ev.data.fd = fd;
if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev) < 0)
{
printf("Couldn't modify client socket %d in epoll set: %m\n", fd);
goto cleanup;
}
}
}
}
}
}
cleanup:
shutdown(sd, SHUT_RDWR);
close(sd);
return 0;
}
\ No newline at end of file
#!/bin/sh #!/bin/sh
uniset2-start.sh -f ./uniset2-mbtcp-persistentslave --confile test.xml --dlog-add-levels level3,level4 \ uniset2-start.sh -f ./uniset2-mbtcp-persistentslave --confile test.xml --dlog-add-levels any --mbs-log-add-levels any --mbs-exchangelog-add-levels any \
--smemory-id SharedMemory \ --smemory-id SharedMemory \
--mbs-name MBMultiSlave1 --mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \ --mbs-name MBMultiSlave1 --mbs-type TCP --mbs-inet-addr 127.0.0.1 --mbs-inet-port 2048 --mbs-reg-from-id 1 --mbs-my-addr 0x01 \
$* $*
......
...@@ -41,6 +41,25 @@ class UTCPStream: ...@@ -41,6 +41,25 @@ class UTCPStream:
bool isSetLinger(); bool isSetLinger();
void forceDisconnect(); // disconnect() без ожидания (с отключением SO_LINGER) void forceDisconnect(); // disconnect() без ожидания (с отключением SO_LINGER)
/*!
* Enable/disable delaying packets (Nagle algorithm)
*
* @return true on success.
* @param enable disable Nagle algorithm when set to true.
*/
bool setNoDelay(bool enable);
// --------------------------------------------------------------------
// Пришлось вынести эти функции read/write[Data] в public
// т.к. они сразу "посылают" данные в канал, в отличие от operator<<
// который у TCPStream (или std::iostream?) буферизует их и из-за этого
// не позволяет работать с отправкой коротких сообщений
// --------------------------------------------------------------------
ssize_t writeData( const void* buf, size_t len, timeout_t t=0 );
ssize_t readData( void * buf,size_t len,char separator=0,timeout_t t=0 );
int getSocket();
protected: protected:
private: private:
......
...@@ -44,7 +44,7 @@ class ModbusRTUSlave: ...@@ -44,7 +44,7 @@ class ModbusRTUSlave:
protected: protected:
// realisation (see ModbusServer.h) // realisation (see ModbusServer.h)
virtual int getNextData( unsigned char* buf, int len ) override; virtual size_t getNextData( unsigned char* buf, int len ) override;
virtual void setChannelTimeout( timeout_t msec ) override; virtual void setChannelTimeout( timeout_t msec ) override;
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override; virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override;
......
...@@ -20,7 +20,7 @@ class ModbusServer ...@@ -20,7 +20,7 @@ class ModbusServer
void initLog( UniSetTypes::Configuration* conf, const std::string& name, const std::string& logfile = "" ); void initLog( UniSetTypes::Configuration* conf, const std::string& name, const std::string& logfile = "" );
void setLog( std::shared_ptr<DebugStream> dlog ); void setLog( std::shared_ptr<DebugStream> dlog );
inline std::shared_ptr<DebugStream> log(){ return dlog; }
std::unordered_set<ModbusRTU::ModbusAddr> addr2vaddr( ModbusRTU::ModbusAddr& mbaddr ); std::unordered_set<ModbusRTU::ModbusAddr> addr2vaddr( ModbusRTU::ModbusAddr& mbaddr );
...@@ -229,13 +229,13 @@ class ModbusServer ...@@ -229,13 +229,13 @@ class ModbusServer
virtual ModbusRTU::mbErrCode fileTransfer( ModbusRTU::FileTransferMessage& query, virtual ModbusRTU::mbErrCode fileTransfer( ModbusRTU::FileTransferMessage& query,
ModbusRTU::FileTransferRetMessage& reply ) = 0; ModbusRTU::FileTransferRetMessage& reply ) = 0;
/*! get next data block from channel ot recv buffer /*! get next data block from channel or recv buffer
\param begin - get from position \param begin - get from position
\param buf - buffer for data \param buf - buffer for data
\param len - size of buf \param len - size of buf
\return real data lenght ( must be <= len ) \return real data lenght ( must be <= len )
*/ */
virtual int getNextData( unsigned char* buf, int len ) = 0; virtual size_t getNextData( unsigned char* buf, int len ) = 0;
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) = 0; virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) = 0;
......
...@@ -5,13 +5,21 @@ ...@@ -5,13 +5,21 @@
#include <queue> #include <queue>
#include <cc++/socket.h> #include <cc++/socket.h>
#include "ModbusRTUErrors.h" #include "ModbusRTUErrors.h"
#include "UTCPStream.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! ModbusTCP core functions */ /*! ModbusTCP core functions */
namespace ModbusTCPCore namespace ModbusTCPCore
{ {
size_t readNextData( ost::TCPStream* tcp, std::queue<unsigned char>& qrecv, int max = 100 ); // t - msec (сколько ждать)
size_t getNextData( ost::TCPStream* tcp, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len );
ModbusRTU::mbErrCode sendData( ost::TCPStream* tcp, unsigned char* buf, size_t len ); size_t readNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, size_t max = 100, timeout_t t=10 );
size_t getNextData( UTCPStream* tcp, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, timeout_t t=10 );
ModbusRTU::mbErrCode sendData(UTCPStream* tcp, unsigned char* buf, size_t len, timeout_t t=10 );
// работа напрямую с сокетом
size_t readDataFD(int fd, std::queue<unsigned char>& qrecv, size_t max = 100, size_t attempts=1 );
size_t getDataFD( int fd, std::queue<unsigned char>& qrecv, unsigned char* buf, size_t len, size_t attempts=1 );
ModbusRTU::mbErrCode sendDataFD( int fd, unsigned char* buf, size_t len );
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
#endif // ModbusTCPCore_H_ #endif // ModbusTCPCore_H_
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <string> #include <string>
#include <queue> #include <queue>
#include <cc++/socket.h> #include <cc++/socket.h>
#include <ev++.h>
#include "Mutex.h" #include "Mutex.h"
#include "Debug.h" #include "Debug.h"
#include "Configuration.h" #include "Configuration.h"
...@@ -12,12 +13,11 @@ ...@@ -12,12 +13,11 @@
#include "ModbusTypes.h" #include "ModbusTypes.h"
#include "ModbusServer.h" #include "ModbusServer.h"
#include "ModbusTCPSession.h" #include "ModbusTCPSession.h"
#include "UTCPStream.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*! ModbusTCP server */ /*! ModbusTCP server */
class ModbusTCPServer: class ModbusTCPServer:
public ModbusServer, public ModbusServer
public ost::TCPSocket
{ {
public: public:
ModbusTCPServer( ost::InetAddress& ia, int port = 502 ); ModbusTCPServer( ost::InetAddress& ia, int port = 502 );
...@@ -26,11 +26,8 @@ class ModbusTCPServer: ...@@ -26,11 +26,8 @@ class ModbusTCPServer:
/*! Однопоточная обработка (каждый запрос последовательно), с разрывом соединения в конце */ /*! Однопоточная обработка (каждый запрос последовательно), с разрывом соединения в конце */
virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override; virtual ModbusRTU::mbErrCode receive( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msecTimeout ) override;
/*! Многопоточная обработка (создаётся по потоку для каждого "клиента") /*! Поддержка большого количества соединений (постоянных) */
\return TRUE - если запрос пришёл virtual void mainLoop( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr );
\return FALSE - если timeout
*/
virtual bool waitQuery( const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t msec = UniSetTimer::WaitUpTime );
void setMaxSessions( unsigned int num ); void setMaxSessions( unsigned int num );
inline unsigned int getMaxSessions() inline unsigned int getMaxSessions()
...@@ -89,19 +86,19 @@ class ModbusTCPServer: ...@@ -89,19 +86,19 @@ class ModbusTCPServer:
protected: protected:
virtual void ioAccept(ev::io &watcher, int revents);
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request ) override; virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request ) override;
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request ) override; virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request ) override;
// realisation (see ModbusServer.h) // realisation (see ModbusServer.h)
virtual int getNextData( unsigned char* buf, int len ) override; virtual size_t getNextData( unsigned char* buf, int len ) override;
virtual void setChannelTimeout( timeout_t msec ) override; virtual void setChannelTimeout( timeout_t msec ) override;
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override; virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override;
virtual ModbusRTU::mbErrCode tcp_processing( ost::TCPStream& tcp, ModbusTCP::MBAPHeader& mhead ); virtual ModbusRTU::mbErrCode tcp_processing( UTCPStream& tcp, ModbusTCP::MBAPHeader& mhead );
void sessionFinished( ModbusTCPSession* s ); void sessionFinished( ModbusTCPSession* s );
ost::tpport_t port = { 0 }; ost::tpport_t port = { 0 };
ost::TCPStream tcp;
ost::InetAddress iaddr; ost::InetAddress iaddr;
std::queue<unsigned char> qrecv; std::queue<unsigned char> qrecv;
ModbusTCP::MBAPHeader curQueryHeader; ModbusTCP::MBAPHeader curQueryHeader;
...@@ -117,6 +114,12 @@ class ModbusTCPServer: ...@@ -117,6 +114,12 @@ class ModbusTCPServer:
timeout_t sessTimeout = { 10000 }; // msec timeout_t sessTimeout = { 10000 }; // msec
ev::default_loop* evloop = { 0 };
ev::io io;
int sock = { -1 };
const std::unordered_set<ModbusRTU::ModbusAddr>* vmbaddr;
private: private:
std::atomic_bool cancelled; std::atomic_bool cancelled;
......
...@@ -5,19 +5,33 @@ ...@@ -5,19 +5,33 @@
#include <string> #include <string>
#include <queue> #include <queue>
#include <unordered_map> #include <unordered_map>
#include <cc++/socket.h> #include <ev++.h>
#include "ModbusServerSlot.h" #include "ModbusServerSlot.h"
#include "ModbusServer.h" #include "ModbusServer.h"
#include "PassiveTimer.h" #include "PassiveTimer.h"
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/*!
* \brief The ModbusTCPSession class
* Класс рассчитан на совместную работу с ModbusTCPServer, т.к. построен на основе libev,
* и главный цикл (default_loop) находиться там.
*
* Текущая реализация не доведена до совершенства использования "событий".
* И рассчитывает, что данные от клиента приходят все сразу, а так как сокеты не блокирующие,
* то попыток чтения делается несколько с небольшой паузой, что нехорошо, т.к. отнимает время у
* других "клиентов", ведь сервер по сути однопоточный (!)
* Альтернативной реализацией могло быть быть.. чтение по событиям и складывание в отдельную очередь,
* а обработку делать по мере достаточного накопления данных во входной очереди, но это требует асинхронный
* парсинг данных протокола modbus (т.е. мы анализируем очередной байт и решаем сколько нам нужно ещё
* "подождать" данных.. чтобы пойти на следующий шаг), это в результате будет слишком сложная реализация.
* В конце-концов пока нет рассчёта на >1000 подключений (хотя libev позволяет держать >10k).
*/
class ModbusTCPSession: class ModbusTCPSession:
public ModbusServerSlot, public ModbusServerSlot,
public ModbusServer, public ModbusServer
public ost::TCPSession
{ {
public: public:
ModbusTCPSession( ost::TCPSocket& server, const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t timeout ); ModbusTCPSession( int sock, const std::unordered_set<ModbusRTU::ModbusAddr>& vmbaddr, timeout_t timeout );
virtual ~ModbusTCPSession(); virtual ~ModbusTCPSession();
void cleanInputStream(); void cleanInputStream();
...@@ -41,20 +55,56 @@ class ModbusTCPSession: ...@@ -41,20 +55,56 @@ class ModbusTCPSession:
return caddr; return caddr;
} }
void setKeepAliveParams( timeout_t timeout_sec = 3, int conn_keepcnt = 2, int keepintvl = 2 ); void run();
protected: protected:
virtual void run();
// -------------------------------------------
// author:
// Buffer class - allow for output buffering such that it can be written out into async pieces
struct Buffer
{
unsigned char *data;
ssize_t len;
ssize_t pos;
Buffer( const unsigned char *bytes, ssize_t nbytes )
{
pos = 0;
len = nbytes;
data = new unsigned char[nbytes];
memcpy(data, bytes, nbytes);
}
virtual ~Buffer()
{
delete [] data;
}
unsigned char *dpos()
{
return data + pos;
}
ssize_t nbytes()
{
return len - pos;
}
};
void callback( ev::io &watcher, int revents );
void idleCallback( ev::idle &watcher, int revents );
virtual void readEvent( ev::io &watcher );
virtual void writeEvent( ev::io &watcher );
virtual void final(); virtual void final();
virtual int getNextData( unsigned char* buf, int len ); virtual size_t getNextData( unsigned char* buf, int len ) override;
virtual void setChannelTimeout( timeout_t msec ); virtual void setChannelTimeout( timeout_t msec );
virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ); virtual ModbusRTU::mbErrCode sendData( unsigned char* buf, int len ) override;
virtual ModbusRTU::mbErrCode tcp_processing( ost::TCPStream& tcp, ModbusTCP::MBAPHeader& mhead ); virtual ModbusRTU::mbErrCode tcp_processing(ModbusTCP::MBAPHeader& mhead );
virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request ); virtual ModbusRTU::mbErrCode pre_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request ); virtual ModbusRTU::mbErrCode post_send_request( ModbusRTU::ModbusMessage& request );
virtual ModbusRTU::mbErrCode readCoilStatus( ModbusRTU::ReadCoilMessage& query, virtual ModbusRTU::mbErrCode readCoilStatus( ModbusRTU::ReadCoilMessage& query,
ModbusRTU::ReadCoilRetMessage& reply ); ModbusRTU::ReadCoilRetMessage& reply );
...@@ -105,6 +155,11 @@ class ModbusTCPSession: ...@@ -105,6 +155,11 @@ class ModbusTCPSession:
timeout_t timeout = { 0 }; timeout_t timeout = { 0 };
ModbusRTU::ModbusMessage buf; ModbusRTU::ModbusMessage buf;
ev::io io;
int sfd;
std::queue<Buffer*> qsend;
ev::idle idle;
bool ignoreAddr = { false }; bool ignoreAddr = { false };
std::string peername = { "" }; std::string peername = { "" };
......
...@@ -7,8 +7,8 @@ libModbus_la_SOURCES = ModbusTypes.cc ModbusHelpers.cc ModbusTCPSession.cc \ ...@@ -7,8 +7,8 @@ libModbus_la_SOURCES = ModbusTypes.cc ModbusHelpers.cc ModbusTCPSession.cc \
ModbusRTUSlave.cc ModbusRTUSlaveSlot.cc ModbusRTUMaster.cc \ ModbusRTUSlave.cc ModbusRTUSlaveSlot.cc ModbusRTUMaster.cc \
ModbusTCPCore.cc ModbusTCPServer.cc ModbusTCPServerSlot.cc ModbusTCPMaster.cc ModbusTCPCore.cc ModbusTCPServer.cc ModbusTCPServerSlot.cc ModbusTCPMaster.cc
libModbus_la_CXXFLAGS = -I$(top_builddir)/include/Communications/modbus $(SIGC_CFLAGS) $(COMCPP_CFLAGS) libModbus_la_CXXFLAGS = -I$(top_builddir)/include/Communications/modbus $(SIGC_CFLAGS) $(COMCPP_CFLAGS) $(EV_CFLAGS)
#libModbus_la_LIBADD = $(top_builddir)/src/Communications/TCP/libTCP.la $(SIGC_LIBS) $(COMCPP_LIBS) #libModbus_la_LIBADD = $(top_builddir)/src/Communications/TCP/libTCP.la $(SIGC_LIBS) $(COMCPP_LIBS)
libModbus_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS) libModbus_la_LIBADD = $(SIGC_LIBS) $(COMCPP_LIBS) $(EV_LIBS)
include $(top_builddir)/include.mk include $(top_builddir)/include.mk
...@@ -147,7 +147,7 @@ void ModbusRTUSlave::setSpeed( const std::string& s ) ...@@ -147,7 +147,7 @@ void ModbusRTUSlave::setSpeed( const std::string& s )
port->setSpeed(s); port->setSpeed(s);
} }
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
int ModbusRTUSlave::getNextData( unsigned char* buf, int len ) size_t ModbusRTUSlave::getNextData( unsigned char* buf, int len )
{ {
// if( !port ) return 0; // if( !port ) return 0;
return port->receiveBlock(buf, len); return port->receiveBlock(buf, len);
......
...@@ -48,6 +48,7 @@ ModbusServer::ModbusServer(): ...@@ -48,6 +48,7 @@ ModbusServer::ModbusServer():
dlog->addLevel(Debug::CRIT); dlog->addLevel(Debug::CRIT);
dlog->addLevel(Debug::INFO); dlog->addLevel(Debug::INFO);
*/ */
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
......
...@@ -19,31 +19,37 @@ ...@@ -19,31 +19,37 @@
using namespace std; using namespace std;
using namespace ModbusRTU; using namespace ModbusRTU;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
size_t ModbusTCPCore::readNextData( ost::TCPStream* tcp, size_t ModbusTCPCore::readNextData(UTCPStream* tcp,
std::queue<unsigned char>& qrecv, int max ) std::queue<unsigned char>& qrecv, size_t max, timeout_t t )
{ {
if( !tcp || !tcp->isConnected() ) if( !tcp || !tcp->isConnected() )
return 0; return 0;
int i = 0; size_t i = 0;
for( ; i < max; i++ ) try
{ {
char c; for( ; i < max; i++ )
tcp->read(&c, sizeof(c)); {
unsigned char c;
ssize_t l = tcp->readData(&c, sizeof(c),0,t);
if( tcp->gcount() <= 0 ) if( l <= 0 )
break; break;
qrecv.push( (unsigned char)(c) ); qrecv.push(c);
}
}
catch( ost::SockException& e )
{
} }
return i; return i;
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
size_t ModbusTCPCore::getNextData(ost::TCPStream* tcp, size_t ModbusTCPCore::getNextData(UTCPStream* tcp,
std::queue<unsigned char>& qrecv, std::queue<unsigned char>& qrecv,
unsigned char* buf, size_t len ) unsigned char* buf, size_t len, timeout_t t )
{ {
if( !tcp || !tcp->isConnected() ) if( !tcp || !tcp->isConnected() )
return 0; return 0;
...@@ -53,9 +59,74 @@ size_t ModbusTCPCore::getNextData(ost::TCPStream* tcp, ...@@ -53,9 +59,74 @@ size_t ModbusTCPCore::getNextData(ost::TCPStream* tcp,
if( len <= 0 ) if( len <= 0 )
len = 7; len = 7;
int ret = ModbusTCPCore::readNextData(tcp, qrecv, len); size_t ret = ModbusTCPCore::readNextData(tcp, qrecv, len, t);
if( ret == 0 )
return 0;
}
size_t i = 0;
for( ; i < len && !qrecv.empty(); i++ )
{
buf[i] = qrecv.front();
qrecv.pop();
}
return i;
}
// -------------------------------------------------------------------------
size_t ModbusTCPCore::readDataFD( int fd, std::queue<unsigned char>& qrecv, size_t max , size_t attempts )
{
size_t i = 0;
#if 1
for( size_t a=0; a<attempts; a++ )
{
for( ; i < max; i++ )
{
// читаем один символ за раз..
unsigned char c;
ssize_t l = ::read(fd, &c, sizeof(c));
if( l <= 0 )
break;
qrecv.push(c);
}
}
#else
char* buf = new char[max];
ssize_t l = 0;
for( size_t a=0; a<attempts; a++ )
{
l = ::read(fd, buf, sizeof(buf));
if( l > 0 )
break;
}
if( l > 0 )
{
for( int i=0; i<l; i++ )
qrecv.push(buf[i]);
}
delete [] buf;
#endif
return ( qrecv.size() >= max ? max : qrecv.size() );
}
// ------------------------------------------------------------------------
size_t ModbusTCPCore::getDataFD( int fd, std::queue<unsigned char>& qrecv,
unsigned char* buf, size_t len, size_t attempts )
{
if( qrecv.empty() || qrecv.size() < len )
{
if( len == 0 )
len = 7;
size_t ret = ModbusTCPCore::readDataFD(fd, qrecv, len, attempts);
if( ret <= 0 ) if( ret == 0 )
return 0; return 0;
} }
...@@ -70,17 +141,16 @@ size_t ModbusTCPCore::getNextData(ost::TCPStream* tcp, ...@@ -70,17 +141,16 @@ size_t ModbusTCPCore::getNextData(ost::TCPStream* tcp,
return i; return i;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
mbErrCode ModbusTCPCore::sendData(ost::TCPStream* tcp, unsigned char* buf, size_t len ) mbErrCode ModbusTCPCore::sendData( UTCPStream* tcp, unsigned char* buf, size_t len, timeout_t t )
{ {
if( !tcp || !tcp->isConnected() ) if( !tcp || !tcp->isConnected() )
return erTimeOut; return erTimeOut;
try try
{ {
for( size_t i = 0; i < len; i++ ) ssize_t l = tcp->writeData(buf,len,t);
(*tcp) << buf[i]; if( l == len )
return erNoError;
return erNoError;
} }
catch( ost::SockException& e ) catch( ost::SockException& e )
{ {
...@@ -94,3 +164,11 @@ mbErrCode ModbusTCPCore::sendData(ost::TCPStream* tcp, unsigned char* buf, size_ ...@@ -94,3 +164,11 @@ mbErrCode ModbusTCPCore::sendData(ost::TCPStream* tcp, unsigned char* buf, size_
return erHardwareError; return erHardwareError;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
mbErrCode ModbusTCPCore::sendDataFD( int fd, unsigned char* buf, size_t len )
{
ssize_t l = ::write(fd,buf,len);
if( l == len )
return erNoError;
return erHardwareError;
}
// -------------------------------------------------------------------------
...@@ -124,22 +124,25 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -124,22 +124,25 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( dlog->is_info() ) if( dlog->is_info() )
{ {
dlog->info() << iaddr << "(ModbusTCPMaster::query): send tcp header(" << sizeof(mh) << "): "; dlog->info() << iaddr << "(ModbusTCPMaster::query): send tcp header(" << sizeof(mh) << "): ";
mbPrintMessage( dlog->info(), (ModbusByte*)(&mh), sizeof(mh)); mbPrintMessage( dlog->info(false), (ModbusByte*)(&mh), sizeof(mh));
dlog->info() << endl; dlog->info(false) << endl;
} }
for( unsigned int i = 0; i < 2; i++ ) for( unsigned int i = 0; i < 2; i++ )
{ {
(*tcp) << mh; if( tcp->isPending(ost::Socket::pendingOutput, timeout) )
{
tcp->writeData(&mh,sizeof(mh));
// send PDU // send PDU
mbErrCode res = send(msg); mbErrCode res = send(msg);
if( res != erNoError ) if( res != erNoError )
return res; return res;
if( tcp->isPending(ost::Socket::pendingOutput, timeout) ) if( tcp->isPending(ost::Socket::pendingOutput, timeout) )
break; break;
}
if( dlog->is_info() ) if( dlog->is_info() )
dlog->info() << "(ModbusTCPMaster::query): no write pending.. reconnnect.." << endl; dlog->info() << "(ModbusTCPMaster::query): no write pending.. reconnnect.." << endl;
...@@ -187,7 +190,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg, ...@@ -187,7 +190,7 @@ mbErrCode ModbusTCPMaster::query( ModbusAddr addr, ModbusMessage& msg,
if( dlog->is_info() ) if( dlog->is_info() )
{ {
dlog->info() << "(ModbusTCPMaster::query): recv tcp header(" << ret << "): "; dlog->info() << "(ModbusTCPMaster::query): recv tcp header(" << ret << "): ";
mbPrintMessage( dlog->info(), (ModbusByte*)(&rmh), sizeof(rmh)); mbPrintMessage( dlog->info(false), (ModbusByte*)(&rmh), sizeof(rmh));
dlog->info(false) << endl; dlog->info(false) << endl;
} }
...@@ -324,6 +327,7 @@ bool ModbusTCPMaster::checkConnection( const std::string& ip, int port, int time ...@@ -324,6 +327,7 @@ bool ModbusTCPMaster::checkConnection( const std::string& ip, int port, int time
UTCPStream t; UTCPStream t;
t.create(ip, port, true, timeout_msec); t.create(ip, port, true, timeout_msec);
t.setKeepAliveParams( (timeout_msec > 1000 ? timeout_msec / 1000 : 1), 1, 1); t.setKeepAliveParams( (timeout_msec > 1000 ? timeout_msec / 1000 : 1), 1, 1);
t.setNoDelay(true);
t.disconnect(); t.disconnect();
return true; return true;
} }
...@@ -351,6 +355,7 @@ void ModbusTCPMaster::reconnect() ...@@ -351,6 +355,7 @@ void ModbusTCPMaster::reconnect()
tcp->create(iaddr, port, true, 500); tcp->create(iaddr, port, true, 500);
tcp->setTimeout(replyTimeOut_ms); tcp->setTimeout(replyTimeOut_ms);
tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1)); tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true);
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
...@@ -405,6 +410,7 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int _port ) ...@@ -405,6 +410,7 @@ void ModbusTCPMaster::connect( ost::InetAddress addr, int _port )
tcp->create(iaddr, port, true, 500); tcp->create(iaddr, port, true, 500);
tcp->setTimeout(replyTimeOut_ms); tcp->setTimeout(replyTimeOut_ms);
tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1)); tcp->setKeepAliveParams((replyTimeOut_ms > 1000 ? replyTimeOut_ms / 1000 : 1));
tcp->setNoDelay(true);
} }
catch( const std::exception& e ) catch( const std::exception& e )
{ {
......
...@@ -25,7 +25,6 @@ using namespace ModbusRTU; ...@@ -25,7 +25,6 @@ using namespace ModbusRTU;
using namespace UniSetTypes; using namespace UniSetTypes;
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ): ModbusTCPServer::ModbusTCPServer( ost::InetAddress& ia, int _port ):
TCPSocket(ia, _port),
port(_port), port(_port),
iaddr(ia), iaddr(ia),
ignoreAddr(false), ignoreAddr(false),
......
...@@ -73,6 +73,26 @@ void UTCPStream::forceDisconnect() ...@@ -73,6 +73,26 @@ void UTCPStream::forceDisconnect()
Socket::flags.linger = f; Socket::flags.linger = f;
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
bool UTCPStream::setNoDelay(bool enable)
{
return ( TCPStream::setNoDelay(enable) == 0 );
}
// -------------------------------------------------------------------------
ssize_t UTCPStream::writeData(const void* buf, size_t len, timeout_t t)
{
return TCPStream::writeData(buf,len,t);
}
// -------------------------------------------------------------------------
ssize_t UTCPStream::readData(void* buf, size_t len, char separator, timeout_t t)
{
return TCPStream::readData(buf,len,separator,t);
}
// -------------------------------------------------------------------------
int UTCPStream::getSocket()
{
return TCPStream::so;
}
// -------------------------------------------------------------------------
void UTCPStream::create( const std::string& hname, int port, bool throwflag, timeout_t t ) void UTCPStream::create( const std::string& hname, int port, bool throwflag, timeout_t t )
{ {
family = ost::Socket::IPV4; family = ost::Socket::IPV4;
......
...@@ -140,6 +140,7 @@ extensions/ModbusSlave/tests/t.cc ...@@ -140,6 +140,7 @@ extensions/ModbusSlave/tests/t.cc
extensions/ModbusSlave/tests/test_mbslave.cc extensions/ModbusSlave/tests/test_mbslave.cc
extensions/ModbusSlave/tests/tests_individual_process.cc extensions/ModbusSlave/tests/tests_individual_process.cc
extensions/ModbusSlave/tests/tests_with_sm.cc extensions/ModbusSlave/tests/tests_with_sm.cc
extensions/ModbusSlave/epoll-test/mbslave-epoll.cc
extensions/ModbusSlave/libUniSet2MBSlave.pc.in extensions/ModbusSlave/libUniSet2MBSlave.pc.in
extensions/ModbusSlave/Makefile.am extensions/ModbusSlave/Makefile.am
extensions/ModbusSlave/MBSlave.cc extensions/ModbusSlave/MBSlave.cc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment