Consume Events⚓︎
For processing of asynchronous events, the INVERS OneAPI supports push functionality via AMQP (Advanced Message Queue Protocol). We allow the consumption of events using AMQP version 0.9.1.
Configuration⚓︎
If you would like to subscribe to event topics, you need some configuration values, including:
- hostname of the AMQP broker (
api.invers.com
) - connection port (
5671
) - virtual host (used to isolate ‘environments’, ensuring that your events are only visible to you)
- username
- password
- queue name(s)
The first two values (host and port) are always the same. The other ones are specific to your subscription. You can find them in the consumer’s configuration within FleetControl under Administration > Event Consumers (RabbitMQ). For a detailed tutorial on how to use Event Consumers, refer to our Help Center. Once you have received all configuration values, you can start implementing your consuming application. For further information, visit our API reference.
Multiple consumers
In many scenarios, it is sufficient to have one consuming application and to use that application to receive all events you need for your workflows. However, there might also be cases where you want to have more than one consuming application. For example, you might have one application that handles your vehicle’s masterdata, and another application that processes vehicle state updates, e.g. to produce custom alerts. Or you might want to provide access to specific event topics to another company that does analytics for you.
In order to achieve this, you can create a separate event consumer for each of your applications or use cases, and only subscribe them to the topic(s) the particular application needs to consume. You can subscribe to any subset of the available topics, and thereby limiting the access of each of your applications to just the necessary event topics. You can have a maximum of up to 5 consumers.
Code example⚓︎
Here is a code sample in C# that shows how to connect to our AMQP-Broker and receive events using the configuration parameters above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
-
Make sure to fill in the value for your consumer’s
user_name
. -
Make sure to fill in the value for your consumer’s
password
. -
Make sure to fill in the value for your consumer’s
virtual_host
. -
Make sure to fill in the value for your consumer’s
queue_name
. -
Here, you can parse the event’s body with
JsonDocument.Parse(body)
orJsonSerializer.Deserialize<…>(body)
.Use the header information first if there are multiple event types within a topic.
-
Starts consumption of the events.
consumerTag
identifies the subscription when it has to be cancelled.
Consume⚓︎
Once you are connected to our AMQP Broker, your connection will remain alive. In default operation mode, new events pushed to the queue are received by your consuming application automatically.
- If a topic queue is unused (no events are consumed by your application) for at least three consecutive days, the corresponding consumer will automatically be unsubscribed from that topic queue. As a result, the topic queue and all events therein are deleted.
- Upon consumption, your system has to acknowledge received events. Acknowledged events are removed from the queue. Unacknowledged consumed events will be returned to the queue in the same order.
- TLS is enforced for security reasons.
- Permissions are set to
basic.get
andbasic.consume
only, so you cannot create new queues on the existing exchanges. - It is recommended to set prefetch-count to 50. A value higher than 100 might lead to warnings because of a hight amount of unacknowledged events.
- When using multiple threads when consuming the events, it is highly recommended to use a separate channel for each thread. Sharing a single connection for all threads is no problem.
Important
Vehicle events are delivered with “at-least-once” semantics. That means sometimes the same event can be delivered multiple times and your system should be able to handle such cases.
Troubleshooting
Access Refused⚓︎
If you get an 403 ACCESS_REFUSED
error message when connecting:
- Verify that your connection configuration values (username, password, queue name, …) are correct
- Verify that you are not attempting to create the queue:
- Don’t use
assertQueue()
. This doesn’t work because you don’t have thequeue.declare
permission. - If you want to check whether a queue exists, use
checkQueue()
instead (or simplyconsume()
).
- Don’t use
Parsing events⚓︎
When consuming events, you receive the event content and additional headers via AMQP messages.
Example
This example shows the message properties/headers and payload for an event published to the VehicleChange
topic.
Message Properties
"Type": "VehicleCreated"
"Content type": "application/json;charset=utf-8"
"Message ID": "01F1FHJ7SHQK5MGF69B8K2346E"
"Timestamp": "2020-01-30T13:22:05Z"
"Correlation ID": "804f2161-a15c-420f-8f21-61a15c320f68"
Header["schema_major_version"]: 2
Message Payload
{
"metadata": {
"id": "01F1FHJ7SHQK5MGF69B8K2346E",
"type": "VehicleCreated",
"schema_major_version": 2,
"occurred_at": "2020-01-30T13:22:05Z",
"flow_id": "804f2161-a15c-420f-8f21-61a15c320f68"
},
"data": {
// event's data content
}
}
When you receive events from a topic, you often want to parse the event payload and, depending on your software stack, convert the event to a data representation like a class. Because event topics can publish events of different event types, it is recommended to use the message properties (part of the AMQP protocol) to determine the event type (see Type
in the example above).
For example, the event topic VehicleChange
publishes events with event types VehicleCreated
, VehicleChanged
, VehicleArchived
, and VehicleDeleted
. The Type
message property allows you to determine which type of event you received, so you can parse it accordingly.
To find out more about the format of events, go to event format.