Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

example of websocket proxy #269

Open
GopherJ opened this issue Mar 10, 2020 · 6 comments
Open

example of websocket proxy #269

GopherJ opened this issue Mar 10, 2020 · 6 comments
Labels
new-example Request for a new example

Comments

@GopherJ
Copy link

GopherJ commented Mar 10, 2020

No description provided.

@antonok-edm
Copy link

For what it's worth - this example seems to work great as a WebSocket proxy, as long as the timeout is increased (otherwise the server will bail with status 500 after just 5 seconds):

     let forwarded_req = client
         .request_from(new_url.as_str(), req.head())
+        .timeout(std::time::Duration::from_secs(60))
         .no_decompress();

@robjtede robjtede added the new-example Request for a new example label Mar 6, 2022
@mbround18
Copy link

@GopherJ and @antonok-edm I tried the timeout suggestion but it doesnt seem to proxy the websockets. Did yall make some changes outside the example to allow websocket connections?

@antonok-edm
Copy link

@mbround18 I don't think it works anymore, unfortunately. If I recall correctly, it worked with actix-web 1.0 and 2.0 but not after upgrading to 3.0.

It'd be great to have an official sample implementation and/or test cases for a websocket proxy.

@mbround18
Copy link

mbround18 commented Jun 6, 2022

@antonok-edm got some great feedback on discord!

@fakeshadow#3395 on discord is awesome and recommended (not a working example but a great route to go down)

use actix_web::{
    get,
    web::{BytesMut, Payload},
    Error, HttpRequest, HttpResponse,
};

use futures::{channel::mpsc::unbounded, sink::SinkExt, stream::StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[get("/")]
async fn index(req: HttpRequest, mut payload: Payload) -> HttpResponse {
    // collect proxy info from request.

    // forward request and get response
    let (res, socket) = awc::Client::new()
        .ws("ws://foo.bar")
        .connect()
        .await
        .unwrap();

    // check if response is switching protocol and continue.
    assert_eq!(res.status().as_u16(), 101);

    // take the websocket io only so we can transfer raw binary data between source and dest.
    let mut io = socket.into_parts().io;

    // a channel for push response body to stream.
    let (mut tx, rx) = unbounded();

    // a buffer to read from dest and proxy it back to source.
    let mut buf = BytesMut::new();

    // spawn a task read payload stream and forward to websocket connection.
    actix_web::rt::spawn(async move {
        loop {
            tokio::select! {
                // body from source.
                res = payload.next() => {
                    match res {
                        None => return,
                        Some(body) => {
                            let body = body.unwrap();
                            io.write_all(&body).await.unwrap();
                        }
                    }
                }

                // body from dest.
                res = io.read_buf(&mut buf) => {
                    let size = res.unwrap();
                    let bytes = buf.split_to(size).freeze();
                    tx.send(Ok::<_, Error>(bytes)).await.unwrap();
                }
            }
        }
    });

    // return response.
    HttpResponse::SwitchingProtocols().streaming(rx)
}

They also mentioned combining awc and actix to accept the incoming websocket then offload it to the proxied websocket and passing that connection context back to the client.

@jwalton
Copy link

jwalton commented Apr 27, 2023

Thanks @mbround18! This solution got me going in the right direction.

One disadvantage to @mbround18's solution above is that when you call client.ws().connect(), the client will always set the Sec-WebSocket-Version header to 13 in the request to the backend. There doesn't seem to be a way to prevent this, and it seems like awc has made various bits private that you'd need to reimplement this. If the client is really connection with version 13 (which, unless a new version of websocket protocol gets released or someone is using a REALLY old browser, it will be), then everything will work just fine, but if the client is using a different version, then we'll send the wrong websocket version to the backend, and it may or may not have problems reading the bytes we're forwarding.

This is my solution, replacing awc with reqwest. This forwards all of the headers from the client through to the backend, and vice versa, so the proxy doesn't care what version of websocket is being used so long as the backend understands it:

pub async fn proxy_ws_request(
    client_req: &HttpRequest,
    client_stream: Payload,
    mut target_url: url::Url,
) -> Result<HttpResponse, Box<dyn std::error::Error>> {
    // Force "http" or else the reqwest client will complain.
    target_url.set_scheme("http").unwrap();

    // Forward the request.
    let mut req = reqwest::ClientBuilder::new().build().unwrap().get(target_url);
    for (key, value) in client_req.headers() {
        req = req.header(key, value);
    }
    let target_response = req.send().await.unwrap();

    // Make sure the server is willing to accept the websocket.
    let status = target_response.status().as_u16();
    if status != 101 {
        return Err(Box::new(std::io::Error::new(
            std::io::ErrorKind::ConnectionRefused,
            "Target did not reply with 101 upgrade",
        )));
    }

    // Copy headers from the target back to the client.
    let mut client_response = HttpResponse::SwitchingProtocols();
    client_response.upgrade("websocket");
    for (header, value) in target_response.headers() {
        client_response.insert_header((header.to_owned(), value.to_owned()));
    }

    let target_upgrade = target_response.upgrade().await?;
    let (target_rx, mut target_tx) = tokio::io::split(target_upgrade);

    // Copy byte stream from the client to the target.
    rt::spawn(async move {
        let mut client_stream = client_stream.map(|result| {
            result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
        });
        let mut client_read = tokio_util::io::StreamReader::new(&mut client_stream);
        let result = tokio::io::copy(&mut client_read, &mut target_tx).await;
        if let Err(err) = result {
            println!("Error proxying websocket client bytes to target: {err}")
        }
    });

    // Copy byte stream from the target back to the client.
    let target_stream = tokio_util::io::ReaderStream::new(target_rx);
    Ok(client_response.streaming(target_stream))
}

@stillinbeta
Copy link

I've written a small crate, though it uses awc so @jwalton's remarks about websocket version hold.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new-example Request for a new example
Projects
None yet
Development

No branches or pull requests

6 participants