Should work

This commit is contained in:
Andrea Mistrali 2023-05-01 12:09:37 +02:00
parent eac3e8c3da
commit 945e816b17
No known key found for this signature in database
GPG Key ID: 6FB0A77F6D5DA9B2
1 changed files with 51 additions and 125 deletions

View File

@ -31,9 +31,9 @@ class ledPulse:
'max': 100, 'max': 100,
'delay': 0.01, 'delay': 0.01,
'delayMethod': 'constant', 'delayMethod': 'constant',
'initialMode': None, 'initialMethod': None,
'valueMethod': 'linear', 'loopMethod': 'linear',
'finalMode': None 'finalMethod': None
} }
def __init__(self, gpio, name="led0"): def __init__(self, gpio, name="led0"):
@ -42,8 +42,6 @@ class ledPulse:
self.name = name self.name = name
self.changed = threading.Event() self.changed = threading.Event()
self.initialStep = 0
self._set = False
if sys.platform == "linux": if sys.platform == "linux":
GPIO.setwarnings(False) # disable warnings GPIO.setwarnings(False) # disable warnings
@ -97,51 +95,51 @@ class ledPulse:
self._set = True self._set = True
else: else:
self.log.warning(f"set: unknown parameter '{key}'") self.log.warning(f"set: unknown parameter '{key}'")
self.delta = self.max - self.min
if self._set: self.log.debug(f"fill values table, "
if self.worker and self.worker.is_alive(): f"loopMethod:{self.loopMethod}, "
print("worker running, notify change")
self.worker.changed.set()
# VALUES CALCULATION
def calcValues(self, initialStep, finalStep):
if not (self.delayMethod in self.methods['delay']
and self.valueMethod in self.methods['value']):
self.log.error(f"Invalid parameters, "
f"delayMethod:{self.delayMethod}, "
f"valueMethod:{self.valueMethod}")
return
self.log.debug(f"fill cycle values table, "
f"valueMethod:{self.valueMethod}, "
f"delayMethod:{self.delayMethod}, min:{self.min}, " f"delayMethod:{self.delayMethod}, min:{self.min}, "
f"max:{self.max}, delay:{self.delay}") f"max:{self.max}, delay:{self.delay}")
values = [] delayValues = []
for step in range(initialStep, finalStep): for step in range(0, 100):
try: try:
delay = self.methods['delay'][self.delayMethod](self, step) delay = self.methods['delay'][self.delayMethod](self, step)
except NameError: except NameError:
self.log.error(f"delay method '{self.delayMethod}' not found") self.log.error(f"delay method '{self.delayMethod}' not found")
return return
delayValues.append(delay)
initialValues = []
if self.initialMethod:
for step in range(0, 50):
try: try:
value = self.methods['value'][self.valueMethod](self, step) value = self.methods['value'][self.initialMethod](self, step)
except NameError: except KeyError:
self.log.error(f"value method '{self.valueMethod}' not found") self.log.error(f"value method '{self.initialMethod}' not found")
return return
values.append((value, delay)) initialValues.append((value, delayValues[step]))
return values
def cycleValues(self): loopValues = []
return self.calcValues(0, 100) for step in range(0, 100):
try:
value = self.methods['value'][self.loopMethod](self, step)
except KeyError:
self.log.error(f"value method '{self.loopMethod}' not found")
return
loopValues.append((value, delayValues[step]))
def initialValues(self): finalValues = []
return self.calcValues(0, 50) if self.finalMethod:
for step in range(50, 100):
try:
value = self.methods['value'][self.finalMethod](self, step)
except KeyError:
self.log.error(f"value method '{self.finalMethod}' not found")
return
finalValues.append((value, delayValues[step]))
def finalValues(self): self.queue.put((initialValues, loopValues, finalValues))
return self.calcValues(50, 100)
def start(self): def start(self):
if self.supported: if self.supported:
@ -158,49 +156,28 @@ class ledPulse:
def loop(self): def loop(self):
if not self._set:
self.log.error("led modes not set, use .set() first")
return
while True: while True:
item = self.queue.get()
# Initial # Initial loop
if self.initialMode: self.log.debug('running initial steps')
self.log.debug('running initial step') for value, delay in item[0]:
vals = self.initialValues()
for value, delay in vals:
self.setValue(value) self.setValue(value)
time.sleep(delay) time.sleep(delay)
print('end initial step')
self.log.debug('end initial step')
vals = self.cycleValues() self.log.debug('running main loop')
while True: while True:
if self.worker.stop_event.is_set(): for value, delay in item[1]:
# self.worker._started.clear() self.setValue(value)
self.worker.stop_event.clear() time.sleep(delay)
self.log.debug("bailing out from infinite loop") if not self.queue.empty()
return
if self.worker.changed.is_set():
self.changed.clear()
self.log.debug("parameters changed, recalculate values...")
break break
for step in range(len(vals)):
value, delay = vals[step] self.log.debug('running final steps')
for value, delay in item[2]:
self.setValue(value) self.setValue(value)
time.sleep(delay) time.sleep(delay)
# Final
if self.finalMode:
print('running final step')
self.log.debug('running final step')
vals = self.finalValues()
for value, delay in vals:
self.setValue(value)
time.sleep(delay)
print('end final step')
self.log.debug('end final step')
### ###
# PLUGINS METHODS # PLUGINS METHODS
@ -221,54 +198,3 @@ class ledPulse:
self.log.info(f"registered {kind} method '{name}'") self.log.info(f"registered {kind} method '{name}'")
else: else:
self.log.warning(f"{kind} method '{name}' already defined") self.log.warning(f"{kind} method '{name}' already defined")
@dataclass
class Item:
parent: ledPulse
initialMethod: str = None
loopMethod: str = 'linear'
finalMethod: str = None
delayMethod: str = 'constant'
min: int = 2
max: int = 50
delay: float = 0.01
def __post_init__(self):
self.log = logging.getLogger(self.__class__.__name__)
for method in ["initial", "loop", "final"]:
methodName = f"{method}Method"
methodValue = getattr(self, methodName)
if methodValue and methodValue not in self.parent.methods['value']:
self.log.error(f"Invalid method, "
f"{methodName}: '{methodValue}'")
return None
if self.delayMethod not in self.parent.methods['delay']:
self.log.error(f"Invalid method, delayMethod:'{self.delayMethod}'")
return None
self.initial = self.calcValues(0, 50, self.initialMethod)
self.loop = self.calcValues(0, 100, self.loopMethod)
self.final = self.calcValues(50, 100, self.finalMethod)
@property
def timings(self):
initialTime = sum(map(lambda x: x[1], self.initial))
loopTime = sum(map(lambda x: x[1], self.loop))
finalTime = sum(map(lambda x: x[1], self.final))
total = initialTime + loopTime + finalTime
return (initialTime, loopTime, finalTime, total)
def calcValues(self, initialStep, finalStep, valueMethod):
values = []
if valueMethod:
valueMethodFn = self.parent.methods['value'][valueMethod]
delayMethodFn = self.parent.methods['delay'][self.delayMethod]
for step in range(initialStep, finalStep):
delay = delayMethodFn(self, step)
value = valueMethodFn(self, step)
values.append((value, delay))
return values