serial-device/transport/tcp/__init__.py

73 lines
2.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
from transport.base import TransportBase
class Transport(TransportBase):
def __init__(self, config):
super().__init__(config)
if not self.config['host']:
self.config['host'] = '127.0.0.1'
if not self.config['port']:
self.config['port'] = '12345'
self.__connect()
self.buffer = bytearray()
def write_line(self, cmd_str):
response_string = cmd_str + "\n"
try:
self.conn.sendall(bytes(response_string, 'utf-8'))
except Exception as err:
print('Can not write line: ', err)
self.__connect()
def read_line(self):
data = self.__read_buffer()
line = self.__findLine(data)
return line
def close(self):
self.conn.close()
self.listener.close()
def __connect(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_listener:
socket_listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket_listener.bind((self.config['host'], self.config['port']))
socket_listener.listen()
socket_listener.setblocking(True)
conn, addr = socket_listener.accept()
self.conn = conn
self.listener = socket_listener
return 1
def __findLine(self, data):
data_chunk = bytearray(self.buffer)
data_chunk.extend(data)
line_separator_index = data_chunk.find(b"\n")
if line_separator_index > 0:
line = data_chunk[0: line_separator_index]
self.buffer = data_chunk[line_separator_index+1:]
return line.decode('utf-8')
def __read_buffer(self):
data = self.conn.recv(1024) # команды все короткие, нет смысла много читать из буфера
if not data:
self.__connect()
# Если данных нет, то клиент отвалился. Т.к. сокет у нас блокирующийся и recv будет
# висеть пока туда не придут хоть какие-то данные. В теории, тут можно словить бесконечную рекурсию,
# но сейчас и так сойдет
data = self.__read_buffer()
return data