掘金 后端 ( ) • 2024-05-03 10:21

作者:Laura Trotta

ES|QL 是 Elasticsearch 引入的一种新的查询语言,它将简化的语法与管道操作符结合起来,使用户能够直观地推断和操作数据。官方 Java 客户端的新版本 8.13.0 引入了对 ES|QL 查询的支持,提供了一个新的 API,允许轻松执行查询,并自动将结果翻译为 Java 对象。

先决条件

  • Elasticsearch 版本 >= 8.11.0
  • Java 版本 >= 17

摄取数据

在开始查询之前,我们需要有一些可用的数据:我们将使用 Java 客户端中提供的 BulkIngester 实用程序类将此 csv 文件存储到 Elasticsearch 中。 该 csv 列出了 Amazon Books Reviews 数据集中的书籍,并使用以下标题行对它们进行分类:

Title;Description;Author;Year;Publisher;Ratings

首先,我们必须创建索引以正确映射字段:



1.  if (!client.indices().exists(ex -> ex.index("books")).value()) {
2.      client.indices().create(c -> c
3.          .index("books")
4.          .mappings(mp -> mp
5.              .properties("title", p -> p.text(t -> t))
6.              .properties("description", p -> p.text(t -> t))
7.              .properties("author", p -> p.text(t -> t))
8.              .properties("year", p -> p.short_(s -> s))
9.              .properties("publisher", p -> p.text(t -> t))
10.              .properties("ratings", p -> p.halfFloat(hf -> hf))
11.          ));
12.  }


然后是书籍的 Java 类:



1.  public record Book(
2.      String title,
3.      String description,
4.      String author,
5.      Integer year,
6.      String publisher,
7.      Float ratings
8.  ){}


我们将使用 Jackson 的 CSV 映射器来读取该文件,所以让我们对其进行配置:



1.  CsvMapper csvMapper = new CsvMapper();
2.  CsvSchema schema = CsvSchema.builder()
3.      .addColumn("title") // same order as in the csv
4.      .addColumn("description")
5.      .addColumn("author")
6.      .addColumn("year")
7.      .addColumn("publisher")
8.      .addColumn("ratings")
9.      .setColumnSeparator(';')
10.      .setSkipFirstDataRow(true)
11.      .build();

13.  MappingIterator<Book> iter = csvMapper
14.      .readerFor(Book.class)
15.      .with(schema)
16.      .readValues(new FileReader("/path/to/file/books.csv"));


然后我们将逐行读取 csv 文件并使用 BulkIngester 优化摄取:



1.  BulkIngester ingester = BulkIngester.of(bi -> bi
2.      .client(client)
3.      .maxConcurrentRequests(20)
4.      .maxOperations(5000));

6.  boolean hasNext = true;
7.  while (hasNext) {
8.      try {
9.          Book book = iter.nextValue();
10.          ingester.add(BulkOperation.of(b -> b
11.              .index(i -> i
12.              .index("books")
13.              .document(book))));
14.          hasNext = iter.hasNextValue();
15.      } catch (JsonParseException | InvalidFormatException e) {
16.          // ignore malformed data
17.      }
18.  }

20.  ingester.close();


索引编制大约需要 15 秒,但完成后,我们的图书索引将包含约 80K 文档,可供查询。

ES|QL

现在是时候从书籍数据中提取一些信息了。 假设我们想要找到阿西莫夫作品的最新重印本:



1.  String queryAuthor =
2.      """
3.      from books
4.      | where author == "Isaac Asimov"
5.      | sort year desc
6.      | limit 10
7.      """;
8.  List<Book> queryRes = (List<Book>) client.esql()
9.      .query(ObjectsEsqlAdapter.of(Book.class),queryAuthor);


感谢使用 Book.class 作为目标的 ObjectsEsqlAdapter,我们可以忽略 ES|QL 查询的 json 结果是什么,而只关注客户端自动返回的更熟悉的书籍列表。

对于那些习惯 SQL 查询和 JDBC 接口的人来说,客户端还提供了 ResultSetEsqlAdapter,可以以同样的方式使用它,而是返回一个 java.sql.ResultSet。



1.  ResultSet resultSet = esClient.esql()
2.      .query(ResultSetEsqlAdapter.INSTANCE,queryAuthor);


另一个例子,我们现在想要找出企鹅图书中评分最高的书籍:



1.  String queryPublisher =
2.      """
3.      from books
4.      | where publisher == "Penguin"
5.      | sort ratings desc
6.      | limit 10
7.      | sort title asc
8.      """;

10.  queryRes = (List<Book>) client.esql()
11.      .query(ObjectsEsqlAdapter.of(Book.class), queryPublisher);


用于检索数据的 Java 代码保持不变,因为结果仍然是书籍列表。 当然也有例外,例如,如果查询使用 eval 命令添加新列,则应修改 Java 类以表示新结果。

本文的完整代码可以在官方客户端存储库中找到。 如有任何疑问或问题,请随时通过讨论联系。

准备好将 RAG 构建到您的应用程序中了吗? 想要尝试使用矢量数据库的不同 LLMs?
Github 上查看我们的 LangChain、Cohere 等示例 notebooks,并参加即将开始的 Elasticsearch 工程师培训

原文:ES|QL queries to Java objects — Elastic Search Labs