Newer
Older
# -*- coding: utf-8 -*-
"""
Virtual DAQ Class.
Description:
DAQ emulation class to playback recorded DAQ datasets in a transparent manner.
Author: Jason Merlo
Maintainer: Jason Merlo (merlojas@msu.edu)
from pyratk.acquisition import daq # Extention of DAQ object
import threading # Used for creating thread and sync events
"""Emulate DAQ using HDF5 dataset data."""
def __init__(self):
"""Create virtual DAQ object to play back recording (hdf5 dataset)."""
Jason Merlo
committed
self.sample_chunk_size = None
self.daq_type = None
self.num_channels = None
# start paused if no dataset is selected
self.paused = True
# Fake sampler period
self.sample_period = None
# Create data member to store samples
self.data = None
self.ds = None
# create member data to store trajectory samples for ground truth
self.ts = None
# Current time index of recording
self.sample_index = 0
# Reset/load button sample thread locking
# self.reset_lock = threading.Event()
self.t_sampling = threading.Thread(target=self.sample_loop)
def load_dataset(self, ds):
"""Select dataset to read from and loads attributes."""
if isinstance(ds, h5py._hl.dataset.Dataset):
self.ds = ds
else:
raise(TypeError,
"load_dataset expects a h5py dataset type, got", type(ds))
# Load attributes
self.sample_rate = ds.attrs["sample_rate"]
Jason Merlo
committed
self.sample_chunk_size = ds.attrs["sample_size"]
self.daq_type = ds.attrs["daq_type"].decode('utf-8')
self.num_channels = ds.attrs["num_channels"]
Jason Merlo
committed
self.sample_period = self.sample_chunk_size / self.sample_rate
Jason Merlo
committed
shape = (self.num_channels, self.sample_chunk_size)
def load_trajectory(self, ts):
"""Load trajectory dataset."""
# Trajectory dataset
self.ts = ts
# Create data buffers
length = 4096
shape = (3, 3) # State matrix shape
self.ts_trajectory = TimeSeries(length, shape)
def get_samples(self, stride=1, loop=-1, playback_speed=1.0):
"""Read sample from dataset at sampled speed, or one-by-one."""
# Read in samples from dataset
try:
self.data = self.ds[self.sample_index]
except IndexError:
print("Invalid sample index:", self.sample_index)
if self.ts:
try:
coordinate_type = self.ts.attrs['coordinate_type'].decode('utf-8')
data = StateMatrix(self.ts[..., self.sample_index * self.sample_chunk_size],
coordinate_type=coordinate_type)
self.trajectory_data = data.get_state().q
# print(self.trajectory_data)
except IndexError:
print("Invalid trajectory sample index:", self.sample_index)
if loop == -1 or loop == 1:
time.sleep(self.sample_period * playback_speed)
elif loop == 0:
print('Stepped:', stride)
raise ValueError("Value must be -1, 0, or 1.")
# Append tarjectory before emitting new data signal
if self.ts:
self.ts_trajectory.append(self.trajectory_data)
new_data = (self.data, self.sample_index)
self.data_available_signal.emit(new_data)
self.ts_buffer.append(self.data)
# Incriment time index and loop around at end of dataset
next_index = self.sample_index + stride
if next_index < self.ds.shape[0]:
self.sample_index = next_index
else:
self.sample_index = 0
if self.ts:
self.ts_trajectory.clear()
self.ts_trajectory.append(self.ts[..., 0])
self.reset_signal.emit()
# Return True if more data
return (self.sample_index + stride) % self.ds.shape[0] / stride < 1.0
else:
raise RuntimeError(
"(VirtualDAQ) Dataset source must be set to get samples")
def reset(self):
"""Reset all data to beginning of data file and begin playing."""
self.close()
if self.ts:
self.ts_trajectory.clear()
self.ts_trajectory.append(self.ts[..., 0])