WebSocket Broadcasting with hyperlane
The hyperlane
framework natively supports the WebSocket protocol. Developers can handle WebSocket requests through a unified interface without dealing with protocol upgrades manually. This article demonstrates how to implement both point-to-point and broadcast messaging on the server side using hyperlane
, along with a simple WebSocket client example.
Framework Feature Highlights
The
hyperlane
framework supports the WebSocket protocol with automatic server-side protocol upgrading. It also offers features such as request middleware, routing, and response middleware.Note: WebSocket responses must be sent using the
send_response_body
method. Usingsend_response
will cause client-side parsing to fail, as it does not format the response according to the WebSocket protocol.
Server: Point-to-Point Example
In this example, the server simply echoes back the data received from the client using WebSocket.
pub async fn handle(ctx: Context) {
let request_body: Vec<u8> = ctx.get_request_body().await;
let _ = ctx.send_response_body(request_body).await;
}
Server: Broadcast Example
In broadcast mode, multiple client connections share a single message channel. Messages sent by any client will be broadcasted to all connected clients.
Important Notes
- Broadcasting is implemented using
hyperlane-broadcast
.- Use
tokio::select
to simultaneously listen for incoming client messages and new data on the broadcast channel.- If
enable_inner_websocket_handle
is not enabled, clients must send at least one message (even an empty one) after connecting to start receiving broadcasts.- When enabled, the connection is considered ready to receive broadcasts as soon as it’s established.
Example Code
static BROADCAST_CHANNEL: OnceLock<Broadcast<ResponseBody>> = OnceLock::new();
fn ensure_broadcast_channel() -> Broadcast<ResponseBody> {
BROADCAST_CHANNEL
.get_or_init(|| Broadcast::default())
.clone()
}
pub async fn handle(ctx: Context) {
if ctx.get_stream().await.is_none() {
ctx.aborted().await;
return;
}
let broadcast: Broadcast<ResponseBody> = ensure_broadcast_channel();
let mut receiver: BroadcastReceiver<Vec<u8>> = broadcast.subscribe();
loop {
tokio::select! {
request_res = ctx.websocket_request_from_stream(10000) => {
if request_res.is_err() {
break;
}
let request = request_res.unwrap_or_default();
let body: RequestBody = request.get_body().clone();
if broadcast.send(body).is_err() {
break;
}
},
msg_res = receiver.recv() => {
if let Ok(msg) = msg_res {
if ctx.send_response_body(msg).await.is_err() || ctx.flush().await.is_err() {
break;
}
}
}
}
}
}
Client Example (JavaScript)
The following is a browser-based WebSocket client. It sends the current time to the server every second and logs broadcast messages received from the server.
const ws = new WebSocket('ws://localhost:60000/websocket');
ws.onopen = () => {
console.log('WebSocket opened');
setInterval(() => {
ws.send(`Now time: ${new Date().toISOString()}`);
}, 1000);
};
ws.onmessage = (event) => {
console.log('Receive: ', event.data);
};
ws.onerror = (error) => {
console.error('WebSocket error: ', error);
};
ws.onclose = () => {
console.log('WebSocket closed');
};
Conclusion
With hyperlane
, building real-time WebSocket-based services becomes straightforward. You don’t need to manage the handshake or protocol intricacies manually. The unified send_response_body
interface allows handling WebSocket messages in the same way as HTTP responses, greatly simplifying the development process.