serial-device/data_processor/__init__.py
2023-10-01 22:41:21 +03:00

67 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import importlib
import logging
import sys
from command_parser import CommandParser
# В идеале, тут должен быть какой-то класс собирающий данные, но не вижу смысла увеличивать количество кода сейчас
response_data = {
'A': '10V',
'B': '5V',
'C': '15A'
}
response_template = '{sensor}_{value}'
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', datefmt='%d-%m-%Y %H:%M:%S', level=logging.DEBUG)
class DataProcessor:
def __init__(self, config):
self.config = config
try:
package = 'transport.{type}'
full_package_name = package.format(type = self.config["transport"]["type"])
cls = getattr(importlib.import_module(full_package_name), 'Transport')
self.transport = cls(self.config["transport"])
except Exception as err:
print("Can't create transport: ", err)
# raise err
sys.exit(1)
def run(self):
logging.debug("Waiting for commands...")
while True:
command_string = self.transport.readline()
logging.debug('read: %s', command_string)
try:
command = CommandParser.parse(command_string)
except ValueError as err:
logging.warning("Command parser error: %s", err)
self.transport.writeline('Failed to process command!')
else:
parsed_command_name = command.get('command')
response_str = None
match parsed_command_name:
case 'GET':
sensor_name = command.get('value')
response_value = response_data.get(sensor_name)
if bool(response_value):
response_str = response_template.format(sensor=command['value'], value=response_value)
logging.info('response_str: %s', response_str)
else:
response_str = '00'
logging.warning('No value for sensor: %s', sensor_name)
case _:
print('Unknown command: %s', parsed_command_name)
response_str = 'Unknown command'
self.transport.writeline(response_str)