Skip to content
Snippets Groups Projects
Commit 7ebbcc8a authored by Dominik HAefner's avatar Dominik HAefner
Browse files

Merge branch 'master' of gitlab.utwente.nl:s1779397/cps_project_group13

parents 61920a75 747299f0
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,9 @@
*.iml
*.ipr
# Local files
data/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
......
......@@ -8,70 +8,69 @@ print(GPIO.VERSION)
class Encoder:
def __init__(self, name, encoder_pin: int):
# Initialize some stuff
global pulse_count, start_time, time_passed, old_time_passed, old_pulse_count
"""Initialize some stuff"""
self.name = name
self.encoder_pin = encoder_pin
pulse_count = 0
start_time = time.time()
time_passed = 0
old_time_passed = 0
old_pulse_count = 0
self.pulse_count = 0
self.start_time = time.time()
self.time_passed = 0
self.old_time_passed = 0
self.old_pulse_count = 0
GPIO.setup(self.encoder_pin, GPIO.IN, GPIO.PUD_DOWN)
def callback_encoder(self, channel):
# Interrupt handler, increases pulse_count and updates time
global pulse_count, start_time, time_passed
time_passed = time.time() - start_time
pulse_count += 1
"""Interrupt handler, increases pulse_count and updates time"""
self.time_passed = time.time() - self.start_time
self.pulse_count += 1
def get_pulse_count(self) -> int:
# return pulse_count
global pulse_count
return pulse_count
"""return pulse_count"""
return self.pulse_count
def reset_pulse_count(self):
# reset pulse_count
global pulse_count
pulse_count = 0
"""reset pulse_count"""
self.pulse_count = 0
def reset_time(self):
# reset time_passed and start_time
global time_passed, start_time
time_passed = 0
start_time = time.time()
"""reset time_passed and start_time"""
self.time_passed = 0
self.start_time = time.time()
def get_current_pulse_speed(self) -> float:
# Get average speed since last call in pulses/s
global pulse_count, time_passed, old_time_passed, old_pulse_count
if (time_passed - old_time_passed):
speed = (pulse_count - old_pulse_count) / (time_passed - old_time_passed)
"""Get average speed since last call in pulses/s"""
if self.time_passed - self.old_time_passed != 0: # If the time has changed
speed = (self.pulse_count - self.old_pulse_count) / (self.time_passed - self.old_time_passed)
else:
speed = 0
old_pulse_count = pulse_count
old_time_passed = time_passed
self.old_pulse_count = self.pulse_count
self.old_time_passed = self.time_passed
return speed
def get_speed(self) -> float:
# Convert pulse speed to cm/s
"""Convert pulse speed to cm/s"""
pulse_per_second = self.get_current_pulse_speed()
return pulse_per_second * (6 * 2 * math.pi) / 40
def init_callback(self):
# Initialize the interrupt
"""Initialize the interrupt"""
GPIO.add_event_detect(self.encoder_pin, GPIO.BOTH, callback=self.callback_encoder, bouncetime=10)
# try:
# encoder1 = Encoder(name="encoder1", encoder_pin=4)
# encoder1.init_callback()
# while True:
# print(encoder1.get_pulse_count())
# print(encoder1.get_speed())
# time.sleep(1)
# except KeyboardInterrupt: # Exit by pressing CTRL + C
# GPIO.cleanup()
# print("Measurement stopped by User")
# finally:
# GPIO.cleanup()
if __name__ == '__main__':
try:
# encoder1 = Encoder(name="encoder1", encoder_pin=4)
encoder1 = Encoder(name='encoder1', encoder_pin=23)
encoder2 = Encoder(name='encoder2', encoder_pin=24)
encoder1.init_callback()
encoder2.init_callback()
while True:
print("%s pulses: %d" % (encoder1.name, encoder1.get_pulse_count()))
print("%s speed : %0.2f" % (encoder1.name, encoder1.get_speed()))
print("%s pulses: %d" % (encoder2.name, encoder2.get_pulse_count()))
print("%s speed : %0.2f" % (encoder2.name, encoder2.get_speed()))
time.sleep(1)
except KeyboardInterrupt: # Exit by pressing CTRL + C
print("Measurement stopped by User")
finally:
GPIO.cleanup()
# Libraries
import threading
import RPi.GPIO as GPIO
import time
from pathlib import Path
import numpy as np
# GPIO Mode (BOARD / BCM)
GPIO.setmode(GPIO.BCM)
print(GPIO.VERSION)
class UltrasoundStorage:
class UltrasoundSensor:
def __init__(self):
"""
Initialize a new storage object which holds the data points for all ultrasound sensors. It also allows the data
to be stored to the disk.
"""
self.data = list() # Initialize empty list which holds the datapoints
def get_nr_rows(self):
"""Get the number of data points in the currently stored data."""
return len(self.data)
def get_nr_cols(self):
"""Get the number of columns of the currently stored data (headers)."""
if self.get_nr_rows() != 0:
return len(self.data[0])
else:
return 0
def add_data_row(self, *args):
"""
Add a datapoint to the currently stored data. If there is already data present, the number of data points
provided should match the number of columns.
"""
if self.get_nr_rows() != 0 and len(args) != self.get_nr_cols():
raise ValueError(
"Wrong number of parameters provided. Expected %d, got %d. Current data is %d by %d in size." % (
self.get_nr_cols(), len(args), self.get_nr_rows(), self.get_nr_cols()))
new_data_line = list()
for arg in args:
new_data_line.append(arg)
self.data.append(new_data_line)
def add_data(self, timestamp, x, y, direction, sensor1=0, sensor2=0, sensor3=0):
"""Store data in predefined header columns (this forces you to store data in a specific way)."""
self.add_data_row(timestamp, x, y, direction, sensor1, sensor2, sensor3)
def save_to_disk(self, filename: str = 'ultrasound.txt', output_dir=None):
"""Save the data to the disk in the specified format."""
data_array = np.array(self.data)
if output_dir is None: # If no other path is given, use the data folder
output_dir = Path(__file__).parents[1] / 'data'
try:
extension = filename.split('.')[-1] # Get the last element after a dot (ususally extension)
except IndexError:
raise ValueError("No proper filename extension detected. Please add an extension (e.g. '.txt.' or '.csv').")
filepath = output_dir / filename
if extension == 'txt':
np.savetxt(filepath, data_array, fmt='%.3e')
elif extension == 'csv':
np.savetxt(filepath, data_array, delimiter=',', fmt='%.3e')
elif extension == 'npy':
np.save(filepath, data_array)
else:
raise ValueError("Extension '.%s' not supported." % extension)
def to_array(self):
"""Return the currently stored data as a numpy array."""
return np.asarray(self.data)
class UltrasoundManager:
def __init__(self, sensors: list, storage: UltrasoundStorage):
"""Initialize this ultrasound manager."""
self.sensors = sensors
self.storage = storage
def measure_and_store(self):
"""Get a measurement from all sensors and store that in the storage."""
measurements = self.measure_sequentially()
self.storage.add_data(time.time(), 0, 0, 0, *measurements)
def measure_sequentially(self):
"""Pulse all sensors once sequentially. This hopefully prevents the sensors from detecting stray pulses from
other sensors."""
result = list()
for sensor in self.sensors:
sensor.measurement()
result.append(sensor.get_distance())
return result
def print_sequentially(self):
"""Pulse all sensors once sequentially and print results. This hopefully prevents the sensors from detecting
stray pulses from other sensors."""
for sensor in self.sensors:
sensor.measurement()
print("Sensor %s: %0.3f m" % (sensor.name, sensor.get_distance()))
def print_distance_continuous(self, interval=1):
"""Print the distance continuously every interval seconds until the user interrupts it with ctrl-c."""
try:
while True:
time.sleep(interval)
self.print_sequentially()
except KeyboardInterrupt: # Exit by pressing CTRL + C
print("Measurement stopped by User. Stopping sensors...")
print("Sensors stopped.")
class UltrasoundSensor(threading.Thread):
SONIC_SPEED = 343 # Speed of sound in m/s
TRIGGER_DELAY = 10 / 1e6 # Time in s the trigger pin needs to be activated to send a pulse (default: 10 ms)
MEASUREMENT_TIMEOUT = 0.5 # Time delay after which the sensor times out and stops its measurement
def __init__(self, name, trigger_pin: int, echo_pin: int, polling_interval=0.5, daemon=True):
"""
Set up this sensor. This is a threaded sensor, be sure to call .start().
def __init__(self, name, trigger_pin: int, echo_pin: int):
:param name: Name of the sensor.
:param trigger_pin: Pin to which the trigger is connected
:param echo_pin: Pin to which the echo is located
:param polling_interval: Interval between measurements (if the thread is running)
:param daemon: Whether to make this thread exit automatically when all other threads are stopped (recommended)
"""
super().__init__(name=name, daemon=daemon)
self.name = name
self.trigger_pin = trigger_pin
self.echo_pin = echo_pin
self.polling_interval = polling_interval # Interval between measurements
self.distance_lock = threading.Lock() # Lock to prevent thread race conditions in read/write to self._distance
self.measurement_lock = threading.Lock() # Lock to prevent multiple threads taking measurement at same time
self._distance = 0 # Set initial distance to 0
self.interrupted = False # Flag to interrupt this thread
# Setup GPIO pins
GPIO.setup(self.trigger_pin, GPIO.OUT)
GPIO.setup(self.trigger_pin, GPIO.OUT) # Setup GPIO
GPIO.setup(self.echo_pin, GPIO.IN)
def get_distance(self) -> float:
GPIO.output(self.trigger_pin, True) # Fire ultrasound pulse
def measurement(self):
"""
Get the distance last recorded by this sensor. The ECHO pin turns HIGH for the exact amount of time it took
between sending and receiving the pulse. Therefore, we need to detect how long the ECHO pin is on exactly.
"""
with self.measurement_lock: # Make sure only one thread can take a measurement at a time
GPIO.output(self.trigger_pin, GPIO.HIGH) # set trigger to HIGH
time.sleep(self.TRIGGER_DELAY) # Keep trigger pin HIGH for 10 ms (time the sensor requires to send a pulse)
GPIO.output(self.trigger_pin, GPIO.LOW) # Set trigger to LOW again
rising_time = time.time() # Time at which echo pin turns from LOW to HIGH
falling_time = time.time() # Time at which echo pin turns from HIGH to LOW
timeout_time = time.time() + self.MEASUREMENT_TIMEOUT # Time at which the measurement times out and fails
# save exact time at which the ECHO pin turns from LOW to HIGH. When this loop is exited, that is the exact time
# echo pin turns HIGH
while GPIO.input(self.echo_pin) == GPIO.LOW:
rising_time = time.time() # The pin is still low, update the recorded time
if time.time() > timeout_time: # Check if the measurement is timed out
raise TimeoutError("Measurement timed out.")
# Store initial time
pulse_sent = time.time()
pulse_received = time.time()
# save exact time at which the ECHO pin turns from HIGH to LOW. When this loop is exited, that is the exact time
# echo pin turns LOW
while GPIO.input(self.echo_pin) == GPIO.HIGH:
falling_time = time.time() # The pin is still high, update the recorded time
if time.time() > timeout_time: # Check if the measurement is timed out
raise TimeoutError("Measurement timed out.")
time.sleep(0.00001) # Wait for a very small time
GPIO.output(self.trigger_pin, False) # Stop firing the pulse
pulse_time = falling_time - rising_time # time difference between rising and falling edge of echo pin
distance = (pulse_time * self.SONIC_SPEED) / 2 # Convert time to distance
with self.distance_lock: # Use the lock to store the distance internally without race conditions between threads
self._distance = distance
return distance # Return the recorded distance
while GPIO.input(self.echo_pin) == 0: # Record the exact time the pulse was sent
pulse_sent = time.time()
def run(self):
"""Overridden method for the running thread."""
while self.interrupted is not True:
self.measurement() # Start a measurement
time.sleep(self.polling_interval)
while GPIO.input(self.echo_pin) == 1: # Record the exact time the pulse was received
pulse_received = time.time()
def get_distance(self) -> float:
"""Get the distance last recorded by this sensor in meters."""
with self.distance_lock: # Only one thread may access self._distance at a time
return self._distance
time_elapsed = pulse_received - pulse_sent # time difference between sending and receiving pulse
return (time_elapsed * self.SONIC_SPEED) / 2 # We divide by two because of the reflection of the wave
def get_distance_cm(self) -> float:
"""Get the distance last recorded by this sensor in centimeters."""
with self.distance_lock: # Only one thread may access self._distance at a time
return self._distance * 100
def print_distance(self, unit='cm') -> None:
if unit == 'm':
dist = self.get_distance()
print("%12s measured %0.3f m" % (self.name, dist))
else: # Unit is cm or not recognised
dist = self.get_distance() * 100.0
print("%12s measured %0.1f cm" % (self.name, dist))
# class UltrasoundSensor(threading.Thread):
# SONIC_SPEED = 343 # Speed of sound in m/s
# TRIGGER_DELAY = 10 / 1e6 # Time in s the trigger pin needs to be activated to send a pulse (default: 10 ms)
#
# def __init__(self, name, trigger_pin: int, echo_pin: int, polling_interval=0.5, daemon=True):
# """
# Set up this sensor. This is a threaded sensor, be sure to call .start().
#
# :param name: Name of the sensor.
# :param trigger_pin: Pin to which the trigger is connected
# :param echo_pin: Pin to which the echo is located
# :param polling_interval: Interval between measurements (if the thread is running)
# :param daemon: Whether to make this thread exit automatically when all other threads are stopped (recommended)
# """
# super().__init__(name=name, daemon=daemon)
# self.name = name
# self.trigger_pin = trigger_pin
# self.echo_pin = echo_pin
# self.polling_interval = polling_interval # Interval between measurements
#
# self.lock = threading.RLock() # Lock object to prevent threads from interfering
# self._distance = 0 # Set initial distance to 0
# self.interrupted = False # Flag to interrupt this thread
# self.measurement_done = threading.Event() # Event which indicates a measurement is done
#
# GPIO.setup(self.trigger_pin, GPIO.OUT) # Setup GPIO
# GPIO.setup(self.echo_pin, GPIO.IN)
#
# def run(self):
# """Overridden method for the running thread."""
# while self.interrupted is not True:
# self.start_measurement() # Start a measurement
# time.sleep(self.polling_interval)
#
# def start_measurement(self):
# """
# Get the distance last recorded by this sensor. The ECHO pin turns HIGH for the exact amount of time it took
# between sending and receiving the pulse. Therefore, we need to detect how long the ECHO pin is on exactly.
# """
#
# def _pin_read(channel):
# """Callback function for echo pin RISING edge detection (low to high)."""
# nonlocal pulse_rising, pulse_falling # Get the variable from the one scope above
# if channel != self.echo_pin:
# print("Warning: echo pin received from channel %d, expected %d" % (channel, self.echo_pin))
#
# with self.lock:
# if GPIO.input(self.echo_pin) == GPIO.HIGH: # Rising edge detected
# pulse_rising = time.time()
# else: # Falling edge detected
# pulse_falling = time.time()
# if pulse_falling == 0:
# print("Warning: falling edge was detected before rising edge on channel %d!" % self.echo_pin)
# time_elapsed = pulse_falling - pulse_rising # time difference between sending and receiving pulse
# self._distance = time_elapsed * self.SONIC_SPEED / 2 # We divide by two because of the reflection of the wave
# GPIO.remove_event_detect(self.echo_pin)
# self.measurement_done.set() # Measurement is finished
#
# with self.lock: # Only one thread may use the sensor at a time
# self.measurement_done.clear() # Let any waiting threads know there is currently a measurement running
# pulse_rising = 0 # Set initial time for rising pulse to 0
# pulse_falling = 0 # Idem
# GPIO.add_event_detect(self.echo_pin, GPIO.BOTH, _pin_read) # Register the callback for rising edge
# # GPIO.add_event_detect(self.echo_pin, GPIO.FALLING, _echo_pin_falling) # Idem for falling edge
# GPIO.output(self.trigger_pin, GPIO.HIGH) # Fire ultrasound pulse
# time.sleep(self.TRIGGER_DELAY) # Wait for at least 10 us (delay required for the sensor to fire)
# GPIO.output(self.trigger_pin, GPIO.LOW) # Stop firing the pulse
#
# def get_distance(self):
# """Get the distance last recorded by this sensor."""
# with self.lock: # Only one thread may use the sensor at a time
# return self._distance
#
# def print_distance(self, unit='cm') -> None:
# if unit == 'm':
# dist = self.get_distance()
# print("%12s measured %0.3f m" % (self.name, dist))
# else: # Unit is cm or not recognised
# dist = self.get_distance() * 100.0
# print("%12s measured %0.1f cm" % (self.name, dist))
if __name__ == '__main__':
try:
storage = UltrasoundStorage()
# print(storage.data)
# storage.save_to_disk()
# storage.save_to_disk(filename='ultrasound.csv')
# storage.save_to_disk(filename='ultrasound.npy')
# GPIO Mode (BOARD / BCM)
GPIO.setmode(GPIO.BCM)
sensor1 = UltrasoundSensor(name="sensor1", trigger_pin=17, echo_pin=16)
sensor2 = UltrasoundSensor(name="sensor2", trigger_pin=27, echo_pin=20)
sensor3 = UltrasoundSensor(name="sensor3", trigger_pin=22, echo_pin=21)
while True:
sensor1.print_distance(unit='cm')
sensor2.print_distance(unit='cm')
sensor3.print_distance(unit='cm')
# sensor2 = UltrasoundSensor(name="sensor2", trigger_pin=27, echo_pin=20)
# sensor3 = UltrasoundSensor(name="sensor3", trigger_pin=22, echo_pin=21)
# manager = UltrasoundManager([sensor1, sensor2, sensor3], storage)
manager = UltrasoundManager([sensor1], storage)
stop_time = time.time() + 6
while time.time() < stop_time: # Until 10 seconds passed
manager.measure_and_store()
time.sleep(1)
x = storage.to_array()
np.set_printoptions(suppress=True, formatter={'float_kind': '{:0.3f}'.format})
print(x)
except KeyboardInterrupt: # Exit by pressing CTRL + C
print("Measurement stopped by User")
finally: # Always clean up the pins
finally: # Always clean up the pins after you are done with them
GPIO.cleanup()
print("Cleaned up.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment