Control Functions
- HIL.task_start(task, clock, frequency, num_samples)
Starts a task running. The task runs at the sampling frequency specified and processes the number of samples specified. If the number of samples is set to INFINITE, then the task runs until task_stop is called.
- Parameters
task (handle) – A handle to the task, as returned by one of the task creation functions.
clock (int) – The clock used to time the operation. Note that some clocks allow faster sampling rates than others.
frequency (double) – The frequency in Hertz at which to process samples. For example, if frequency is set to 1000, then the task_start function causes the task to process one sample every millisecond.
num_samples (int) – The total number of samples to process. Each “sample” consists of all the channels specified when the task was created. For example, if frequency is set to 1000 and num_samples is set to 5000, then the task will run for 5 seconds, processing 5000 samples. If the number of samples is set to INFINITE, then the task will run until task_stop is called.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Start a task to read 5000 samples at 1 kHz from the first four analog input channels using
SYSTEM_CLOCK_1
.Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = array('I', [0, 1, 2, 3]) >>> num_channels = len(channels) >>> buffer = array('d', [0.0] * num_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> # ... ... >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> num_channels = len(channels) >>> buffer = np.zeros(num_channels, dtype=np.float64) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> # ... ... >>> finally: >>> card.close()
- HIL.task_stop(task)
Stops a task that is running. A task may be stopped before it has processed all of its samples. Tasks may also be restarted using the task_start function.
- Parameters
task (handle) – A handle to the task, as returned by one of the task creation functions.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Stop the specified task after processing.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = array('I', [0, 1, 2, 3]) >>> num_channels = len(channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> buffer = array('d', [0.0] * num_channels) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): ... card.task_read_analog(task, samples_to_read, buffer) ... # ... ... >>> card.task_stop(task) >>> card.task_delete(task) >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> num_channels = len(channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> buffer = np.zeros(num_channels, dtype=np.float64) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): ... card.task_read_analog(task, samples_to_read, buffer) ... # ... ... >>> card.task_stop(task) >>> card.task_delete(task) >>> finally: >>> card.close()
- HIL.task_stop_all()
Stops all running tasks that are associated with the given card. Tasks may be stopped before they have processed all of their samples. Tasks may also be restarted using the task_start function. This function is useful in exception handling where a specific task handle may be lost.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Stop all tasks associated with the board.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> analog_channels = array('I', [0, 1, 2, 3]) >>> encoder_channels = array('I', [0, 1]) >>> num_analog_channels = len(analog_channels) >>> num_encoder_channels = len(analog_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> analog_buffer = array('d', [0.0] * num_analog_channels) >>> encoder_buffer = array('i', [0] * num_encoder_channels) >>> analog_task = card.task_create_analog_reader(samples_in_buffer, analog_channels, num_analog_channels) >>> encoder_task = card.task_create_encoder_reader(samples_in_buffer, encoder_channels, num_encoder_channels) >>> try: >>> card.task_start(analog_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> card.task_start(encoder_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): >>> card.task_read_analog(analog_task, samples_to_read, analog_buffer) >>> card.task_read_encoder(encoder_task, samples_to_read, encoder_buffer) >>> # ... ... >>> except HILError: >>> card.task_stop_all() >>> card.task_delete_all() >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> analog_channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> encoder_channels = np.array([0, 1], dtype=np.int32) >>> num_analog_channels = len(analog_channels) >>> num_encoder_channels = len(encoder_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> analog_buffer = np.zeros(num_analog_channels, dtype=np.float64) >>> encoder_buffer = np.zeros(num_encoder_channels, dtype=np.int32) >>> analog_task = card.task_create_analog_reader(samples_in_buffer, analog_channels, num_encoder_channels) >>> encoder_task = card.task_create_encoder_reader(samples_in_buffer, encoder_channels, num_encoder_channels) >>> try: >>> card.task_start(analog_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> card.task_start(encoder_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): >>> card.task_read_analog(analog_task, samples_to_read, analog_buffer) >>> card.task_read_encoder(encoder_task, samples_to_read, encoder_buffer) >>> # ... ... >>> except HILError: >>> card.task_stop_all() >>> card.task_delete_all() >>> finally: >>> card.close()
- HIL.task_flush(task)
Flushes the task buffer for a writer or read-writer task. This function has no effect on reader tasks. Flushing the task buffer ensures that all the data written to the task buffer has been transferred to the actual hardware. This function does not return until all the data in the task buffer has been flushed to the hardware or the task is stopped.
- Parameters
task (handle) – A handle to the task, as returned by one of the task creation functions.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Writes 5000 samples at 1 kHz to the first four analog output channels using SYSTEM_CLOCK_1. Flushes the buffer to ensure that all data is written to the hardware.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = array('I', [0, 1, 2, 3]) >>> num_channels = len(channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_write = 100 >>> buffer = array('d', [0.0] * samples_to_write * num_channels) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(0, samples, samples_to_write): ... # Fill the buffer ... buffer[:] = array('d', [x for x in range(samples_to_write)]) ... # Does not wait for data to be written to the hardware; it only waits for space in the task buffer. ... card.task_write_analog(task, samples_to_write, buffer) ... # ... ... >>> card.task_flush(task) # Ensure all data has been written to the hardware >>> card.task_stop(task) >>> card.task_delete(task) >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> num_channels = len(channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_write = 100 >>> buffer = np.zeros(samples_to_write * num_channels, dtype=np.float64) >>> task = card.task_create_analog_writer(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(0, samples, samples_to_write): ... # Fill the buffer ... buffer[:] = np.array([x for x in range(samples_to_write)], dtype=np.float64) ... # Does not wait for data to be written to the hardware; it only waits for space in the task buffer. ... card.task_write_analog(task, samples_to_write, buffer) ... # ... ... >>> card.task_flush(task) # Ensure all data has been written to the hardware >>> card.task_stop(task) >>> card.task_delete(task) >>> finally: >>> card.close()
- HIL.task_delete(task)
Deletes a task, freeing up any system resources and hardware resources used by the task. If the task is running, then it is stopped prior to being deleted. Once task has been deleted, the task handle becomes invalid and the task may no longer be used.
- Parameters
task (handle) – A handle to the task, as returned by one of the task creation functions.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Delete the specified task after use.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = array('I', [0, 1, 2, 3]) >>> num_channels = len(channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> buffer = array('d', [0.0] * num_channels) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): >>> card.task_read_analog(task, samples_to_read, buffer) >>> # ... ... >>> card.task_stop(task) >>> card.task_delete(task) >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> num_channels = len(channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> buffer = np.zeros(num_channels, dtype=np.float64) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): >>> card.task_read_analog(task, samples_to_read, buffer) >>> # ... ... >>> card.task_stop(task) >>> card.task_delete(task) >>> finally: >>> card.close()
- HIL.task_delete_all()
Deletes all tasks that are associated with the given card. Any tasks that are running, will be stopped prior to being deleted. All task handles associated with the card become invalid and may no longer be used.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Deletes all tasks associated with the board.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> analog_channels = array('I', [0, 1, 2, 3]) >>> encoder_channels = array('I', [0, 1]) >>> num_analog_channels = len(analog_channels) >>> num_encoder_channels = len(analog_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> analog_buffer = array('d', [0.0] * num_analog_channels) >>> encoder_buffer = array('i', [0] * num_encoder_channels) >>> analog_task = card.task_create_analog_reader(samples_in_buffer, analog_channels, num_analog_channels) >>> encoder_task = card.task_create_encoder_reader(samples_in_buffer, encoder_channels, num_encoder_channels) >>> try: >>> card.task_start(analog_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> card.task_start(encoder_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): >>> card.task_read_analog(analog_task, samples_to_read, analog_buffer) >>> card.task_read_encoder(encoder_task, samples_to_read, encoder_buffer) >>> # ... ... >>> except HILError: >>> card.task_stop_all() >>> card.task_delete_all() >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> analog_channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> encoder_channels = np.array([0, 1], dtype=np.int32) >>> num_analog_channels = len(analog_channels) >>> num_encoder_channels = len(encoder_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> samples_to_read = 1 >>> analog_buffer = np.zeros(num_analog_channels, dtype=np.float64) >>> encoder_buffer = np.zeros(num_encoder_channels, dtype=np.int32) >>> analog_task = card.task_create_analog_reader(samples_in_buffer, analog_channels, num_encoder_channels) >>> encoder_task = card.task_create_encoder_reader(samples_in_buffer, encoder_channels, num_encoder_channels) >>> try: >>> card.task_start(analog_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> card.task_start(encoder_task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> for index in range(samples): >>> card.task_read_analog(analog_task, samples_to_read, analog_buffer) >>> card.task_read_encoder(encoder_task, samples_to_read, encoder_buffer) >>> # ... ... >>> except HILError: >>> card.task_stop_all() >>> card.task_delete_all() >>> finally: >>> card.close()
- HIL.task_set_buffer_overflow_mode(task, mode)
Determines how buffer overflows are handled for a task.
The task buffer overflows when samples are not being read fast enough by task_read_xxx to keep up with the data being read by the card at its inputs, or when samples are not being written fast enough by task_write_xxx to stay ahead of the data being written by the card to its outputs. Buffering is used to handle cases where the application is momentarily interrupted and the size of the buffer determines how long an interruption is permitted. If increasing the buffer size does not prevent buffer overflow then the application is simply not capable of keeping up with real time.
By default, an error will be returned by the next task_read_xxx or task_write_xxx call when a buffer overflow occurs and the task will need to be stopped. The reason this is the default behaviour is that the HIL API is intended for real-time applications where missing samples is regarded as a fatal error.
However, for those applications where samples may be missed then the buffer overflow handling can be altered using the task_set_buffer_overflow_mode function (if the card supports it). There are three possible modes:
BufferOverflowMode.ERROR_ON_OVERFLOW:
This is the default mode in which buffer overflows cause a QERR_BUFFER_OVERFLOW error to be returned by the next hil_tas_read_xxx or hil_task_write_xxx. The task should be stopped at that point.
BufferOverflowMode.OVERWRITE_ON_OVERFLOW:
In this mode, old samples in the buffer are discarded to make room for new samples if the buffer overflows. For writer tasks, this means that there may be unexpected discontinuities in the output waveforms. For reader tasks, it means that samples may be lost and only the more recent samples will be read.
BufferOverflowMode.DISCARD_ON_OVERFLOW:
In this mode, new samples are discarded if there is no room in the buffer. For writer tasks, this means that there may be unexpected discontinuities in the output waveforms. For reader tasks, it means that samples may be lost and only the oldest samples will be read.
BufferOverflowMode.WAIT_ON_OVERFLOW:
In this mode, the task waits on buffer overflow for space to become available. This mode is not supported by hardware cards but is only for use by simulated cards.
BufferOverflowMode.SYNCHRONIZE:
In this mode, the task provides complete buffer synchronization. This mode is not supported by hardware cards but is only for use by simulated cards.
- Parameters
task (handle) – A handle to the task, as returned by one of the task creation functions.
mode (BufferOverflowMode) – The buffer overflow mode to use as a BufferOverflowMode eumeration value.
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Sets the buffer overflow mode for a task to overwrite old samples on overflow.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = array('I', [0, 1, 2, 3]) >>> num_channels = len(channels) >>> buffer = array('d', [0.0] * num_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_set_buffer_overflow_mode(task, BufferOverflowMode.OVERWRITE_ON_OVERFLOW) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> # ... ... >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> num_channels = len(channels) >>> buffer = np.zeros(num_channels, dtype=np.float64) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_set_buffer_overflow_mode(task, BufferOverflowMode.OVERWRITE_ON_OVERFLOW) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> # ... ... >>> finally: >>> card.close()
- HIL.task_get_buffer_overflows(task)
Returns the number of buffer overflows that have occurred since the task was started.
This function is only relevant when the buffer overflow mode has been changed from the default mode i.e., when the overflow mode is not BUFFER_MODE_ERROR_ON_OVERFLOW.
- Parameters
task (handle) – A handle to the task, as returned by one of the task creation functions.
- Returns
The number of buffer overflows that have occurred for the task.
- Return type
int
- Raises
HILError – On non-zero return code. A suitable error message may be retrieved using get_error_message.
Examples
Gets the number of buffer overflows for a task.
Using array:
>>> from array import array >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = array('I', [0, 1, 2, 3]) >>> num_channels = len(channels) >>> buffer = array('d', [0.0] * num_channels) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_set_buffer_overflow_mode(task, BufferOverflowMode.OVERWRITE_ON_OVERFLOW) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> # ... >>> num_overflows = card.task_get_buffer_overflows(task) ... >>> finally: >>> card.close()
Using numpy:
>>> import numpy as np >>> from quanser.hardware import HIL, Clock >>> card = HIL("q8_usb", "0") >>> try: >>> channels = np.array([0, 1, 2, 3], dtype=np.uint32) >>> num_channels = len(channels) >>> buffer = np.zeros(num_channels, dtype=np.float64) >>> frequency = 1000.0 >>> samples = 5000 >>> samples_in_buffer = int(frequency) >>> task = card.task_create_analog_reader(samples_in_buffer, channels, num_channels) >>> card.task_set_buffer_overflow_mode(task, BufferOverflowMode.OVERWRITE_ON_OVERFLOW) >>> card.task_start(task, Clock.SYSTEM_CLOCK_1, frequency, samples) >>> # ... >>> num_overflows = card.task_get_buffer_overflows(task) ... >>> finally: >>> card.close()