转载

使用 Spring、Elasticsearch 及 Logstash 构建企业级数据搜索和分析平台

数据是企业应用的核心。企业应用的功能都围绕数据展开,包括数据的收集、存储、查询、展示、搜索和分析等若干不同的功能。有不同的软件程序负责对数据进行不同的处理。比如关系型数据库和 NoSQL 数据库可以进行数据存储和查询。数据展示可以通过不同的用户界面框架来完成。本文中要介绍的 Elasticsearch 和 Logstash 可以完成对数据的收集、搜索和分析。

Elasticsearch 介绍

Elasticsearch 是一个可伸缩的开源全文搜索和分析引擎。它可以快速地存储、搜索和分析海量数据。它通常用来支撑有复杂的数据搜索需求的企业级应用。Elasticsearch 基于 Apache Lucene 构建,是一个非常流行的开源软件,同时有相应的公司提供商业支持,使之非常适合企业用户使用。在使用 Elasticsearch 之前,首先介绍其中的一些基本概念。

集群

集群是一个或多个节点的集合,用来保存应用的全部数据并提供基于全部节点的集成式索引和搜索功能。每个集群都需要有一个惟一的名称。

节点

节点是一个集群中的单台服务器,用来保存数据并参与整个集群的索引和搜索操作。每个节点也可以配置其名称。节点会加入指定名称的集群中。

索引

索引是相似文档的集合。索引中的内容与应用本身的业务相关。比如电子商务应用可以使用索引来保存产品数据、订单数据和客户数据等。每个索引都有一个名称,通过该名称可以对索引中包含的文档进行添加、更新、删除和搜索等操作。

类型

类型是对一个索引中包含的文档的进一步细分。一般根据文档的公共属性来进行划分。比如在电子商务应用的产品数据索引中,可以根据产品的特征划分成不同的类型,如一般产品、虚拟产品、数字产品等。

文档

文档是进行索引的基本单位,与索引中的一个类型相对应。比如产品数据索引中一般产品类型中的每个具体的产品可以有一个文档与之对应。文档使用 JSON 格式来表示。

分片和副本

企业应用需要存储的数据量一般比较巨大,超出单个节点所能处理的范围。Elasticsearch 允许把索引划分成多个分片(shard)来存储索引的部分数据。Elasticsearch 会负责处理分片的分配和聚合。从可靠性的角度出发,对于一个分片中的数据,应该有至少一个副本(replica)。Elasticsearch 中每个索引可以划分成多个分片,而且有多个副本。Elasticsearch 会自动管理集群中节点的分片和副本,对开发人员是透明的。


Elasticsearch 基本用法

Elasticsearch 的安装比较简单,只需要下载之后解压启动即可。其官方网站上提供了针对不同平台的详细安装文档说明。当使用默认配置时,Elasticsearch 会使用默认集群名 elasticsearch,并在 9200 端口提供 HTTP REST API 接口。当安装完成之后,只需要通过浏览器访问 http://localhost:9200 就可以查看当前 Elasticsearch 集群和版本信息。本文中使用的是 Elasticsearch 1.7.1 版本。

Elasticsearch 提供了丰富的 HTTP REST API 接口,通过浏览器或 Postman 这样的工具可以直接访问 Elasticsearch 提供的各种功能。应用开发中使用 REST API 可以快速地查看集群状态、浏览索引的文档、测试搜索结果等。

在安装 Elasticsearch 服务器之后,应用程序需要通过客户端来连接服务器并进行相应的操作。Elasticsearch 对于不同的语言提供了相应的开发库。本文中使用的是 Java 客户端。首先需要创建一个 Elasticsearch 的 Java 客户端,即 org.elasticsearch.client.Client 接口的对象。有两种方式可以创建 Client 接口的对象。第一种方式是创建一个内嵌式的 Elasticsearch 节点并加入集群,再从表示节点的 org.elasticsearch.node.Node 接口中获取 Client 对象。这种做法的好处是性能比较好,可以避免不必要的查询。第二种做法是创建一个 org.elasticsearch.client.transport.TransportClient 类的对象。TransportClient 并不加入集群,而是通过 Elasticsearch 的传输模块来连接到服务器。

代码清单 1 给出了通过内嵌式节点来创建 Client 对象的方法。通过 org.elasticsearch.node.NodeBuilder.nodeBuilder 静态方法来创建 Node 对象,再通过 client 方法来获取到 Client 对象。在创建 Node 对象时,通过 client(true) 把该节点设置成仅作为客户端,而并不存储数据。另外通过 http.enabled 配置来禁用该节点的 HTTP 服务。

清单 1. 通过内嵌式节点创建 Client 对象
Node node = nodeBuilder()        .client(true)        .settings(ImmutableSettings.settingsBuilder().put("http.enabled", false))        .node(); Client client = node.client();  try {    ClusterStatsResponse stats = client.admin().cluster().prepareClusterStats().execute().get();    System.out.println(stats); } catch (Exception e) {    e.printStackTrace(); }  node.close();

代码清单 1 中使用 Client 对象调用 Elasticsearch 的集群 API 来获取集群的状态。从输出的 JSON 中可以发现集群中有 2 个节点,一个是 Elasticsearch 服务器的主节点,另外一个是客户端对应的节点。

代码清单 2 给出了创建 TransportClient 对象的方法。在创建时需要指定 Elasticsearch 传输模块的主机名和端口号。连接本机的 Elasticsearch 服务器时使用 localhost 的 9300 端口。

清单 2. 创建 TransportClient 对象的示例
Client client = new TransportClient()        .addTransportAddress(new InetSocketTransportAddress("localhost", 9300));  try {    ClusterStatsResponse stats = client.admin().cluster().prepareClusterStats().execute().get();    System.out.println(stats); } catch (Exception e) {    e.printStackTrace(); }  client.close();

当同样使用集群 API 来查看集群状态时,会发现集群中只有一个节点,即 Elasticsearch 服务器的主节点。这是由于 TransportClient 对象并不会加入集群。


应用场景

本文以一个电子商务网站作为示例应用场景。该应用中的模型层包括产品、用户和订单等实体。不同的实体通过 Elasticsearch 中的不同索引来存储。

在示例应用中,程序并不是使用 Elasticsearch 的 Java 客户端来直接访问 Elasticsearch 服务器,而是通过 Spring Data 项目的 Elasticsearch 子项目来与 Elasticsearch 交互。Spring Data 的基本概念是仓库(repository)。对于 Elasticsearch 来说,每一个仓库对应一个索引中的类型。Spring Data Elasticsearch 已经提供了对于索引中类型的基本操作,包括索引新文档、更新索引、删除索引和搜索等。示例应用基于 Spring Boot 来创建,由 Spring Boot 对 Elasticsearch 进行自动配置。


数据索引

在进行数据索引之前,需要定义应用中的模型层。以产品为例,代码清单 3 中给出了对应的 Java 类 Product。

清单 3. 表示产品的 Product 类
@Document(indexName = "product", type = "simple") public class Product {     private Long id;     @Field(type = FieldType.String, index = FieldIndex.not_analyzed)    private String sku;     private String name;     private String description;     public Long getId() {        return id;    }     public void setId(Long id) {        this.id = id;    }     public String getSku() {        return sku;    }     public void setSku(String sku) {        this.sku = sku;    }     public String getName() {        return name;    }     public void setName(String name) {        this.name = name;    }     public String getDescription() {        return description;    }     public void setDescription(String description) {        this.description = description;    } }

代码清单 3 中,使用 Spring Data Elasticsearch 的注解 @Document(indexName = "product", type = "simple") 声明 Product 类为需要被索引的文档,属性 indexName 声明了索引的名称,而属性 type 声明了索引中文档的类型。需要注意的是 Product 类中域 sku 的注解 @Field(type = FieldType.String, index = FieldIndex.not_analyzed)。注解 @Field 声明了域 sku 的数据类型为 String,同时对该域的值是不做分析的。不做分析的域在搜索时是进行完全匹配的。域 name 和 description 没有添加 @Field 注解,因此会根据其 Java 类型自动确定数据类型,同时对域的值进行文本分析,因此可以支持全文检索。

在定义应用的模型层之后,创建相应的仓库来与 Elasticsearch 进行交互。Spring Data 在很大程度上简化了与数据源的交互方式。只需要通过接口的方式声明所要执行的操作即可,具体的实现由 Spring Data 自动来生成。Spring Data 提供了对常见的创建、读取、更新和删除操作(CRUD),以及分页和排序的支持,使得开发人员要编写的代码数量极大减少。

清单 4. 操作 Product 类的仓库接口
public interface ProductRepository extends CrudRepository<Product, Long> {  }

代码清单 4 中的 ProductRepository 接口继承自 Spring Data 的 CrudRepository 接口。CrudRepository 接口提供了 save、findOne、findAll 和 delete 等方法来对 Product 对象进行操作。当需要索引一个新的 Product 文档时,只需要调用 save 方法即可,如代码清单 5 所示。

清单 5. 在 REST 控制器中使用 ProductRepository 接口
@Controller @RequestMapping("/product") public class ProductController {     @Autowired    private ProductRepository productRepository;     @RequestMapping(value = "", method = RequestMethod.POST)    @ResponseBody    public Product add(@RequestBody Product product) {        return productRepository.save(product);    } }

代码清单 5 所示,当客户端发送 HTTP POST 请求到/product 时,直接通过 ProductRepository 的 save 方法来保存该对象,其作用是在 Elasticsearch 中索引对应的文档。


更新和删除文档

在使用 ProductRepository 保存文档之后,可以对文档进行更新和删除。代码清单 6 给出了更新和删除文档的示例。

清单 6. 更新和删除文档
@RequestMapping(value = "/{productId}", method = RequestMethod.PUT) @ResponseBody public Product update(@PathVariable("productId") Long productId, @RequestBody Product product) {    product.setId(productId);    productRepository.save(product);    return product; }  @RequestMapping(value = "/{productId}", method = RequestMethod.DELETE) public void delete(@PathVariable("productId") Long productId) {    productRepository.delete(productId); }

代码清单 6 所示,使用用 ProductRepository 的 save 方法来更新文档,使用 delete 方法来删除文档。


数据搜索

Elasticsearch 的核心功能就是搜索。通过 Elasticsearch 进行全文搜索是非常容易的。Product 对象的域 name 和 description 是支持全文搜索的。在进行搜索时,可以根据输入的关键词对 name 和 description 进行搜索。

清单 7. 在 ProductRepository 中添加搜索方法
public interface ProductRepository extends CrudRepository<Product, Long> {    @Query("{/"bool/" : {/"should/" : [{/"term/" : {/"name/" : /"?0/"}},                                         {/"term/" : {/"description/" : /"?0/"}}]}}}")    List<Product> findByNameOrDescription(String query); }

代码清单 7 所示,在 ProductRepository 接口中添加了一个方法 findByNameOrDescription,参数是查询的关键词。在方法上通过 @Query 注解声明了搜索时用的查询。声明查询使用的是 Elasticsearch 的查询语法,代码清单 7 中的查询的含义是在 name 和 description 中根据关键词进行搜索,只需要有一个域可以匹配即可。


数据分析

对数据进行分析处理是应用中的重要一环。示例应用中包含了订单相关的数据。从业务相关的角度出发,会需要对订单数据进行分析,得到可供决策使用的分析数据。

代码清单 8 给出了表示订单的 Java 类 Order。Order 类中声明了订单的创建时间和数额。

清单 8. 表示订单的 Order 类
@Document(indexName = "order") public class Order {    private Long id;     private Date createdAt;     private BigDecimal amount;     public Long getId() {        return id;    }     public void setId(Long id) {        this.id = id;    }     public Date getCreatedAt() {        return createdAt;    }     public void setCreatedAt(Date createdAt) {        this.createdAt = createdAt;    }     public BigDecimal getAmount() {        return amount;    }     public void setAmount(BigDecimal amount) {        this.amount = amount;    } }

对于订单数据要做的分析操作之一是按月统计订单数额的总数。这需要用到 Elasticsearch 提供的数据聚合功能。数据聚合是对一个给定的文档集中的全部文档执行特定的操作。聚合操作的文档集由当前的执行上下文来确定,如把搜索的结果作为聚合的文档集。

Elasticsearch 中有两类不同的聚合操作,分别是划分和指标。

划分

划分(bucketing)用来创建不同的分组。每个分组都有一个标识符和文档应该满足的条件。在进行聚合操作时,当前执行上下文的文档集中的每一个文档都会用来匹配所有分组中的文档条件。当条件满足时,该文档会被划分到对应的分组中。当聚合操作完成时,每个分组中包含的是满足对应条件的文档集合。

指标

指标(metric)从文档集中计算出某些指标,如最大值、最小值和平均值等。

聚合操作是可以嵌套的。比如对于划分聚合结果中的每个分组中的文档集可以再次进行聚合操作。

代码清单 9 中给出了按月计算订单数额总数的聚合操作的示例。首先通过 Client 对象的 prepareSearch 方法指定对 order 索引进行聚合操作,order 索引中的全部文档是聚合操作的文档集。所执行的第一个层次的聚合操作是使用 dateHistogram 方法对文档的 createdAt 域进行按月统计,即根据 createdAt 域的值得出的月份相同的文档会被聚合到同一个分组中,而分组的标识符是每个月的第一天的日期时间值。在第一个层次的聚合操作之上,对于每个分组中的文档集再由 sum 方法对 amount 域进行求和计算,所得到的结果即是每个月份的全部订单的数额的和。

清单 9. 按月计算订单数额总数的聚合操作的示例
@RequestMapping(value = "/summary", method = RequestMethod.GET) @ResponseBody public Map<DateTime, Double> orderSummary() throws Exception {    SearchResponse searchResponse = client.prepareSearch("order")            .addAggregation(AggregationBuilders.dateHistogram("monthly").field(                                         "createdAt").interval(DateHistogram.Interval.MONTH)                    .subAggregation(AggregationBuilders.sum("total").field("amount")))    .execute().get();    Map<DateTime, Double> result = new HashMap<>();    DateHistogram dateHistogram = searchResponse.getAggregations().get("monthly");    for (DateHistogram.Bucket entry : dateHistogram.getBuckets()) {        DateTime keyAsDate = entry.getKeyAsDate();        Sum sum = entry.getAggregations().get("total");        result.put(keyAsDate, sum.getValue());    }    return result; }

代码清单 10 给出了代码清单 9 中的聚合操作的结果。

清单 10. 订单数据按月聚合的结果
{  "2015-08-01T00:00:00.000Z": 200.5,  "2015-09-01T00:00:00.000Z": 501 }

已有数据集成

在一个企业系统中可能有多个不同的系统产生相关的数据。这些数据也需要集成到 Elasticsearch 中。Logstash 提供了一个简单的方式将已有数据集成到 Elasticsearch 中。Logstash 是一个数据收集引擎,可以把来自不同源的数据进行收集和处理之后,输出到不同的目的地。Logstash 最早是用来收集日志的,现在已经演化成一个通用的数据收集和处理工具。通过不同的插件,Logstash 可以对不同类型的数据进行收集和处理。

Logstash 的核心是事件处理流水线,由 3 个阶段组成:输入、过滤器和输出。输入是 Logstash 数据的来源,如文件、syslog 和 redis 等。过滤器对输入的事件进行处理,如 grok、mutate 和 geoip 等。输出是接收事件的目的地,如文件、elasticsearch 和 graphite 等。编码器是一种特殊类型的过滤器,用来对数据进行序列化处理,如 json、plain 和 multiline 等。每个不同的阶段都是通过插件来实现的,社区贡献了 200 多种不同的插件。

例如在示例应用中,有另外一个系统也产生订单数据。这些订单数据需要集成到 Elasticsearch 中。该系统产生的数据采用的是 CSV 格式,如代码清单 11 所示。

清单 11. 第三方系统产生的 CSV 订单数据
 1,1438698376000,150  2,1438698386000,50  3,1438698396000,100

代码清单 12 给出了集成订单数据的 Logstash 配置文件。配置文件中包含三个部分:input、filter 和 output,分别对应事件处理流水线的 3 个阶段。input 表示的是事件的输入,来自文件系统的 CSV 文件,并指定从文件开头开始读取;filter 表示的是事件的处理,由两个步骤组成:第一个步骤使用 csv 插件把 CSV 文件中的每一行的不同列转换成事件对象中的属性值,第二个步骤使用插件 mutate 转换事件对象中的属性值的类型;output 表示的是事件的输出,目的地是 Elasticsearch,并指定 Elasticsearch 中对应的索引的名称和文档类型。

清单 12. 进行数据集成的 Logstash 配置文件
input {   file {    path => "/tmp/orders.csv"    start_position => "beginning"  } }  filter {  csv {    columns => ["id", "createdAt", "amount"]  }   mutate {    convert => {       "createdAt" => "integer"      "amount" => "float"    }  } }  output {  elasticsearch {     host => localhost     port => 9300    index => "order"    document_type => "order"  } }

当 CSV 文件被更新时,其中包含的订单数据会被自动添加到 Elasticsearch 中 order 索引中。

小结
Elasticsearch 作为数据搜索领域的佼佼者,可以为企业应用提供强大的全文检索功能。Logstash 则可以对第三方系统产生的数据进行整合。这两者结合起来,可以实现完善的数据集成和搜索系统。本文首先对 Elasticsearch 做了详细的介绍,包括如何索引文档、更新和删除文档,以及如何对数据进行聚合分析处理,最后介绍了如何使用 Logstash 来整合已有数据到 Elasticsearch 中。


下载

描述名字大小
示例代码sample_code.zip10k

正文到此结束
Loading...