API Authentication - Rust examples

Below are complete examples for authenticating with the Data Streams API in Rust. Each example shows how to properly generate the required headers and make a request.

To learn more about the Data Streams API authentication, see the Data Streams Authentication page.

Note: The Data Streams SDKs handle authentication automatically. If you're using the Go SDK, Rust SDK, or TypeScript SDK, you don't need to implement the authentication logic manually.

API Authentication Example

Requirements

  • Rust (v1.70 or later recommended)
  • API credentials from Chainlink Data Streams

Running the Example

  1. Create a Cargo.toml file:

    [package]
    name = "chainlink-streams-direct-auth"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    reqwest = { version = "0.11", features = ["json", "blocking"] }
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    tokio = { version = "1.32", features = ["full"] }
    hmac = "0.12"
    sha2 = "0.10"
    hex = "0.4"
    chrono = "0.4"
    
  2. Create a src/main.rs file:

    use hmac::{Hmac, Mac};
    use reqwest::header::{HeaderMap, HeaderValue};
    use serde::{Deserialize, Serialize};
    use sha2::{Digest, Sha256};
    use std::env;
    use std::error::Error;
    use std::time::{SystemTime, UNIX_EPOCH};
    
    type HmacSha256 = Hmac<Sha256>;
    
    // SingleReport represents a data feed report structure
    #[derive(Debug, Deserialize, Serialize)]
    struct SingleReport {
        #[serde(rename = "feedID")]
        feed_id: String,
        #[serde(rename = "validFromTimestamp")]
        valid_from_timestamp: u32,
        #[serde(rename = "observationsTimestamp")]
        observations_timestamp: u32,
        #[serde(rename = "fullReport")]
        full_report: String,
    }
    
    // SingleReportResponse is the response structure for a single report
    #[derive(Debug, Deserialize, Serialize)]
    struct SingleReportResponse {
        report: SingleReport,
    }
    
    // Generate HMAC signature for API authentication
    fn generate_hmac(
        method: &str,
        path: &str,
        body: &[u8],
        api_key: &str,
        api_secret: &str
    ) -> Result<(String, u128), Box<dyn Error>> {
        // Generate timestamp (milliseconds since Unix epoch)
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis();
    
        // Generate body hash
        let mut hasher = Sha256::new();
        hasher.update(body);
        let body_hash = hex::encode(hasher.finalize());
    
        // Create string to sign
        let string_to_sign = format!("{} {} {} {} {}", method, path, body_hash, api_key, timestamp);
    
        // Generate HMAC-SHA256 signature
        let mut mac = HmacSha256::new_from_slice(api_secret.as_bytes())?;
        mac.update(string_to_sign.as_bytes());
        let signature = hex::encode(mac.finalize().into_bytes());
    
        Ok((signature, timestamp))
    }
    
    // Generate authentication headers for API requests
    fn generate_auth_headers(
        method: &str,
        path: &str,
        api_key: &str,
        api_secret: &str
    ) -> Result<HeaderMap, Box<dyn Error>> {
        let (signature, timestamp) = generate_hmac(method, path, &[], api_key, api_secret)?;
    
        let mut headers = HeaderMap::new();
        headers.insert("Authorization", HeaderValue::from_str(api_key)?);
        headers.insert(
            "X-Authorization-Timestamp",
            HeaderValue::from_str(&timestamp.to_string())?
        );
        headers.insert(
            "X-Authorization-Signature-SHA256",
            HeaderValue::from_str(&signature)?
        );
    
        Ok(headers)
    }
    
    // Fetch a single report for a specific feed
    async fn fetch_single_report(feed_id: &str) -> Result<SingleReport, Box<dyn Error>> {
        // Get API credentials from environment variables
        let api_key = env::var("STREAMS_API_KEY")
            .map_err(|_| "API credentials not set. Please set STREAMS_API_KEY environment variable")?;
        let api_secret = env::var("STREAMS_API_SECRET")
            .map_err(|_| "API credentials not set. Please set STREAMS_API_SECRET environment variable")?;
    
        // API connection details
        let method = "GET";
        let host = "api.testnet-dataengine.chain.link";
        let path = "/api/v1/reports/latest";
        let full_path = format!("{}?feedID={}", path, feed_id);
    
        // Create headers with authentication
        let headers = generate_auth_headers(method, &full_path, &api_key, &api_secret)?;
    
        // Create and execute the request
        let url = format!("https://{}{}", host, full_path);
        let client = reqwest::Client::new();
        let response = client
            .get(&url)
            .headers(headers)
            .send()
            .await?;
    
        // Check for non-success status code
        if !response.status().is_success() {
            let status = response.status();
            let error_text = response.text().await?;
            return Err(format!("API error (status code {}): {}", status, error_text).into());
        }
    
        // Parse the response
        let report_resp: SingleReportResponse = response.json().await?;
        Ok(report_resp.report)
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        // Example feed ID (ETH/USD)
        let feed_id = "0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782";
    
        println!("Fetching latest report for feed ID: {}", feed_id);
    
        // Fetch the report
        let report = fetch_single_report(feed_id).await?;
    
        // Display the report
        println!("Successfully retrieved report:");
        println!("  Feed ID: {}", report.feed_id);
        println!("  Valid From: {}", report.valid_from_timestamp);
        println!("  Observations Timestamp: {}", report.observations_timestamp);
    
        // Display the full report with truncation for readability
        let report_preview = if report.full_report.len() > 40 {
            format!("{}...", &report.full_report[..40])
        } else {
            report.full_report.clone()
        };
        println!("  Full Report: {}", report_preview);
    
        Ok(())
    }
    
  3. Set your API credentials as environment variables:

    export STREAMS_API_KEY="your-api-key"
    export STREAMS_API_SECRET="your-api-secret"
    
  4. Run with cargo run

Expected output:

Fetching latest report for feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
Successfully retrieved report:
  Feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
  Valid From: 1747933113
  Observations Timestamp: 1747933113
  Full Report: 0x00090d9e8d96765a0c49e03a6ae05c82e8f8de...

Production Considerations

While this example demonstrates the authentication mechanism, production applications should consider:

  • Connection resilience: Implement retry logic with exponential backoff for network failures
  • Error handling: Use custom error types instead of string errors for better error management
  • Logging: Replace println! with structured logging (e.g., tracing, env_logger)
  • Configuration: Make API endpoints configurable through environment variables
  • Resource management: Implement graceful shutdown for long-running connections
  • Testing: Add unit tests for HMAC generation and integration tests for API calls

For production use, consider using the Rust SDK which handles authentication automatically and provides built-in fault tolerance.

WebSocket Authentication Example

Requirements

  • Rust (v1.70 or later recommended)
  • API credentials from Chainlink Data Streams

Running the Example

  1. Create a Cargo.toml file:

    [package]
    name = "chainlink-streams-direct-auth"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    tokio = { version = "1.32", features = ["full"] }
    tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
    futures-util = "0.3"
    hmac = "0.12"
    sha2 = "0.10"
    hex = "0.4"
    chrono = "0.4"
    url = "2.4"
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    
  2. Create a src/main.rs file:

    use hmac::{ Hmac, Mac };
    use sha2::{ Digest, Sha256 };
    use std::{ env, error::Error, time::{ SystemTime, UNIX_EPOCH } };
    use tokio_tungstenite::{
        connect_async,
        tungstenite::client::IntoClientRequest,
        tungstenite::protocol::Message,
    };
    use futures_util::{ StreamExt, SinkExt };
    use serde::{ Deserialize, Serialize };
    
    type HmacSha256 = Hmac<Sha256>;
    
    // Report structure for deserializing WebSocket messages
    #[derive(Debug, Deserialize, Serialize)]
    struct ReportWrapper {
        report: Report,
    }
    
    #[derive(Debug, Deserialize, Serialize)]
    struct Report {
        #[serde(rename = "feedID")]
        feed_id: String,
    
        #[serde(rename = "fullReport")]
        full_report: String,
    
        #[serde(rename = "validFromTimestamp")]
        valid_from_timestamp: u64,
    
        #[serde(rename = "observationsTimestamp")]
        observations_timestamp: u64,
    }
    
    // Generate HMAC signature for API authentication
    fn generate_hmac(
        method: &str,
        path: &str,
        body: &[u8],
        api_key: &str,
        api_secret: &str
    ) -> Result<(String, u128), Box<dyn Error>> {
        // Generate timestamp (milliseconds since Unix epoch)
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis();
    
        // Generate body hash
        let mut hasher = Sha256::new();
        hasher.update(body);
        let body_hash = hex::encode(hasher.finalize());
    
        // Create string to sign
        let string_to_sign = format!("{} {} {} {} {}", method, path, body_hash, api_key, timestamp);
    
        // Generate HMAC-SHA256 signature
        let mut mac = HmacSha256::new_from_slice(api_secret.as_bytes())?;
        mac.update(string_to_sign.as_bytes());
        let signature = hex::encode(mac.finalize().into_bytes());
    
        Ok((signature, timestamp))
    }
    
    // Set up WebSocket connection with proper authentication
    async fn setup_websocket_connection(
        api_key: &str,
        api_secret: &str,
        feed_ids: &[&str]
    ) -> Result<
        tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
        Box<dyn Error>
    > {
        // Validate feed IDs
        if feed_ids.is_empty() {
            return Err("No feed ID(s) provided".into());
        }
    
        // WebSocket connection details
        let host = "ws.testnet-dataengine.chain.link";
        let path = "/api/v1/ws";
        let feed_ids_joined = feed_ids.join(",");
        let full_path = format!("{}?feedIDs={}", path, feed_ids_joined);
    
        // Generate authentication signature and timestamp
        let (signature, timestamp) = generate_hmac("GET", &full_path, &[], api_key, api_secret)?;
    
        // Create WebSocket URL
        let ws_url = format!("wss://{}{}", host, full_path);
        println!("Connecting to: {}", ws_url);
    
        // Create request with auth headers
        let mut request = ws_url.into_client_request()?;
        request.headers_mut().insert("Authorization", api_key.parse()?);
        request.headers_mut().insert("X-Authorization-Timestamp", timestamp.to_string().parse()?);
        request.headers_mut().insert("X-Authorization-Signature-SHA256", signature.parse()?);
    
        // Connect to WebSocket server
        let (ws_stream, _) = connect_async(request).await?;
        println!("WebSocket connection established");
    
        Ok(ws_stream)
    }
    
    // Handle incoming WebSocket messages
    async fn handle_messages(
        mut ws_stream: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>
    ) {
        println!("Waiting for incoming messages... (press Ctrl+C to exit)");
    
        // Process messages as they arrive
        while let Some(msg) = ws_stream.next().await {
            match msg {
                Ok(msg) => {
                    match msg {
                        Message::Text(text) => {
                            // Try to parse JSON
                            if let Ok(report_wrapper) = serde_json::from_str::<ReportWrapper>(&text) {
                                let report = &report_wrapper.report;
                                println!("\nReceived new report:");
                                println!("  Feed ID: {}", report.feed_id);
                                println!("  Valid From: {}", report.valid_from_timestamp);
                                println!("  Observations Timestamp: {}", report.observations_timestamp);
    
                                // Display the full report with truncation for readability
                                let report_preview = if report.full_report.len() > 40 {
                                    format!("{}...", &report.full_report[..40])
                                } else {
                                    report.full_report.clone()
                                };
                                println!("  Full Report: {}", report_preview);
                            } else {
                                println!("Received text message: {}", text);
                            }
                        }
                        Message::Binary(data) => {
                            // Try to parse binary as JSON
                            if let Ok(text) = String::from_utf8(data.clone()) {
                                if
                                    let Ok(report_wrapper) = serde_json::from_str::<ReportWrapper>(
                                        &text
                                    )
                                {
                                    let report = &report_wrapper.report;
                                    println!("\nReceived new report:");
                                    println!("  Feed ID: {}", report.feed_id);
                                    println!("  Valid From: {}", report.valid_from_timestamp);
                                    println!(
                                        "  Observations Timestamp: {}",
                                        report.observations_timestamp
                                    );
    
                                    // Display the full report with truncation
                                    let report_preview = if report.full_report.len() > 40 {
                                        format!("{}...", &report.full_report[..40])
                                    } else {
                                        report.full_report.clone()
                                    };
                                    println!("  Full Report: {}", report_preview);
                                } else {
                                    println!("Received binary message: {} bytes", data.len());
                                }
                            } else {
                                println!("Received binary message (not UTF-8): {} bytes", data.len());
                            }
                        }
                        Message::Ping(ping_data) => {
                            println!("Received ping, sending pong response");
                            // Send a pong with the same data to keep the connection alive
                            if let Err(e) = ws_stream.send(Message::Pong(ping_data)).await {
                                eprintln!("Error sending pong: {}", e);
                            }
                        }
                        Message::Pong(_) => println!("Received pong"),
                        Message::Close(_) => {
                            println!("Received close message");
                            break;
                        }
                        Message::Frame(_) => println!("Received raw frame"),
                    }
                }
                Err(e) => {
                    eprintln!("Error receiving message: {}", e);
                    break;
                }
            }
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        // Get API credentials from environment variables
        let api_key = env
            ::var("STREAMS_API_KEY")
            .map_err(|_| "API credentials not set. Please set STREAMS_API_KEY environment variable")?;
        let api_secret = env
            ::var("STREAMS_API_SECRET")
            .map_err(
                |_| "API credentials not set. Please set STREAMS_API_SECRET environment variable"
            )?;
    
        // Example feed IDs (ETH/USD)
        let feed_ids = vec!["0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782"];
    
        // Set up WebSocket connection
        let ws_stream = setup_websocket_connection(&api_key, &api_secret, &feed_ids).await?;
    
        // Set up a task to handle WebSocket communication
        let ws_task = tokio::spawn(handle_messages(ws_stream));
    
        // Wait for user to press Ctrl+C
        tokio::signal::ctrl_c().await?;
        println!("Shutting down...");
    
        // Clean up resources
        let _ = ws_task.abort();
    
        Ok(())
    }
    
  3. Set your API credentials as environment variables:

    export STREAMS_API_KEY="your-api-key"
    export STREAMS_API_SECRET="your-api-secret"
    
  4. Run with cargo run

Expected output:

Connecting to: wss://ws.testnet-dataengine.chain.link/api/v1/ws?feedIDs=0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
WebSocket connection established
Waiting for incoming messages... (press Ctrl+C to exit)
Received ping, sending pong response

Received new report:
  Feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
  Valid From: 1747934358
  Observations Timestamp: 1747934358
  Full Report: 0x00090d9e8d96765a0c49e03a6ae05c82e8f8de...

Received new report:
  Feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
  Valid From: 1747934359
  Observations Timestamp: 1747934359
  Full Report: 0x00090d9e8d96765a0c49e03a6ae05c82e8f8de...

^CShutting down...

Production Considerations

While this example demonstrates WebSocket authentication, production applications should consider:

  • Connection resilience: Implement automatic reconnection with exponential backoff
  • Heartbeat mechanism: Send periodic pings to detect stale connections
  • Message buffering: Queue messages during reconnection attempts
  • Error handling: Use custom error types for better error categorization
  • Logging: Replace println! with structured logging (e.g., tracing, env_logger)
  • Configuration: Make WebSocket endpoints and timeouts configurable
  • Graceful shutdown: Properly close WebSocket connections with close frames
  • Testing: Add tests for connection handling and message parsing

For production use, consider using the Rust SDK which handles authentication automatically and provides built-in fault tolerance.

Get the latest Chainlink content straight to your inbox.