From e9fefee6f09903c532566bba3e150435d6b53455 Mon Sep 17 00:00:00 2001 From: Andrea Mistrali Date: Tue, 2 May 2023 11:58:53 +0200 Subject: [PATCH] Newat and clean --- pulses/pulses.py | 44 ++++++++++++++++++++++++++------------------ pulses/worker.py | 46 ---------------------------------------------- pyproject.toml | 3 +++ test.py | 9 ++++++--- 4 files changed, 35 insertions(+), 67 deletions(-) delete mode 100644 pulses/worker.py diff --git a/pulses/pulses.py b/pulses/pulses.py index 3632dd6..888e7b2 100644 --- a/pulses/pulses.py +++ b/pulses/pulses.py @@ -47,7 +47,9 @@ class ledPulse(threading.Thread): self.stop_event = threading.Event() # Method to stop thread - # Like def stop(): self.stop_event.set() + # Like + # def stop(): + # self.stop_event.set() # But shorter ;) self.stop = self.stop_event.set @@ -58,11 +60,17 @@ class ledPulse(threading.Thread): setattr(self, key, value) def pwm_setup(self): + """ + We check if we are running on RPi. + If yes we set supported to True and set setValue method + to changeDutyCycle, else + we set supported to False and create a dummy setValue method + """ + self.supported = False - self.moel = sys.platform + self.model = sys.platform try: - with op en('/sys/firmware/devicetree/base/model', 'r') as m: - print("OPN") + with open('/sys/firmware/devicetree/base/model', 'r') as m: model = m.read() if model.lower().startswith('raspberry pi'): self.supported = True @@ -72,24 +80,24 @@ class ledPulse(threading.Thread): if self.supported: import RPi.GPIO as GPIO - GPIO.setwarnings(False) # disable warnings - GPIO.setmode(GPIO.BCM) # set pin numbering system - # set GPIO for output - GPIO.setup(self.gpio, GPIO.OUT) + GPIO.setwarnings(False) # disable warnings + GPIO.setmode(GPIO.BCM) # set pin numbering system + GPIO.setup(self.gpio, GPIO.OUT) # set GPIO for output - # create PWM instance with frequency + # create PWM instance with 120Hz frequency self.pwm = GPIO.PWM(self.gpio, 120) + self.pwm.start(0) # start pwm with value 0, off + # setValue function self.setValue = self.pwm.ChangeDutyCycle - self.pwm.start(0) # start pwm with value 0 else: # Platform not supported, set # a dummy setValue function if self.log.level == logging.DEBUG: - self.setValue = print + self.setValue = print else: - self.setValue = lambda x: x + self.setValue = lambda x: x - return (False, sys.platform) + return (self.supported, sys.platform) def set(self, **kwargs): @@ -99,7 +107,7 @@ class ledPulse(threading.Thread): else: self.log.warning(f"set: unknown parameter '{key}'") - self.log.debug("calculate values: " + self.log.debug("calculating values: " f"initialMethod:{self.initialMethod}, " f"loopMethod:{self.loopMethod}, " f"finalMethod:{self.finalMethod}, " @@ -161,17 +169,17 @@ class ledPulse(threading.Thread): # Get an item from the queue, # if queue is empty, wait for a new item # item is a tuple with (initialValues, loopValues, finalValues) - item = self.queue.get() + (initialValues, loopValues, finalValues) = self.queue.get() # Initial loop self.log.debug('running initial steps') - for value, delay in item[0]: + for value, delay in initialValues: self.setValue(value) time.sleep(delay) self.log.debug('running main loop') while True: - for value, delay in item[1]: + for value, delay in loopValues: self.setValue(value) time.sleep(delay) @@ -181,7 +189,7 @@ class ledPulse(threading.Thread): break self.log.debug('running final steps') - for value, delay in item[2]: + for value, delay in finalValues: self.setValue(value) time.sleep(delay) diff --git a/pulses/worker.py b/pulses/worker.py deleted file mode 100644 index 078e698..0000000 --- a/pulses/worker.py +++ /dev/null @@ -1,46 +0,0 @@ -import logging -import threading -import time - - -class ledWorker(threading.Thread): - """ - This is the thread that runs the main loop - and actually drives the LED - """ - - def __init__(self, parent): - super().__init__(name=parent.name, daemon=True) - self.log = logging.getLogger(self.__class__.__name__) - self.parent = parent - - # self.changed = self.parent.changed - - self.stop_event = threading.Event() - - def run(self): - - while True: - item = self.parent.queue.get() - # Empty item, we exit - if item == ([], [], []): - break - - # Initial loop - self.log.debug('running initial steps') - for value, delay in item[0]: - self.parent.setValue(value) - time.sleep(delay) - - self.log.debug('running main loop') - while True: - for value, delay in item[1]: - self.parent.setValue(value) - time.sleep(delay) - if not self.parent.queue.empty(): - break - - self.log.debug('running final steps') - for value, delay in item[2]: - self.parent.setValue(value) - time.sleep(delay) diff --git a/pyproject.toml b/pyproject.toml index 9787c3b..9a05d02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ [build-system] requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" + +[tool.pyright] +reportMissingImports = "information" diff --git a/test.py b/test.py index 84c4335..20aa51b 100644 --- a/test.py +++ b/test.py @@ -11,10 +11,13 @@ led = pulses.ledPulse(12) led.start() led.set(delay=1/100, loopMethod="sin", max=20) time.sleep(4) -led.set(delay=1/100, loopMethod="on", initialMethod="sin", finalMethod=None, max=40) +led.set(delay=1/100, loopMethod="on", initialMethod="sin", + finalMethod=None, max=40) time.sleep(4) -led.set(delay=1/100, loopMethod="cos", initialMethod=None, finalMethod="sin", max=20) +led.set(delay=1/100, loopMethod="cos", initialMethod=None, + finalMethod="sin", max=20) time.sleep(4) -led.set(delay=1/100, loopMethod="on", initialMethod="sin", finalMethod="sin", max=40) +led.set(delay=1/100, loopMethod="on", initialMethod="sin", + finalMethod="sin", max=40) time.sleep(4) led.stop()