Initial refactor

This commit is contained in:
Andrea Mistrali 2023-05-01 07:20:24 +02:00
parent 20c5d9c4ba
commit ba50af7c5e
No known key found for this signature in database
GPG Key ID: 6FB0A77F6D5DA9B2
6 changed files with 131 additions and 132 deletions

68
pulses/methods.py Normal file
View File

@ -0,0 +1,68 @@
"""
Predefined methods
"""
"""
Delay methods
"""
def delay_sin(obj, step):
return math.sin(math.radians(1.8*step)) * obj.delay
def delay_cos(obj, step):
return abs(math.cos(math.radians(1.8*step))) * obj.delay
def delay_constant(obj, step):
return obj.delay
"""
Values methods
"""
def value_on(obj, step):
"""
Always on at "max" brightness
"""
return obj.max
def value_off(obj, step):
"""
Always off
"""
return 0
def value_linear(obj, step):
"""
Saw patterned value, 0-1-0
"""
if step < 50:
return obj.delta*step/50 + obj.min
else:
return obj.delta * (100-step)/50 + obj.min
def value_sin(obj, step):
"""
Sinusoidal values, 0-1-0 /\
"""
radians = math.radians(1.8 * step)
return obj.delta * math.sin(radians) + obj.min
def value_cos(obj, step):
"""
Absolute Cosinusoidal values, 1-0-1 \\//
"""
radians = math.radians(1.8 * step)
return obj.delta * abs(math.cos(radians)) + obj.min

View File

@ -1,37 +1,14 @@
import logging
import sys
import math
import time
import threading
from .worker import ledWorker
from .methods import * # NOQA
if sys.platform == "linux":
import RPi.GPIO as GPIO
class ledWorker(threading.Thread):
"""
This is the thread that runs the main loop
and actually drives the LED
"""
def __init__(self, parent):
super().__init__(name=parent.name, daemon=True)
self.log = logging.getLogger(self.__class__.__name__)
self.parent = parent
self.changed = self.parent.changed
self.stop_event = threading.Event()
def run(self):
self.log.debug(f"running {self.name} main loop...")
self.parent.loop()
def stop(self):
self.stop_event.set()
self.log.debug(f"stopping {self.name}...")
class ledPulse:
"""
There are 4 states of pulsing:
@ -41,17 +18,17 @@ class ledPulse:
- fast: pulse with delay/2;
"""
delayModes = ['constant', 'sin', 'cos']
valueModes = ['on', 'off', 'linear', 'sin', 'cos']
modes = {'delay': {}, 'value': {}}
delayMethods = ['constant', 'sin', 'cos']
valueMethods = ['on', 'off', 'linear', 'sin', 'cos']
methods = {'delay': {}, 'value': {}}
defaultSet = {
'min': 2,
'max': 100,
'delay': 0.01,
'delayMode': 'constant',
'delayMethod': 'constant',
'initialMode': None,
'valueMode': 'linear',
'valueMethod': 'linear',
'finalMode': None
}
@ -84,10 +61,10 @@ class ledPulse:
self.log.info(f"platform '{sys.platform}' " +
f"support: {sys.platform == 'linux'}")
for dm in self.delayModes:
for dm in self.delayMethods:
self.register_delay_method(dm, eval(f"delay_{dm}"))
for vm in self.valueModes:
for vm in self.valueMethods:
self.register_value_method(vm, eval(f"value_{vm}"))
# Add thread placeholder
@ -104,11 +81,6 @@ class ledPulse:
# on unsupported platforms, we just print the value
print(value)
# def set(self, min=2, max=50, delay=1,
# delayMode='constant',
# valueMode='linear',
# initialMode=None,
# finalMode=None):
def set(self, **kwargs):
# for key, value in self.defaultSet.items():
@ -122,45 +94,36 @@ class ledPulse:
self.log.warning(f"set: unknown parameter '{key}'")
self.delta = self.max - self.min
# self.min = min
# self.max = max
# self.delay = delay
# self.delayMode = delayMode
# self.valueMode = valueMode
# self.initialMode = initialMode
# self.finalMode = finalMode
if self._set:
if self.worker and self.worker.is_alive():
print("worker running, notify change")
self.worker.changed.set()
# VALUES CALCULATION
def calcValues(self, initialStep, finalStep):
if not (self.delayMode in self.modes['delay']
and self.valueMode in self.modes['value']):
self.log.error(f"Invalid parameters, delayMode:{self.delayMode}, "
f"valueMode:{self.valueMode}")
if not (self.delayMethod in self.methods['delay']
and self.valueMethod in self.methods['value']):
self.log.error(f"Invalid parameters, delayMethod:{self.delayMethod}, "
f"valueMethod:{self.valueMethod}")
return
self.log.debug(f"fill cycle values table, valueMode:{self.valueMode}, "
f"delayMode:{self.delayMode}, min:{self.min}, "
self.log.debug(f"fill cycle values table, valueMethod:{self.valueMethod}, "
f"delayMethod:{self.delayMethod}, min:{self.min}, "
f"max:{self.max}, delay:{self.delay}")
values = []
for step in range(initialStep, finalStep):
try:
delay = self.modes['delay'][self.delayMode](self, step)
delay = self.methods['delay'][self.delayMethod](self, step)
except NameError:
self.log.error(f"delay function for '{self.delayMode}' "
self.log.error(f"delay function for '{self.delayMethod}' "
"not found")
return
try:
value = self.modes['value'][self.valueMode](self, step)
value = self.methods['value'][self.valueMethod](self, step)
except NameError:
self.log.error(f"value function for '{self.valueMode}' not found")
self.log.error(f"value function for '{self.valueMethod}' not found")
return
values.append((value, delay))
return values
@ -247,72 +210,8 @@ class ledPulse:
self.log.warning(f"unknown kind {kind}")
return
if name not in self.modes[kind]:
self.modes[kind][name] = fn
if name not in self.methods[kind]:
self.methods[kind][name] = fn
self.log.info(f"registered {kind} method '{name}'")
else:
self.log.warning(f"{kind} method '{name}' already defined")
####
# Predefined functions
###
# Delays
###
def delay_sin(obj, step):
return math.sin(math.radians(1.8*step)) * obj.delay
def delay_cos(obj, step):
return abs(math.cos(math.radians(1.8*step))) * obj.delay
def delay_constant(obj, step):
return obj.delay
###
# Values
###
def value_on(obj, step):
"""
Always on at "max" brightness
"""
return obj.max
def value_off(obj, step):
"""
Always off
"""
return 0
def value_linear(obj, step):
"""
Saw patterned value, 0-1-0
"""
if step < 50:
return obj.delta*step/50 + obj.min
else:
return obj.delta * (100-step)/50 + obj.min
def value_sin(obj, step):
"""
Sinusoidal values, 0-1-0 /\
"""
radians = math.radians(1.8 * step)
return obj.delta * math.sin(radians) + obj.min
def value_cos(obj, step):
"""
Absolute Cosinusoidal values, 1-0-1 \\//
"""
radians = math.radians(1.8 * step)
return obj.delta * abs(math.cos(radians)) + obj.min

25
pulses/worker.py Normal file
View File

@ -0,0 +1,25 @@
import threading
class ledWorker(threading.Thread):
"""
This is the thread that runs the main loop
and actually drives the LED
"""
def __init__(self, parent):
super().__init__(name=parent.name, daemon=True)
self.log = logging.getLogger(self.__class__.__name__)
self.parent = parent
self.changed = self.parent.changed
self.stop_event = threading.Event()
def run(self):
self.log.debug(f"running {self.name} main loop...")
self.parent.loop()
def stop(self):
self.stop_event.set()
self.log.debug(f"stopping {self.name}...")

View File

@ -1,3 +0,0 @@
build
PyYAML
rpi-backlight

View File

@ -1,5 +1,5 @@
[metadata]
name = mblight
name = pulses
version = 0.90
author = Andrea Mistrali
author_email = akelge@gmail.com
@ -10,12 +10,12 @@ keywords = graylog, py3
license = BSD 3-Clause License
[options]
packages = mblight
packages = pulses
python_requires = >=3
include_package_data = True
install_requires =
PyYAML
# install_requires =
# PyYAML
[options.package_data]
* =
config.yaml.example
# [options.package_data]
# * =
# config.yaml.example

10
test.py Normal file
View File

@ -0,0 +1,10 @@
import logging
from pulses import pulses
logging.basicConfig(format='%(name)s %(threadName)s %(msg)s')
log = logging.getLogger()
log.setLevel('DEBUG')
led = pulses.ledPulse(12)
led.set(delay=1/100, valueMode="sin", max=20)