STOMP is the Simple (or Streaming) Text Orientated Messaging Protocol. STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers. Get more details about STOMP on https://stomp.github.io/index.html.
Vertx-Stomp is an implementation of a STOMP server and client. You can use the STOMP server with other clients and use the STOMP client with other servers. The server and the client supports the version 1.0, 1.1 and 1.2 of the STOMP protocol (see https://stomp.github.io/stomp-specification-1.2.html). The STOMP server can also be used as a bridge with the vert.x event bus, or directly with web sockets (using StompJS).
To use the Vert.x Stomp server and client, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-stomp</artifactId>
<version>3.6.2</version>
</dependency>
Gradle (in your build.gradle
file):
compile 'io.vertx:vertx-stomp:3.6.2'
The simplest way to create an STOMP server, using all default options is as follows:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).listen()
This creates a STOMP server listening on localhost:61613
that is compliant with the STOMP specification.
You can configure the port and host in the listen
method:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).listen(1234, "0.0.0.0")
If you pass -1
as port, the TCP server would not be started. This is useful when using the websocket
bridge. To be notified when the server is ready, use a handler as follows:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).listen({ ar ->
if (ar.failed()) {
println("Failing to start the STOMP server : ${ar.cause().getMessage()}")
} else {
println("Ready to receive STOMP frames")
}
})
The handler receive a reference on the StompServer
.
You can also configure the host and port in StompServerOptions
:
var server = StompServer.create(vertx, StompServerOptions(
port = 1234,
host = "0.0.0.0")).handler(StompServerHandler.create(vertx)).listen()
STOMP servers are closed as follows:
server.close({ ar ->
if (ar.succeeded()) {
println("The STOMP server has been closed")
} else {
println("The STOMP server failed to close : ${ar.cause().getMessage()}")
}
})
The StompServerOptions
let you configure some aspects of the STOMP server.
First, the STOMP server is based on a
NetServer
, so you can configure the underlying NetServer
from
the StompServerOptions
. Alternatively you can also pass the
NetServer
you want to use:
var server = StompServer.create(vertx, netServer).handler(StompServerHandler.create(vertx)).listen()
The StompServerOptions
let you configure:
the host and port of the STOMP server - defaults to 0.0.0.0:61613
.
whether or not the STOMP server is secured - defaults to false
the max STOMP frame body - default to 10 Mb
the maximum number of headers accepted in a STOMP frame - defaults to 1000
the max length of a header line in a STOMP frame - defaults to 10240
the STOMP heartbeat time - default to 1000, 1000
the supported STOMP protocol versions (1.0, 1.1 and 1.2 by default)
the maximum number of frame allowed in a transaction (defaults to 1000)
the size of the transaction chunk - defaults to 1000 (see
setTransactionChunkSize
)
the maximum number of subscriptions a client can handle - defaults to 1000
The STOMP heartbeat is configured using a JSON object as follows:
var server = StompServer.create(vertx, StompServerOptions(
heartbeat = json {
obj(
"x" to 1000,
"y" to 1000
)
})).handler(StompServerHandler.create(vertx)).listen()
Enabling security requires an additional AuthProvider
handling the
authentication requests:
var server = StompServer.create(vertx, StompServerOptions(
secured = true)).handler(StompServerHandler.create(vertx).authProvider(provider)).listen()
More information about AuthProvider
is available
here.
If a frame exceeds one of the size limits, the frame is rejected and the client receives an ERROR
frame. As the
specification requires, the client connection is closed immediately after having sent the error. The same behavior
happens with the other thresholds.
The default STOMP server handles subscription destination as opaque Strings. So it does not promote a structure and it not hierarchic. By default the STOMP server follow a topic semantic (so messages are dispatched to all subscribers).
By default, the STOMP server manages destinations as topics. So messages are dispatched to all subscribers. You can configure the server to use queues, or mix both types:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).destinationFactory({ v, name ->
if (name.startsWith("/queue")) {
return Destination.queue(vertx, name)
} else {
return Destination.topic(vertx, name)
}
})).listen()
In the last example, all destination starting with /queue
are queues while others are topics. The destination is
created when the first subscription on this destination is received.
A server can decide to reject the destination creation by returning null
:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).destinationFactory({ v, name ->
if (name.startsWith("/forbidden")) {
return null
} else if (name.startsWith("/queue")) {
return Destination.queue(vertx, name)
} else {
return Destination.topic(vertx, name)
}
})).listen()
In this case, the subscriber received an ERROR
frame.
Queues dispatches messages using a round-robin strategies.
On purpose the STOMP server does not implement any advanced feature. IF you need more advanced dispatching policy,
you can implement your own type of destination by providing a DestinationFactory
returning your own Destination
object.
By default, the STOMP server does nothing when a message is not acknowledged. You can customize this by
providing your own Destination
implementation.
The custom destination should call the
onAck
and
onNack
method in order to let the StompServerHandler
customizes the behavior:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).onAckHandler({ acknowledgement ->
// Action to execute when the frames (one in `client-individual` mode, several
// in `client` mode are acknowledged.
}).onNackHandler({ acknowledgement ->
// Action to execute when the frames (1 in `client-individual` mode, several in
// `client` mode are not acknowledged.
})).listen()
In addition to the handlers seen above, you can configure almost all aspects of the STOMP server, such as the
actions made when specific frames are received, the ping
to sent to the client (to implement the heartbeat).
Here are some examples:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).closeHandler({ connection ->
// client connection closed
}).beginHandler({ frame ->
// transaction starts
}).commitHandler({ frame ->
// transaction committed
})).listen()
Be aware that changing the default behavior may break the compliance with the STOMP specification. So, please look at the default implementations.
STOMP clients connect to STOMP server and can send and receive frames.
You create a StompClient
instance with default options as follows:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
The previous snippet creates a STOMP client connecting to "0.0.0.0:61613". Once connected, you get a
StompClientConnection
that let you interact with the server. You can
configure the host and port as follows:
var client = StompClient.create(vertx).connect(61613, "0.0.0.0", { ar ->
if (ar.succeeded()) {
var connection = ar.result()
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
To catch connection errors due to authentication issues, or whatever error frames sent by the server during the connection negotiation, you can register a error handler on the Stomp Client. All connections created with the client inherit of the error handler (but can have their own):
var client = StompClient.create(vertx).errorFrameHandler({ frame ->
// Received the ERROR frame
}).connect(61613, "0.0.0.0", { ar ->
if (ar.succeeded()) {
var connection = ar.result()
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
You can also configure the host and port in the StompClientOptions
:
var client = StompClient.create(vertx, StompClientOptions(
host = "localhost",
port = 1234)).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
You can close a STOMP client:
var client = StompClient.create(vertx, StompClientOptions(
host = "localhost",
port = 1234)).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
client.close()
However, this way would not notify the server of the disconnection. To cleanly close the connection, you should
use the disconnect
method:
var client = StompClient.create(vertx, StompClientOptions(
host = "localhost",
port = 1234)).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.disconnect()
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
If the heartbeat is enabled and if the client did not detect server activity after the configured timeout, the connection is automatically closed.
On the StompClientConnection
, you can register an error handler receiving ERROR
frames sent by the server. Notice that the server closes the connection with the client after having sent such frame:
var client = StompClient.create(vertx, StompClientOptions(
host = "localhost",
port = 1234)).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.errorHandler({ frame ->
println("ERROR frame received : ${frame}")
})
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
The client can also be notified when a connection drop has been detected. Connection failures are detected using the
STOMP heartbeat mechanism. When the server has not sent a message in the heartbeat time window, the connection is
closed and the connectionDroppedHandler
is called (if set). To configure a connectionDroppedHandler
, call
connectionDroppedHandler
. The handler can
for instance tries to reconnect to the server:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.connectionDroppedHandler({ con ->
// The connection has been lost
// You can reconnect or switch to another server.
})
connection.send("/queue", Buffer.buffer("Hello"), { frame ->
println("Message processed by the server")
})
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
You can configure various aspect by passing a
StompClientOptions
when creating the StompClient
. As the
STOMP client relies on a NetClient
, you can configure the underlying Net Client from
the StompClientOptions
. Alternatively, you can pass the NetClient
you want to use in the
connect
method:
var client = StompClient.create(vertx).connect(netClient, { ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.errorHandler({ frame ->
println("ERROR frame received : ${frame}")
})
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
The StompClientOptions
let you configure:
the host and port ot the STOMP server
the login and passcode to connect to the server
whether or not the content-length
header should be added to the frame if not set explicitly. (enabled by default)
whether or not the STOMP
command should be used instead of the CONNECT
command (disabled by default)
whether or not the host
header should be ignored in the CONNECT
frame (disabled by default)
the heartbeat configuration (1000, 1000 by default)
To subscribe to a destination, use:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.subscribe("/queue", { frame ->
println("Just received a frame from /queue : ${frame}")
})
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
To unsubscribe, use:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.subscribe("/queue", { frame ->
println("Just received a frame from /queue : ${frame}")
})
// ....
connection.unsubscribe("/queue")
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
To send a message, use:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
var headers = mutableMapOf<String, Any?>()
headers["header1"] = "value1"
connection.send("/queue", headers, Buffer.buffer("Hello"))
// No headers:
connection.send("/queue", Buffer.buffer("World"))
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
Clients can send ACK
and NACK
frames:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.subscribe("/queue", { frame ->
connection.ack(frame.ack)
// OR
connection.nack(frame.ack)
})
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
Clients can also create transactions. ACK
, NACK
and SEND
frames sent in the transaction will be delivery
only when the transaction is committed.
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
var headers = mutableMapOf<String, Any?>()
headers["transaction"] = "my-transaction"
connection.beginTX("my-transaction")
connection.send("/queue", headers, Buffer.buffer("Hello"))
connection.send("/queue", headers, Buffer.buffer("World"))
connection.send("/queue", headers, Buffer.buffer("!!!"))
connection.commit("my-transaction")
// OR
connection.abort("my-transaction")
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
Each sent commands can have a receipt handler, notified when the server has processed the message:
var client = StompClient.create(vertx).connect({ ar ->
if (ar.succeeded()) {
var connection = ar.result()
connection.send("/queue", Buffer.buffer("Hello"), { frame ->
println("Message processed by the server")
})
} else {
println("Failed to connect to the STOMP server: ${ar.cause().toString()}")
}
})
The STOMP server can be used as a bridge to the vert.x Event Bus. The bridge is bi-directional meaning the STOMP frames are translated to Event Bus messages and Event Bus messages are translated to STOMP frames.
To enable the bridge you need to configure the inbound and outbound addresses. Inbound addresses are STOMP destination that are transferred to the event bus. The STOMP destination is used as the event bus address. Outbound addresses are event bus addresses that are transferred to STOMP.
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).bridge(BridgeOptions(
inboundPermitteds = listOf(PermittedOptions(
address = "/toBus")),
outboundPermitteds = listOf(PermittedOptions(
address = "/toStomp"))))).listen()
By default, the bridge use a publish/subscribe delivery (topic). You can configure it to use a point to point delivery where only one STOMP client or Event Bus consumer is invoked:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).bridge(BridgeOptions(
inboundPermitteds = listOf(PermittedOptions(
address = "/toBus")),
outboundPermitteds = listOf(PermittedOptions(
address = "/toStomp")),
pointToPoint = true))).listen()
The permitted options can also be expressed as a "regex" or with a match. A match is a structure that the message payload must meet. For instance, in the next examples, the payload must contains the field "foo" set to "bar". Structure match only supports JSON object.
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).bridge(BridgeOptions(
inboundPermitteds = listOf(PermittedOptions(
address = "/toBus",
match = json {
obj("foo" to "bar")
})),
outboundPermitteds = listOf(PermittedOptions(
address = "/toStomp")),
pointToPoint = true))).listen()
If you want to connect a JavaScript client (node.js or a browser) directly with the STOMP server, you can use a web socket. The STOMP protocol has been adapted to work over web sockets in StompJS. The JavaScript connects directly to the STOMP server and send STOMP frames on the web socket. It also receives the STOMP frame directly on the web socket.
To configure the server to use StompJS, you need to:
Enable the web socket bridge and configure the path of the listening web socket (/stomp
by default).
Import StompJS in your application (as a script on an HTML page, or as an npm module (https://www.npmjs.com/package/stompjs).
Connect to the server
To achieve the first step, you would need a HTTP server, and pass the
webSocketHandler
result to
websocketHandler
:
var server = StompServer.create(vertx, StompServerOptions(
port = -1,
websocketBridge = true,
websocketPath = "/stomp")).handler(StompServerHandler.create(vertx))
var http = vertx.createHttpServer(HttpServerOptions(
websocketSubProtocols = "v10.stomp, v11.stomp")).websocketHandler(server.webSocketHandler()).listen(8080)
Don’t forget to declare the supported sub-protocols. Without this, the connection will be rejected.
Then follow the instructions from the StompJS documentation to connect to the server. Here is a simple example:
var url = "ws://localhost:8080/stomp";
var client = Stomp.client(url);
var callback = function(frame) {
console.log(frame);
};
client.connect({}, function() {
var subscription = client.subscribe("foo", callback);
});
STOMP clients, client’s connections and server handlers support registering a received
Frame
handler that would be notified every time a frame is received from the wire. It lets
you log the frames, or implement custom behavior. The handler is already called for PING
frames, and illegal / unknown frames:
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx).receivedFrameHandler({ sf ->
println(sf.frame())
})).listen()
var client = StompClient.create(vertx).receivedFrameHandler({ frame ->
println(frame)
})
The handler is called before the frame is processed, so you can also modify the frame.
Frames not using a valid STOMP command use the UNKNOWN
command. The original command is written
in the headers using the Frame.STOMP_FRAME_COMMAND
key.
You can also register a handler to be notified when a frame is going to be sent (written to the wire):
var server = StompServer.create(vertx).handler(StompServerHandler.create(vertx)).writingFrameHandler({ sf ->
println(sf.frame())
}).listen()
var client = StompClient.create(vertx).writingFrameHandler({ frame ->
println(frame)
})