import cmd import threading from queue import Queue from equipment.turtle_device import TurtleDevice class TurtleDeviceThread(threading.Thread): # TODO(Homework 4) def __init__(self): super().__init__() self.device = TurtleDevice() self.queue = Queue[tuple]() self.device.open() self.device.execute("speed", 1) def run(self): while True: action, args, kwargs = self.queue.get() if action == "exit": self.queue.task_done() break self.device.execute(action, *args, **kwargs) self.queue.task_done() def add_task(self, action, *args, **kwargs): self.queue.put((action, args, kwargs)) class NoBlockingTurtleShell(cmd.Cmd): intro = "Welcome to the turtle shell. Type help or ? to list commands.\n" prompt = "(turtle) " file = None def __init__(self, turtle_thread: TurtleDeviceThread): super().__init__() self.turtle_thread = turtle_thread @property def turtle_device(self): return self.turtle_thread.device def do_execute(self, arg): command_and_args = arg.split() if len(command_and_args) == 1: self.turtle_thread.add_task(command) return command = command_and_args[0] args = tuple(int(c) if c.isdecimal() else c for c in command_and_args[1:]) self.turtle_thread.add_task(command, *args) def do_fw(self, arg): self.do_execute("forward 100") def do_le(self, arg): self.do_execute("left 1000") def do_exit(self, arg): self.turtle_thread.add_task("exit") return True if __name__ == "__main__": turtle_thread = TurtleDeviceThread() # TODO(Homework 4: Correct start thread) turtle_thread.daemon = True thread_shell = threading.Thread(target=NoBlockingTurtleShell(turtle_thread).cmdloop) thread_shell.start() turtle_thread.run() thread_shell.join()