diff --git a/data_processor/__init__.py b/data_processor/__init__.py index 35eb334..e0c72b8 100644 --- a/data_processor/__init__.py +++ b/data_processor/__init__.py @@ -1,6 +1,6 @@ import importlib import logging -import sys +from time import sleep from command_parser import CommandParser @@ -27,26 +27,31 @@ class DataProcessor: self.transport = cls(self.config["transport"]) except Exception as err: - print("Can't create transport: ", err) - # raise err - sys.exit(1) + logging.error("Can't create transport: ", err) + raise err def run(self): logging.debug("Waiting for commands...") while True: - command_string = self.transport.readline() + command_string = self.transport.read_line() logging.debug('read: %s', command_string) + if not command_string: + sleep(1) # Все читатели блокирующиеся и, в теории, этот код не будет вызываться практически никогда. + # Но, на практике, стоит добавить слип чтобы не жрал зря CPU + continue + + response_str = None + try: command = CommandParser.parse(command_string) except ValueError as err: logging.warning("Command parser error: %s", err) - self.transport.writeline('Failed to process command!') + response_str = 'Failed to process command!' else: parsed_command_name = command.get('command') - response_str = None match parsed_command_name: case 'GET': sensor_name = command.get('value') @@ -64,4 +69,7 @@ class DataProcessor: print('Unknown command: %s', parsed_command_name) response_str = 'Unknown command' - self.transport.writeline(response_str) + try: + self.transport.write_line(response_str) + except Exception as err: + logging.error('Cannot send message: %s', err) diff --git a/reader.py b/reader.py index 6d1c2b5..e2725fb 100644 --- a/reader.py +++ b/reader.py @@ -27,5 +27,9 @@ with open(config_name) as f: print("config: ", config) -processor = DataProcessor(config) -processor.run() +try: + processor = DataProcessor(config) + processor.run() +except Exception as err: + print('Command processor serious error: ', err) + sys.exit(1) diff --git a/tests/test_transport_tcp.py b/tests/test_transport_tcp.py new file mode 100644 index 0000000..a74d46a --- /dev/null +++ b/tests/test_transport_tcp.py @@ -0,0 +1,81 @@ +import unittest +import socket +from threading import Thread +from time import sleep + +from transport.tcp import Transport + + +class ClientThread(Thread): + def __init__(self, write_line='test_line'): + Thread.__init__(self) + + self.write_line = write_line + self.result = None + + def run(self): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client: + client.connect(('127.0.0.1', 23456)) + + client.sendall(bytes(self.write_line + "\n", 'utf-8')) + + self.result = client.recv(1024) + + client.close() + + +class TransportThread(Thread): + def __init__(self, write_string='test'): + Thread.__init__(self) + + self.write_string = write_string + self.result = None + + def run(self): + transport = Transport({'host': '127.0.0.1', 'port': 23456}) + + self.result = transport.read_line() + + transport.write_line(self.write_string) + + sleep(0.5) + transport.close() + + +class MyTestCase(unittest.TestCase): + def test_write_line(self): + sent_string = 'test_sting' + + transport_thread = TransportThread(sent_string) + transport_thread.start() + + client_thread = ClientThread() + client_thread.start() + + client_thread.join() + transport_thread.join() + + data = client_thread.result + received_string = data.decode('utf-8') + + self.assertEqual(sent_string+"\n", received_string) # add assertion here + + def test_read_line(self): + sent_string = 'written_string' + + transport_thread = TransportThread('test-test') + transport_thread.start() + + client_thread = ClientThread(sent_string) + client_thread.start() + + client_thread.join() + transport_thread.join() + + received_string = transport_thread.result + + self.assertEqual(sent_string, received_string) # add assertion here + + +if __name__ == '__main__': + unittest.main() diff --git a/transport/base/__init__.py b/transport/base/__init__.py index 230be76..218e0a7 100644 --- a/transport/base/__init__.py +++ b/transport/base/__init__.py @@ -2,10 +2,10 @@ class TransportBase: def __init__(self, config): self.config = config - def writeline(self, cmd_str): + def write_line(self, cmd_str): raise Exception('Implement me!') - def readline(self): + def read_line(self): raise Exception('Implement me!') def close(self): diff --git a/transport/serial/__init__.py b/transport/serial/__init__.py index 2ab000f..cfcd405 100644 --- a/transport/serial/__init__.py +++ b/transport/serial/__init__.py @@ -16,11 +16,11 @@ class Transport(TransportBase): self.addr = None - def writeline(self, cmd_str): + def write_line(self, cmd_str): self.ser.write(str.encode(cmd_str + "\n")) return 1 - def readline(self): + def read_line(self): buffer = self.ser.readline() return buffer.decode('utf-8').rstrip() diff --git a/transport/tcp/__init__.py b/transport/tcp/__init__.py index 835e178..f167f1a 100644 --- a/transport/tcp/__init__.py +++ b/transport/tcp/__init__.py @@ -7,45 +7,67 @@ class Transport(TransportBase): def __init__(self, config): super().__init__(config) - self.listener = self.__connect(config['host'], config['port']) + if not self.config['host']: + self.config['host'] = '127.0.0.1' + + if not self.config['port']: + self.config['port'] = '12345' + + self.__connect() self.buffer = bytearray() - def writeline(self, cmd_str): + def write_line(self, cmd_str): response_string = cmd_str + "\n" - self.conn.sendall(bytes(response_string, 'utf-8')) + try: + self.conn.sendall(bytes(response_string, 'utf-8')) + except Exception as err: + print('Can not write line: ', err) + self.__connect() - def readline(self): - data = self.conn.recv(16) # команды все короткие, нет смысла много читать из буфера - # print('data: ', data) - if not data: - return '' + def read_line(self): + data = self.__read_buffer() line = self.__findLine(data) return line def close(self): + self.conn.close() self.listener.close() - def __connect(self, host='127.0.0.1', port=12345): + def __connect(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_listener: - socket_listener.bind((host, port)) + socket_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + socket_listener.bind((self.config['host'], self.config['port'])) socket_listener.listen() + socket_listener.setblocking(True) conn, addr = socket_listener.accept() self.conn = conn - return socket_listener + + self.listener = socket_listener + + return 1 def __findLine(self, data): data_chunk = bytearray(self.buffer) data_chunk.extend(data) - # print("line: ", data_chunk) line_separator_index = data_chunk.find(b"\n") - # print('n idx: ', line_separator_index) if line_separator_index > 0: line = data_chunk[0: line_separator_index] self.buffer = data_chunk[line_separator_index+1:] return line.decode('utf-8') + + def __read_buffer(self): + data = self.conn.recv(1024) # команды все короткие, нет смысла много читать из буфера + if not data: + self.__connect() + # Если данных нет, то клиент отвалился. Т.к. сокет у нас блокирующийся и recv будет + # висеть пока туда не придут хоть какие-то данные. В теории, тут можно словить бесконечную рекурсию, + # но сейчас и так сойдет + data = self.__read_buffer() + + return data diff --git a/writer.py b/writer.py index 9b6c486..a5f46ac 100644 --- a/writer.py +++ b/writer.py @@ -17,8 +17,8 @@ commands_list = ['GET_A', 'GET_B', 'GET_C', 'GET_NORESP', 'INVALID_CMD'] while (True): for cmd in commands_list: print('Write: ' + cmd) - connector.writeline(cmd) + connector.write_line(cmd) sleep(0.5) - print("Response: "+connector.readline()) + print("Response: " + connector.read_line()) sleep(1) diff --git a/writer_tcp.py b/writer_tcp.py index a73f712..03068e0 100644 --- a/writer_tcp.py +++ b/writer_tcp.py @@ -8,7 +8,6 @@ config_name = "config.yml" with open(config_name) as f: config = yaml.load(f, Loader=yaml.FullLoader) - commands_list = ['GET_A', 'GET_B', 'GET_C', 'GET_NORESP', 'INVALID_CMD'] with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -18,8 +17,8 @@ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: for cmd in commands_list: print('Write: ' + cmd) s.sendall(bytes(cmd+"\n", 'utf-8')) - # data = s.recv(1024) + data = s.recv(1024) sleep(0.5) - # print("Response: ", data.decode('utf-8')) + print("Response: ", data.decode('utf-8')) sleep(1)