Commit f58a8a6e authored by Oleg Nikulin's avatar Oleg Nikulin

First commit

parents
import subprocess
import time
import serial
import os
import argparse
from argparse import RawTextHelpFormatter
import datetime
import threading
POLL_SLEEP_TIME = 0.05
RECONNECT_SLEEP_TIME = 1
TIMEOUT_TIME = 3
SERIAL_SPEEDS = [300, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 74880, 115200, 230400, 250000, 500000, 1000000, 2000000]
PORT:str = None
SPEED:int = None
temp = '50'
connected = False
input_thread_running = False
def temp_input():
global temp
while True:
temp_input = input('Enter the temperature value: ')
try:
int(temp_input)
temp = temp_input
except ValueError:
print('Temperature value must be a number')
parser = argparse.ArgumentParser(
description = 'This script sends PC disk temperature via a serial port',
formatter_class=RawTextHelpFormatter
)
parser.add_argument('-l', '--list',
required = False,
action='store_true',
help = 'show a list of available serial devices',
)
parser.add_argument('-m', '--manual',
required = False,
action='store_true',
help = 'send temperature from user input instead of hddtemp',
)
parser.add_argument('device',
type = str,
nargs = '?',
help = 'serial device to commuicate with. Example: ttyUSB0',
metavar = 'SERIAL'
)
parser.add_argument('speed',
type = int,
nargs = '?',
help = 'serial port baud rate. Supported rates: ' + str(SERIAL_SPEEDS)[1:-1],
metavar = 'SPEED'
)
args = parser.parse_args()
#получаем список serial устройств
dev_contents = os.listdir('/dev/')
serial_devices:str = []
for file in dev_contents:
if file.startswith('ttyUSB'):
serial_devices.append(file)
if args.list != False: #вывод списка serial устройств
if len(serial_devices) == 0:
print('No serial devices found')
else:
print('Available serial devices:')
for device in serial_devices:
print(device)
exit()
#проверка наличия аргументов
if args.device == None:
print('Serial device was not specified')
exit()
if args.speed == None:
print('Serial port baud rate was not specified')
exit()
if args.manual:
input_thread = threading.Thread(target = temp_input)
#ищем указанное serial устройство
if args.device in serial_devices:
PORT = args.device
else:
print('Serial device "{dev}" was not found. Use --list option to see a list of available serial devices'.format(dev = args.device))
exit()
#ищем указанную скорость
if args.speed in SERIAL_SPEEDS:
SPEED = args.speed
else:
print('Specified baud rate is unsupported. Use -h option to see a list of supported rates')
exit()
try: #пытаемся открыть serial порт
serial_port = serial.Serial(
port = '/dev/' + PORT,
baudrate = SPEED,
parity = serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
)
except OSError:
print('An error has occured while opening the serial port')
exit()
print('Connecting to the serial device...')
while True:
try:
if serial_port.in_waiting > 0:
if not connected:
print('Connected. Starting transmission')
connected = True
if args.manual and not input_thread_running:
input_thread.start()
input_thread_running = True
if serial_port.read().decode() == '1':
last_serial_input_time = datetime.datetime.now().timestamp()
if args.manual:
serial_port.write('a'.encode())
serial_port.write(temp.encode()) #отправка значения
else:
output = str(subprocess.Popen(['netcat', 'localhost', '7634'], stdout = subprocess.PIPE, stderr = subprocess.STDOUT).communicate()[0]) #получаем output от hddtemp
#b'|/dev/sda2|ST340014A|41|C|'
if output == "b''":
print('Hddtemp output is empty, make sure it is running. If not, launch it with $sudo hddtemp -d')
exit()
#вычленяем значение температуры:
temp_pos_end = output.find('|C|')
temp_pos_start = output.rfind('|', 0, temp_pos_end) + 1
temp = output[temp_pos_start:temp_pos_end]
try:
int(temp)
#print(temp + ' C')
serial_port.write('a'.encode())
serial_port.write(temp.encode()) #отправка значения
except ValueError:
print('Error: Failed to get a temperature value from hddtemp output')
else:
print('Unknown query from serial device')
except OSError:
if connected:
print('Serial divice was disconnected. Trying to reconnect...')
connected = False
try: #пытаемся снова открыть serial порт
serial_port = serial.Serial(
port = '/dev/' + PORT,
baudrate = SPEED
)
connected = True
except OSError: pass
time.sleep(RECONNECT_SLEEP_TIME)
if connected and datetime.datetime.now().timestamp() - last_serial_input_time > TIMEOUT_TIME:
print('Serial device is not responding')
connected = False
time.sleep(POLL_SLEEP_TIME)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment