2023-10-13 22:33:07 +03:00
|
|
|
from dataclasses import dataclass
|
|
|
|
from typing import Optional, Collection, Any
|
2023-10-23 13:44:07 +03:00
|
|
|
from abc import ABC, abstractmethod, abstractproperty
|
|
|
|
from enum import Enum
|
2023-10-13 22:33:07 +03:00
|
|
|
|
|
|
|
|
2023-10-23 13:44:07 +03:00
|
|
|
class DeviceLifecycleState(Enum):
|
|
|
|
INIT = 0
|
|
|
|
OPEN = 1
|
|
|
|
CLOSE = 2
|
2023-10-13 22:33:07 +03:00
|
|
|
|
2023-10-23 13:44:07 +03:00
|
|
|
|
|
|
|
class DeviceError(Exception):
|
2023-10-13 22:33:07 +03:00
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class NonReadableTrait(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class NonWritableTrait(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class TraitDescriptor:
|
|
|
|
name: str
|
|
|
|
info: Optional[str] = None
|
|
|
|
readable: bool = True
|
|
|
|
writable: bool = False
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class ActionDescriptor:
|
|
|
|
name: str
|
|
|
|
arguments: dict[str, type]
|
|
|
|
info: Optional[str] = None
|
|
|
|
|
|
|
|
|
2023-10-23 13:44:07 +03:00
|
|
|
class Device(ABC):
|
2023-10-13 22:33:07 +03:00
|
|
|
_state = DeviceLifecycleState.INIT
|
|
|
|
|
|
|
|
@property
|
|
|
|
def state(self) -> DeviceLifecycleState:
|
|
|
|
return self._state
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self._state = DeviceLifecycleState.CLOSE
|
|
|
|
|
2023-10-23 13:44:07 +03:00
|
|
|
@abstractproperty
|
2023-10-13 22:33:07 +03:00
|
|
|
def trait_descriptors(self) -> Collection[TraitDescriptor]:
|
|
|
|
pass
|
|
|
|
|
2023-10-23 13:44:07 +03:00
|
|
|
@abstractproperty
|
2023-10-13 22:33:07 +03:00
|
|
|
def action_descriptors(self) -> Collection[ActionDescriptor]:
|
|
|
|
pass
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def __getitem__(self, trait_name: str) -> Optional[Any]:
|
|
|
|
"""Return logical state of trait `trait_name`."""
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class SynchronyDevice(Device):
|
|
|
|
|
|
|
|
def open(self):
|
|
|
|
self._state = DeviceLifecycleState.OPEN
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def execute(self, action_name: str, *args, **kwargs):
|
|
|
|
"""Execute action `action_name`, using `args` and `kwargs` as action argument."""
|
2023-11-16 21:21:10 +03:00
|
|
|
raise DeviceError
|
2023-10-13 22:33:07 +03:00
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def read(self, trait_name: str) -> Any:
|
|
|
|
"""Read physical state of trait `trait_name` from device."""
|
|
|
|
raise NonReadableTrait
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def write(self, trait_name: str, value: Any) -> bool:
|
|
|
|
"""Pass `value` to trait `trait_name` of device."""
|
|
|
|
raise NonWritableTrait
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def invalidate(self, trait_name: str):
|
|
|
|
"""Invalidate logical state of trait `trait_name`"""
|
2023-10-23 13:44:07 +03:00
|
|
|
pass
|