Commit 970a5e03 authored by Ivan Mazhukin's avatar Ivan Mazhukin

just import…

just import https://gitlab.eterfund.ru/Gendalf4ever/co2/-/archive/co2-101/co2-co2-101.tar to co2-mt8057s subdir with rpmgs script
parents
import logging as log
import time
import hid
import sys
import socket
#from influxWriter import InfluxWriter
from influxConnection import InfluxConnection
CO2_USB_MANUFACTURER = 'Holtek'
CO2_USB_PRODUCT = 'USB-zyTemp'
# Ignore first 5 measurements during self-calibration after power-up
IGNORE_FIRST_N_MEASUREMENTS = 5
logger = log.getLogger('zytemp')
CO2MON_MAGIC_WORD = b'Htemp99e'
CO2MON_MAGIC_TABLE = (0, 0, 0, 0, 0, 0, 0, 0)
def list_to_longint(byte_list):
return sum([val << (i * 8) for i, val in enumerate(byte_list[::-1])])
def longint_to_list(integer_value):
return [(integer_value >> i) & 0xFF for i in (56, 48, 40, 32, 24, 16, 8, 0)]
class ZyTemp():
MEASUREMENTS = {
0x50: {
'name': 'CO2',
'unit': 'ppm',
'conversion': lambda value: value,
},
0x42: {
'name': 'Temperature',
'unit': '°C',
'conversion': lambda value: (value * 0.0625) - 273.15,
},
}
def __init__(self, hid_device, decrypt=True):
self.hid_device = hid_device
self.measurements_to_ignore = IGNORE_FIRST_N_MEASUREMENTS
self.sensor_values = {v['name']: None for v in ZyTemp.MEASUREMENTS.values()}
self.magic_word_bytes = [((word << 4) & 0xFF) | (word >> 4) for word in bytearray(CO2MON_MAGIC_WORD)]
self.magic_table = CO2MON_MAGIC_TABLE
self.magic_table_integer = list_to_longint(CO2MON_MAGIC_TABLE)
self.influx_connection = InfluxConnection() # Создаем объект для работы с InfluxDB
if decrypt:
self.hid_device.send_feature_report(self.magic_table)
else:
self.hid_device.send_feature_report(b'\xc4\xc6\xc0\x92\x40\x23\xdc\x96')
def __del__(self):
self.hid_device.close()
def update_sensor_value(self, measurement_key, measurement_value, sensor_values):
sensor_values[measurement_key] = measurement_value
if all(value is not None for value in sensor_values.values()):
sensor_id = socket.gethostname()
print(f"{sensor_id}: {sensor_values}")
# Проверка записи в InfluxDB
try:
self.influx_connection.write_data(sensor_id, sensor_values) # Записываем данные в InfluxDB
except Exception as e:
logger.error(f"Failed to write data into InfluxDB: {e}")
sys.exit(0)
def run_once(self, decrypt=False):
while True:
try:
report_data = self.hid_device.read(8, timeout_ms=5000)
except OSError as err:
logger.error(f'OS error: {err}')
return
if not report_data:
logger.error('Read error or timeout')
self.hid_device.close()
return
if decrypt:
message = list_to_longint([report_data[i] for i in [2, 4, 0, 7, 1, 6, 5, 3]])
result = message ^ self.magic_table_integer
result = (result >> 3) | ((result << 61) & 0xFFFFFFFFFFFFFFFF)
result = longint_to_list(result)
report_data = [(data - magic_word) & 0xFF for data, magic_word in zip(result, self.magic_word_bytes)]
if len(report_data) < 5 or report_data[4] != 0x0d:
logger.debug(f'Unexpected data from device: {report_data}\n')
continue
if report_data[3] != sum(report_data[0:3]) & 0xff:
continue
measurement_type = report_data[0]
measurement_value = report_data[1] << 8 | report_data[2]
try:
measurement = ZyTemp.MEASUREMENTS[measurement_type]
except KeyError:
logger.debug(f'Unknown key {measurement_type:02x}\n')
continue
measurement_name, measurement_unit, measurement_converted_value = (
measurement['name'],
measurement['unit'],
measurement['conversion'](measurement_value)
)
ignore_measurement = self.measurements_to_ignore > 0
logger.debug(f'{measurement_name}: {measurement_converted_value:g} {measurement_unit}' +
(' (ignored)' if ignore_measurement else '') + '\n')
if not ignore_measurement:
self.update_sensor_value(measurement_name, measurement_converted_value, self.sensor_values)
if self.measurements_to_ignore:
self.measurements_to_ignore -= 1
def get_hid_device():
hid_sensors = [
sensor for sensor in hid.enumerate()
if sensor['manufacturer_string'] == CO2_USB_MANUFACTURER
and sensor['product_string'] == CO2_USB_PRODUCT
]
sensor_paths = []
for sensor in hid_sensors:
interface_number, path, vendor_id, product_id = (sensor[key] for key in
('interface_number', 'path', 'vendor_id', 'product_id'))
path_str = path.decode('utf-8')
logger.info(f'Found CO2 sensor at interface {interface_number}, {path_str}, VID={vendor_id:04x}, PID={product_id:04x}')
sensor_paths.append(path)
if not sensor_paths:
logger.error('No device found')
return None
logger.info(f'Using device at {sensor_paths[0].decode("utf-8")}')
hid_device = hid.device()
hid_device.open_path(sensor_paths[0])
return hid_device
if __name__ == "__main__":
log.basicConfig(level=log.DEBUG)
hid_device = get_hid_device()
if hid_device is None:
sys.exit(1)
zytemp_sensor = ZyTemp(hid_device)
zytemp_sensor.run_once()
temperature = zytemp_sensor.sensor_values.get('Temperature', 'N/A')
co2_level = zytemp_sensor.sensor_values.get('CO2', 'N/A')
config.ini
*.pyc
__pycache__/
\ No newline at end of file
from influxdb import InfluxDBClient
import time
class InfluxConnection:
def __init__(self):
# Конфиги
self.INFLUXDB_URL = 'http://influxdb.k8s.eterfund.ru'
self.INFLUXDB_DB = ''
self.INFLUXDB_USER = ''
self.INFLUXDB_PASSWORD = ''
# Разделение URL на хост и порт
url_parts = self.INFLUXDB_URL.split('://')[1].split(':')
host = url_parts[0]
port = 80 # Использование порта 80 вместо стандартного 8086
# Подключение к InfluxDB
self.client = InfluxDBClient(
host=host,
port=port,
username=self.INFLUXDB_USER,
password=self.INFLUXDB_PASSWORD,
database=self.INFLUXDB_DB
)
def write_data(self, sensor_id, values):
json_body = [
{
"measurement": "sensor_data",
"tags": {
"sensor_id": sensor_id
},
"time": int(time.time())*1000000000, # Текущая метка времени в секундах
"fields": values
}
]
try:
self.client.write_points(json_body, database=self.INFLUXDB_DB)
print("Successfully wrote data to InfluxDB")
except Exception as e:
print(f"Failed to write data to InfluxDB: {e}")
def __del__(self):
self.client.close()
[Unit]
Description=co2 Service
After=network.target
[Service]
WorkingDirectory=/usr/lib/python3/site-packages/co2/
ExecStart=/usr/bin/python3 /usr/lib/python3/site-packages/co2/co2.py
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
\ No newline at end of file
[Unit]
Description=co2 Service every minute
[Timer]
OnCalendar=*:0/1
Unit=co2.service
[Install]
WantedBy=timers.target
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment