Investigating Messages using Groups with Variable Length Fields
There are cases in which a variable length data field is needed within a group within a message. For example, if we are transmitting instrument information, we may have a group of instruments, each with a variable length instruments name.
We will create another schema, schema-03.xml, to demonstrate this.
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
package="com.shaunlaurens.pa.schema3"
id="1002"
version="1"
semanticVersion="pa0.1"
description="Schema 1 for the PA samples, version 0.1">
<types>
<composite name="messageHeader"
description="Message identifiers and length of message root">
<type name="blockLength" primitiveType="uint16"/>
<type name="templateId" primitiveType="uint16"/>
<type name="schemaId" primitiveType="uint16"/>
<type name="version" primitiveType="uint16"/>
</composite>
<composite name="varStringEncoding">
<type name="length" primitiveType="uint32" maxValue="1073741824"/>
<type name="varData" primitiveType="uint8" length="0"
characterEncoding="UTF-8"/>
</composite>
<composite name="groupSizeEncoding"
description="Repeating group dimensions."> <!-- 1 (1) -->
<type name="blockLength" primitiveType="uint16"/>
<type name="numInGroup" primitiveType="uint16"/>
</composite>
</types>
<sbe:message name="MessageType3" id="3"
description="Message Type with a repeating group">
<field name="field1" id="1" type="int64"/>
<group name="group1" id="10"
dimensionType="groupSizeEncoding"> <!-- 2 (2) -->
<field name="groupField1" id="11" type="int64"/>
<data name="groupField2" id="12" type="varStringEncoding"/>
<data name="groupField3" id="13" type="varStringEncoding"/>
</group>
<data name="field2" id="2" type="varStringEncoding"/>
</sbe:message>
</sbe:messageSchema>
- The group size encoding composite type is used to provide the block length and number of groups in the repeating group. This is a common pattern for repeating groups in SBE messages.
- We add a repeating group after the fixed length fields, but before the variable length fields.
We are again going to focus on the MessageType3 encoder and decoder generated by the SBE tool.
MessageType3 Encoder¶
public void wrap(final MutableDirectBuffer buffer, final int count)
{
if (count < 0 || count > 65534)
{
throw new
IllegalArgumentException("count outside allowed range: count="
+ count);
}
if (buffer != this.buffer)
{
this.buffer = buffer;
}
index = 0;
this.count = count;
final int limit = parentMessage.limit();
initialLimit = limit;
parentMessage.limit(limit + HEADER_SIZE);
buffer.putShort(limit + 0, (short)8, BYTE_ORDER); // block length (1)
buffer.putShort(limit + 2, (short)count, BYTE_ORDER); // numInGroup (2)
}
- The block length is set to 8 bytes for the repeating group. This is the sum of the length of the fixed length fields in the group.
- The number of items in the repeating group is set.
The wrap method is used to set up the buffer for encoding the repeating group.
The wrap method is called by the parent message encoder to set up the buffer for the repeating group.
The blockLength and numInGroup fields are defined in the groupSizeEncoding composite type in the schema.
Encoding the Repeating Group¶
public Group1Encoder groupField1(final long value)
{
buffer.putLong(offset + 0, value, BYTE_ORDER);
return this;
}
...
public Group1Encoder groupField2(final String value)
{
final byte[] bytes = (null == value || value.isEmpty()) ?
org.agrona.collections.ArrayUtil.EMPTY_BYTE_ARRAY :
value.getBytes(java.nio.charset.StandardCharsets.UTF_8);
final int length = bytes.length;
if (length > 1073741824)
{
throw new IllegalStateException("length > maxValue for type: "
+ length);
}
final int headerLength = 4;
final int limit = parentMessage.limit();
parentMessage.limit(limit + headerLength + length);
buffer.putInt(limit, length, BYTE_ORDER);
buffer.putBytes(limit + headerLength, bytes, 0, length);
return this;
}
...
public Group1Encoder groupField3(final String value)
{
final byte[] bytes = (null == value || value.isEmpty()) ?
org.agrona.collections.ArrayUtil.EMPTY_BYTE_ARRAY :
value.getBytes(java.nio.charset.StandardCharsets.UTF_8);
final int length = bytes.length;
if (length > 1073741824)
{
throw new IllegalStateException("length > maxValue for type: "
+ length);
}
final int headerLength = 4;
final int limit = parentMessage.limit();
parentMessage.limit(limit + headerLength + length);
buffer.putInt(limit, length, BYTE_ORDER);
buffer.putBytes(limit + headerLength, bytes, 0, length);
return this;
}
The groupField1 field is fixed length and uses the offset from which this particular group starts at.
This is consistent with the SBE pattern for encoding fixed length fields.
In a similar way, the groupField2 and groupField3 fields are variable length fields that follow the patterns from the simple variable length messages.
They, too, make use of the parentMessage.limit() to hold state about position, and are also subject to the similar issues when applying out of order writes and reads.
Encoding the Variable Length Field after the repeating group¶
public MessageType3Encoder field2(final String value)
{
final byte[] bytes = (null == value || value.isEmpty()) ?
org.agrona.collections.ArrayUtil.EMPTY_BYTE_ARRAY :
value.getBytes(java.nio.charset.StandardCharsets.UTF_8);
final int length = bytes.length;
if (length > 1073741824)
{
throw new IllegalStateException("length > maxValue for type: " +
length);
}
final int headerLength = 4;
final int limit = parentMessage.limit();
parentMessage.limit(limit + headerLength + length);
buffer.putInt(limit, length, BYTE_ORDER);
buffer.putBytes(limit + headerLength, bytes, 0, length);
return this;
}
The field2 method once again follows the same pattern of using the limit() value for state.
MessageType3 Decoder¶
As can be seen from the encoder, the decoder for MessageType3 is in line with the other decoders we have already seen, except for the repeating group.