Skip to content

Consume Events⚓︎

For processing of asynchronous events, the INVERS OneAPI supports push functionality via AMQP (Advanced Message Queue Protocol).​ We allow the consumption of events using AMQP version 0.9.1.

Configuration⚓︎

If you would like to subscribe to event topics, you need some configuration values, including:​

  • hostname of the AMQP broker (api.invers.com)
  • connection port (5671)
  • virtual host (used to isolate ‘environments’, ensuring that your events are only visible to you)
  • username
  • password
  • queue name(s)​

The first two values (host and port) are always the same and can be found in the API reference. The other ones are specific to your subscription, and INVERS provides you with the required data to consume your event topics. Once you have received all configuraton values, you can start implementing your consuming application.

Multiple consumers

In many scenarios, it is sufficient to have one consuming application and to use that application to receive all events you need for your workflows. However, there might also be cases where you want to have more than one consuming application. For example, you might have one application that handles your vehicle’s masterdata, and another application that processes vehicle state updates, e.g. to produce custom alerts. Or your might want to provide access to specific event topics to another company that does analytics for you.

In order to achieve this, you can create a separate event consumer for each of your applications or use cases, and only subscribe them to the topic(s) the particular application needs to consume. You can subscribe to any subset of the available topics, and thereby limiting the access of each of your applications to just the necessary event topics. You can have a maximum of up to 5 consumers.

Code example⚓︎

Here is a code sample in C# that shows how to connect to our AMQP-Broker and receive events using the configuration parameters above.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
string user = "f-FLT23-c-CNSM23A"; // (1)!
string pass = "❰consumer password❱"; // (2)!
string hostName = "api.invers.com";
int port = 5671;
string vhost = "fleet-FLT23";  // (3)!
string queueName = "VehicleState_v1.CNSM23A";  // (4)!

ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri($"amqps://{user}:{pass}@{hostName}:{port}/{vhost}");

IConnection conn = factory.CreateConnection();
IModel channel = conn.CreateModel();

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (ch, ea) =>
  {
    var body = ea.Body;
    try
    {
      // (5)
    }
    catch (Exception e)
    {
      e.Dump();
    }
    channel.BasicAck(ea.DeliveryTag, false);
  };

String consumerTag = channel.BasicConsume(queueName, false, consumer); // (6)
  1. Make sure to fill in the value for your consumer’s user_name.

  2. Make sure to fill in the value for your consumer’s password.

  3. Make sure to fill in the value for your consumer’s virtual_host.

  4. Make sure to fill in the value for your consumer’s queue_name.

  5. Here, you can parse the event’s body with JsonDocument.Parse(body) or JsonSerializer.Deserialize<…>(body).

    Use the header information first if there are multiple event types within a topic.

  6. Starts consumption of the events.

    consumerTag identifies the subscription when it has to be cancelled.

Consume⚓︎

​​Once you are connected to our AMQP Broker, your connection will remain alive. In default operation mode, new messsages (events) pushed to the queue are received by your consuming application automatically.​

  • If a topic queue is unused (no messages are consumed by your application) for at least three consecutive days, the corresponding consumer will automatically be unsubscribed from that topic queue. As a result, the topic queue and all messages (events) therein are deleted.
  • Upon consumption, your system has to acknowledge received events. Acknowledged events are removed from the queue. Unacknowledged consumed events will be returned to the queue in the same order.​
  • TLS is enforced for security reasons.
  • Permissions are set to basic.get and basic.consume only, so you cannot create new queues on the existing exchanges.​
  • It is recommended to set prefetch-count to 50. A value higher than 100 might lead to warnings because of a hight amount of unacknowledged messages.
  • When using multiple threads when consuming the messages, it is highly recommended to use a separate channel for each thread. Sharing a single connection for all threads is no problem.

Troubleshooting

Access Refused⚓︎

If you get an 403 ACCESS_REFUSED error message when connecting:

  • Verify that your connection configuration values (username, password, queue name, …) are correct
  • Verify that you are not attempting to create the queue:
    • Don’t use assertQueue(). This doesn’t work because you don’t have the queue.declare permission.
    • If you want to check whether a queue exists, use checkQueue() instead (or simply consume()).

Parsing events⚓︎

When consuming events, you receive the event content and additional headers via AMQP messages.

Example

This example shows the message properties/headers and payload for an event published to the VehicleChange topic.

Message Properties

"Type":           "VehicleCreated"
"Content type":   "application/json;charset=utf-8"
"Message ID":     "01F1FHJ7SHQK5MGF69B8K2346E"
"Timestamp":      "2020-01-30T13:22:05Z"
"Correlation ID": "804f2161-a15c-420f-8f21-61a15c320f68"
Header["schema_major_version"]: 2

Message Payload

{
  "metadata": {
    "id": "01F1FHJ7SHQK5MGF69B8K2346E",
    "type": "VehicleCreated",
    "schema_major_version": 2,
    "occurred_at": "2020-01-30T13:22:05Z",
    "flow_id": "804f2161-a15c-420f-8f21-61a15c320f68"
  },
  "data": {
      // event's data content
  }
}

When you receive events from a topic, you often want to parse the event payload and, depending on your software stack, convert the event to a data representation like a class. Because event topics can publish events of different event types, it is recommended to use the message properties (part of the AMQP protocol) to determine the event type (see Type in the example above).

For example, the event topic VehicleChange publishes events with event types VehicleCreated, VehicleChanged, VehicleArchived, and VehicleDeleted. The Type message property allows you to determine which type of event you received, so you can parse it accordingly.

To find out more about the format of events, go to event format.