import cmd import threading from queue import Queue from equipment.turtle_device import TurtleDevice class TurtleDeviceThread(threading.Thread): def __init__(self): super().__init__() self.device = TurtleDevice() self.queue = Queue() self.device.open() def add_task(self, action: str, *args, **kwargs): self.queue.put((action, args, kwargs)) def run(self): while True: action, args, kwargs = self.queue.get() if action == "stop": self.queue.task_done() break self.device.execute(action, *args, **kwargs) self.queue.task_done() class NoBlockingTurtleShell(cmd.Cmd): intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n' prompt = '(turtle) ' file = None def __init__(self, turtle_thread: TurtleDeviceThread): super().__init__() self.turtle_thread = turtle_thread def do_execute(self, arg): 'Executes a command with the given arguments: EXECUTE forward 100' command_and_args = parse(arg) if len(command_and_args) == 1: self.turtle_thread.add_task(command_and_args[0]) return None self.turtle_thread.add_task(command_and_args[0], *command_and_args[1:]) def do_exit(self, arg): 'Stops the message processing thread: EXIT' print('The message processing thread has stopped.') self.turtle_thread.add_task('stop') return True def parse(arg): 'Convert a series of zero or more numbers to an argument tuple' def perform_elem(elem: str): return int(elem) if elem.isdecimal() else elem return tuple(map(perform_elem, arg.split())) if __name__ == '__main__': turtle_thread = TurtleDeviceThread() turtle_thread.daemon = True thread_shell = threading.Thread(target=NoBlockingTurtleShell(turtle_thread).cmdloop) thread_shell.start() turtle_thread.run() thread_shell.join()