import importlib import logging from time import sleep from command_parser import CommandParser # В идеале, тут должен быть какой-то класс собирающий данные, но не вижу смысла увеличивать количество кода сейчас response_data = { 'A': '10V', 'B': '5V', 'C': '15A' } response_template = '{sensor}_{value}' logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', datefmt='%d-%m-%Y %H:%M:%S', level=logging.DEBUG) class DataProcessor: def __init__(self, config): self.config = config try: package = 'transport.{type}' full_package_name = package.format(type = self.config["transport"]["type"]) cls = getattr(importlib.import_module(full_package_name), 'Transport') self.transport = cls(self.config["transport"]) except Exception as err: logging.error("Can't create transport: ", err) raise err def run(self): logging.debug("Waiting for commands...") while True: command_string = self.transport.read_line() logging.debug('read: %s', command_string) if not command_string: sleep(1) # Все читатели блокирующиеся и, в теории, этот код не будет вызываться практически никогда. # Но, на практике, стоит добавить слип чтобы не жрал зря CPU continue response_str = None try: command = CommandParser.parse(command_string) except ValueError as err: logging.warning("Command parser error: %s", err) response_str = 'Failed to process command!' else: parsed_command_name = command.get('command') match parsed_command_name: case 'GET': sensor_name = command.get('value') response_value = response_data.get(sensor_name) if bool(response_value): response_str = response_template.format(sensor=command['value'], value=response_value) logging.info('response_str: %s', response_str) else: response_str = '00' logging.warning('No value for sensor: %s', sensor_name) case _: print('Unknown command: %s', parsed_command_name) response_str = 'Unknown command' try: self.transport.write_line(response_str) except Exception as err: logging.error('Cannot send message: %s', err)