Changed to thread

This commit is contained in:
Andrea Mistrali 2023-05-02 06:46:41 +02:00
parent 0ade9119b0
commit 62b97f3f01
No known key found for this signature in database
GPG Key ID: 6FB0A77F6D5DA9B2
1 changed files with 82 additions and 57 deletions

View File

@ -3,23 +3,14 @@ import sys
import time
import threading
from queue import Queue
from .worker import ledWorker
from .methods import * # NOQA
if sys.platform == "linux":
import RPi.GPIO as GPIO
# if sys.platform == "linux":
# import RPi.GPIO as GPIO
class ledPulse:
"""
There are 4 states of pulsing:
- on: steady light at value;
- off: turn off led;
- normal: pulse with delay;
- fast: pulse with delay/2;
"""
class ledPulse(threading.Thread):
delayMethods = ['constant', 'sin', 'cos']
valueMethods = ['on', 'off', 'linear', 'sin', 'cos']
@ -27,7 +18,7 @@ class ledPulse:
defaultSet = {
'min': 2,
'max': 100,
'max': 50,
'delay': 0.01,
'delayMethod': 'constant',
'initialMethod': None,
@ -36,28 +27,12 @@ class ledPulse:
}
def __init__(self, gpio, name="led0"):
self.log = logging.getLogger(self.__class__.__name__)
self.gpio = gpio
self.name = name
super().__init__(name=name, daemon=False)
self.changed = threading.Event()
if sys.platform == "linux":
GPIO.setwarnings(False) # disable warnings
GPIO.setmode(GPIO.BCM) # set pin numbering system
# set GPIO for output
GPIO.setup(self.gpio, GPIO.OUT)
# create PWM instance with frequency
self.pwm = GPIO.PWM(self.gpio, 120)
# self.setValue = self.pwm.ChangeDutyCycle
self.supported = True
# self.pwm.start(0) # start pwm with value 0
else:
# Not supported, skip setValue
# self.setValue = print
self.supported = False
# self.setValue = lambda x: x
self.pwm_setup()
self.log.info(f"platform '{sys.platform}' " +
f"support: {sys.platform == 'linux'}")
@ -68,35 +43,56 @@ class ledPulse:
for vm in self.valueMethods:
self.register_value_method(vm, eval(f"value_{vm}"))
# Add thread placeholder
self.worker = ledWorker(self)
self.stop_event = threading.Event()
# Method to stop thread
self.stop = self.stop_event.set
self.queue = Queue()
# Set default values
for key, value in self.defaultSet.items():
setattr(self, key, value)
def setValue(self, value):
def pwm_setup(self):
try:
with open('/sys/firmware/devicetree/base/model', 'r') as m:
model = m.read()
if model.lower().startswith('raspberry pi'):
self.supported = True
self.model = model
except Exception:
self.supported = False
self.model = sys.platform
if self.supported:
self.pwm.ChangeDutyCycle(value)
import RPi.GPIO as GPIO
GPIO.setwarnings(False) # disable warnings
GPIO.setmode(GPIO.BCM) # set pin numbering system
# set GPIO for output
GPIO.setup(self.gpio, GPIO.OUT)
# create PWM instance with frequency
self.pwm = GPIO.PWM(self.gpio, 120)
self.setValue = self.pwm.ChangeDutyCycle
self.pwm.start(0) # start pwm with value 0
else:
# on unsupported platforms, we just print the value
print(value)
# Not supported, skip setValue
# self.setValue = lambda x: x
self.setValue = print
return (False, sys.platform)
def set(self, **kwargs):
# for key, value in self.defaultSet.items():
# setattr(self, key, value)
for key, value in kwargs.items():
if key in self.defaultSet.keys():
setattr(self, key, value)
self._set = True
else:
self.log.warning(f"set: unknown parameter '{key}'")
self.log.debug(f"fill values table, "
self.log.debug("calculate values: "
f"initialMethod:{self.initialMethod}, "
f"loopMethod:{self.loopMethod}, "
f"finalMethod:{self.finalMethod}, "
f"delayMethod:{self.delayMethod}, min:{self.min}, "
f"max:{self.max}, delay:{self.delay}")
@ -113,9 +109,11 @@ class ledPulse:
if self.initialMethod:
for step in range(0, 50):
try:
value = self.methods['value'][self.initialMethod](self, step)
value = self.methods['value'][self.initialMethod](self,
step)
except KeyError:
self.log.error(f"value method '{self.initialMethod}' not found")
self.log.error(f"value method '{self.initialMethod}' "
"not found")
return
initialValues.append((value, delayValues[step]))
@ -134,28 +132,55 @@ class ledPulse:
try:
value = self.methods['value'][self.finalMethod](self, step)
except KeyError:
self.log.error(f"value method '{self.finalMethod}' not found")
self.log.error(f"value method '{self.finalMethod}' "
"not found")
return
finalValues.append((value, delayValues[step]))
self.queue.put((initialValues, loopValues, finalValues))
def start(self):
if self.supported:
self.pwm.start(0)
self.worker = ledWorker(self)
self.worker.start()
def run(self):
def stop(self):
self.queue.put(([], [], []))
self.worker.join()
while True:
if self.stop_event.is_set():
self.log.debug("break from main loop, bye")
if self.supported:
self.pwm.stop()
self.worker = None
break
# Get an item from the queue,
# if queue is empty, wait for a new item
# item is a tuple with (initialValues, loopValues, finalValues)
item = self.queue.get()
# Initial loop
self.log.debug('running initial steps')
for value, delay in item[0]:
self.setValue(value)
time.sleep(delay)
self.log.debug('running main loop')
while True:
for value, delay in item[1]:
self.setValue(value)
time.sleep(delay)
# After each loop we check if there is something waiting in
# queue or if stop has been raised
if not self.queue.empty() or self.stop_event.is_set():
break
self.log.debug('running final steps')
for value, delay in item[2]:
self.setValue(value)
time.sleep(delay)
###
# PLUGINS METHODS
# We register new functions for delay or value
# Function must get `step` variable, specifing at which step
# in the loop we are and is added as a method to self, so
# it has access to all the properties of self: max, min, delay
###
def register_value_method(self, name, fn):
self.register_method('value', name, fn)