Newat and clean

This commit is contained in:
2023-05-02 11:58:53 +02:00
parent e1b201376f
commit e9fefee6f0
4 changed files with 35 additions and 67 deletions

View File

@ -47,7 +47,9 @@ class ledPulse(threading.Thread):
self.stop_event = threading.Event() self.stop_event = threading.Event()
# Method to stop thread # Method to stop thread
# Like def stop(): self.stop_event.set() # Like
# def stop():
# self.stop_event.set()
# But shorter ;) # But shorter ;)
self.stop = self.stop_event.set self.stop = self.stop_event.set
@ -58,11 +60,17 @@ class ledPulse(threading.Thread):
setattr(self, key, value) setattr(self, key, value)
def pwm_setup(self): def pwm_setup(self):
"""
We check if we are running on RPi.
If yes we set supported to True and set setValue method
to changeDutyCycle, else
we set supported to False and create a dummy setValue method
"""
self.supported = False self.supported = False
self.moel = sys.platform self.model = sys.platform
try: try:
with op en('/sys/firmware/devicetree/base/model', 'r') as m: with open('/sys/firmware/devicetree/base/model', 'r') as m:
print("OPN")
model = m.read() model = m.read()
if model.lower().startswith('raspberry pi'): if model.lower().startswith('raspberry pi'):
self.supported = True self.supported = True
@ -72,24 +80,24 @@ class ledPulse(threading.Thread):
if self.supported: if self.supported:
import RPi.GPIO as GPIO import RPi.GPIO as GPIO
GPIO.setwarnings(False) # disable warnings GPIO.setwarnings(False) # disable warnings
GPIO.setmode(GPIO.BCM) # set pin numbering system GPIO.setmode(GPIO.BCM) # set pin numbering system
# set GPIO for output GPIO.setup(self.gpio, GPIO.OUT) # set GPIO for output
GPIO.setup(self.gpio, GPIO.OUT)
# create PWM instance with frequency # create PWM instance with 120Hz frequency
self.pwm = GPIO.PWM(self.gpio, 120) self.pwm = GPIO.PWM(self.gpio, 120)
self.pwm.start(0) # start pwm with value 0, off
# setValue function
self.setValue = self.pwm.ChangeDutyCycle self.setValue = self.pwm.ChangeDutyCycle
self.pwm.start(0) # start pwm with value 0
else: else:
# Platform not supported, set # Platform not supported, set
# a dummy setValue function # a dummy setValue function
if self.log.level == logging.DEBUG: if self.log.level == logging.DEBUG:
self.setValue = print self.setValue = print
else: else:
self.setValue = lambda x: x self.setValue = lambda x: x
return (False, sys.platform) return (self.supported, sys.platform)
def set(self, **kwargs): def set(self, **kwargs):
@ -99,7 +107,7 @@ class ledPulse(threading.Thread):
else: else:
self.log.warning(f"set: unknown parameter '{key}'") self.log.warning(f"set: unknown parameter '{key}'")
self.log.debug("calculate values: " self.log.debug("calculating values: "
f"initialMethod:{self.initialMethod}, " f"initialMethod:{self.initialMethod}, "
f"loopMethod:{self.loopMethod}, " f"loopMethod:{self.loopMethod}, "
f"finalMethod:{self.finalMethod}, " f"finalMethod:{self.finalMethod}, "
@ -161,17 +169,17 @@ class ledPulse(threading.Thread):
# Get an item from the queue, # Get an item from the queue,
# if queue is empty, wait for a new item # if queue is empty, wait for a new item
# item is a tuple with (initialValues, loopValues, finalValues) # item is a tuple with (initialValues, loopValues, finalValues)
item = self.queue.get() (initialValues, loopValues, finalValues) = self.queue.get()
# Initial loop # Initial loop
self.log.debug('running initial steps') self.log.debug('running initial steps')
for value, delay in item[0]: for value, delay in initialValues:
self.setValue(value) self.setValue(value)
time.sleep(delay) time.sleep(delay)
self.log.debug('running main loop') self.log.debug('running main loop')
while True: while True:
for value, delay in item[1]: for value, delay in loopValues:
self.setValue(value) self.setValue(value)
time.sleep(delay) time.sleep(delay)
@ -181,7 +189,7 @@ class ledPulse(threading.Thread):
break break
self.log.debug('running final steps') self.log.debug('running final steps')
for value, delay in item[2]: for value, delay in finalValues:
self.setValue(value) self.setValue(value)
time.sleep(delay) time.sleep(delay)

View File

@ -1,46 +0,0 @@
import logging
import threading
import time
class ledWorker(threading.Thread):
"""
This is the thread that runs the main loop
and actually drives the LED
"""
def __init__(self, parent):
super().__init__(name=parent.name, daemon=True)
self.log = logging.getLogger(self.__class__.__name__)
self.parent = parent
# self.changed = self.parent.changed
self.stop_event = threading.Event()
def run(self):
while True:
item = self.parent.queue.get()
# Empty item, we exit
if item == ([], [], []):
break
# Initial loop
self.log.debug('running initial steps')
for value, delay in item[0]:
self.parent.setValue(value)
time.sleep(delay)
self.log.debug('running main loop')
while True:
for value, delay in item[1]:
self.parent.setValue(value)
time.sleep(delay)
if not self.parent.queue.empty():
break
self.log.debug('running final steps')
for value, delay in item[2]:
self.parent.setValue(value)
time.sleep(delay)

View File

@ -1,3 +1,6 @@
[build-system] [build-system]
requires = ["setuptools", "wheel"] requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[tool.pyright]
reportMissingImports = "information"

View File

@ -11,10 +11,13 @@ led = pulses.ledPulse(12)
led.start() led.start()
led.set(delay=1/100, loopMethod="sin", max=20) led.set(delay=1/100, loopMethod="sin", max=20)
time.sleep(4) time.sleep(4)
led.set(delay=1/100, loopMethod="on", initialMethod="sin", finalMethod=None, max=40) led.set(delay=1/100, loopMethod="on", initialMethod="sin",
finalMethod=None, max=40)
time.sleep(4) time.sleep(4)
led.set(delay=1/100, loopMethod="cos", initialMethod=None, finalMethod="sin", max=20) led.set(delay=1/100, loopMethod="cos", initialMethod=None,
finalMethod="sin", max=20)
time.sleep(4) time.sleep(4)
led.set(delay=1/100, loopMethod="on", initialMethod="sin", finalMethod="sin", max=40) led.set(delay=1/100, loopMethod="on", initialMethod="sin",
finalMethod="sin", max=40)
time.sleep(4) time.sleep(4)
led.stop() led.stop()