Stream Peek

Stream.peek_begin(prefetch=0)

Prepares the t_stream_peek_state to begin peeking for data. “Peeking” involves reading ahead without removing the data from the stream buffer. It is particularly useful for parsing the input from the stream because it allows the next bytes to be tested without preventing the user from receiving those bytes later. It may also be used to read a sequence of data types as a single atomic unit, much like the xxxx_array functions. The “peek state” is used to keep track of how far ahead the lookahead operations have reached so far.

As an example of using the peek capabilities to read a sequence of data types as a single atomic unit using non-blocking I/O:

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> int_buf = array("i", [0])
>>> double_buf = array("d", [0.0])
>>> prefetch = 0
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_int_array(state, int_buf, len(int_buf))
>>>      if result > 0:
>>>         result = stream.peek_double_array(state, double_buf, len(double_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
>>>
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()

In this example, the code peeks ahead in the input stream for an integer and a double. If at any time the operation would block, then result is -ErrorCode.WOULD_BLOCK. Since result is negative, peek_end will not advance the stream pointer and the data is not removed from the input stream. Hence, the next time the code is called the same data is read again.

However, if the peek_int and peek_double succeed then peek_end does advance the stream pointer and returns 1. Hence, the data is removed from the input stream and subsequent reads/peeks from the stream will return new data.

Thus, the integer and double are read as one atomic unit from the input stream, because if not all the data is available, then nothing is removed from the input stream. Only if both quantities are read successfully is the data actually removed from the input stream.

Note that the peek_xxxx functions may read data from the underlying communication channel, but the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

This function assumes that the stream is valid. If the prefetch is zero then it does not block because it does not access the underlying communication channel.

However, if the prefetch is non-zero then it may access the underlying communication channel. Normally, the stream will read as much as it can on any receive operation and try to fill the stream’s receive buffer. Hence, no prefetching of data is necessary because the stream will do so automatically on any receive operation. However, if read-ahead has been disabled by setting the BooleanProperty.NO_READ_AHEAD property then the stream only reads as much data as is required to satisfy the receive operation requested. Some protocols, like I2C, enforce this property and do not allow read-ahead. The prefetch parameter of peek_begin causes the stream to attempt to ensure that at least prefetch bytes are present in the stream receive buffer before the rest of the peek_xxxx operations are performed, thereby reducing the number of calls to the underlying stream to receive data when read-ahead has been disabled.

If prefetch is not required (because read-ahead is enabled on the stream - the default situation) then the prefetch parameter may be zero. If prefetch is desired, then it should be set to no more than the number of bytes expected to be peeked in subsequent peek_xxxx calls.

Parameters

prefetch (int) – The number of bytes to prefetch even if read-ahead is disabled.

Returns

  • result (int) – If the stream is closed gracefully during a prefetch operation and there are not enough bytes left in the receive buffer for the prefetch specified then it will return zero. It returns 1 on success.

  • state (t_stream_peek_state) – The initialized peek state.

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

Suppose the following binary structure is being received:

struct {
    t_int8 temperature;
    t_uint8 status;
    t_int16 gyro_rates[3];
}

This structure is 1 + 1 + 3*2 = 8 bytes long. Hence, the code to read this structure as one atomic unit, while accounting for potential byte swapping due to byte order differences between server and client would be:

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> temperature = array("b", [0])
>>> status = array("B", [0])
>>> gyro_rates = array("h", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> prefetch = len(temperature) + len(status) + len(gyro_rates)*2
>>> try:
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, temperature, len(temperature))
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, status, len(status))
>>>      if result > 0:
>>>         result = stream.peek_short_array(state, gyro_rates, len(gyro_rates))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_end(state, status)

Finishes a “peek” operation by removing all peeked data from the input stream. It must be called after a sequence of peek_xxxx operations to complete the peek. If this function is not called before another peek_begin then the data is left in the stream buffer.

This function assumes that the stream is valid. It does not block because it does not access the underlying communication channel.

Parameters
  • state (t_stream_peek_state) – The “peek state” indicating how much data to remove from the input stream.

  • status (int) – This value is typically the result from the last peek_xxxx operation. If it is greater than zero then the peek is completed and the stream pointer advanced. If it is 0 because the stream closed before finishing the peek operations then this function does not advance the stream pointer and returns 0. If it is negative then the stream pointer is not advanced and the status is returned as the result.

Returns

result – If the result argument is less than or equal to zero then it simply returns result. Otherwise, it returns 1 to indicate the operation completed successfully.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> short_buf = array("h", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(short_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_short_array(state, short_buf, len(short_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_byte_array(state, elements, num_elements)

Peeks ahead in the input stream to read an array of bytes from a client stream. It either peeks all of the array or none of it.

The peek_begin function must be called to initialize the peek state. No data peeked will be removed from the input stream until peek_end is called. If peek_end is not called before the next peek_begin then the peeked data is read again.

See the peek_begin function for more information and an example.

The size of the stream receive buffer must be at least num_elements bytes in length.

If listen or connect was called with the non-blocking flag set to False (0), then this function blocks until all the data is peeked. Blocking may occur because data may be read from the underlying communication channel. However, the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

If listen or connect was called with the non-blocking flag set to True (1), then this function does not block. If fewer bytes are available then the size of the entire array then it returns -ErrorCode.WOULD_BLOCK. In this case, the poll function may be used with the PollFlag.RECEIVE flag to determine when data becomes available. Otherwise it returns 1.

If the connection has been closed gracefully and there are fewer bytes left to peek than the size of the entire array then it returns 0. Otherwise it returns 1. Once there are fewer bytes left to peek than the size of the entire array then it will return -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then it raises an exception.

This function does not support two threads peeking data at the same time. However, data may be sent or the stream flushed by another thread at the same time as data is being peeked.

The BSD socket API has no equivalent to this function.

Parameters
  • state (t_stream_peek_state) – The peek state.

  • elements (array_like) – A buffer of at least num_elements bytes in which the received data will be stored.

  • num_elements (int) – The number of bytes available in the buffer.

Returns

result – Returns 1 on success. If no more data is available and the connection has been closed gracefully, then it returns 0. If the method would block then it returns -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then an exception is raised.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> short_buf = array("h", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(short_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_short_array(state, short_buf, len(short_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_short_array(state, elements, num_elements)

This function peeks ahead in the input stream to read an array of short (16-bit) integers from a client stream. It either peeks all of the array or none of it. If the stream has been configured to swap bytes using set_swap_bytes then this function will swap the order of the bytes within each element that it peeks before storing them in the given buffer.

The peek_begin function must be called to initialize the peek state. No data peeked will be removed from the input stream until peek_end is called. If peek_end is not called before the next peek_begin then the peeked data is read again.

See the peek_begin function for more information and an example.

The size of the stream receive buffer must be at least num_elements 16-bit integers in length.

If listen or connect was called with the non-blocking flag set to False (0), then this function blocks until all the data is peeked. Blocking may occur because data may be read from the underlying communication channel. However, the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

If listen or connect was called with the non-blocking flag set to True (1), then this function does not block. If fewer bytes are available then the size of the entire array then it returns -ErrorCode.WOULD_BLOCK. In this case, the poll function may be used with the PollFlag.RECEIVE flag to determine when data becomes available. Otherwise it returns 1.

If the connection has been closed gracefully then it returns 0 only if there are fewer 16-bit integers left to peek than the size of the entire array. Otherwise it returns 1. Once there are fewer bytes left to peek than the size of the entire array then it will return 0 to indicate the connection has been closed. If an error occurs, then it returns a negative error code.

This function does not support two threads peeking data at the same time. However, data may be sent or the stream flushed by another thread at the same time as data is being peeked.

The BSD socket API has no equivalent to this function.

Parameters
  • state (t_stream_peek_state) – The peek state.

  • elements (array_like) – A buffer of at least num_elements 16-bit integers in which the received data will be stored.

  • num_elements (int) – The number of 16-bit integers available in the buffer.

Returns

result – Returns 1 on success. If no more data is available and the connection has been closed gracefully, then it returns 0. If the method would block then it returns -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then an exception is raised.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> short_buf = array("h", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(short_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_short_array(state, short_buf, len(short_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_int_array(state, elements, num_elements)

This function peeks ahead in the input stream to read an array of 32-bit integers from a client stream. It either peeks all of the array or none of it. If the stream has been configured to swap bytes using set_swap_bytes then this function will swap the order of the bytes within each element that it peeks before storing them in the given buffer.

The peek_begin function must be called to initialize the peek state. No data peeked will be removed from the input stream until peek_end is called. If peek_end is not called before the next peek_begin then the peeked data is read again.

See the peek_begin function for more information and an example.

The size of the stream receive buffer must be at least num_elements 32-bit integers in length.

If listen or connect was called with the non-blocking flag set to False (0), then this function blocks until all the data is peeked. Blocking may occur because data may be read from the underlying communication channel. However, the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

If listen or connect was called with the non-blocking flag set to True (1), then this function does not block. If fewer bytes are available then the size of the entire array then it returns -ErrorCode.WOULD_BLOCK. In this case, the poll function may be used with the PollFlag.RECEIVE flag to determine when data becomes available. Otherwise it returns 1.

If the connection has been closed gracefully then it returns 0 only if there are fewer 32-bit integers left to peek than the size of the entire array. Otherwise it returns 1. Once there are fewer bytes left to peek than the size of the entire array then it will return 0 to indicate the connection has been closed. If an error occurs, then it returns a negative error code.

This function does not support two threads peeking data at the same time. However, data may be sent or the stream flushed by another thread at the same time as data is being peeked.

The BSD socket API has no equivalent to this function.

Parameters
  • state (t_stream_peek_state) – The peek state.

  • elements (array_like) – A buffer of at least num_elements 32-bit integers in which the received data will be stored.

  • num_elements (int) – The number of 32-bit integers available in the buffer.

Returns

result – Returns 1 on success. If no more data is available and the connection has been closed gracefully, then it returns 0. If the method would block then it returns -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then an exception is raised.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> int_buf = array("l", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(int_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_int_array(state, int_buf, len(int_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_long_array(state, elements, num_elements)

This function peeks ahead in the input stream to read an array of long (64-bit) integers from a client stream. It either peeks all of the array or none of it. If the stream has been configured to swap bytes using set_swap_bytes then this function will swap the order of the bytes within each element that it peeks before storing them in the given buffer.

The peek_begin function must be called to initialize the peek state. No data peeked will be removed from the input stream until peek_end is called. If peek_end is not called before the next peek_begin then the peeked data is read again.

See the peek_begin function for more information and an example.

The size of the stream receive buffer must be at least num_elements 64-bit integers in length.

If listen or connect was called with the non-blocking flag set to False (0), then this function blocks until all the data is peeked. Blocking may occur because data may be read from the underlying communication channel. However, the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

If listen or connect was called with the non-blocking flag set to True (1), then this function does not block. If fewer bytes are available then the size of the entire array then it returns -ErrorCode.WOULD_BLOCK. In this case, the poll function may be used with the PollFlag.RECEIVE flag to determine when data becomes available. Otherwise it returns 1.

If the connection has been closed gracefully then it returns 0 only if there are fewer 64-bit integers left to peek than the size of the entire array. Otherwise it returns 1. Once there are fewer bytes left to peek than the size of the entire array then it will return 0 to indicate the connection has been closed. If an error occurs, then it returns a negative error code.

This function does not support two threads peeking data at the same time. However, data may be sent or the stream flushed by another thread at the same time as data is being peeked.

The BSD socket API has no equivalent to this function.

Parameters
  • state (t_stream_peek_state) – The peek state.

  • elements (array_like) – A buffer of at least num_elements 64-bit integers in which the received data will be stored.

  • num_elements (int) – The number of 64-bit integers available in the buffer.

Returns

result – Returns 1 on success. If no more data is available and the connection has been closed gracefully, then it returns 0. If the method would block then it returns -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then an exception is raised.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> long_buf = array("q", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(long_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_long_array(state, long_buf, len(long_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_single_array(state, elements, num_elements)

This function peeks ahead in the input stream to read an array of single-precision (32-bit) floats from a client stream. It either peeks all of the array or none of it. If the stream has been configured to swap bytes using set_swap_bytes then this function will swap the order of the bytes within each element that it peeks before storing them in the given buffer.

The peek_begin function must be called to initialize the peek state. No data peeked will be removed from the input stream until peek_end is called. If peek_end is not called before the next peek_begin then the peeked data is read again.

See the peek_begin function for more information and an example.

The size of the stream receive buffer must be at least num_elements 32-bit single-precision floats in length.

If listen or connect was called with the non-blocking flag set to False (0), then this function blocks until all the data is peeked. Blocking may occur because data may be read from the underlying communication channel. However, the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

If listen or connect was called with the non-blocking flag set to True (1), then this function does not block. If fewer bytes are available then the size of the entire array then it returns -ErrorCode.WOULD_BLOCK. In this case, the poll function may be used with the PollFlag.RECEIVE flag to determine when data becomes available. Otherwise it returns 1.

If the connection has been closed gracefully then it returns 0 only if there are fewer 32-bit single-precision floats left to peek than the size of the entire array. Otherwise it returns 1. Once there are fewer bytes left to peek than the size of the entire array then it will return 0 to indicate the connection has been closed. If an error occurs, then it returns a negative error code.

This function does not support two threads peeking data at the same time. However, data may be sent or the stream flushed by another thread at the same time as data is being peeked.

The BSD socket API has no equivalent to this function.

Parameters
  • state (t_stream_peek_state) – The peek state.

  • elements (array_like) – A buffer of at least num_elements 32-bit single-precision floats in which the received data will be stored.

  • num_elements (int) – The number of 32-bit single-precision floats available in the buffer.

Returns

result – Returns 1 on success. If no more data is available and the connection has been closed gracefully, then it returns 0. If the method would block then it returns -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then an exception is raised.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> single_buf = array("f", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(single_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_single_array(state, single_buf, len(single_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()
Stream.peek_double_array(state, elements, num_elements)

This function peeks ahead in the input stream to read an array of double-precision (64-bit) floats from a client stream. It either peeks all of the array or none of it. If the stream has been configured to swap bytes using set_swap_bytes then this function will swap the order of the bytes within each element that it peeks before storing them in the given buffer.

The peek_begin function must be called to initialize the peek state. No data peeked will be removed from the input stream until peek_end is called. If peek_end is not called before the next peek_begin then the peeked data is read again.

See the peek_begin function for more information and an example.

The size of the stream receive buffer must be at least num_elements 64-bit double-precision floats in length.

If listen or connect was called with the non-blocking flag set to False (0), then this function blocks until all the data is peeked. Blocking may occur because data may be read from the underlying communication channel. However, the contents of the stream buffer are never overwritten until peek_end is called to indicate that the peeked data may be removed from the input stream.

If listen or connect was called with the non-blocking flag set to True (1), then this function does not block. If fewer bytes are available then the size of the entire array then it returns -ErrorCode.WOULD_BLOCK. In this case, the poll function may be used with the PollFlag.RECEIVE flag to determine when data becomes available. Otherwise it returns 1.

If the connection has been closed gracefully then it returns 0 only if there are fewer 64-bit double-precision floats left to peek than the size of the entire array. Otherwise it returns 1. Once there are fewer bytes left to peek than the size of the entire array then it will return 0 to indicate the connection has been closed. If an error occurs, then it returns a negative error code.

This function does not support two threads peeking data at the same time. However, data may be sent or the stream flushed by another thread at the same time as data is being peeked.

The BSD socket API has no equivalent to this function.

Parameters
  • state (t_stream_peek_state) – The peek state.

  • elements (array_like) – A buffer of at least num_elements 64-bit double-precision floats in which the received data will be stored.

  • num_elements (int) – The number of 64-bit double-precision floats available in the buffer.

Returns

result – Returns 1 on success. If no more data is available and the connection has been closed gracefully, then it returns 0. If the method would block then it returns -ErrorCode.WOULD_BLOCK if the stream is non-blocking. If an error occurs, then an exception is raised.

Return type

int

Raises

StreamError – On a negative return code. A suitable error message may be retrieved using get_error_message.

Example

>>> from quanser.communications import Stream
>>> buffer_size = 64
>>> byte_buf = array("b", [0 0 0])
>>> double_buf = array("d", [0 0 0])
>>> stream = Stream()
>>> stream.connect("tcpip://localhost:5000", True, buffer_size, buffer_size)
>>> try:
>>>   prefetch = len(byte_buf) + 2 * len(double_buf)
>>>   while stream.poll(None, PollFlag.RECEIVE) > 0:
>>>      result, state = stream.peek_begin(prefetch)
>>>      if result > 0:
>>>         result = stream.peek_byte_array(state, byte_buf, len(byte_buf))
>>>      if result > 0:
>>>         result = stream.peek_double_array(state, double_buf, len(double_buf))
>>>      result = stream.peek_end(state, result)
>>>      if result >= 0:
>>>         break
...
>>>   stream.shutdown()
>>> finally:
>>>   stream.close()