import cmd import threading from queue import Queue import turtle from equipment.turtle_device import TurtleDevice class TurtleDeviceThread(threading.Thread): # TODO(Homework 4) def __init__(self): super().__init__() self.device = TurtleDevice() self.queue = Queue[tuple]() self.device.open() self.device.execute("speed", 1) def run(self): while True: action, args, kwargs = self.queue.get() self.device.execute(action, *args, **kwargs) self.queue.task_done() def add_task(self, action, *args, **kwargs): self.queue.put((action, args, kwargs)) class NoBlockingTurtleShell(cmd.Cmd): intro = "Welcome to the turtle shell. Type help or ? to list commands.\n" prompt = "(turtle) " file = None def __init__(self, turtle_thread: TurtleDeviceThread): super().__init__() self.turtle_thread = turtle_thread @property def turtle_device(self): return self.turtle_thread.device def do_execute(self, arg): print(arg) command, number = tuple(arg.split()) assert number.isdecimal() self.turtle_thread.add_task(command, int(number)) def do_exit(self, arg): self.turtle_device.close() self.close() def precmd(self, line): line = line.lower() if self.file and "playback" not in line: print(line, file=self.file) return line def close(self): if self.file: self.file.close() self.file = None import tkinter if __name__ == "__main__": turtle_thread = TurtleDeviceThread() # TODO(Homework 4: Correct start thread) turtle_thread.daemon = True turtle_thread.start() NoBlockingTurtleShell(turtle_thread).cmdloop() turtle_thread.join()