мелкий рефакторинг транспорта
This commit is contained in:
parent
0b3e72109c
commit
1205a3b90d
8 changed files with 146 additions and 32 deletions
|
@ -1,6 +1,6 @@
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
import sys
|
from time import sleep
|
||||||
|
|
||||||
from command_parser import CommandParser
|
from command_parser import CommandParser
|
||||||
|
|
||||||
|
@ -27,26 +27,31 @@ class DataProcessor:
|
||||||
self.transport = cls(self.config["transport"])
|
self.transport = cls(self.config["transport"])
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print("Can't create transport: ", err)
|
logging.error("Can't create transport: ", err)
|
||||||
# raise err
|
raise err
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logging.debug("Waiting for commands...")
|
logging.debug("Waiting for commands...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
command_string = self.transport.readline()
|
command_string = self.transport.read_line()
|
||||||
logging.debug('read: %s', command_string)
|
logging.debug('read: %s', command_string)
|
||||||
|
|
||||||
|
if not command_string:
|
||||||
|
sleep(1) # Все читатели блокирующиеся и, в теории, этот код не будет вызываться практически никогда.
|
||||||
|
# Но, на практике, стоит добавить слип чтобы не жрал зря CPU
|
||||||
|
continue
|
||||||
|
|
||||||
|
response_str = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
command = CommandParser.parse(command_string)
|
command = CommandParser.parse(command_string)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
logging.warning("Command parser error: %s", err)
|
logging.warning("Command parser error: %s", err)
|
||||||
self.transport.writeline('Failed to process command!')
|
response_str = 'Failed to process command!'
|
||||||
else:
|
else:
|
||||||
parsed_command_name = command.get('command')
|
parsed_command_name = command.get('command')
|
||||||
|
|
||||||
response_str = None
|
|
||||||
match parsed_command_name:
|
match parsed_command_name:
|
||||||
case 'GET':
|
case 'GET':
|
||||||
sensor_name = command.get('value')
|
sensor_name = command.get('value')
|
||||||
|
@ -64,4 +69,7 @@ class DataProcessor:
|
||||||
print('Unknown command: %s', parsed_command_name)
|
print('Unknown command: %s', parsed_command_name)
|
||||||
response_str = 'Unknown command'
|
response_str = 'Unknown command'
|
||||||
|
|
||||||
self.transport.writeline(response_str)
|
try:
|
||||||
|
self.transport.write_line(response_str)
|
||||||
|
except Exception as err:
|
||||||
|
logging.error('Cannot send message: %s', err)
|
||||||
|
|
|
@ -27,5 +27,9 @@ with open(config_name) as f:
|
||||||
|
|
||||||
print("config: ", config)
|
print("config: ", config)
|
||||||
|
|
||||||
processor = DataProcessor(config)
|
try:
|
||||||
processor.run()
|
processor = DataProcessor(config)
|
||||||
|
processor.run()
|
||||||
|
except Exception as err:
|
||||||
|
print('Command processor serious error: ', err)
|
||||||
|
sys.exit(1)
|
||||||
|
|
81
tests/test_transport_tcp.py
Normal file
81
tests/test_transport_tcp.py
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
import unittest
|
||||||
|
import socket
|
||||||
|
from threading import Thread
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
from transport.tcp import Transport
|
||||||
|
|
||||||
|
|
||||||
|
class ClientThread(Thread):
|
||||||
|
def __init__(self, write_line='test_line'):
|
||||||
|
Thread.__init__(self)
|
||||||
|
|
||||||
|
self.write_line = write_line
|
||||||
|
self.result = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
|
||||||
|
client.connect(('127.0.0.1', 23456))
|
||||||
|
|
||||||
|
client.sendall(bytes(self.write_line + "\n", 'utf-8'))
|
||||||
|
|
||||||
|
self.result = client.recv(1024)
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
|
||||||
|
class TransportThread(Thread):
|
||||||
|
def __init__(self, write_string='test'):
|
||||||
|
Thread.__init__(self)
|
||||||
|
|
||||||
|
self.write_string = write_string
|
||||||
|
self.result = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
transport = Transport({'host': '127.0.0.1', 'port': 23456})
|
||||||
|
|
||||||
|
self.result = transport.read_line()
|
||||||
|
|
||||||
|
transport.write_line(self.write_string)
|
||||||
|
|
||||||
|
sleep(0.5)
|
||||||
|
transport.close()
|
||||||
|
|
||||||
|
|
||||||
|
class MyTestCase(unittest.TestCase):
|
||||||
|
def test_write_line(self):
|
||||||
|
sent_string = 'test_sting'
|
||||||
|
|
||||||
|
transport_thread = TransportThread(sent_string)
|
||||||
|
transport_thread.start()
|
||||||
|
|
||||||
|
client_thread = ClientThread()
|
||||||
|
client_thread.start()
|
||||||
|
|
||||||
|
client_thread.join()
|
||||||
|
transport_thread.join()
|
||||||
|
|
||||||
|
data = client_thread.result
|
||||||
|
received_string = data.decode('utf-8')
|
||||||
|
|
||||||
|
self.assertEqual(sent_string+"\n", received_string) # add assertion here
|
||||||
|
|
||||||
|
def test_read_line(self):
|
||||||
|
sent_string = 'written_string'
|
||||||
|
|
||||||
|
transport_thread = TransportThread('test-test')
|
||||||
|
transport_thread.start()
|
||||||
|
|
||||||
|
client_thread = ClientThread(sent_string)
|
||||||
|
client_thread.start()
|
||||||
|
|
||||||
|
client_thread.join()
|
||||||
|
transport_thread.join()
|
||||||
|
|
||||||
|
received_string = transport_thread.result
|
||||||
|
|
||||||
|
self.assertEqual(sent_string, received_string) # add assertion here
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
|
@ -2,10 +2,10 @@ class TransportBase:
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
def writeline(self, cmd_str):
|
def write_line(self, cmd_str):
|
||||||
raise Exception('Implement me!')
|
raise Exception('Implement me!')
|
||||||
|
|
||||||
def readline(self):
|
def read_line(self):
|
||||||
raise Exception('Implement me!')
|
raise Exception('Implement me!')
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
|
|
@ -16,11 +16,11 @@ class Transport(TransportBase):
|
||||||
|
|
||||||
self.addr = None
|
self.addr = None
|
||||||
|
|
||||||
def writeline(self, cmd_str):
|
def write_line(self, cmd_str):
|
||||||
self.ser.write(str.encode(cmd_str + "\n"))
|
self.ser.write(str.encode(cmd_str + "\n"))
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
def readline(self):
|
def read_line(self):
|
||||||
buffer = self.ser.readline()
|
buffer = self.ser.readline()
|
||||||
return buffer.decode('utf-8').rstrip()
|
return buffer.decode('utf-8').rstrip()
|
||||||
|
|
||||||
|
|
|
@ -7,45 +7,67 @@ class Transport(TransportBase):
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
super().__init__(config)
|
super().__init__(config)
|
||||||
|
|
||||||
self.listener = self.__connect(config['host'], config['port'])
|
if not self.config['host']:
|
||||||
|
self.config['host'] = '127.0.0.1'
|
||||||
|
|
||||||
|
if not self.config['port']:
|
||||||
|
self.config['port'] = '12345'
|
||||||
|
|
||||||
|
self.__connect()
|
||||||
self.buffer = bytearray()
|
self.buffer = bytearray()
|
||||||
|
|
||||||
def writeline(self, cmd_str):
|
def write_line(self, cmd_str):
|
||||||
response_string = cmd_str + "\n"
|
response_string = cmd_str + "\n"
|
||||||
self.conn.sendall(bytes(response_string, 'utf-8'))
|
try:
|
||||||
|
self.conn.sendall(bytes(response_string, 'utf-8'))
|
||||||
|
except Exception as err:
|
||||||
|
print('Can not write line: ', err)
|
||||||
|
self.__connect()
|
||||||
|
|
||||||
def readline(self):
|
def read_line(self):
|
||||||
data = self.conn.recv(16) # команды все короткие, нет смысла много читать из буфера
|
data = self.__read_buffer()
|
||||||
# print('data: ', data)
|
|
||||||
if not data:
|
|
||||||
return ''
|
|
||||||
|
|
||||||
line = self.__findLine(data)
|
line = self.__findLine(data)
|
||||||
|
|
||||||
return line
|
return line
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
self.conn.close()
|
||||||
self.listener.close()
|
self.listener.close()
|
||||||
|
|
||||||
def __connect(self, host='127.0.0.1', port=12345):
|
def __connect(self):
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_listener:
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_listener:
|
||||||
socket_listener.bind((host, port))
|
socket_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
socket_listener.bind((self.config['host'], self.config['port']))
|
||||||
socket_listener.listen()
|
socket_listener.listen()
|
||||||
|
socket_listener.setblocking(True)
|
||||||
|
|
||||||
conn, addr = socket_listener.accept()
|
conn, addr = socket_listener.accept()
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
return socket_listener
|
|
||||||
|
self.listener = socket_listener
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
def __findLine(self, data):
|
def __findLine(self, data):
|
||||||
data_chunk = bytearray(self.buffer)
|
data_chunk = bytearray(self.buffer)
|
||||||
data_chunk.extend(data)
|
data_chunk.extend(data)
|
||||||
|
|
||||||
# print("line: ", data_chunk)
|
|
||||||
line_separator_index = data_chunk.find(b"\n")
|
line_separator_index = data_chunk.find(b"\n")
|
||||||
# print('n idx: ', line_separator_index)
|
|
||||||
|
|
||||||
if line_separator_index > 0:
|
if line_separator_index > 0:
|
||||||
line = data_chunk[0: line_separator_index]
|
line = data_chunk[0: line_separator_index]
|
||||||
self.buffer = data_chunk[line_separator_index+1:]
|
self.buffer = data_chunk[line_separator_index+1:]
|
||||||
|
|
||||||
return line.decode('utf-8')
|
return line.decode('utf-8')
|
||||||
|
|
||||||
|
def __read_buffer(self):
|
||||||
|
data = self.conn.recv(1024) # команды все короткие, нет смысла много читать из буфера
|
||||||
|
if not data:
|
||||||
|
self.__connect()
|
||||||
|
# Если данных нет, то клиент отвалился. Т.к. сокет у нас блокирующийся и recv будет
|
||||||
|
# висеть пока туда не придут хоть какие-то данные. В теории, тут можно словить бесконечную рекурсию,
|
||||||
|
# но сейчас и так сойдет
|
||||||
|
data = self.__read_buffer()
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
|
@ -17,8 +17,8 @@ commands_list = ['GET_A', 'GET_B', 'GET_C', 'GET_NORESP', 'INVALID_CMD']
|
||||||
while (True):
|
while (True):
|
||||||
for cmd in commands_list:
|
for cmd in commands_list:
|
||||||
print('Write: ' + cmd)
|
print('Write: ' + cmd)
|
||||||
connector.writeline(cmd)
|
connector.write_line(cmd)
|
||||||
|
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
print("Response: "+connector.readline())
|
print("Response: " + connector.read_line())
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
|
|
@ -8,7 +8,6 @@ config_name = "config.yml"
|
||||||
with open(config_name) as f:
|
with open(config_name) as f:
|
||||||
config = yaml.load(f, Loader=yaml.FullLoader)
|
config = yaml.load(f, Loader=yaml.FullLoader)
|
||||||
|
|
||||||
|
|
||||||
commands_list = ['GET_A', 'GET_B', 'GET_C', 'GET_NORESP', 'INVALID_CMD']
|
commands_list = ['GET_A', 'GET_B', 'GET_C', 'GET_NORESP', 'INVALID_CMD']
|
||||||
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
@ -18,8 +17,8 @@ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
for cmd in commands_list:
|
for cmd in commands_list:
|
||||||
print('Write: ' + cmd)
|
print('Write: ' + cmd)
|
||||||
s.sendall(bytes(cmd+"\n", 'utf-8'))
|
s.sendall(bytes(cmd+"\n", 'utf-8'))
|
||||||
# data = s.recv(1024)
|
data = s.recv(1024)
|
||||||
|
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
# print("Response: ", data.decode('utf-8'))
|
print("Response: ", data.decode('utf-8'))
|
||||||
sleep(1)
|
sleep(1)
|
||||||
|
|
Loading…
Reference in a new issue