Compare commits

..

No commits in common. "1a6e43dfdd6f3ed855ef588439bf1cbbac597ce7" and "8a621ebbe3a26d5b9878e849686741a6611401d9" have entirely different histories.

5 changed files with 109 additions and 125 deletions

View File

@ -1,16 +1,15 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Collection, Any from typing import Optional, Collection, Any
from abc import ABC, abstractmethod, abstractproperty from abc import ABC, abstractmethod
from enum import Enum from enum import Enum, auto
class DeviceLifecycleState(Enum): class DeviceLifecycleState(Enum):
INIT = 0 INIT = "init"
OPEN = 1 OPEN = "open"
CLOSE = 2 CLOSE = "close"
class DeviceError(Exception): class DevaceError(Exception):
pass pass
@ -38,6 +37,7 @@ class ActionDescriptor:
class Device(ABC): class Device(ABC):
# TODO(Homework #3)
_state = DeviceLifecycleState.INIT _state = DeviceLifecycleState.INIT
@property @property
@ -47,11 +47,13 @@ class Device(ABC):
def close(self): def close(self):
self._state = DeviceLifecycleState.CLOSE self._state = DeviceLifecycleState.CLOSE
@abstractproperty @property
@abstractmethod
def trait_descriptors(self) -> Collection[TraitDescriptor]: def trait_descriptors(self) -> Collection[TraitDescriptor]:
pass pass
@abstractproperty @property
@abstractmethod
def action_descriptors(self) -> Collection[ActionDescriptor]: def action_descriptors(self) -> Collection[ActionDescriptor]:
pass pass
@ -69,7 +71,7 @@ class SynchronyDevice(Device):
@abstractmethod @abstractmethod
def execute(self, action_name: str, *args, **kwargs): def execute(self, action_name: str, *args, **kwargs):
"""Execute action `action_name`, using `args` and `kwargs` as action argument.""" """Execute action `action_name`, using `args` and `kwargs` as action argument."""
raise DeviceError pass
@abstractmethod @abstractmethod
def read(self, trait_name: str) -> Any: def read(self, trait_name: str) -> Any:
@ -85,3 +87,6 @@ class SynchronyDevice(Device):
def invalidate(self, trait_name: str): def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`""" """Invalidate logical state of trait `trait_name`"""
pass pass
def close(self):
return super().close()

View File

@ -1,80 +1,73 @@
import time
from turtle import Turtle from turtle import Turtle
from typing import Optional, Collection, Any from equipment.device import SynchronyDevice
from controls.device import * from equipment.device import Device
import inspect import inspect
from typing import Optional, Collection, Any
class TurtleDevice(SynchronyDevice): class TurtleDevice(SynchronyDevice):
def open(self): def open(self):
self.turtle = Turtle()
self.descriptors = dict(traits=dict(), actions=dict())
super().open() super().open()
self.turtle = Turtle()
def close (self): def close (self):
self.turtle.reset()
super().close() super().close()
del self.turtle
def trait_descriptors(self) -> Collection[TraitDescriptor]: @property
return self.get_descriptors()["traits"] def trait_descriptors(self):
l = list()
for i in inspect.getmembers(TurtleDevice):
if not (i[0].startswith("__") or i[0].startswith("_")) and not inspect.isfunction(i[1]):
l.append(i[0])
return l
def action_descriptors(self) -> Collection[ActionDescriptor]: @property
return self.get_descriptors()["actions"] def action_descriptors(self):
l = list()
def __getitem__(self, trait_name: str) -> Optional[Any]: #print(inspect.getmembers(TurtleDevice))
"""Return logical state of trait `trait_name`.""" #for i in dir(TurtleDevice):
return self.descriptors[trait_name] # # print(type(getattr(TurtleDevice(), i))=='method')
# # if not i.startswith("__") and type(getattr(TurtleDevice(), i)) == types.MethodType:
def read_descriptors(self, arg): # if not i.startswith("__") and inspect.isroutine(inspect.getattr_static(TurtleDevice, i)):
self.descriptors = self.get_descriptors() # sig = inspect.signature(getattr(TurtleDevice, i))
return self.descriptors # l.append(i + " " + str(sig))
#return l
def read(self, trait_name: str) -> Any: for i in inspect.getmembers(TurtleDevice):
"""Read physical state of trait `trait_name` from device.""" # print(type(getattr(TurtleDevice, i)))
self.read_descriptors() # print(type(getattr(TurtleDevice(), i))=='method')
return self.descriptors[trait_name] if not (i[0].startswith("__") or i[0].startswith("_")) and inspect.isfunction(i[1]):
sig = inspect.signature(getattr(TurtleDevice, i[0]))
def write(self, trait_name: str, value: Any) -> bool: l.append(i[0]+str(sig))
"""Turtle traits are not writable""" return l
# trait_desc = self.trait_descriptors.get(trait_name)
# if trait_desc.writable:
# return getattr(self.turtle, trait_name)(value)
super().write()
def execute(self, action_name: str, *args, **kwargs): def execute(self, action_name: str, *args, **kwargs):
"""Execute action `action_name`, using `args` and `kwargs` as action argument.""" """Execute action `action_name`, using `args` and `kwargs` as action argument."""
action = self.get_descriptors()["actions"].get(action_name) if self.state == 1:
if action: f = getattr(self.turtle, action_name)
getattr(self.turtle, action_name)(*args, **kwargs) f(int(*args), **kwargs)
return
else: else:
raise DeviceError("No such action: `{}`".format(action_name)) print("It's not opened")
self.open()
#time.sleep(30)
f = getattr(self.turtle, action_name)
f(int(*args), **kwargs)
def read(self, trait_name: str):
"""Read physical state of trait `trait_name` from device."""
return getattr(self.turtle, trait_name)
def write(self, trait_name: str, value: Any) -> bool:
"""Pass `value` to trait `trait_name` of device."""
setattr(self.turtle, trait_name, value)
def invalidate(self, trait_name: str): def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`""" """Invalidate logical state of trait `trait_name`"""
if self.trait_descriptors.get(trait_name): if self.read(trait_name) == self.__getitem__(trait_name):
self.trait_descriptors[trait_name] = None print("Everything's okey")
raise DeviceError()
def get_descriptors(self):
descriptors = dict(actions=dict(), traits=dict())
for m_name, member in inspect.getmembers(Turtle):
if m_name.startswith("_"):
continue
if m_name.lower() != m_name:
continue
doc = inspect.getdoc(member)
if doc is None:
continue
if not inspect.isfunction(member):
descriptors["traits"][m_name] = TraitDescriptor(
m_name, doc, readable=True, writable=False
)
else: else:
sig = inspect.signature(member) print(f"Something went wrong with {trait_name}.")
params_dict = dict(sig.parameters)
descriptors["actions"][m_name] = ActionDescriptor( def __getitem__(self, trait_name: str) -> Optional[Any]:
m_name, arguments=params_dict, info=doc """Return logical state of trait `trait_name`."""
) return getattr(self, trait_name)
return descriptors

View File

@ -1,76 +1,62 @@
import cmd import cmd
import threading import threading
from queue import Queue from queue import Empty, Queue
from turtle import forward
from equipment.turtle_device import TurtleDevice from equipment.turtle_device import TurtleDevice
import argparse
import time
class TurtleDeviceThread(threading.Thread): class TurtleDeviceThread(threading.Thread):
# TODO(Homework 4)
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.device = TurtleDevice() self.device = TurtleDevice()
self.queue = Queue()
self.queue = Queue[tuple]() self.event = Event()
self.device.open()
self.device.execute("speed", 1)
def run(self): def run(self):
while True: while True:
action, args, kwargs = self.queue.get() try:
if action == "exit": item = self.queue.get()
self.queue.task_done() except self.queue.Empty:
continue
else:
if (item == 'exit'):
break break
self.device.execute(action, *args, **kwargs) self.device.execute(item[0], item[1:])
self.queue.task_done() self.queue.task_done()
def __message_processing__(self):
def add_task(self, action, *args, **kwargs): while True:
self.queue.put((action, args, kwargs)) new_message = self.queue.get()
#print(type(new_message))
#print(new_message)
#print("Name and args of action are {}".format(new_message))
self.device.execute(*new_message)
#time.sleep(30)
self.queue.task_done()
if self.event.is_set():
break
def __message_thread__(self):
thread = Thread(target = self.__message_processing__, daemon=True)
thread.start()
class NoBlockingTurtleShell(cmd.Cmd): class NoBlockingTurtleShell(cmd.Cmd):
intro = "Welcome to the turtle shell. Type help or ? to list commands.\n" intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
prompt = "(turtle) " prompt = '(turtle) '
file = None file = None
def __init__(self, turtle_thread: TurtleDeviceThread): def __init__(self, turtle_thread: TurtleDeviceThread):
super().__init__() super(NoBlockingTurtleShell, self).__init__()
self.turtle_thread = turtle_thread turtle_thread.__message_thread__()
@property
def turtle_device(self):
return self.turtle_thread.device
def do_execute(self, arg): def do_execute(self, arg):
command_and_args = arg.split() turtle_thread.queue.put(parse(arg))
if len(command_and_args) == 1:
self.turtle_thread.add_task(command) def do_exit(self, arg):
return turtle_thread.event.set()
command = command_and_args[0]
args = tuple(int(c) if c.isdecimal() else c for c in command_and_args[1:])
self.turtle_thread.add_task(command, *args)
def do_forward(self, arg): if __name__ == '__main__':
'Move the turtle forward by the specified distance: FORWARD 10'
self.do_execute("forward 100")
def do_right(self, arg):
'Turn turtle right by given number of degrees: RIGHT 20'
self.do_execute("right 100")
def do_left(self, arg):
'Turn turtle left by given number of degrees: LEFT 90'
self.do_execute("left 100")
def do_bye(self, arg):
'Stop recording, close the turtle window, and exit: BYE'
print('Thank you for using Turtle')
self.turtle_thread.add_task("exit")
return True
if __name__ == "__main__":
turtle_thread = TurtleDeviceThread() turtle_thread = TurtleDeviceThread()
turtle_thread.daemon = True turtle_thread.start()
thread_shell = threading.Thread(target=NoBlockingTurtleShell(turtle_thread).cmdloop) NoBlockingTurtleShell(turtle_thread).cmdloop()
thread_shell.start()
turtle_thread.run()
thread_shell.join()