serial-device/data_processor/__init__.py

76 lines
2.8 KiB
Python
Raw Permalink Normal View History

2023-10-01 22:41:21 +03:00
import importlib
import logging
from time import sleep
2023-10-01 22:41:21 +03:00
from command_parser import CommandParser
# В идеале, тут должен быть какой-то класс собирающий данные, но не вижу смысла увеличивать количество кода сейчас
response_data = {
'A': '10V',
'B': '5V',
'C': '15A'
}
response_template = '{sensor}_{value}'
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', datefmt='%d-%m-%Y %H:%M:%S', level=logging.DEBUG)
class DataProcessor:
def __init__(self, config):
self.config = config
try:
package = 'transport.{type}'
full_package_name = package.format(type = self.config["transport"]["type"])
cls = getattr(importlib.import_module(full_package_name), 'Transport')
self.transport = cls(self.config["transport"])
except Exception as err:
logging.error("Can't create transport: ", err)
raise err
2023-10-01 22:41:21 +03:00
def run(self):
logging.debug("Waiting for commands...")
while True:
command_string = self.transport.read_line()
2023-10-01 22:41:21 +03:00
logging.debug('read: %s', command_string)
if not command_string:
sleep(1) # Все читатели блокирующиеся и, в теории, этот код не будет вызываться практически никогда.
# Но, на практике, стоит добавить слип чтобы не жрал зря CPU
continue
response_str = None
2023-10-01 22:41:21 +03:00
try:
command = CommandParser.parse(command_string)
except ValueError as err:
logging.warning("Command parser error: %s", err)
response_str = 'Failed to process command!'
2023-10-01 22:41:21 +03:00
else:
parsed_command_name = command.get('command')
match parsed_command_name:
case 'GET':
sensor_name = command.get('value')
response_value = response_data.get(sensor_name)
if bool(response_value):
response_str = response_template.format(sensor=command['value'], value=response_value)
logging.info('response_str: %s', response_str)
else:
response_str = '00'
logging.warning('No value for sensor: %s', sensor_name)
case _:
print('Unknown command: %s', parsed_command_name)
response_str = 'Unknown command'
try:
self.transport.write_line(response_str)
except Exception as err:
logging.error('Cannot send message: %s', err)