About
The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.
Articles Related
Example
For instance suppose that we have a verticle for dealing with HTTP requests, and a verticle for managing access to the database. The event bus allows the HTTP verticle to send a request to the database verticle that performs a SQL query, and responds back to the HTTP verticle
Features
Event-Bus:
- The Vert.x event bus is the main tool for different verticles to communicate through asynchronous message passing.
- destinations are free-form strings.
Communication Pattern
The event bus supports the following communication patterns:
- point-to-point messaging, and
- request-response messaging and
- publish / subscribe for broadcasting messages.
The event bus allows verticles to transparently communicate not just within the same JVM process:
- when network clustering is activated, the event bus is distributed so that messages can be sent to verticles running on other application nodes,
- the event-bus can be accessed through a simple TCP protocol for third-party applications to communicate,
- the event-bus can also be exposed over general-purpose messaging bridges (e.g, AMQP, Stomp),
- a SockJS bridge allows web applications to seamlessly communicate over the event bus from JavaScript running in the browser by receiving and publishing messages just like any verticle would do.
Management
Get
The vertx object. gives access to the event bus. See vertx.EventBus()
Publisher/Consumer
Send a request message (publisher)
String queue = 'queueName'; // The consumer Address
JsonObject message = new JsonObject().put("pageId", 1);
DeliveryOptions options = new DeliveryOptions().addHeader("action", "get-ip");
vertx.eventBus().request(queue , message, options, reply -> {
if (reply.succeeded()) {
// The data from the db
JsonObject body = (JsonObject) reply.result().body();
// Use the json object
boolean success = body.getBoolean("success");
..........
} else {
context.fail(reply.cause());
}
});
where:
Receive a message (consumer)
String address = 'queueName';
vertx.eventBus().consumer(address, this::onMessage);
where:
- address is where this consumer can be called
- this::onMessage defines the function to be called when a message is received
where the message handler check the header and takes actions accordingly. For instance:
public void onMessage(Message<JsonObject> message) {
if (!message.headers().contains("action")) {
LOGGER.error("No action header specified for message with headers {} and body {}",
message.headers(), message.body().encodePrettily());
message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
return;
}
String action = message.headers().get("action");
switch (action) {
case "get-ip":
fetchIp(message);
break;
default:
message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
}
}