掘金 后端 ( ) • 2024-05-05 10:12

在我之前的文章 “Elasticsearch:对 Java 对象的 ES|QL 查询”,我详细介绍了如何使用 Java 来对 ES|QL 进行查询。对于不是很熟悉 Elasticsearch 的开发者来说,那篇文章里的例子还是不能单独来进行运行。在今天的这篇文章中,我来详细地介绍如何把那个例子跑起来。更多关于 ES|QL 的动手实践,请阅读文章 “Elasticsearch:ES|QL 查询展示”。

为了说明方便,我把所有的代码放在地址 GitHub - liu-xiao-guo/elasticsearch-java-esql 以方便大家学习。这是一个 Maven 的项目。我们可以使用如下的命令来进行克隆:

git clone https://github.com/liu-xiao-guo/elasticsearch-java-esql

准备工作

Elasticsearch 及 Kibana 安装

如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的链接来进行安装:

在安装的时候,我们选择 Elastic Stack 8.x 来进行安装。特别值得指出的是:ES|QL 只在 Elastic Stack 8.11 及以后得版本中才有。你需要下载 Elastic Stack 8.11 及以后得版本来进行安装。

在首次启动 Elasticsearch 的时候,我们可以看到如下的输出:

我们记下这个密码在如下的配置中进行使用。

准备数据集

我们的数据集非常简单。我从之前的文章中下载了文章里的数据集,但是我发现数据集中字段和文章里的字段并不相同,而且那个 year 定义为 integer,但是下载数据集里的数据其实是一个 date 类型的数据。为了说明问题,我们也不需要那么多的数据。我从中挑出了10个数据,并把数据集置于链接

在我们克隆完项目的时候,我们可以看到:



1.  $ pwd
2.  /Users/liuxg/java/elasticsearch-java-esql
3.  $ ls 
4.  pom.xml     sample.csv  src


这里的 sample.csv 就是我们所需要的数据集。我们的一条数据是这样的。

为了方便我们把它的字段重新命令为:

title,description,authors,image,previewLink,publisher,year,infoLink,categories,ratings

如下是一条示例文档:

Its Only Art If Its Well Hung!,,['Julie Strain'],http://books.google.com/books/content?id=DykPAAAACAAJ&printsec=frontcover&img=1&zoom=1&source=gbs_api,http://books.google.nl/books?id=DykPAAAACAAJ&dq=Its+Only+Art+If+Its+Well+Hung!&hl=&cd=1&source=gbs_api,,1996,http://books.google.nl/books?id=DykPAAAACAAJ&dq=Its+Only+Art+If+Its+Well+Hung!&hl=&source=gbs_api,['Comics & Graphic Novels'],

配置项目

为了能够使得项目能够正常运行,我们必须配置如下的 application.conf 文件:



1.  $ pwd
2.  /Users/liuxg/java/elasticsearch-java-esql
3.  $ tree -L 10
4.  .
5.  ├── http_ca.crt
6.  ├── pom.xml
7.  ├── sample.csv
8.  └── src
9.      ├── main
10.      │   ├── java
11.      │   │   └── com
12.      │   │       └── example
13.      │   │           └── esql
14.      │   │               ├── Book.java
15.      │   │               └── EsqlArticle.java
16.      │   └── resources
17.      │       └── application.conf
18.      └── test
19.          └── java


application.conf



1.  server-url=https://localhost:9200
2.  api-key=NTdYSFBJOEJ6TnJzZHhPZ0xDcGQ6Y09hYTFzZDVRLUtSVHVVZWVaOEJKdw==
3.  csv-file=/Users/liuxg/java/elasticsearch-java-esql/sample.csv
4.  cert_path=/Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt


如上所示,我们需要根据自己的设置进行配置。我们需要填入 Elasticsearch 的访问地址,sample.csv 的路径及 Elasticsearch 的证书。我们需要申请一个 API key 来访问 Elasticsearch:

至此,我们的配置就基本完成了。

代码解读

写入文档

首先我们根据 csv 格式的字段创建了如下的一个 Book.java 类:

Book.java



1.  package com.example.esql;

3.  import java.util.Date;

5.  public record Book(
6.          String title,
7.          String description,
8.          String author,
9.          String image,
10.          String previewLink,
11.          String publisher,
12.          Integer year,
13.          String infoLink,
14.          String categories,
15.          Float ratings
16.          ) {
17.          }


它分别对应于 csv 示例文档中的各个字段。

接下来,我们来阅读 EsqlArticle.java 文件。我们首先读出在 application.conf 文件中的配置:

 1.         String dir = System.getProperty("user.dir");
2.          System.out.println(dir);

4.          Properties prop = new Properties();
5.          Path path = Paths.get(dir, "src", "main", "resources", "application" +
6.                  ".conf");
7.          prop.load(new FileInputStream(path.toString()));

9.          String serverUrl = prop.getProperty("server-url");
10.          String apiKey = prop.getProperty("api-key");
11.          String csvPath = prop.getProperty("csv-file");
12.          String certPath = prop.getProperty("cert_path");

14.          System.out.println("serverUrl:  " + serverUrl);
15.          System.out.println("apiKey:  " + apiKey);
16.          System.out.println("csvPath:  " + csvPath);
17.          System.out.println("certPath:  " + certPath);

输出结果:



1.  serverUrl:  https://localhost:9200
2.  apiKey:  NTdYSFBJOEJ6TnJzZHhPZ0xDcGQ6Y09hYTFzZDVRLUtSVHVVZWVaOEJKdw==
3.  csvPath:  /Users/liuxg/java/elasticsearch-java-esql/sample.csv
4.  certPath:  /Users/liuxg/elastic/elasticsearch-8.13.2/config/certs/http_ca.crt


我们接下来创建 Elasticsearch 访问客户端:

 1.          Path caCertificatePath = Paths.get(certPath);
2.          CertificateFactory factory =
3.                  CertificateFactory.getInstance("X.509");
4.          Certificate trustedCa;
5.          try (InputStream is = Files.newInputStream(caCertificatePath)) {
6.              trustedCa = factory.generateCertificate(is);
7.          }
8.          KeyStore trustStore = KeyStore.getInstance("pkcs12");
9.          trustStore.load(null, null);
10.          trustStore.setCertificateEntry("ca", trustedCa);
11.          SSLContextBuilder sslContextBuilder = SSLContexts.custom()
12.                  .loadTrustMaterial(trustStore, null);
13.          final SSLContext sslContext = sslContextBuilder.build();

15.          RestClient restClient = RestClient
16.                  .builder(HttpHost.create(serverUrl))
17.                  .setDefaultHeaders(new Header[]{
18.                          new BasicHeader("Authorization", "ApiKey " + apiKey)
19.                  })
20.                  .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
21.                      @Override
22.                      public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
23.                          return httpAsyncClientBuilder.setSSLContext(sslContext);
24.                      }
25.                  })
26.                  .build();

28.          System.out.println(restClient.isRunning());

30.          ObjectMapper mapper = JsonMapper.builder()
31.                  .build();

33.          JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(mapper);

35.          ElasticsearchTransport transport = new RestClientTransport(
36.                  restClient, jsonpMapper);

38.          ElasticsearchClient client = new ElasticsearchClient(transport);

由于我们的部署是自签名的,我们需要使用 Elasticsearch 的证书。

我们接下来删除 books 索引,如果它已经存在的话:

 1.          final String INDEX_NAME = "books";
2.          // Delete the index if it exists
3.          if (client.indices().exists(ex -> ex.index(INDEX_NAME)).value()) {
4.              client.indices().delete(d -> d
5.                      .index(INDEX_NAME)
6.              );
7.          }

我们接下来创建 books 索引的 mappings:

 1.          if (!client.indices().exists(ex -> ex.index(INDEX_NAME)).value()) {
2.              client.indices()
3.                      .create(c -> c
4.                              .index(INDEX_NAME)
5.                              .mappings(mp -> mp
6.                                      .properties("title", p -> p.text(t -> t))
7.                                      .properties("description", p -> p.text(t -> t))
8.                                      .properties("author", p -> p.text(t -> t))
9.                                      .properties("image", p -> p.text(t -> t))
10.                                      .properties("previewLink", p -> p.text(t -> t))
11.                                      .properties("publisher", p -> p.text(t -> t))
12.                                      .properties("year", p -> p.short_(s -> s))
13.                                      .properties("infoLink", p -> p.text(t -> t))
14.                                      .properties("categories", p -> p.text(t -> t))
15.                                      .properties("ratings", p -> p.halfFloat(hf -> hf))
16.                              ));
17.          }

你可以看到 year 是 short 类型的数据,而 ratings 是一个浮点数。其它的均为 text 字段。

我们接下来使用 Jackson 的 CSV 映射器来读取该文件,所以让我们对其进行配置:

 1.          Instant start = Instant.now();
2.          System.out.println("Starting BulkIndexer... \n");

4.          CsvMapper csvMapper = new CsvMapper();
5.          CsvSchema schema = CsvSchema.builder()
6.                  .addColumn("title") // same order as in the csv
7.                  .addColumn("description")
8.                  .addColumn("author")
9.                  .addColumn("image")
10.                  .addColumn("previewLink")
11.                  .addColumn("publisher")
12.                  .addColumn("year")
13.                  .addColumn("infoLink")
14.                  .addColumn("categories")
15.                  .addColumn("ratings")
16.                  .setColumnSeparator(',')
17.                  .setSkipFirstDataRow(true)
18.                  .build();

20.          MappingIterator<Book> it = csvMapper
21.                  .readerFor(Book.class)
22.                  .with(schema)
23.                  .readValues(new FileReader(csvPath));

然后我们将逐行读取 csv 文件并使用 BulkIngester 优化摄取:

 1.          BulkIngester ingester = BulkIngester.of(bi -> bi
2.                  .client(client)
3.                  .maxConcurrentRequests(20)
4.                  .maxOperations(5000));

6.          boolean hasNext = true;

8.          int j = 0;
9.          while (hasNext) {
10.              try {
11.                  Book book = it.nextValue();
12.                  ingester.add(BulkOperation.of(b -> b
13.                          .index(i -> i
14.                                  .index(INDEX_NAME)
15.                                  .document(book))));
16.                  hasNext = it.hasNextValue();
17.              } catch (JsonParseException | InvalidFormatException e) {
18.                  // ignore malformed data
19.                  System.out.println("Something is wrong at: " + j);
20.              }
21.              j ++;
22.          }

24.          ingester.close();

由于我们使用的文档数非常之少,只有10个文档。索引的速度非常之快。

查询文档

现在是时候从书籍数据中提取一些信息了。假设我们想要找到 ['Julie Strain']。请注意,为了方便,我们在摄入文档的时候并没有针对 author 来进行任何的处理。它应该是一个数组。在这里我们为什么需要添加 [ 及 ] 符号呢?这是因为截止目前的 ES|QL 版本发布,所有的 text 字段都被当做为 keyword 字段。全文搜索还没有完全实现。

 1.          String queryAuthor =
2.                  """
3.                      from books
4.                      | where author == "['Julie Strain']"
5.                      | sort year desc
6.                      | limit 10
7.                  """;

9.          List<Book> queryRes = (List<Book>) client.esql().query(ObjectsEsqlAdapter.of(Book.class), queryAuthor);

11.          System.out.println("~~~\nObject result author:\n" + queryRes.stream().map(Book::title).collect(Collectors.joining("\n")));

13.          ResultSet resultSet = client.esql().query(ResultSetEsqlAdapter.INSTANCE, queryAuthor);

15.          System.out.println("~~~\nResultSet result author:");
16.          while (resultSet.next()) {
17.              System.out.println(resultSet.getString("title"));
18.          }

上面显示的结果是:



1.  ~~~
2.  Object result author:
3.  Its Only Art If Its Well Hung!

5.  ~~~
6.  ResultSet result author:
7.  Its Only Art If Its Well Hung!


感谢使用 Book.class 作为目标的 ObjectsEsqlAdapter,我们可以忽略 ES|QL 查询的 json 结果是什么,而只关注客户端自动返回的更熟悉的书籍列表。

对于那些习惯 SQL 查询和 JDBC 接口的人来说,客户端还提供了 ResultSetEsqlAdapter,可以以同样的方式使用它,而是返回一个 java.sql.ResultSet。



1.  ResultSet resultSet = esClient.esql()
2.      .query(ResultSetEsqlAdapter.INSTANCE,queryAuthor);


另一个例子,我们现在想要找出出版商为 Plympton PressIntl 中评分最高的书籍:

 1.          String queryPublisher =
2.                  """
3.                      from books
4.                      | where publisher == "Plympton PressIntl"
5.                      | sort ratings desc
6.                      | limit 10
7.                      | sort title asc
8.                  """;

10.          queryRes = (List<Book>) client.esql().query(ObjectsEsqlAdapter.of(Book.class), queryPublisher);
11.          System.out.println("~~~\nObject result publisher:\n" + queryRes.stream().map(Book::title).collect(Collectors.joining("\n")));

上面代码运行的结果为:



1.  Object result publisher:
2.  Rising Sons and Daughters: Life Among Japan's New Young


你可以在地址 GitHub - liu-xiao-guo/elasticsearch-java-esql 下载源码。