serial-device/tests/test_transport_tcp.py

81 lines
1.9 KiB
Python

import unittest
import socket
from threading import Thread
from time import sleep
from transport.tcp import Transport
class ClientThread(Thread):
def __init__(self, write_line='test_line'):
Thread.__init__(self)
self.write_line = write_line
self.result = None
def run(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
client.connect(('127.0.0.1', 23456))
client.sendall(bytes(self.write_line + "\n", 'utf-8'))
self.result = client.recv(1024)
client.close()
class TransportThread(Thread):
def __init__(self, write_string='test'):
Thread.__init__(self)
self.write_string = write_string
self.result = None
def run(self):
transport = Transport({'host': '127.0.0.1', 'port': 23456})
self.result = transport.read_line()
transport.write_line(self.write_string)
sleep(0.5)
transport.close()
class MyTestCase(unittest.TestCase):
def test_write_line(self):
sent_string = 'test_sting'
transport_thread = TransportThread(sent_string)
transport_thread.start()
client_thread = ClientThread()
client_thread.start()
client_thread.join()
transport_thread.join()
data = client_thread.result
received_string = data.decode('utf-8')
self.assertEqual(sent_string+"\n", received_string) # add assertion here
def test_read_line(self):
sent_string = 'written_string'
transport_thread = TransportThread('test-test')
transport_thread.start()
client_thread = ClientThread(sent_string)
client_thread.start()
client_thread.join()
transport_thread.join()
received_string = transport_thread.result
self.assertEqual(sent_string, received_string) # add assertion here
if __name__ == '__main__':
unittest.main()