Skip to content
Snippets Groups Projects
Commit ed4c33e5 authored by jhierck laptop's avatar jhierck laptop
Browse files

Ultrasound detection not quite working yet. Probably need to revisit the way...

Ultrasound detection not quite working yet. Probably need to revisit the way the threads and events are set up.
parent acfb4dd2
No related branches found
No related tags found
No related merge requests found
......@@ -35,11 +35,9 @@ class UltrasoundStorage:
raise ValueError(
"Wrong number of parameters provided. Expected %d, got %d. Current data is %d by %d in size." % (
self.get_nr_cols(), len(args), self.get_nr_rows(), self.get_nr_cols()))
new_data_line = list()
for arg in args:
new_data_line.append(arg)
self.data.append(new_data_line)
def add_data(self, timestamp, x, y, direction, sensor1=0, sensor2=0, sensor3=0):
......@@ -75,9 +73,31 @@ class UltrasoundStorage:
class UltrasoundManager:
def __init__(self, sensors: list):
def __init__(self, sensors: list, storage: UltrasoundStorage):
"""Initialize this ultrasound manager."""
self.sensors = sensors
self.storage = storage
def measure_and_store(self):
"""Get a measurement from all sensors and store that in the storage."""
measurements = self.measure_sequentially()
self.storage.add_data(time.time(), 0, 0, 0, *measurements)
def measure_sequentially(self):
"""Pulse all sensors once sequentially. This prevents the sensors from detecting stray pulses from other
sensors."""
result = list()
for sensor in self.sensors:
sensor.start_measurement()
sensor.measurement_done.wait() # Wait until the measurement is done using a threading.Event
result.append(sensor.get_distance())
return result
def print_sequentially(self, unit='cm'):
for sensor in self.sensors:
sensor.start_measurement()
sensor.measurement_done.wait() # Wait until the measurement is done using a threading.Event
sensor.print_distance(unit=unit)
def print_distance(self, unit='cm'):
"""Print the recorded distances of all sensors."""
......@@ -99,22 +119,6 @@ class UltrasoundManager:
sensor.join()
print("Sensors stopped.")
def measure_sequentially(self):
"""Pulse all sensors once sequentially. This prevents the sensors from detecting stray pulses from other
sensors."""
result = list()
for sensor in self.sensors:
sensor.start_measurement()
sensor.measurement_done.wait() # Wait until the measurement is done using a threading.Event
result.append(sensor.get_distance())
return result
def print_sequentially(self, unit='cm'):
for sensor in self.sensors:
sensor.start_measurement()
sensor.measurement_done.wait() # Wait until the measurement is done using a threading.Event
sensor.print_distance(unit=unit)
class UltrasoundSensor(threading.Thread):
SONIC_SPEED = 343 # Speed of sound in m/s
......@@ -213,9 +217,16 @@ if __name__ == '__main__':
sensor2 = UltrasoundSensor(name="sensor2", trigger_pin=27, echo_pin=20)
sensor3 = UltrasoundSensor(name="sensor3", trigger_pin=22, echo_pin=21)
manager = UltrasoundManager([sensor1, sensor2, sensor3])
manager = UltrasoundManager([sensor1, sensor2, sensor3], storage)
# manager = UltrasoundManager([sensor1])
manager.print_distance_continuous(interval=1)
stop_time = time.time() + 6
while time.time() < stop_time: # Until 10 seconds passed
manager.measure_and_store()
time.sleep(1)
x = storage.to_array()
np.set_printoptions(suppress=True, formatter={'float_kind':'{:0.3f}'.format})
print(x)
finally: # Always clean up the pins after you are done with them
GPIO.cleanup()
print("Cleaned up.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment