elasticsearch 8.5.0-alpha.1

Official Elasticsearch Rust client
Documentation
/*
 * Licensed to Elasticsearch B.V. under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch B.V. licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *	http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
pub mod common;
use common::*;

use elasticsearch::{
    http::{
        headers::{
            HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE, DEFAULT_ACCEPT,
            DEFAULT_CONTENT_TYPE, X_OPAQUE_ID,
        },
        StatusCode,
    },
    indices::{IndicesCloseParts, IndicesCreateParts, IndicesDeleteParts},
    params::TrackTotalHits,
    SearchParts,
};

use crate::common::client::index_documents;
use bytes::Bytes;
use hyper::Method;
use serde_json::{json, Value};
use std::time::Duration;

#[tokio::test]
async fn default_user_agent_content_type_accept_headers() -> Result<(), failure::Error> {
    let server = server::http(move |req| async move {
        assert_eq!(req.headers()["user-agent"], DEFAULT_USER_AGENT);
        assert_eq!(req.headers()["content-type"], "application/json");
        assert_eq!(req.headers()["accept"], "application/json");
        http::Response::default()
    });

    let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
    let _response = client.ping().send().await?;

    Ok(())
}

#[tokio::test]
async fn default_header() -> Result<(), failure::Error> {
    let server = server::http(move |req| async move {
        assert_eq!(req.headers()["x-opaque-id"], "foo");
        http::Response::default()
    });

    let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header(
        HeaderName::from_static(X_OPAQUE_ID),
        HeaderValue::from_static("foo"),
    );

    let client = client::create(builder);
    let _response = client.ping().send().await?;

    Ok(())
}

#[tokio::test]
async fn override_default_header() -> Result<(), failure::Error> {
    let server = server::http(move |req| async move {
        assert_eq!(req.headers()["x-opaque-id"], "bar");
        http::Response::default()
    });

    let builder = client::create_builder(format!("http://{}", server.addr()).as_ref()).header(
        HeaderName::from_static(X_OPAQUE_ID),
        HeaderValue::from_static("foo"),
    );

    let client = client::create(builder);
    let _response = client
        .ping()
        .header(
            HeaderName::from_static(X_OPAQUE_ID),
            HeaderValue::from_static("bar"),
        )
        .send()
        .await?;

    Ok(())
}

#[tokio::test]
async fn x_opaque_id_header() -> Result<(), failure::Error> {
    let server = server::http(move |req| async move {
        assert_eq!(req.headers()["x-opaque-id"], "foo");
        http::Response::default()
    });

    let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
    let _response = client
        .ping()
        .header(
            HeaderName::from_static(X_OPAQUE_ID),
            HeaderValue::from_static("foo"),
        )
        .send()
        .await?;

    Ok(())
}

#[tokio::test]
async fn uses_global_request_timeout() {
    let server = server::http(move |_| async move {
        std::thread::sleep(Duration::from_secs(1));
        http::Response::default()
    });

    let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
        .timeout(std::time::Duration::from_millis(500));

    let client = client::create(builder);
    let response = client.ping().send().await;

    match response {
        Ok(_) => assert!(false, "Expected timeout error, but response received"),
        Err(e) => assert!(e.is_timeout(), "Expected timeout error, but was {:?}", e),
    }
}

#[tokio::test]
async fn uses_call_request_timeout() {
    let server = server::http(move |_| async move {
        std::thread::sleep(Duration::from_secs(1));
        http::Response::default()
    });

    let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
        .timeout(std::time::Duration::from_secs(2));

    let client = client::create(builder);
    let response = client
        .ping()
        .request_timeout(Duration::from_millis(500))
        .send()
        .await;

    match response {
        Ok(_) => assert!(false, "Expected timeout error, but response received"),
        Err(e) => assert!(e.is_timeout(), "Expected timeout error, but was {:?}", e),
    }
}

#[tokio::test]
async fn call_request_timeout_supersedes_global_timeout() {
    let server = server::http(move |_| async move {
        std::thread::sleep(Duration::from_secs(1));
        http::Response::default()
    });

    let builder = client::create_builder(format!("http://{}", server.addr()).as_ref())
        .timeout(std::time::Duration::from_millis(500));

    let client = client::create(builder);
    let response = client
        .ping()
        .request_timeout(Duration::from_secs(2))
        .send()
        .await;

    match response {
        Ok(_) => (),
        Err(e) => assert!(e.is_timeout(), "Did not expect error, but was {:?}", e),
    }
}

#[tokio::test]
async fn deprecation_warning_headers() -> Result<(), failure::Error> {
    let client = client::create_default();
    let index = "deprecation-warnings";
    let _delete_response = client
        .indices()
        .delete(IndicesDeleteParts::Index(&[index]))
        .send()
        .await?;

    let _create_response = client
        .indices()
        .create(IndicesCreateParts::Index(index))
        .send()
        .await?;

    let response = client
        .indices()
        .close(IndicesCloseParts::Index(&[index]))
        .wait_for_active_shards("index-setting")
        .send()
        .await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    let warnings = response.warning_headers().collect::<Vec<&str>>();
    assert!(warnings.len() > 0);
    assert!(
        warnings.iter().any(|&w| w.contains("unsupported")),
        "warnings= {:?}",
        &warnings
    );

    let _delete_response = client
        .indices()
        .delete(IndicesDeleteParts::Index(&[index]))
        .send()
        .await?;

    Ok(())
}

#[tokio::test]
async fn serialize_querystring() -> Result<(), failure::Error> {
    let server = server::http(move |req| async move {
        assert_eq!(req.method(), Method::GET);
        assert_eq!(req.uri().path(), "/_search");
        assert_eq!(
                req.uri().query(),
                Some("filter_path=took%2C_shards&pretty=true&q=title%3AElasticsearch&track_total_hits=100000")
            );
        http::Response::default()
    });

    let client = client::create_for_url(format!("http://{}", server.addr()).as_ref());
    let _response = client
        .search(SearchParts::None)
        .pretty(true)
        .filter_path(&["took", "_shards"])
        .track_total_hits(TrackTotalHits::Count(100_000))
        .q("title:Elasticsearch")
        .send()
        .await?;

    Ok(())
}

#[tokio::test]
async fn search_with_body() -> Result<(), failure::Error> {
    let client = client::create_default();
    let _ = index_documents(&client).await?;
    let response = client
        .search(SearchParts::None)
        .body(json!({
            "query": {
                "match_all": {}
            }
        }))
        .allow_no_indices(true)
        .send()
        .await?;

    let expected_url = {
        let mut addr = client::cluster_addr();
        if !addr.ends_with('/') {
            addr.push('/');
        }
        let mut url = url::Url::parse(addr.as_str())?;
        url.set_username("").unwrap();
        url.set_password(None).unwrap();
        url.join("_search?allow_no_indices=true")?
    };

    match response.content_length() {
        Some(c) => assert!(c > 0),
        None => (),
    };

    assert_eq!(response.url(), &expected_url);
    assert_eq!(response.status_code(), StatusCode::OK);
    assert_eq!(response.method(), elasticsearch::http::Method::Post);
    let debug = format!("{:?}", &response);
    assert!(debug.contains("method"));
    assert!(debug.contains("status_code"));
    assert!(debug.contains("headers"));
    let response_body = response.json::<Value>().await?;
    assert!(response_body["took"].as_i64().is_some());

    Ok(())
}

#[tokio::test]
async fn search_with_no_body() -> Result<(), failure::Error> {
    let client = client::create_default();
    let _ = index_documents(&client).await?;
    let response = client
        .search(SearchParts::None)
        .pretty(true)
        .q("title:Elasticsearch")
        .send()
        .await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    assert_eq!(response.method(), elasticsearch::http::Method::Get);
    let response_body = response.json::<Value>().await?;
    assert!(response_body["took"].as_i64().is_some());

    for hit in response_body["hits"]["hits"].as_array().unwrap() {
        assert!(hit["_source"]["title"].as_str().is_some());
    }

    Ok(())
}

#[tokio::test]
async fn read_response_as_bytes() -> Result<(), failure::Error> {
    let client = client::create_default();
    let _ = index_documents(&client).await?;
    let response = client
        .search(SearchParts::None)
        .pretty(true)
        .q("title:Elasticsearch")
        .send()
        .await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    assert_eq!(response.method(), elasticsearch::http::Method::Get);

    let response: Bytes = response.bytes().await?;
    let json: Value = serde_json::from_slice(&response).unwrap();

    assert!(json["took"].as_i64().is_some());

    for hit in json["hits"]["hits"].as_array().unwrap() {
        assert!(hit["_source"]["title"].as_str().is_some());
    }

    Ok(())
}

#[tokio::test]
async fn cat_health_format_json() -> Result<(), failure::Error> {
    let client = client::create_default();
    let response = client
        .cat()
        .health()
        .format("json")
        .pretty(true)
        .send()
        .await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    assert!(response
        .headers()
        .get(CONTENT_TYPE)
        .unwrap()
        .to_str()
        .unwrap()
        .starts_with(DEFAULT_CONTENT_TYPE));
    let _response_body = response.json::<Value>().await?;

    Ok(())
}

#[tokio::test]
async fn cat_health_header_json() -> Result<(), failure::Error> {
    let client = client::create_default();
    let response = client
        .cat()
        .health()
        .header(ACCEPT, HeaderValue::from_static(DEFAULT_ACCEPT))
        .pretty(true)
        .send()
        .await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    assert!(response
        .headers()
        .get(CONTENT_TYPE)
        .unwrap()
        .to_str()
        .unwrap()
        .starts_with(DEFAULT_CONTENT_TYPE));
    let _response_body = response.json::<Value>().await?;

    Ok(())
}

#[tokio::test]
async fn cat_health_text() -> Result<(), failure::Error> {
    let client = client::create_default();
    let response = client.cat().health().pretty(true).send().await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    assert!(response
        .headers()
        .get(CONTENT_TYPE)
        .unwrap()
        .to_str()
        .unwrap()
        .starts_with("text/plain"));
    let _response_body = response.text().await?;

    Ok(())
}

#[tokio::test]
async fn clone_search_with_body() -> Result<(), failure::Error> {
    let client = client::create_default();
    let _ = index_documents(&client).await?;
    let base_request = client.search(SearchParts::None);

    let request_clone = base_request.clone().q("title:Elasticsearch").size(1);

    let _request = base_request
        .body(json!({
            "query": {
                "match_all": {}
            }
        }))
        .allow_no_indices(true);

    let response = request_clone.send().await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    let response_body = response.json::<Value>().await?;

    assert_eq!(response_body["hits"]["hits"].as_array().unwrap().len(), 1);

    Ok(())
}

#[tokio::test]
async fn byte_slice_body() -> Result<(), failure::Error> {
    let client = client::create_default();
    let body = b"{\"query\":{\"match_all\":{}}}";

    let response = client
        .send(
            elasticsearch::http::Method::Post,
            SearchParts::None.url().as_ref(),
            HeaderMap::new(),
            Option::<&Value>::None,
            Some(body.as_ref()),
            None,
        )
        .await?;

    assert_eq!(response.status_code(), StatusCode::OK);
    let _response_body = response.json::<Value>().await?;

    Ok(())
}