📦WebSocket Broadcasting with hyperlane

WebSocket Broadcasting with hyperlane

The hyperlane framework natively supports the WebSocket protocol. Developers can handle WebSocket requests through a unified interface without dealing with protocol upgrades manually. This article demonstrates how to implement both point-to-point and broadcast messaging on the server side using hyperlane, along with a simple WebSocket client example.

Framework Feature Highlights

The hyperlane framework supports the WebSocket protocol with automatic server-side protocol upgrading. It also offers features such as request middleware, routing, and response middleware.

Note: WebSocket responses must be sent using the send_response_body method. Using send_response will cause client-side parsing to fail, as it does not format the response according to the WebSocket protocol.

Server: Point-to-Point Example

In this example, the server simply echoes back the data received from the client using WebSocket.

pub async fn handle(ctx: Context) {
    let request_body: Vec<u8> = ctx.get_request_body().await;
    let _ = ctx.send_response_body(request_body).await;
}

Server: Broadcast Example

In broadcast mode, multiple client connections share a single message channel. Messages sent by any client will be broadcasted to all connected clients.

Important Notes

  • Broadcasting is implemented using hyperlane-broadcast.
  • Use tokio::select to simultaneously listen for incoming client messages and new data on the broadcast channel.
  • If enable_inner_websocket_handle is not enabled, clients must send at least one message (even an empty one) after connecting to start receiving broadcasts.
  • When enabled, the connection is considered ready to receive broadcasts as soon as it’s established.

Example Code

static BROADCAST_CHANNEL: OnceLock<Broadcast<ResponseBody>> = OnceLock::new();

fn ensure_broadcast_channel() -> Broadcast<ResponseBody> {
    BROADCAST_CHANNEL
        .get_or_init(|| Broadcast::default())
        .clone()
}

pub async fn handle(ctx: Context) {
    if ctx.get_stream().await.is_none() {
        ctx.aborted().await;
        return;
    }
    let broadcast: Broadcast<ResponseBody> = ensure_broadcast_channel();
    let mut receiver: BroadcastReceiver<Vec<u8>> = broadcast.subscribe();
    loop {
        tokio::select! {
            request_res = ctx.websocket_request_from_stream(10000) => {
                if request_res.is_err() {
                    break;
                }
                let request = request_res.unwrap_or_default();
                let body: RequestBody = request.get_body().clone();
                if broadcast.send(body).is_err() {
                    break;
                }
            },
            msg_res = receiver.recv() => {
                if let Ok(msg) = msg_res {
                    if ctx.send_response_body(msg).await.is_err() || ctx.flush().await.is_err() {
                        break;
                    }
                }
           }
        }
    }
}

Client Example (JavaScript)

The following is a browser-based WebSocket client. It sends the current time to the server every second and logs broadcast messages received from the server.

const ws = new WebSocket('ws://localhost:60000/websocket');

ws.onopen = () => {
  console.log('WebSocket opened');
  setInterval(() => {
    ws.send(`Now time: ${new Date().toISOString()}`);
  }, 1000);
};

ws.onmessage = (event) => {
  console.log('Receive: ', event.data);
};

ws.onerror = (error) => {
  console.error('WebSocket error: ', error);
};

ws.onclose = () => {
  console.log('WebSocket closed');
};

Conclusion

With hyperlane, building real-time WebSocket-based services becomes straightforward. You don’t need to manage the handshake or protocol intricacies manually. The unified send_response_body interface allows handling WebSocket messages in the same way as HTTP responses, greatly simplifying the development process.

Leave a Reply