Compare commits

...

4 Commits

Author SHA1 Message Date
c2c68123de Implement TurtleDevice 2023-10-25 20:56:57 +03:00
49341abd41 Fix tests 2023-10-25 20:12:40 +03:00
efb24c1d20 Uptade project configuration 2023-10-25 19:52:32 +03:00
f6040acd22 Update .gitignore 2023-10-25 19:52:19 +03:00
7 changed files with 97 additions and 12 deletions

2
.gitignore vendored
View File

@ -157,4 +157,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ .vscode/

View File

@ -1,12 +1,15 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Collection, Any from typing import Optional, Collection, Any
from abc import abstractmethod from abc import ABC, abstractmethod
from enum import Enum
class DeviceLifecycleState: class DeviceLifecycleState(Enum):
pass # TODO(Homework #3) INIT = 1
OPEN = 2
CLOSE = 3
class DevaceError(Exception): class DeviceError(Exception):
pass pass
@ -33,8 +36,7 @@ class ActionDescriptor:
info: Optional[str] = None info: Optional[str] = None
class Device: class Device(ABC):
# TODO(Homework #3)
_state = DeviceLifecycleState.INIT _state = DeviceLifecycleState.INIT
@property @property
@ -44,9 +46,13 @@ class Device:
def close(self): def close(self):
self._state = DeviceLifecycleState.CLOSE self._state = DeviceLifecycleState.CLOSE
@property
@abstractmethod
def trait_descriptors(self) -> Collection[TraitDescriptor]: def trait_descriptors(self) -> Collection[TraitDescriptor]:
pass pass
@property
@abstractmethod
def action_descriptors(self) -> Collection[ActionDescriptor]: def action_descriptors(self) -> Collection[ActionDescriptor]:
pass pass
@ -79,4 +85,4 @@ class SynchronyDevice(Device):
@abstractmethod @abstractmethod
def invalidate(self, trait_name: str): def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`""" """Invalidate logical state of trait `trait_name`"""
pass pass

View File

@ -1,8 +1,81 @@
from turtle import Turtle from turtle import Turtle
from typing import Any, Collection, Optional
from types import FunctionType
from inspect import getfullargspec
from controls.device import SynchronyDevice from controls.device import SynchronyDevice
from controls.device import ActionDescriptor, TraitDescriptor
from controls.device import DeviceLifecycleState, DeviceError
class TurtleDevice(SynchronyDevice): class TurtleDevice(SynchronyDevice):
pass # TODO(Homework #3) def __init__(self) -> None:
super().__init__()
self._device: Optional[Turtle] = None
self._logical_traits: dict[str, Optional[Any]] = {}
self._actions: list[str] = []
def open(self) -> None:
"""Opens a new device and initializes all logical traits with `None`"""
if self.state == DeviceLifecycleState.OPEN:
return
super().open()
self._device = Turtle()
self._logical_traits = {k.name: None for k in self.trait_descriptors()}
self._actions = [a.name for a in self.action_descriptors()]
def close(self) -> None:
"""Closes the device and invalidates all logial traits"""
if self.state == DeviceLifecycleState.CLOSE:
return
self._logical_traits = {k: None for k in self._logical_traits.keys()}
self._device = None
super().close()
def action_descriptors(self) -> Collection[ActionDescriptor]:
def get_args(f: FunctionType) -> dict[str, type]:
spec = getfullargspec(f)
return {arg: spec.annotations.get(arg, object) for arg in spec.args}
actions = (dict(filter(lambda i: not i[0].startswith('_'),
filter(lambda i: isinstance(i[1], FunctionType),
self._device.__dict__.items())))) # lisp :3
return [ActionDescriptor(name=action[0],arguments=get_args(action[1])) for action in actions.items()]
def trait_descriptors(self) -> Collection[TraitDescriptor]:
traits = dict(filter(lambda i: not i[0].startswith('_'),
filter(lambda i: not isinstance(i[1], FunctionType),
self._device.__dict__.items())))
return [TraitDescriptor(name=trait) for trait in traits.keys()]
def read(self, trait_name: str) -> Any:
if self.state != DeviceLifecycleState.OPEN:
raise DeviceError("Device is not opened")
self._logical_traits[trait_name] = self._device.__dict__[trait_name]
return self._device.__dict__[trait_name]
def write(self, trait_name: str, value: Any) -> bool:
if self.state != DeviceLifecycleState.OPEN:
raise DeviceError("Device is not opened")
if trait_name not in self._logical_traits.keys():
raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}")
return super().write(trait_name, value)
def execute(self, action_name: str, *args, **kwargs):
if self.state != DeviceLifecycleState.OPEN:
raise DeviceError("Device is not opened")
if action_name not in self._actions:
raise KeyError(f"\"{action_name}\" is not an action of {self.__class__}")
return self._device.__dict__[action_name](args, kwargs)
def invalidate(self, trait_name: str):
if trait_name in self._logical_traits:
self._logical_traits[trait_name] = None
else:
raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}")
def __getitem__(self, trait_name: str) -> Optional[Any]:
if trait_name in self._logical_traits:
return self._logical_traits[trait_name]
else:
raise KeyError(f"\"{trait_name}\" is not a trait of {self.__class__}")

View File

@ -1,8 +1,14 @@
[build-system]
requires = ["setuptools", "setuptools-scm"]
build-backend = "setuptools.build_meta"
[project] [project]
name = "controls" name = "controls"
version = "0.0.1" version = "0.0.2"
license = {file = "LICENSE"} license = {file = "LICENSE"}
description = "Example SCADA System" description = "Example SCADA System"
authors = [{name = "teldufalsari et al (cockroaches)"}] authors = [{name = "teldufalsari et al (cockroaches)"}]
requires-python = ">=3.9" requires-python = ">=3.9"
[tool.setuptools]
packages = ["controls", "equipment"]

View File

View File

@ -9,4 +9,4 @@ class DeviceLifecycleStateTest(TestCase):
pass pass
def test_enum(self): def test_enum(self):
self.assertEqual(DeviceLifecycleStateTest["INIT"], DeviceLifecycleStateTest.INIT) self.assertEqual(DeviceLifecycleState["INIT"], DeviceLifecycleState.INIT)

View File