Source code for pitop.pma.ultrasonic_sensor

from gpiozero import SmoothedInputDevice
from threading import Event, Lock

from pitopcommon.logger import PTLogger

from pitop.core.mixins import (
    Stateful,
    Recreatable,
)
from pitop.pma.common import get_pin_for_port


# Modified version of gpiozero's DistanceSensor class that only uses 1 pin
#
# Note: all private member variables are semi-private to follow upstream gpiozero convention
# and to override inherited functions
[docs]class UltrasonicSensor(Stateful, Recreatable, SmoothedInputDevice): ECHO_LOCK = Lock() def __init__( self, port_name, queue_len=9, max_distance=3, threshold_distance=0.3, partial=False, name="ultrasonic" ): self._pma_port = port_name self.name = name SmoothedInputDevice.__init__(self, get_pin_for_port(self._pma_port), pull_up=False, queue_len=queue_len, sample_wait=0.06, partial=partial, ignore=frozenset({None}), ) try: if max_distance <= 0: raise ValueError('invalid maximum distance (must be positive)') self._max_distance = max_distance self.threshold = threshold_distance / max_distance self.speed_of_sound = 343.26 # m/s self._echo = Event() self._echo_rise = None self._echo_fall = None self.pin.edges = 'both' self.pin.bounce = None self.pin.when_changed = self._echo_changed self._queue.start() except Exception: self.close() raise Stateful.__init__(self) Recreatable.__init__(self, {"port_name": port_name, "queue_len": queue_len, "partial": partial, "name": self.name, "max_distance": lambda: self.max_distance, "threshold_distance": lambda: self.threshold_distance}) @property def own_state(self): return { 'distance': self.distance, }
[docs] def close(self): """Shut down the device and release all associated resources. This method can be called on an already closed device without raising an exception. This method is primarily intended for interactive use at the command line. It disables the device and releases its pin(s) for use by another device. You can attempt to do this simply by deleting an object, but unless you've cleaned up all references to the object this may not work (even if you've cleaned up all references, there's still no guarantee the garbage collector will actually delete the object at that point). By contrast, the close method provides a means of ensuring that the object is shut down. For example, if you have a buzzer connected to port D4, but then wish to attach an LED instead: >>> from pitop import Buzzer, LED >>> bz = Buzzer("D4") >>> bz.on() >>> bz.off() >>> bz.close() >>> led = LED("D4") >>> led.blink() :class:`Device` descendents can also be used as context managers using the :keyword:`with` statement. For example: >>> from pitop import Buzzer, LED >>> with Buzzer("D4") as bz: ... bz.on() ... >>> with LED("D4") as led: ... led.on() ... """ try: super(UltrasonicSensor, self).close() except RuntimeError: PTLogger.debug(f"Ultrasonic Sensor on port {self._pma_port} - " "there was an error in closing the port!")
@property def max_distance(self): """The maximum distance that the sensor will measure in meters. This value is specified in the constructor and is used to provide the scaling for the :attr:`~SmoothedInputDevice.value` attribute. When :attr:`distance` is equal to :attr:`max_distance`, :attr:`~SmoothedInputDevice.value` will be 1. """ return self._max_distance @max_distance.setter def max_distance(self, value): if value <= 0: raise ValueError('invalid maximum distance (must be positive)') t = self.threshold_distance self._max_distance = value self.threshold_distance = t @property def threshold_distance(self): """ The distance, measured in meters, that will trigger the :attr:`when_in_range` and :attr:`when_out_of_range` events when crossed. This is simply a meter-scaled variant of the usual :attr:`~SmoothedInputDevice.threshold` attribute. """ return self.threshold * self.max_distance @threshold_distance.setter def threshold_distance(self, value): self.threshold = value / self.max_distance @property def distance(self): """Returns the current distance measured by the sensor in meters. Note that this property will have a value between 0 and :attr:`max_distance`. """ return self.value * self._max_distance @property def value(self): """Returns a value between 0, indicating that something is either touching the sensor or is sufficiently near that the sensor can't tell the difference, and 1, indicating that something is at or beyond the specified *max_distance*.""" return super(UltrasonicSensor, self).value @property def pin(self): """Returns the :class:`Pin` that the sensor is connected to. This is simply an alias for the usual :attr:`~GPIODevice.pin` attribute. """ return super(UltrasonicSensor, self).pin def _echo_changed(self, ticks, level): if level: self._echo_rise = ticks else: self._echo_fall = ticks self._echo.set() def _read(self): # Wait up to 50ms for the echo pin to fall to low (the maximum echo # pulse is 35ms so this gives some leeway); if it doesn't something is # horribly wrong (most likely at the hardware level) if self.pin.state: if not self._echo.wait(0.05): PTLogger.debug(f"Ultrasonic Sensor on port {self._pma_port} - " "no echo received, not using value") return None self._echo.clear() self._echo_fall = None self._echo_rise = None # Obtain the class-level ECHO_LOCK to ensure multiple distance sensors # don't listen for each other's "pings" with UltrasonicSensor.ECHO_LOCK: # Wait up to 200ms for the echo pin to rise and fall if self._echo.wait(0.2): if self._echo_fall is not None and self._echo_rise is not None: distance = ( self.pin_factory.ticks_diff( self._echo_fall, self._echo_rise) * self.speed_of_sound / 2.0) return min(1.0, distance / self._max_distance) else: # If we only saw the falling edge it means we missed # the echo because it was too fast return None else: # The echo pin never rose or fell - assume that distance is max PTLogger.debug(f"Ultrasonic Sensor on port {self._pma_port} - " "no echo received, using max distance ") return 1.0 @property def in_range(self): return not self.is_active
UltrasonicSensor.when_out_of_range = UltrasonicSensor.when_activated UltrasonicSensor.when_in_range = UltrasonicSensor.when_deactivated UltrasonicSensor.wait_for_out_of_range = UltrasonicSensor.wait_for_active UltrasonicSensor.wait_for_in_range = UltrasonicSensor.wait_for_inactive