Compare commits

...

9 Commits
main ... main

27 changed files with 533 additions and 40 deletions

13
LW1/README.md Normal file
View File

@ -0,0 +1,13 @@
# Results
## Function execution time measurement
|Interpitator | Pure Python without type annotations | Pure Python with type annotations | Python + numpy|
| --- | --- | --- | --- |
|CPython 3.9 | 0.255727898 s | 0.250940687 s | 0.333585421 s|
|CPython 3.11 | 0.178182069 s | 0.18051334 s | 0.312025812 s |
|Latest PyPy for Python 3.9 | 0.039385466 s | 0.312025812 s | 1.51180428 s |
## Measuring the execution time of a function along with running the script and interpreter
|Interpitator | Pure Python without type annotations | Pure Python with type annotations | Python + numpy |
| --- | --- | --- | --- |
|CPython 3.9 | **real**: 0m0,260s; **user**: 0m0,260s; **sys**: 0m0,000s | **real**: 0m0,262s; **user**: 0m0,262s; **sys**: 0m0,000s | **real**: 0m0,458s; **user**: 0m0,644s; **sys**: 0m0,712s |
|CPython 3.11 | **real**: 0m0,191s; **user**: 0m0,191s; **sys**: 0m0,000s | **real**: 0m0,191s; **user**: 0m0,190s; **sys**: 0m0,000s | **real**: 0m0,426s; **user**: 0m0,533s; **sys**: 0m0,794s |
|Latest PyPy for Python 3.9 | **real**: 0m0,065s; **user**: 0m0,065s; **sys**: 0m0,000s | **real**: 0m0,064s; **user**: 0m0,064s; **sys**: 0m0,000s | **real**: 0m1,863s; **user**: 0m2,072s; **sys**: 0m0,687s |

24
LW1/python_and_numpy.py Normal file
View File

@ -0,0 +1,24 @@
import time
import numpy as np
def mandelbrot(pmin = -2.5, pmax = 1.5, qmin = -2, qmax = 2,
ppoints = 200, qpoints = 200, max_iterations = 300, infinity_border= 100):
image = np.zeros((ppoints, qpoints))
for ip, p in enumerate(np.linspace(pmin, pmax, ppoints)):
for iq, q in enumerate(np.linspace(qmin, qmax, qpoints)):
c = p + 1j * q
z = 0
for k in range(max_iterations):
z = z ** 2 + c
if abs(z) > infinity_border:
image[ip, iq] = 1
break
return image
tic = time.perf_counter_ns()
image = mandelbrot()
toc = time.perf_counter_ns()
print((toc - tic)/1_000_000_000, "s")

28
LW1/with_annotation.py Normal file
View File

@ -0,0 +1,28 @@
import time
def linspace(start, stop, n):
if n == 1:
yield stop
return
h = (stop - start) / (n - 1)
for i in range(n):
yield start + h * i
def mandelbrot(pmin: float = -2.5, pmax: float = 1.5, qmin: float = -2, qmax: float = 2,
ppoints: int = 200, qpoints: int = 200, max_iterations: int = 300, infinity_border: float = 100) -> list[list[int]]:
image: list[list[int]] = [[0 for i in range(qpoints)] for j in range(ppoints)]
for ip, p in enumerate(linspace(pmin, pmax, ppoints)):
for iq, q in enumerate(linspace(qmin, qmax, qpoints)):
c: complex = p + 1j * q
z: complex = 0
for k in range(max_iterations):
z = z ** 2 + c
if abs(z) > infinity_border:
image[ip][iq] = 1
break
return image
tic = time.perf_counter_ns()
image = mandelbrot()
toc = time.perf_counter_ns()
print((toc - tic)/1_000_000_000, "s")

27
LW1/without_annotation.py Normal file
View File

@ -0,0 +1,27 @@
import time
def linspace(start, stop, n):
if n == 1:
yield stop
return
h = (stop - start) / (n - 1)
for i in range(n):
yield start + h * i
def mandelbrot(pmin = -2.5, pmax= 1.5, qmin = -2, qmax= 2,
ppoints = 200, qpoints = 200, max_iterations = 300, infinity_border = 100):
image = [[0 for i in range(qpoints)] for j in range(ppoints)]
for ip, p in enumerate(linspace(pmin, pmax, ppoints)):
for iq, q in enumerate(linspace(qmin, qmax, qpoints)):
c = p + 1j * q
z = 0
for k in range(max_iterations):
z = z ** 2 + c
if abs(z) > infinity_border:
image[ip][iq] = 1
break
return image
tic = time.perf_counter_ns()
image = mandelbrot()
toc = time.perf_counter_ns()
print((toc - tic)/1_000_000_000, "s")

View File

@ -1,2 +1,31 @@
# advanced-python-homework-2023
## advanced-python-homework-2023
## Installation
1. Install `Python`
2. Install `python3-venv` by the following command: `apt install python3-venv`
3. Create and activate your virtual environment following the [instructions](https://docs.python.org/3/tutorial/venv.html)
4. Install the `requirements.txt` dependencies following the commands:
```
cd ./scada_system
pip3 install -r requirements.txt
```
5. Install the package in editable mode:
```
pip3 install -e .
```
6. To generate documentation in HTML format, you need to run the following commands:
```
cd ./docs
make html
```
## Tests
1. To run the tests, go to the `scada_system` directory and run the following command:
```
python3 -m unittest
```

View File

@ -1,8 +0,0 @@
from turtle import Turtle
from controls.device import SynchronyDevice
class TurtleDevice(SynchronyDevice):
pass # TODO(Homework #3)

21
scada_system/LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 Vasilev Ilya
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,9 @@
'''
Supervisory control and data acquisition (SCADA)
It is a control system architecture comprising computers,
networked data communications and graphical user interfaces for
high-level supervision of machines and processes. It also covers
sensors and other devices, such as programmable logic controllers,
which interface with process plant or machinery.
'''

View File

@ -1,9 +1,12 @@
from dataclasses import dataclass
from typing import Optional, Collection, Any
from abc import abstractmethod
from abc import ABC, abstractmethod
from enum import Enum
class DeviceLifecycleState:
pass # TODO(Homework #3)
class DeviceLifecycleState(Enum):
INIT = "INIT"
OPEN = "OPEN"
CLOSE = "CLOSE"
class DevaceError(Exception):
@ -33,8 +36,7 @@ class ActionDescriptor:
info: Optional[str] = None
class Device:
# TODO(Homework #3)
class Device(ABC):
_state = DeviceLifecycleState.INIT
@property
@ -44,9 +46,13 @@ class Device:
def close(self):
self._state = DeviceLifecycleState.CLOSE
@property
@abstractmethod
def trait_descriptors(self) -> Collection[TraitDescriptor]:
pass
@property
@abstractmethod
def action_descriptors(self) -> Collection[ActionDescriptor]:
pass

View File

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = source
BUILDDIR = build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

View File

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=source
set BUILDDIR=build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View File

@ -0,0 +1,28 @@
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'SCADA'
copyright = '2023, ilyavasilev'
author = 'ilyavasilev'
release = '0.0.1'
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = []
templates_path = ['_templates']
exclude_patterns = []
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'alabaster'
html_static_path = ['_static']

View File

@ -0,0 +1,20 @@
.. SCADA documentation master file, created by
sphinx-quickstart on Mon Sep 25 22:24:00 2023.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to SCADA's documentation!
=================================
.. toctree::
:maxdepth: 2
:caption: Contents:
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -0,0 +1,54 @@
from turtle import Turtle
from typing import Optional, Collection, Any
from controls.device import SynchronyDevice
from controls.device import TraitDescriptor
from controls.device import ActionDescriptor
import inspect
class TurtleDevice(SynchronyDevice):
def open(self):
self.turtle = Turtle()
super().open()
def close(self):
self.turtle.clear()
super().close()
def execute(self, action_name: str, *args, **kwargs):
getattr(self.turtle, action_name)(*args, **kwargs)
def read(self, trait_name: str):
"""Read physical state of trait `trait_name` from device."""
trait = self.trait_descriptors()[trait_name]
if not trait.readable : super().read(trait_name)
return trait.info
def write(self, trait_name: str, value: Any) -> bool:
"""Pass `value` to trait `trait_name` of device."""
if not self.trait_descriptors()[trait_name] : super().write(trait_name, value)
setattr(self.turtle, trait_name, value)
def invalidate(self, trait_name: str):
"""Invalidate logical state of trait `trait_name`"""
pass
def trait_descriptors(self):
traits = dict()
for name, value in inspect.getmembers(self.turtle):
if not callable(value) and not name.startswith('_'):
info = f"{value}"
traits[name] = TraitDescriptor(name, info)
return traits
def action_descriptors(self):
actions = dict()
for name, func in inspect.getmembers(self.turtle):
if callable(func) and not name.startswith('_'):
args = dict(inspect.signature(func).parameters)
info = inspect.getdoc(func)
actions[name] = ActionDescriptor(name, args, info)
return actions
def __getitem__(self, trait_name: str):
"""Return logical state of trait `trait_name`."""
pass

View File

@ -0,0 +1,66 @@
import cmd
import threading
from queue import Queue
from equipment.turtle_device import TurtleDevice
class TurtleDeviceThread(threading.Thread):
def __init__(self):
super().__init__()
self.device = TurtleDevice()
self.queue = Queue()
self.device.open()
def add_task(self, action: str, *args, **kwargs):
self.queue.put((action, args, kwargs))
def run(self):
while True:
action, args, kwargs = self.queue.get()
if action == "stop":
self.queue.task_done()
break
self.device.execute(action, *args, **kwargs)
self.queue.task_done()
class NoBlockingTurtleShell(cmd.Cmd):
intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
prompt = '(turtle) '
file = None
def __init__(self, turtle_thread: TurtleDeviceThread):
super().__init__()
self.turtle_thread = turtle_thread
def do_execute(self, arg):
'Executes a command with the given arguments: EXECUTE forward 100'
command_and_args = parse(arg)
if len(command_and_args) == 1:
self.turtle_thread.add_task(command_and_args[0])
return None
self.turtle_thread.add_task(command_and_args[0], *command_and_args[1:])
def do_exit(self, arg):
'Stops the message processing thread: EXIT'
print('The message processing thread has stopped.')
self.turtle_thread.add_task('stop')
return True
def parse(arg):
'Convert a series of zero or more numbers to an argument tuple'
def perform_elem(elem: str):
return int(elem) if elem.isdecimal() else elem
return tuple(map(perform_elem, arg.split()))
if __name__ == '__main__':
turtle_thread = TurtleDeviceThread()
turtle_thread.daemon = True
thread_shell = threading.Thread(target=NoBlockingTurtleShell(turtle_thread).cmdloop)
thread_shell.start()
turtle_thread.run()
thread_shell.join()

View File

@ -0,0 +1,4 @@
[project]
name = "SCADA"
description = "Cистема управления для сбора, анализа и визуализации данных"
version = "0.0.1"

View File

@ -0,0 +1,36 @@
alabaster==0.7.13
astroid==2.15.7
Babel==2.12.1
certifi==2023.7.22
charset-normalizer==3.2.0
dill==0.3.7
docutils==0.20.1
idna==3.4
imagesize==1.4.1
importlib-metadata==6.8.0
isort==5.12.0
Jinja2==3.1.2
lazy-object-proxy==1.9.0
MarkupSafe==2.1.3
mccabe==0.7.0
mypy==1.5.1
mypy-extensions==1.0.0
packaging==23.1
platformdirs==3.10.0
Pygments==2.16.1
pylint==2.17.6
requests==2.31.0
snowballstemmer==2.2.0
Sphinx==7.2.6
sphinxcontrib-applehelp==1.0.7
sphinxcontrib-devhelp==1.0.5
sphinxcontrib-htmlhelp==2.0.4
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.6
sphinxcontrib-serializinghtml==1.1.9
tomli==2.0.1
tomlkit==0.12.1
typing_extensions==4.8.0
urllib3==2.0.5
wrapt==1.15.0
zipp==3.17.0

View File

View File

View File

@ -0,0 +1,14 @@
from unittest import TestCase
from controls.device import DeviceLifecycleState
class DeviceLifecycleStateTest(TestCase):
def setUp(self) -> None:
pass
def test_enum(self):
self.assertEqual(DeviceLifecycleState["INIT"], DeviceLifecycleState.INIT)
self.assertEqual(DeviceLifecycleState["OPEN"], DeviceLifecycleState.OPEN)
self.assertEqual(DeviceLifecycleState["CLOSE"], DeviceLifecycleState.CLOSE)

View File

View File

@ -0,0 +1,18 @@
from unittest import TestCase
from equipment.turtle_device import TurtleDevice
class TurtleDeviceTest(TestCase):
def setUp(self) -> None:
self.device = TurtleDevice()
def test_open(self):
self.device.open()
self.device.execute('forward', 100)
self.device.execute('left', 90)
self.device.execute('right', 90)
self.device.execute('color', 'red')
self.device.execute('circle', 120, 180)
self.device.execute('home')
self.device.close()

View File

@ -0,0 +1,74 @@
import cmd, sys
from turtle import *
class TurtleShell(cmd.Cmd):
intro = 'Welcome to the turtle shell. Type help or ? to list commands.\n'
prompt = '(turtle) '
file = None
# ----- basic turtle commands -----
def do_forward(self, arg):
'Move the turtle forward by the specified distance: FORWARD 10'
forward(*parse(arg))
def do_right(self, arg):
'Turn turtle right by given number of degrees: RIGHT 20'
right(*parse(arg))
def do_left(self, arg):
'Turn turtle left by given number of degrees: LEFT 90'
left(*parse(arg))
def do_goto(self, arg):
'Move turtle to an absolute position with changing orientation. GOTO 100 200'
goto(*parse(arg))
def do_home(self, arg):
'Return turtle to the home position: HOME'
home()
def do_circle(self, arg):
'Draw circle with given radius an options extent and steps: CIRCLE 50'
circle(*parse(arg))
def do_position(self, arg):
'Print the current turtle position: POSITION'
print('Current position is %d %d\n' % position())
def do_heading(self, arg):
'Print the current turtle heading in degrees: HEADING'
print('Current heading is %d\n' % (heading(),))
def do_color(self, arg):
'Set the color: COLOR BLUE'
color(arg.lower())
def do_undo(self, arg):
'Undo (repeatedly) the last turtle action(s): UNDO'
def do_reset(self, arg):
'Clear the screen and return turtle to center: RESET'
reset()
def do_bye(self, arg):
'Stop recording, close the turtle window, and exit: BYE'
print('Thank you for using Turtle')
self.close()
bye()
return True
# ----- record and playback -----
def do_record(self, arg):
'Save future commands to filename: RECORD rose.cmd'
self.file = open(arg, 'w')
def do_playback(self, arg):
'Playback commands from a file: PLAYBACK rose.cmd'
self.close()
with open(arg) as f:
self.cmdqueue.extend(f.read().splitlines())
def precmd(self, line):
line = line.lower()
if self.file and 'playback' not in line:
print(line, file=self.file)
return line
def close(self):
if self.file:
self.file.close()
self.file = None
def parse(arg):
'Convert a series of zero or more numbers to an argument tuple'
return tuple(map(int, arg.split()))
if __name__ == '__main__':
TurtleShell().cmdloop()

View File

@ -1,12 +0,0 @@
from unittest import TestCase
from controls.device import DeviceLifecycleState
class DeviceLifecycleStateTest(TestCase):
def setUp(self) -> None:
pass
def test_enum(self):
self.assertEqual(DeviceLifecycleStateTest["INIT"], DeviceLifecycleStateTest.INIT)

View File

@ -1,13 +0,0 @@
from unittest import TestCase
from equipment.turtle_device import TurtleDevice
class TurtleDeviceTest(TestCase):
def setUp(self) -> None:
self.device = TurtleDevice()
def test_open(self):
self.device.open()
self.device.close()