Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
A
AP-S 2020 - PyRaTk
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
DELTA
DELTA Low-Cost 5.8GHz FMCW Radar PCB
AP-S 2020 Software
AP-S 2020 - PyRaTk
Commits
9ce2c5e6
Commit
9ce2c5e6
authored
4 years ago
by
Merlo, Jason
Browse files
Options
Downloads
Patches
Plain Diff
Added new DAQ-specific modules
parent
61d54c8a
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
pyratk/acquisition/mcdaq.py
+210
-0
210 additions, 0 deletions
pyratk/acquisition/mcdaq.py
pyratk/acquisition/nidaq.py
+157
-0
157 additions, 0 deletions
pyratk/acquisition/nidaq.py
with
367 additions
and
0 deletions
pyratk/acquisition/mcdaq.py
0 → 100644
+
210
−
0
View file @
9ce2c5e6
# -*- coding: utf-8 -*-
"""
Measurement Computing DAQ interface class.
Author: Jason Merlo
Maintainer: Jason Merlo (merlojas@msu.edu)
Dependencies: uldaq
"""
from
pyratk.acquisition.daq
import
DAQ
# Used for Measurement Computing hardware
from
uldaq
import
(
get_daq_device_inventory
,
DaqDevice
,
AInScanFlag
,
DaqEventType
,
ScanOption
,
InterfaceType
,
AiInputMode
,
create_float_buffer
,
ULException
,
EventCallbackArgs
)
from
collections
import
namedtuple
from
datetime
import
datetime
class
mcdaq
(
DAQ
):
"""
Measurement Computing DAQ Class.
"""
def
__init__
(
self
,
sample_rate
=
50000
,
sample_chunk_size
=
5000
,):
self
.
daq_type
=
"
MC-DAQ
"
self
.
_daq_device
=
None
self
.
_ai_device
=
None
self
.
_descriptor_index
=
0
self
.
_range_index
=
3
self
.
_interface_type
=
InterfaceType
.
USB
self
.
_low_channel
=
0
self
.
_high_channel
=
7
self
.
_event_types
=
(
DaqEventType
.
ON_DATA_AVAILABLE
|
DaqEventType
.
ON_END_OF_INPUT_SCAN
|
DaqEventType
.
ON_INPUT_SCAN_ERROR
)
self
.
_scan_params
=
namedtuple
(
'
scan_params
'
,
'
buffer high_chan low_chan descriptor status
'
)
self
.
_scan_options
=
ScanOption
.
CONTINUOUS
|
ScanOption
.
DEFAULTIO
self
.
_daq_flags
=
AInScanFlag
.
DEFAULT
# buffer_size = int(2**15 / (self._high_channel - self._low_channel + 1))
buffer_size
=
sample_chunk_size
# Display warning if buffer size < 2*chunk_size
if
buffer_size
<
2
*
sample_chunk_size
:
print
(
'
Warning: Buffer size < 2 * sample_chunk_size
'
)
channel_count
=
self
.
_high_channel
-
self
.
_low_channel
+
1
super
().
__init__
(
sample_rate
,
buffer_size
,
channel_count
)
# Initialize last sample
self
.
_last_index
=
0
# Create new sampling task
try
:
# Get descriptors for all of the available DAQ devices.
devices
=
get_daq_device_inventory
(
self
.
_interface_type
)
number_of_devices
=
len
(
devices
)
if
number_of_devices
==
0
:
raise
Exception
(
'
Error: No DAQ devices found
'
)
print
(
'
Found
'
,
number_of_devices
,
'
DAQ device(s):
'
)
for
i
in
range
(
number_of_devices
):
print
(
'
'
,
devices
[
i
].
product_name
,
'
(
'
,
devices
[
i
].
unique_id
,
'
)
'
,
sep
=
''
)
# Create the DAQ device object associated with the specified descriptor index.
self
.
_daq_device
=
DaqDevice
(
devices
[
self
.
_descriptor_index
])
# Get the AiDevice object and verify that it is valid.
self
.
_ai_device
=
self
.
_daq_device
.
get_ai_device
()
if
self
.
_ai_device
is
None
:
raise
Exception
(
'
Error: The DAQ device does not support analog input
'
)
# Verify that the specified device supports hardware pacing for analog input.
self
.
_ai_info
=
self
.
_ai_device
.
get_info
()
if
not
self
.
_ai_info
.
has_pacer
():
raise
Exception
(
'
\n
Error: The specified DAQ device does not support hardware paced analog input
'
)
# Establish a connection to the DAQ device.
self
.
_descriptor
=
self
.
_daq_device
.
get_descriptor
()
print
(
'
\n
Connecting to
'
,
self
.
_descriptor
.
dev_string
,
'
- please wait...
'
)
self
.
_daq_device
.
connect
()
# The default input mode is SINGLE_ENDED.
self
.
_input_mode
=
AiInputMode
.
SINGLE_ENDED
# If SINGLE_ENDED input mode is not supported, set to DIFFERENTIAL.
if
self
.
_ai_info
.
get_num_chans_by_mode
(
AiInputMode
.
SINGLE_ENDED
)
<=
0
:
self
.
_input_mode
=
AiInputMode
.
DIFFERENTIAL
# Get a list of supported ranges and validate the range index.
ranges
=
self
.
_ai_info
.
get_ranges
(
self
.
_input_mode
)
if
self
.
_range_index
>=
len
(
ranges
):
self
.
_range_index
=
len
(
ranges
)
-
1
self
.
_range
=
ranges
[
self
.
_range_index
]
# Allocate a buffer to receive the data
self
.
_data
=
create_float_buffer
(
self
.
num_channels
,
self
.
sample_chunk_size
)
# Store the user data for use in the callback function.
self
.
_scan_status
=
{
'
complete
'
:
False
,
'
error
'
:
False
}
self
.
_user_data
=
self
.
_scan_params
(
self
.
_data
,
self
.
_high_channel
,
self
.
_low_channel
,
self
.
_descriptor
,
self
.
_scan_status
)
print
(
'
\n
'
,
self
.
_descriptor
.
dev_string
,
'
ready
'
,
sep
=
''
)
print
(
'
Function demonstrated: self._ai_device.a_in_scan()
'
)
print
(
'
Channels:
'
,
self
.
_low_channel
,
'
-
'
,
self
.
_high_channel
)
print
(
'
Input mode:
'
,
self
.
_input_mode
.
name
)
print
(
'
Range:
'
,
self
.
_range
.
name
)
print
(
'
Samples per channel:
'
,
self
.
sample_chunk_size
)
print
(
'
Rate:
'
,
self
.
sample_rate
,
'
Hz
'
)
print
(
'
Scan options:
'
,
self
.
_display_scan_options
(
self
.
_scan_options
))
except
Exception
as
e
:
self
.
daq_type
=
"
FakeDAQ
"
print
(
"
=
"
*
80
)
print
(
"
Warning: Exception occurred opening DAQ. Using fake data.
"
)
print
(
e
)
print
(
"
=
"
*
80
)
def
run
(
self
):
# Enable the event to be notified every time samples are available.
print
(
'
sample_chunk_size:
'
,
self
.
sample_chunk_size
)
print
(
'
self.num_channels:
'
,
self
.
num_channels
)
available_sample_count
=
self
.
sample_chunk_size
print
(
'
available_sample_count:
'
,
available_sample_count
)
self
.
_daq_device
.
enable_event
(
self
.
_event_types
,
available_sample_count
,
self
.
_sample_callback
,
self
.
_user_data
)
# Start the acquisition.
try
:
rate
=
self
.
_ai_device
.
a_in_scan
(
self
.
_low_channel
,
self
.
_high_channel
,
self
.
_input_mode
,
self
.
_range
,
self
.
sample_chunk_size
,
self
.
sample_rate
,
self
.
_scan_options
,
self
.
_daq_flags
,
self
.
_data
)
# Initialize last sample
self
.
_last_index
=
0
self
.
_last_time
=
datetime
.
now
()
except
Exception
as
e
:
print
(
"
Error encountered while starting sampling:
"
,
e
)
def
_sample_callback
(
self
,
event_callback_args
):
"""
Read device sample buffers returning the specified sample size.
"""
event_type
=
DaqEventType
(
event_callback_args
.
event_type
)
scan_count
=
event_callback_args
.
event_data
user_data
=
event_callback_args
.
user_data
if
(
event_type
==
DaqEventType
.
ON_DATA_AVAILABLE
or
event_type
==
DaqEventType
.
ON_END_OF_INPUT_SCAN
):
# current_time = datetime.now()
# print('\n'*2, 'delta-time =', current_time - self._last_time)
# self._last_time = current_time
chan_count
=
user_data
.
high_chan
-
user_data
.
low_chan
+
1
total_samples
=
scan_count
*
chan_count
# print('scan_count: ', scan_count, '\n')
# print('actual scan rate = ', '{:.6f}'.format(self.sample_rate), 'Hz\n')
for
i
in
range
(
chan_count
):
self
.
data
[
i
,
:]
=
user_data
.
buffer
[
i
::
chan_count
]
if
event_type
==
DaqEventType
.
ON_INPUT_SCAN_ERROR
:
exception
=
ULException
(
event_data
)
print
(
exception
)
user_data
.
status
[
'
error
'
]
=
True
if
event_type
==
DaqEventType
.
ON_END_OF_INPUT_SCAN
:
print
(
'
\n
The scan is complete
\n
'
)
user_data
.
status
[
'
complete
'
]
=
True
def
start
(
self
):
self
.
run
()
def
close
(
self
):
if
self
.
_daq_device
:
if
self
.
_daq_device
.
is_connected
():
# Stop the acquisition if it is still running.
if
self
.
_ai_device
and
self
.
_ai_info
and
self
.
_ai_info
.
has_pacer
():
self
.
_ai_device
.
scan_stop
()
self
.
_daq_device
.
disable_event
(
self
.
_event_types
)
self
.
_daq_device
.
disconnect
()
self
.
_daq_device
.
release
()
super
().
close
()
def
_display_scan_options
(
self
,
bit_mask
):
options
=
[]
if
bit_mask
==
ScanOption
.
DEFAULTIO
:
options
.
append
(
ScanOption
.
DEFAULTIO
.
name
)
for
so
in
ScanOption
:
if
so
&
bit_mask
:
options
.
append
(
so
.
name
)
return
'
,
'
.
join
(
options
)
This diff is collapsed.
Click to expand it.
pyratk/acquisition/nidaq.py
0 → 100644
+
157
−
0
View file @
9ce2c5e6
# -*- coding: utf-8 -*-
"""
National Instruments DAQ interface class wrapper.
Author: Jason Merlo
Maintainer: Jason Merlo (merlojas@msu.edu)
Dependencies: uldaq
"""
from
pyratk.acquisition.daq
import
DAQ
try
:
import
nidaqmx
# Used for National Instruments hardware
except
ImportError
:
print
(
'
Warning: nidaqmx module not imported
'
)
class
nidaq
(
DAQ
):
"""
National Instruments DAQ Class.
"""
def
__init__
(
self
,
sample_rate
=
44100
,
sample_chunk_size
=
4096
,
ni_dev_string
=
"
Dev1/ai0:7
"
,
ni_sample_mode
=
nidaqmx
.
constants
.
AcquisitionType
.
FINITE
,):
"""
Initialize NI-DAQ class.
arguments:
sample_rate -- frequency in Hz to sample at (default: 44100)
sample_chunk_size -- size of chunk to read (default/max: 4095)
ni_dev_string -- device and ports to initialize (default:
"
Dev1/ai0:7
"
)
ni_sample_mode -- finite or continuous acquisition (default: finite)
"""
self
.
daq_type
=
'
NI-DAQ
'
self
.
ni_sample_mode
=
ni_sample_mode
self
.
ni_dev_string
=
ni_dev_string
# Get number of channels to sample
if
self
.
ni_dev_string
[
-
2
]
==
'
:
'
:
num_channels
=
int
(
self
.
ni_dev_string
[
-
1
])
-
int
(
self
.
ni_dev_string
[
-
3
])
+
1
else
:
num_channels
=
int
(
self
.
ni_dev_string
[
-
1
])
+
1
super
().
__init__
(
sample_rate
,
sample_chunk_size
,
num_channels
)
# Create new sampling task
try
:
# Try to create sampling task
self
.
task
=
nidaqmx
.
Task
()
self
.
task
.
ai_channels
.
add_ai_voltage_chan
(
ni_dev_string
)
self
.
task
.
timing
.
cfg_samp_clk_timing
(
sample_rate
,
ni_sample_mode
=
ni_sample_mode
,
samps_per_chan
=
sample_chunk_size
)
self
.
in_stream
=
\
stream_readers
.
AnalogMultiChannelReader
(
self
.
task
.
in_stream
)
except
nidaqmx
.
_lib
.
DaqNotFoundError
:
# On failure (ex. on mac/linux) generate random data for
# development purposes
# TODO: switch to PyDAQmx for mac/linux
# TODO: is there any reason to keep nidaqmx for windows?
# TODO: try performance comparison
self
.
daq_type
=
"
FakeDAQ
"
print
(
"
=
"
*
80
)
print
(
"
Warning: Using fake data. nidaqmx is not
"
"
supported on this platform.
"
)
print
(
"
=
"
*
80
)
except
nidaqmx
.
errors
.
DaqError
as
e
:
print
(
e
)
self
.
daq_type
=
"
FakeDAQ
"
print
(
"
=
"
*
80
)
print
(
"
Warning: Using fake data. DAQ could not be detected.
"
)
print
(
"
=
"
*
80
)
def
get_samples
(
self
):
"""
Read device sample buffers returning the specified sample size.
"""
try
:
read_all
=
nidaqmx
.
constants
.
READ_ALL_AVAILABLE
self
.
in_stream
.
read_many_sample
(
self
.
data
,
number_of_samples_per_channel
=
read_all
,
timeout
=
1.0
)
# print('received update')
except
nidaqmx
.
errors
.
DaqError
as
err
:
print
(
"
DAQ exception caught: {0}
\n
"
.
format
(
err
))
# === SAMPLING ======================================================
def
sample_loop
(
self
):
"""
Call get_samples forever.
"""
while
self
.
running
:
if
self
.
paused
:
# warning('(daq.py) daq paused...')
time
.
sleep
(
0.1
)
# sleep 100 ms
else
:
if
self
.
daq_type
==
"
FakeDAQ
"
:
self
.
get_fake_samples
()
else
:
self
.
get_samples
()
new_data
=
(
self
.
data
,
self
.
sample_num
)
# Set the update event to True once data is read in
self
.
data_available_signal
.
emit
(
new_data
)
self
.
ts_buffer
.
append
(
self
.
data
)
# Incriment sample number
self
.
sample_num
+=
1
print
(
"
Sampling thread stopped.
"
)
def
get_fake_samples
(
self
):
"""
Generate fake DAQ samples.
"""
sleep_time
=
self
.
sample_chunk_size
/
self
.
sample_rate
self
.
data
=
np
.
random
.
randn
(
self
.
num_channels
,
self
.
sample_chunk_size
)
*
0.001
+
\
np
.
random
.
randn
(
1
)
*
0.001
+
0.01
time
.
sleep
(
sleep_time
)
def
run
(
self
):
if
self
.
running
==
False
:
# Spawn sampling thread
self
.
running
=
True
self
.
t_sampling
=
threading
.
Thread
(
target
=
self
.
sample_loop
)
# Trigger DAQ to start running
self
.
start
()
try
:
if
not
self
.
t_sampling
.
is_alive
():
print
(
'
Staring sampling thread
'
)
self
.
t_sampling
.
start
()
self
.
paused
=
False
except
RuntimeError
as
e
:
print
(
'
Error starting sampling thread:
'
,
e
)
else
:
print
(
'
Warning: Not starting new sampling thread; sampling thread already running!
'
)
def
start
(
self
):
self
.
run
()
def
close
(
self
):
self
.
task
.
close
()
# Close nidaq gracefully
if
self
.
t_sampling
.
is_alive
():
print
(
"
Stopping sampling thread...
"
)
try
:
self
.
t_sampling
.
join
()
except
Exception
as
e
:
print
(
"
Error closing sampling thread:
"
,
e
)
super
().
close
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment