Compare commits
No commits in common. "master" and "0.90" have entirely different histories.
134
README.md
134
README.md
|
@ -6,141 +6,15 @@ Pulses is a python module to drive LEDs on RPi using PWM (Pulse Width Modulation
|
|||
|
||||
1. Clone the repo
|
||||
|
||||
`https://gitea.mistrali.pw/musicalbox/pulses.git`
|
||||
`git clone https://web.mistrali.pw/gitea/musicalbox/pulses.git`
|
||||
|
||||
2. `apt-get install python3-venv`
|
||||
3. `pip install requirements-dev.txt`
|
||||
4. `poetry build`
|
||||
5. `pip install dist/pulses-<version>-py3-none-any.whl`
|
||||
|
||||
|
||||
### GPIO and PWM
|
||||
For more info on PWM refer to [PWM](https://en.wikipedia.org/wiki/Pulse-width_modulation) and for information on the different GPIO pin you can read [this](https://projects.raspberrypi.org/en/projects/physical-computing/1).
|
||||
4. `./build.sh`
|
||||
5. `pip install dist/pulses-0.90-py3-none-any.whl`
|
||||
|
||||
|
||||
### CLI tool
|
||||
|
||||
There is a CLI tool, called `pulses` that you can use to test loops and pulses.
|
||||
Run `pulses -h` to read the usage instructions.
|
||||
|
||||
### Work model
|
||||
|
||||
Each `ledPulse` object has some attributes that define the **pulse**, i.e. the light pattern of the managed LED.
|
||||
|
||||
The state of the LED is controlled by the following parameters:
|
||||
- **min**: minimum brightness value, I suggest to avoid using `0`, start from `2`;
|
||||
- **max**: maximum brightness;
|
||||
- **delay**: the base delay between each step of the pattern;
|
||||
- **initialMethod**: the method used to calculate the initial steps of the pattern;
|
||||
- **loopMethod**: the method used to calculate the steps of the main loop;
|
||||
- **finalMethod**: the method used to calculate the final steps of the loop;
|
||||
- **delayMethod**: the method used to calculate the delay between each step;
|
||||
|
||||
Each pattern starts running 50 steps of initial values, then goes into a repeteated loop, on exit it runs other 50 steps of final values.
|
||||
|
||||
To change the above parameters we use a FIFO queue, each time we `set` a new value of one of the above attributes, `ledPulse` will calculate a `tuple` of 3 values, each element of the tuple is in turn an array of 2-ples, each of these 2-ples has the first element as the `value` at a specific step, the second element is the `delay` at a specific step.
|
||||
|
||||
- `initialValues`: 50 elements, if an initialMethod is defined, empy otherwise. These are the values that define the pattern of the LED at the start of the new loop;
|
||||
- `loopValues`: 100 elements, never empy. These are the values that define the pattern of the LED at the core infinite loop;
|
||||
- `finalValues`: 50 elements, if a finalMethod is defined, empy otherwise. These are the values that define the pattern of the LED at the end of a terminating loop;
|
||||
|
||||
#### An example
|
||||
|
||||
Let's say we define a new pattern that has:
|
||||
- `initialMethod = linear`;
|
||||
- `loopMethod = cos`;
|
||||
- `finalMethod = linear`;
|
||||
- `delayMethod = constant`;
|
||||
- `delay = 0.01`;
|
||||
- `min = 2`;
|
||||
- `max = 50`;
|
||||
|
||||
The LED starts from brightness `0`, in 50 steps goes linearly to brightness `50`, i.e. the brightness will increate of 1 at each step, there is a delay of `0.01` seconds between each step.
|
||||
After these first 50 steps, there will be a repeating loop, looping on `cosine` values, so going from brightness 50 (value of max), down to brightness 2 (value of min), the up again to brightness 50, still with a constant delay of 0.01 between each step.
|
||||
The moment we will add a new tuple of values to the queue, using the `set()` method, the worker will exit from the repeating loop, run on 50 steps of the final values and then start with a new similar loop with the new values.
|
||||
|
||||
### Class
|
||||
|
||||
The only class defined is called `ledPulse`, it is derived from `threading.Thread`
|
||||
and takes care of managing the led connected to a PWM GPIO.
|
||||
|
||||
#### Methods
|
||||
`__init__`: quite simple, the only parameter is the GPIO pin we want to use. This method takes care of:
|
||||
- setting up logging;
|
||||
- setting up PWM;
|
||||
- installing the default methods for loops and delays;
|
||||
- set up the queue that we will use later on;
|
||||
|
||||
`set()`: change one or more parameter of the state. Bear in mind that each parameter is standalone, so if `min` is set to `2` and `max` is set to `20`, calling `led.set(max=40)` will only change the value of `max` parameter, leaving all the others at the previous value. After successfully setting a parameter, the values for `initial`, `loop`, `final` and `delay` are recalculated and a new tuple is added to the queue;
|
||||
|
||||
`run`: this method runs the main loop, that executes first a loop of 50 steps with `initialValues` if this is not empty, then loops on `loopValues` until there is a new tuple in the queue OR the `stop_event` event is set. If there is a new tuple in the queue, the infinite loop on `loopValues` is interrupted, another loop of 50 steps runs through `finalValues` (if not empty) and then we go to the main loop, pull the tuple from the queue and start all over, unless `stop_event` is set; if that's the case we bail out of the external loop and the thread is going to stop;
|
||||
|
||||
#### Plugins
|
||||
Pulses uses `plugin methods` to calculate the values and delays for each loop.
|
||||
These plugin methods are nothing more than python functions, following this prototypes:
|
||||
|
||||
- for value plugins:
|
||||
```python
|
||||
def value_methodname(obj, step):
|
||||
return <value for step>
|
||||
```
|
||||
|
||||
- for delay plugins:
|
||||
```python
|
||||
def delay_methodname(obj, step):
|
||||
return <delay value for step>
|
||||
```
|
||||
|
||||
`obj` is the LED object that uses the method, so you can refer to the attributes of if, like `obj.max` or `obj.min`.
|
||||
|
||||
Some examples, that are predefined in Pulses:
|
||||
|
||||
```python
|
||||
def delay_constant(obj, step):
|
||||
# Delay method: constant
|
||||
return obj.delay
|
||||
|
||||
def value_sin(obj, step):
|
||||
"""
|
||||
Value method: sin
|
||||
Sinusoidal values, 0-1-0 /\
|
||||
"""
|
||||
|
||||
delta = obj.max - obj.min
|
||||
radians = math.radians(1.8 * step)
|
||||
return delta * math.sin(radians) + obj.min
|
||||
|
||||
def value_on(obj, step):
|
||||
"""
|
||||
Value method: on
|
||||
Always on at "max" brightness
|
||||
"""
|
||||
|
||||
return obj.max
|
||||
```
|
||||
|
||||
You can write your own method plugins, trying to keep them quite simple possibly, then you have to register them before being able to use them.
|
||||
You can use two specific methods to register plugins based on their kind:
|
||||
|
||||
- `register_delay_method(methodName, function)`;
|
||||
- `register_value_method(methodName, function)`;
|
||||
|
||||
For example, we can write a function that returns a random value between `min` and `max`:
|
||||
|
||||
in `mymodule.py`:
|
||||
```python
|
||||
def value_random(obj, step):
|
||||
return random.randint(obj.min, obj.max)
|
||||
```
|
||||
|
||||
then we can register it
|
||||
|
||||
```python
|
||||
from pulses import ledPulse
|
||||
from mymodule import value_random
|
||||
|
||||
led = ledPulse(12)
|
||||
led.register_value_method('random', value_random)
|
||||
led.set(loopMethod='random')
|
||||
```
|
||||
|
||||
from now on we can call `led.set(loopValue='random')` and our LED will blink with a random value at each step.
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
#!/bin/sh
|
||||
|
||||
if [ "$1" = "clean" ]; then
|
||||
rm -rf build
|
||||
rm -rf dist
|
||||
rm -rf *.egg-info
|
||||
fi
|
||||
|
||||
python -m build .
|
|
@ -1,21 +0,0 @@
|
|||
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "rpi-gpio"
|
||||
version = "0.7.1"
|
||||
description = "A module to control Raspberry Pi GPIO channels"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "RPi.GPIO-0.7.1-cp27-cp27mu-linux_armv6l.whl", hash = "sha256:b86b66dc02faa5461b443a1e1f0c1d209d64ab5229696f32fb3b0215e0600c8c"},
|
||||
{file = "RPi.GPIO-0.7.1-cp310-cp310-linux_armv6l.whl", hash = "sha256:57b6c044ef5375a78c8dda27cdfadf329e76aa6943cd6cffbbbd345a9adf9ca5"},
|
||||
{file = "RPi.GPIO-0.7.1-cp37-cp37m-linux_armv6l.whl", hash = "sha256:77afb817b81331ce3049a4b8f94a85e41b7c404d8e56b61ac0f1eb75c3120868"},
|
||||
{file = "RPi.GPIO-0.7.1-cp38-cp38-linux_armv6l.whl", hash = "sha256:29226823da8b5ccb9001d795a944f2e00924eeae583490f0bc7317581172c624"},
|
||||
{file = "RPi.GPIO-0.7.1-cp39-cp39-linux_armv6l.whl", hash = "sha256:15311d3b063b71dee738cd26570effc9985a952454d162937c34e08c0fc99902"},
|
||||
{file = "RPi.GPIO-0.7.1.tar.gz", hash = "sha256:cd61c4b03c37b62bba4a5acfea9862749c33c618e0295e7e90aa4713fb373b70"},
|
||||
]
|
||||
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = ">=3.9,<4.0"
|
||||
content-hash = "5b3b5679f5c6b9e375be39f892efbaabceb43d1d7a346c78386ad876cfa286be"
|
|
@ -16,61 +16,62 @@ Options:
|
|||
-M --max=<max> Maximum value [default: 50]
|
||||
-D --delay-val=<delay> Base delay value [default: 0.01]
|
||||
|
||||
-V --verbose Verbose mode [default: False]
|
||||
-V --verbose Verbose mode [default: True]
|
||||
|
||||
-h --help Show help.
|
||||
-v --version Show version.
|
||||
|
||||
""" # NOQA
|
||||
""" # NOQA
|
||||
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import signal
|
||||
# from docopt import docopt
|
||||
from pulses.cli_parser import parser
|
||||
from docopt import docopt
|
||||
from pulses import VERSION, ledPulse
|
||||
|
||||
logging.basicConfig(
|
||||
format="%(asctime)-15s %(levelname)5s %(name)s[%(process)d] %(threadName)s: %(message)s")
|
||||
logging.basicConfig(format="%(name)s %(msg)s", stream=sys.stdout)
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
print('stopping pulse...', end="", flush=True)
|
||||
led.stop()
|
||||
led.join()
|
||||
print('done.\nBailing out')
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
def signal_handler(sig: int, frame):
|
||||
print('stopping pulse...', end="", flush=True)
|
||||
led.stop()
|
||||
led.join()
|
||||
print('done.\nBailing out')
|
||||
sys.exit(0)
|
||||
global led
|
||||
|
||||
args = parser.parse_args()
|
||||
args = docopt(__doc__, version=f'pulses {VERSION}')
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
log = logging.getLogger()
|
||||
log.setLevel('WARNING')
|
||||
if args.verbose:
|
||||
if args.get('--verbose'):
|
||||
log.setLevel('DEBUG')
|
||||
log.info('setting verbose mode')
|
||||
|
||||
led = ledPulse(int(args.gpio))
|
||||
led = ledPulse(int(args.get('--gpio')))
|
||||
|
||||
params = {}
|
||||
for method in ['initial', 'loop', 'final']:
|
||||
methodName = getattr(args, method)
|
||||
methodName = args.get(f"--{method}")
|
||||
if methodName not in led.valueMethods:
|
||||
print(f"error: no value method '{methodName}' defined")
|
||||
sys.exit(1)
|
||||
params[f"{method}Method"] = methodName
|
||||
|
||||
methodName = args.delay
|
||||
methodName = args.get("--delay")
|
||||
if methodName not in led.delayMethods:
|
||||
print(f"error: no delay method '{methodName}' defined")
|
||||
sys.exit(1)
|
||||
params['delayMethod'] = methodName
|
||||
|
||||
params['min'] = args.min
|
||||
params['max'] = args.max
|
||||
params['delay'] = args.delay_val
|
||||
params['min'] = int(args.get('--min'))
|
||||
params['max'] = int(args.get('--max'))
|
||||
params['delay'] = float(args.get('--delay-val'))
|
||||
|
||||
print("-" * 20)
|
||||
print("pulses CLI test tool")
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
"""
|
||||
Usage:
|
||||
pulses [--gpio <gpio>] [--initial <initialMethod>] [--loop <loopMethod>]
|
||||
[--final <finalMethod>] [--delay <delayMethod>]
|
||||
[--min <min>] [--max <max>] [--delay-val <delay>]
|
||||
[--verbose]
|
||||
|
||||
Options:
|
||||
-g --gpio=<gpio> GPIO to use [default: 12]
|
||||
-i --initial=<method> Initial method [default: linear]
|
||||
-l --loop=<method> Loop method [default: sin]
|
||||
-f --final=<method> Final method [default: linear]
|
||||
-d --delay=<method> Delay method [default: constant]
|
||||
|
||||
-m --min=<min> Minimum value [default: 2]
|
||||
-M --max=<max> Maximum value [default: 50]
|
||||
-D --delay-val=<delay> Base delay value [default: 0.01]
|
||||
|
||||
-V --verbose Verbose mode [default: True]
|
||||
|
||||
-h --help Show help.
|
||||
-v --version Show version.
|
||||
|
||||
""" # NOQA
|
||||
|
||||
import argparse
|
||||
|
||||
from pulses import VERSION
|
||||
|
||||
epilogue = """
|
||||
"""
|
||||
|
||||
|
||||
class Formatter(argparse.RawDescriptionHelpFormatter,
|
||||
argparse.ArgumentDefaultsHelpFormatter):
|
||||
"""
|
||||
We want to show the default values and show the description
|
||||
and epilogue not reformatted, so we create our own formatter class
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(prog="pulses",
|
||||
description="Pulse your LED(s)",
|
||||
epilog=epilogue,
|
||||
formatter_class=Formatter)
|
||||
|
||||
parser.add_argument('-g', '--gpio', type=int, default=12,
|
||||
metavar="<gpio>",
|
||||
help="GPIO to use")
|
||||
parser.add_argument('-i', '--initial', type=str, default='linear',
|
||||
metavar="<initial method>",
|
||||
help="Initial method")
|
||||
parser.add_argument('-l', '--loop', type=str, default='sin',
|
||||
metavar="<loop method>",
|
||||
help="Loop method")
|
||||
parser.add_argument('-f', '--final', type=str, default='linear',
|
||||
metavar="<final method>",
|
||||
help="Final method")
|
||||
parser.add_argument('-d', '--delay', type=str, default='constant',
|
||||
metavar="<delay method>",
|
||||
help="Delay method")
|
||||
|
||||
parser.add_argument('-m', '--min', type=int, default=2,
|
||||
metavar="<min>",
|
||||
help="Minimum value")
|
||||
parser.add_argument('-M', '--max', type=int, default=50,
|
||||
metavar="<max>",
|
||||
help="Max value")
|
||||
parser.add_argument('-D', '--delay-val', type=float, default=0.01,
|
||||
metavar="<delay value>",
|
||||
help="Base delay value")
|
||||
|
||||
parser.add_argument('-V', '--verbose', action='store_true', default=False,
|
||||
help="Verbose mode")
|
||||
|
||||
parser.add_argument('--version', action='version',
|
||||
version=f'%(prog)s {VERSION}')
|
|
@ -54,21 +54,9 @@ def value_linear(obj, step):
|
|||
return delta * (100-step)/50 + obj.min
|
||||
|
||||
|
||||
def value_vshape(obj, step):
|
||||
"""
|
||||
V-shape value, 1-0-1
|
||||
"""
|
||||
|
||||
delta = obj.max - obj.min
|
||||
if step < 50:
|
||||
return delta * (100-step)/50 + obj.min
|
||||
else:
|
||||
return delta*step/50 + obj.min
|
||||
|
||||
|
||||
def value_sin(obj, step):
|
||||
"""
|
||||
Sinusoidal values, 0-1-0
|
||||
Sinusoidal values, 0-1-0 /\
|
||||
"""
|
||||
|
||||
delta = obj.max - obj.min
|
||||
|
@ -78,7 +66,7 @@ def value_sin(obj, step):
|
|||
|
||||
def value_cos(obj, step):
|
||||
"""
|
||||
Absolute Cosinusoidal values, 1-0-1
|
||||
Absolute Cosinusoidal values, 1-0-1 \\//
|
||||
"""
|
||||
|
||||
delta = obj.max - obj.min
|
||||
|
|
|
@ -4,17 +4,16 @@ import time
|
|||
import threading
|
||||
|
||||
from queue import Queue
|
||||
from typing import Callable
|
||||
from .methods import * # NOQA
|
||||
|
||||
|
||||
class ledPulse(threading.Thread):
|
||||
|
||||
delayMethods: list = ['constant', 'sin', 'cos']
|
||||
valueMethods: list = ['on', 'off', 'linear', 'vshape', 'sin', 'cos']
|
||||
methods: dict = {'delay': {}, 'value': {}}
|
||||
delayMethods = ['constant', 'sin', 'cos']
|
||||
valueMethods = ['on', 'off', 'linear', 'sin', 'cos']
|
||||
methods = {'delay': {}, 'value': {}}
|
||||
|
||||
defaultSet: dict = {
|
||||
defaultSet = {
|
||||
'min': 2,
|
||||
'max': 50,
|
||||
'delay': 0.01,
|
||||
|
@ -23,22 +22,14 @@ class ledPulse(threading.Thread):
|
|||
'loopMethod': 'linear',
|
||||
'finalMethod': None
|
||||
}
|
||||
initialMethod: str
|
||||
loopMethod: str
|
||||
finalMethod: str
|
||||
delayMethod: str
|
||||
min: int
|
||||
max: int
|
||||
delay: float
|
||||
|
||||
def __init__(self, gpio: int, name: str = "led0") -> None:
|
||||
|
||||
super().__init__(name=name,
|
||||
daemon=True)
|
||||
def __init__(self, gpio, name="led0"):
|
||||
|
||||
self.log = logging.getLogger(self.__class__.__name__)
|
||||
self.gpio = gpio
|
||||
|
||||
super().__init__(name=name, daemon=True)
|
||||
|
||||
self.pwm_setup()
|
||||
|
||||
self.log.info(f"platform '{self.model}' " +
|
||||
|
@ -77,7 +68,7 @@ class ledPulse(threading.Thread):
|
|||
self.model = sys.platform
|
||||
try:
|
||||
with open('/sys/firmware/devicetree/base/model', 'r') as m:
|
||||
model = m.read()[:-1]
|
||||
model = m.read()
|
||||
if model.lower().startswith('raspberry pi'):
|
||||
self.supported = True
|
||||
self.model = model
|
||||
|
@ -85,7 +76,7 @@ class ledPulse(threading.Thread):
|
|||
pass
|
||||
|
||||
if self.supported:
|
||||
import RPi.GPIO as GPIO # type: ignore
|
||||
import RPi.GPIO as GPIO
|
||||
GPIO.setwarnings(False) # disable warnings
|
||||
GPIO.setmode(GPIO.BCM) # set pin numbering system
|
||||
GPIO.setup(self.gpio, GPIO.OUT) # set GPIO for output
|
||||
|
@ -105,7 +96,7 @@ class ledPulse(threading.Thread):
|
|||
|
||||
return (self.supported, sys.platform)
|
||||
|
||||
def set(self, **kwargs: dict):
|
||||
def set(self, **kwargs):
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if key in self.defaultSet.keys():
|
||||
|
@ -206,13 +197,13 @@ class ledPulse(threading.Thread):
|
|||
# in the loop we are and is added as a method to self, so
|
||||
# it has access to all the properties of self: max, min, delay
|
||||
###
|
||||
def register_value_method(self, name: str, fn: Callable):
|
||||
def register_value_method(self, name, fn):
|
||||
self.register_method('value', name, fn)
|
||||
|
||||
def register_delay_method(self, name: str, fn: Callable):
|
||||
def register_delay_method(self, name, fn):
|
||||
self.register_method('delay', name, fn)
|
||||
|
||||
def register_method(self, kind: str, name: str, fn: Callable):
|
||||
def register_method(self, kind, name, fn):
|
||||
if kind not in ['delay', 'value']:
|
||||
self.log.warning(f"unknown kind {kind}")
|
||||
return
|
||||
|
|
|
@ -1,18 +1,6 @@
|
|||
[tool.poetry]
|
||||
name = "pulses"
|
||||
version = "1.0.2"
|
||||
description = "Pulse LEDs on RPi"
|
||||
authors = ["Andrea Mistrali <andrea@mistrali.pw>"]
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.9,<4.0"
|
||||
rpi-gpio = { version = "^0.7.1", platform = "linux" }
|
||||
|
||||
[tool.poetry.scripts]
|
||||
pulses = "pulses.cli:main"
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
requires = ["setuptools", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.pyright]
|
||||
reportMissingImports = "information"
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
poetry
|
||||
build
|
||||
ipython
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
[metadata]
|
||||
name = pulses
|
||||
version = 0.90
|
||||
author = Andrea Mistrali
|
||||
author_email = akelge@gmail.com
|
||||
platform = linux
|
||||
description = RPI led(s) and display brightness
|
||||
long_description = file: README.md
|
||||
keywords = graylog, py3
|
||||
license = BSD 3-Clause License
|
||||
|
||||
[options]
|
||||
packages = pulses
|
||||
python_requires = >=3
|
||||
include_package_data = True
|
||||
install_requires =
|
||||
docopt
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
pulses = pulses.cli:main
|
Loading…
Reference in New Issue