Skip to content
Snippets Groups Projects
Commit 291e42f6 authored by Robert Roos's avatar Robert Roos
Browse files

Merged in feature/serial_cleanup (pull request #1)

Feature/serial cleanup
parents 2f5bf739 e5241e51
No related branches found
No related tags found
No related merge requests found
from enum import Enum
import struct
from typing import Iterator, Tuple
class Mode(Enum):
......@@ -20,76 +21,104 @@ class PlotDecoder:
Then follow the floats (4 bytes each): 01 01 01 01 ...
"""
HEADER_BYTES = bytearray(b'\x7f\xff\xbf')
def __init__(self, *args, **kwargs):
"""Constructor."""
self.header_bytes = [b'\x7f', b'\xff', b'\xbf']
self.buffer = bytearray() # Store bytes while receiving
self.mode = Mode.HEADER
self.bytes_count = 0 # Number of bytes in current mode
# Keep internals of the last received data
self.channel_size = 0 # Number of channels
self.time = 0 # Read microtime
self.data = [] # Received floats
self._buffer = bytearray() # Store bytes while receiving
self._mode = Mode.HEADER
self._bytes_count = 0 # Number of bytes in current mode
def set_state(self, new_mode: Mode):
"""Simple wrapper to change mode."""
self._channel_size = 0 # Number of channels
self._time = 0 # Read microtime
@property
def mode(self):
return self._mode
self.mode = new_mode
self.bytes_count = 0
self.buffer = bytearray()
@mode.setter
def mode(self, new_mode: Mode):
"""Simple wrapper to change mode."""
if new_mode == Mode.DATA:
self.data = []
self._mode = new_mode
self._bytes_count = 0
self._buffer = bytearray()
def receive_byte(self, byte: bytearray) -> bool:
"""Precess new incoming byte.
def receive_bytes(self, new_bytes: bytearray) -> Iterator[Tuple]:
"""Precess a set of incoming bytes.
Return true when a complete package was received
Multiple packages can be received at once. The data stream can be distributed
over any number of method calls.
This method acts as a generator, yielding a list of tuples
like (n_channels, time, data).
"""
if self.mode == Mode.HEADER: # If header not passed
i = 0 # Index inside set of bytes
new_size = len(new_bytes)
while i < new_size:
if self._mode == Mode.HEADER: # If header not passed
# Increment header
if new_bytes[i] == self.HEADER_BYTES[self._bytes_count]:
self._bytes_count += 1
else:
self._bytes_count = 0 # Reset, header failed
if self._bytes_count >= 3: # Header completed
self.mode = Mode.SIZE
i += 1
continue
if self._mode == Mode.SIZE:
self._channel_size = new_bytes[i]
self.mode = Mode.TIME
# Increment header
if byte == self.header_bytes[self.bytes_count]:
self.bytes_count += 1
else:
self.bytes_count = 0 # Reset, header failed
i += 1
continue
if self.bytes_count >= 3: # Header completed
self.set_state(Mode.SIZE)
if self._mode == Mode.TIME:
return False
# Copy until buffer contains 4 bytes or until new segment is depleted
copy_bytes = min(4 - len(self._buffer), new_size - i)
self._buffer.extend(new_bytes[i:(i + copy_bytes)])
if self.mode == Mode.SIZE:
self.channel_size = int.from_bytes(byte, byteorder='big', signed=False)
self.set_state(Mode.TIME)
if len(self._buffer) == 4:
self._time = int.from_bytes(self._buffer, byteorder='big', signed=True)
self.mode = Mode.DATA
elif len(self._buffer) > 4:
self.mode = Mode.HEADER # Something went wrong
return False
i += copy_bytes
continue
if self.mode == Mode.TIME:
self.buffer.append(byte[0])
if self._mode == Mode.DATA:
if len(self.buffer) == 4:
# print(''.join('{:02x}'.format(x) for x in self.buffer))
# Copy until buffer contains 4 bytes or until new segment is depleted
copy_bytes = min(self._channel_size * 4 - len(self._buffer), new_size - i)
self.time = int.from_bytes(self.buffer, byteorder='big', signed=True)
self.set_state(Mode.DATA)
self._buffer.extend(new_bytes[i:(i + copy_bytes)])
return False
# If enough bytes are collected for all nch floats
if len(self._buffer) == self._channel_size * 4:
if self.mode == Mode.DATA:
self.buffer.append(byte[0])
# Decode at once
values = struct.unpack(self._channel_size * 'f', self._buffer)
data = list(values) # Tuple to list
if len(self.buffer) == 4:
value = struct.unpack('f', self.buffer)
self.buffer = bytearray()
self.data.append(value)
# Yield new package
yield self._channel_size, self._time, data
if len(self.data) == self.channel_size:
self.set_state(Mode.HEADER)
return True
self.mode = Mode.HEADER
return False
elif len(self._buffer) > self._channel_size * 4:
self.mode = Mode.HEADER # Something went wrong
return False
i += copy_bytes
continue
......@@ -208,10 +208,8 @@ class MainWindow(QMainWindow):
new_bytes = self.serial.readAll()
for byte in new_bytes:
if self.decoder.receive_byte(byte):
self.update_data(self.decoder.channel_size, self.decoder.time,
self.decoder.data)
for block in self.decoder.receive_bytes(bytearray(new_bytes)):
self.update_data(*block)
@pyqtSlot(QAction)
def on_save(self, action: QAction):
......@@ -331,7 +329,7 @@ class MainWindow(QMainWindow):
col = np.array(new_data, dtype=float)
self.data = np.roll(self.data, -1, axis=1) # Rotate backwards
self.data[:, -1] = col[:, 0] # Set new column at the end
self.data[:, -1] = col[:] # Set new column at the end
self.time = np.roll(self.time, -1) # Rotate backwards
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment