68 lines
2.4 KiB
Python
68 lines
2.4 KiB
Python
|
import importlib
|
|||
|
import logging
|
|||
|
import sys
|
|||
|
|
|||
|
from command_parser import CommandParser
|
|||
|
|
|||
|
# В идеале, тут должен быть какой-то класс собирающий данные, но не вижу смысла увеличивать количество кода сейчас
|
|||
|
response_data = {
|
|||
|
'A': '10V',
|
|||
|
'B': '5V',
|
|||
|
'C': '15A'
|
|||
|
}
|
|||
|
|
|||
|
response_template = '{sensor}_{value}'
|
|||
|
|
|||
|
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', datefmt='%d-%m-%Y %H:%M:%S', level=logging.DEBUG)
|
|||
|
|
|||
|
|
|||
|
class DataProcessor:
|
|||
|
def __init__(self, config):
|
|||
|
self.config = config
|
|||
|
try:
|
|||
|
package = 'transport.{type}'
|
|||
|
full_package_name = package.format(type = self.config["transport"]["type"])
|
|||
|
cls = getattr(importlib.import_module(full_package_name), 'Transport')
|
|||
|
|
|||
|
self.transport = cls(self.config["transport"])
|
|||
|
|
|||
|
except Exception as err:
|
|||
|
print("Can't create transport: ", err)
|
|||
|
# raise err
|
|||
|
sys.exit(1)
|
|||
|
|
|||
|
def run(self):
|
|||
|
logging.debug("Waiting for commands...")
|
|||
|
|
|||
|
while True:
|
|||
|
command_string = self.transport.readline()
|
|||
|
logging.debug('read: %s', command_string)
|
|||
|
|
|||
|
try:
|
|||
|
command = CommandParser.parse(command_string)
|
|||
|
except ValueError as err:
|
|||
|
logging.warning("Command parser error: %s", err)
|
|||
|
self.transport.writeline('Failed to process command!')
|
|||
|
else:
|
|||
|
parsed_command_name = command.get('command')
|
|||
|
|
|||
|
response_str = None
|
|||
|
match parsed_command_name:
|
|||
|
case 'GET':
|
|||
|
sensor_name = command.get('value')
|
|||
|
response_value = response_data.get(sensor_name)
|
|||
|
|
|||
|
if bool(response_value):
|
|||
|
response_str = response_template.format(sensor=command['value'], value=response_value)
|
|||
|
logging.info('response_str: %s', response_str)
|
|||
|
|
|||
|
else:
|
|||
|
response_str = '00'
|
|||
|
logging.warning('No value for sensor: %s', sensor_name)
|
|||
|
|
|||
|
case _:
|
|||
|
print('Unknown command: %s', parsed_command_name)
|
|||
|
response_str = 'Unknown command'
|
|||
|
|
|||
|
self.transport.writeline(response_str)
|