import cmd import threading from threading import Thread from threading import Event import argparse import time from queue import Queue from equipment.turtle_device import TurtleDevice class TurtleDeviceThread(threading.Thread): # TODO(Homework 4) \begin def __message_processing__(self): while True: new_message = self.queue.get() #print(type(new_message)) #print(new_message) #print("Name and args of action are {}".format(new_message)) self.device.execute(*new_message) #time.sleep(30) self.queue.task_done() if self.event.is_set(): break def __message_thread__(self): thread = Thread(target = self.__message_processing__, daemon=True) thread.start() # TODO(Homework 4) \end def __init__(self): super().__init__() self.device = TurtleDevice() self.queue = Queue() self.event = Event() class NoBlockingTurtleShell(cmd.Cmd): intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n' prompt = '(turtle) ' file = None def __init__(self, turtle_thread: TurtleDeviceThread): # TODO(Homework 4) \begin super(NoBlockingTurtleShell, self).__init__() turtle_thread.__message_thread__() # TODO(Homework 4) \end def do_execute(self, arg): # TODO(Homework 4) \begin turtle_thread.queue.put(parse(arg)) # TODO(Homework 4) \end def do_exit(self, arg): # TODO(Homework 4) \begin turtle_thread.event.set() # TODO(Homework 4) \end def parse(arg): 'Convert a series of zero or more numbers to an argument tuple' return tuple(arg.split()) if __name__ == '__main__': turtle_thread = TurtleDeviceThread() # TODO(Homework 4: Correct start thread) NoBlockingTurtleShell(turtle_thread).cmdloop()