- validation of methods, possibly using schema;
- cleanup
This commit is contained in:
Andrea Mistrali 2023-05-01 09:54:19 +02:00
parent ba50af7c5e
commit eac3e8c3da
No known key found for this signature in database
GPG Key ID: 6FB0A77F6D5DA9B2
3 changed files with 115 additions and 11 deletions

View File

@ -1,3 +1,5 @@
import math
"""
Predefined methods
"""
@ -45,10 +47,11 @@ def value_linear(obj, step):
Saw patterned value, 0-1-0
"""
delta = obj.max - obj.min
if step < 50:
return obj.delta*step/50 + obj.min
return delta*step/50 + obj.min
else:
return obj.delta * (100-step)/50 + obj.min
return delta * (100-step)/50 + obj.min
def value_sin(obj, step):
@ -56,13 +59,16 @@ def value_sin(obj, step):
Sinusoidal values, 0-1-0 /\
"""
delta = obj.max - obj.min
radians = math.radians(1.8 * step)
return obj.delta * math.sin(radians) + obj.min
return delta * math.sin(radians) + obj.min
def value_cos(obj, step):
"""
Absolute Cosinusoidal values, 1-0-1 \\//
"""
delta = obj.max - obj.min
radians = math.radians(1.8 * step)
return obj.delta * abs(math.cos(radians)) + obj.min
return delta * abs(math.cos(radians)) + obj.min

View File

@ -2,6 +2,10 @@ import logging
import sys
import time
import threading
from dataclasses import dataclass
from queue import Queue
from .worker import ledWorker
from .methods import * # NOQA
@ -68,7 +72,8 @@ class ledPulse:
self.register_value_method(vm, eval(f"value_{vm}"))
# Add thread placeholder
self.worker = None
self.worker = ledWorker(self)
self.queue = Queue()
# Set default values
for key, value in self.defaultSet.items():
@ -104,11 +109,13 @@ class ledPulse:
if not (self.delayMethod in self.methods['delay']
and self.valueMethod in self.methods['value']):
self.log.error(f"Invalid parameters, delayMethod:{self.delayMethod}, "
self.log.error(f"Invalid parameters, "
f"delayMethod:{self.delayMethod}, "
f"valueMethod:{self.valueMethod}")
return
self.log.debug(f"fill cycle values table, valueMethod:{self.valueMethod}, "
self.log.debug(f"fill cycle values table, "
f"valueMethod:{self.valueMethod}, "
f"delayMethod:{self.delayMethod}, min:{self.min}, "
f"max:{self.max}, delay:{self.delay}")
@ -117,13 +124,12 @@ class ledPulse:
try:
delay = self.methods['delay'][self.delayMethod](self, step)
except NameError:
self.log.error(f"delay function for '{self.delayMethod}' "
"not found")
self.log.error(f"delay method '{self.delayMethod}' not found")
return
try:
value = self.methods['value'][self.valueMethod](self, step)
except NameError:
self.log.error(f"value function for '{self.valueMethod}' not found")
self.log.error(f"value method '{self.valueMethod}' not found")
return
values.append((value, delay))
return values
@ -215,3 +221,54 @@ class ledPulse:
self.log.info(f"registered {kind} method '{name}'")
else:
self.log.warning(f"{kind} method '{name}' already defined")
@dataclass
class Item:
parent: ledPulse
initialMethod: str = None
loopMethod: str = 'linear'
finalMethod: str = None
delayMethod: str = 'constant'
min: int = 2
max: int = 50
delay: float = 0.01
def __post_init__(self):
self.log = logging.getLogger(self.__class__.__name__)
for method in ["initial", "loop", "final"]:
methodName = f"{method}Method"
methodValue = getattr(self, methodName)
if methodValue and methodValue not in self.parent.methods['value']:
self.log.error(f"Invalid method, "
f"{methodName}: '{methodValue}'")
return None
if self.delayMethod not in self.parent.methods['delay']:
self.log.error(f"Invalid method, delayMethod:'{self.delayMethod}'")
return None
self.initial = self.calcValues(0, 50, self.initialMethod)
self.loop = self.calcValues(0, 100, self.loopMethod)
self.final = self.calcValues(50, 100, self.finalMethod)
@property
def timings(self):
initialTime = sum(map(lambda x: x[1], self.initial))
loopTime = sum(map(lambda x: x[1], self.loop))
finalTime = sum(map(lambda x: x[1], self.final))
total = initialTime + loopTime + finalTime
return (initialTime, loopTime, finalTime, total)
def calcValues(self, initialStep, finalStep, valueMethod):
values = []
if valueMethod:
valueMethodFn = self.parent.methods['value'][valueMethod]
delayMethodFn = self.parent.methods['delay'][self.delayMethod]
for step in range(initialStep, finalStep):
delay = delayMethodFn(self, step)
value = valueMethodFn(self, step)
values.append((value, delay))
return values

View File

@ -1,3 +1,4 @@
import logging
import threading
@ -12,11 +13,51 @@ class ledWorker(threading.Thread):
self.log = logging.getLogger(self.__class__.__name__)
self.parent = parent
self.changed = self.parent.changed
# self.changed = self.parent.changed
self.stop_event = threading.Event()
def run(self):
while True:
item = self.parent.queue.get()
# Initial
if self.initialMode:
self.log.debug('running initial step')
vals = self.initialValues()
for value, delay in vals:
self.setValue(value)
time.sleep(delay)
print('end initial step')
self.log.debug('end initial step')
vals = self.cycleValues()
while True:
if self.worker.stop_event.is_set():
# self.worker._started.clear()
self.worker.stop_event.clear()
self.log.debug("bailing out from infinite loop")
return
if self.worker.changed.is_set():
self.changed.clear()
self.log.debug("parameters changed, recalculate values...")
break
for step in range(len(vals)):
value, delay = vals[step]
self.setValue(value)
time.sleep(delay)
# Final
if self.finalMode:
print('running final step')
self.log.debug('running final step')
vals = self.finalValues()
for value, delay in vals:
self.setValue(value)
time.sleep(delay)
print('end final step')
self.log.debug('end final step')
self.log.debug(f"running {self.name} main loop...")
self.parent.loop()